Skip to content

refactor(submitter): concurrent submitter#3287

Open
julienrbrt wants to merge 10 commits intomainfrom
julien/speedup-submitter
Open

refactor(submitter): concurrent submitter#3287
julienrbrt wants to merge 10 commits intomainfrom
julien/speedup-submitter

Conversation

@julienrbrt
Copy link
Copy Markdown
Member

@julienrbrt julienrbrt commented Apr 24, 2026

Overview

Attempt to improve submitter by doing concurrent sends. We don't need to wait for the answer and we would submit.
Useful when the throughput of blobs needs to be high.

Related to #3244 as Fiber takes time to return, so it is necessary there. This PR is mainly to investigate if we can generalize that improvement to mainline evnode.

Summary by CodeRabbit

  • Refactor
    • Enhanced submission handling with improved retry logic and error recovery mechanisms.
    • Optimized pending item management with better batch processing and size-limit enforcement for oversized submissions.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 24, 2026

Warning

Rate limit exceeded

@julienrbrt has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 38 minutes and 8 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c03190c7-0f87-4662-8158-2da77bdae6fc

📥 Commits

Reviewing files that changed from the base of the PR and between 31093e6 and ea69d67.

📒 Files selected for processing (3)
  • block/internal/cache/pending_base.go
  • block/internal/submitting/da_submitter.go
  • block/internal/submitting/submitter.go
📝 Walkthrough

Walkthrough

The pull request extends the cache manager with in-flight range reset methods, refactors pending items tracking to maintain claims and gaps for retry management, and converts DA submissions from synchronous to asynchronous with centralized retry logic and callbacks.

Changes

Cohort / File(s) Summary
Cache In-Flight Range Reset
block/internal/cache/manager.go, block/internal/cache/pending_headers.go, block/internal/cache/pending_data.go
Added ResetInFlightHeaderRange(start, end) and ResetInFlightDataRange(start, end) methods to the PendingManager interface and implementing types to support re-exposure of previously claimed ranges after failures.
Pending Base Claims and Gaps Tracking
block/internal/cache/pending_base.go
Refactored to track in-flight claims and gap ranges per height; getPending now finds contiguous unclaimed ranges, registers claims, and removes gaps; setLastSubmittedHeight trims claims/gaps; new resetInFlightRange method re-introduces failed ranges as gaps for retry.
Cache Tests
block/internal/cache/manager_test.go, block/internal/cache/pending_base_test.go, block/internal/cache/pending_headers_test.go, block/internal/cache/pending_data_test.go
Updated tests to call range-reset methods after advancing submitted heights and added four new test cases exercising in-flight claim depletion and range reset behavior.
DA Submitter Async and Retry
block/internal/submitting/da_submitter.go, block/internal/submitting/da_submitter_tracing.go, block/internal/submitting/submitter.go
Converted SubmitHeaders and SubmitData to accept success/error callbacks, spawn async submission goroutines with retry/backoff, implement size-limit batching, and added Close() lifecycle method.
DA Submitter Tests
block/internal/submitting/da_submitter_test.go, block/internal/submitting/da_submitter_integration_test.go, block/internal/submitting/da_submitter_mocks_test.go, block/internal/submitting/da_submitter_tracing_test.go, block/internal/submitting/submitter_test.go
Updated test signatures to match callback-based API; restructured mock tests to verify batch-level retry behavior and edge cases; added explicit Close() calls.

Sequence Diagrams

sequenceDiagram
    participant Client
    participant Submitter as Submitter<br/>(submission loop)
    participant DASubmitter as DASubmitter<br/>(async handler)
    participant Cache as Cache<br/>(pending manager)
    participant DA as DA Layer

    Client->>Submitter: SubmitHeaders(headers, cache, callbacks)
    Submitter->>DASubmitter: SubmitHeaders(..., onSuccess, onError)
    DASubmitter->>DASubmitter: spawn goroutine
    DASubmitter-->>Submitter: return nil immediately
    
    DASubmitter->>DASubmitter: submitWithRetry loop
    DASubmitter->>DASubmitter: limitBatchBySize
    DASubmitter->>DA: submitToDA(batch)
    
    alt Success
        DA-->>DASubmitter: success result
        DASubmitter->>Cache: Update cache (post-submit)
        DASubmitter->>Cache: Append DA height hint
        DASubmitter->>DASubmitter: Call onSubmitSuccess()
    else Failure (retryable)
        DA-->>DASubmitter: error result
        DASubmitter->>DASubmitter: backoff & retry
    else Failure (non-retryable)
        DA-->>DASubmitter: error result
        DASubmitter->>DASubmitter: Call onError(error)
    end
    
    Note over Submitter,DASubmitter: Success/error callbacks<br/>update submitter state
    Submitter->>Cache: ResetInFlightRange (on error callback)
    Client->>Submitter: Close()
    Submitter->>DASubmitter: Close()
    DASubmitter->>DASubmitter: Wait for goroutines
Loading
sequenceDiagram
    participant Submitter as Submitter
    participant Cache as Cache<br/>(pendingBase)
    participant DASubmitter as DASubmitter
    participant DA as DA Layer

    Submitter->>Cache: GetPendingHeaders()
    Cache->>Cache: Snapshot claims/gaps
    Cache->>Cache: Find first contiguous<br/>unclaimed range
    Cache->>Cache: Register as in-flight claim
    Cache-->>Submitter: pending items & range
    
    Submitter->>DASubmitter: SubmitHeaders(..., onSuccess, onError)
    DASubmitter-->>Submitter: nil (async)
    
    DASubmitter->>DA: submitWithRetry(batch)
    
    alt Success
        DA-->>DASubmitter: success
        DASubmitter->>DASubmitter: onSuccess callback fires
        Note over Submitter: Update lastHeaderSubmit<br/>via callback
    else Failure
        DA-->>DASubmitter: error
        DASubmitter->>DASubmitter: onError callback fires
        Note over Submitter: ResetInFlightRange<br/>via callback
        Submitter->>Cache: ResetInFlightHeaderRange(start, end)
        Cache->>Cache: Remove overlapping claims
        Cache->>Cache: Add failed portion as gaps
        Submitter->>Submitter: Retry or handle critical error
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

T:code-hygiene, T:testing

Suggested reviewers

  • chatton
  • tac0turtle
  • alpe

Poem

🐰 A submit, a retry, a callback so sweet,
In-flight claims and gaps make the retry complete.
No more blocking waits—async all the way,
Ranges reset on error for another fine day! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.12% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'refactor(submitter): concurrent submitter' clearly and concisely summarizes the main change—introducing concurrent submission handling in the submitter component, which is the primary focus of this changeset.
Description check ✅ Passed The description provides adequate context explaining the concurrent submission improvement, its rationale for high blob throughput, and relevant background via a related PR link, meeting template requirements.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch julien/speedup-submitter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 38 minutes and 8 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 24, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedApr 29, 2026, 8:30 AM

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 24, 2026

Claude finished @julienrbrt's task in 2m 37s —— View job


Code Review: refactor(submitter): concurrent submitter

This is a substantial refactor converting synchronous DA submissions to an async callback-based model with in-flight claim tracking. The architecture is sound and the general approach is well thought out. I've found a few bugs — one silent data-integrity issue and one that can cause an infinite loop or panic — plus the tracing regression and the lock ordering race flagged by CodeRabbit. Details below.


Critical Bugs

1. Partial-success callback slices from wrong offset (da_submitter.go:228-235, 284-293)

submitWithRetry advances marshaled = marshaled[submitted:] after a partial DA success, but the onSuccess closure captures the original headers/signedDataList slice and always passes headers[:submittedCount]. On the second iteration, it would re-mark the same leading items instead of the next batch — so items beyond the first submitted count never get their DA-inclusion recorded. This is a silent liveness bug.

// Bug: offset never advances across iterations
postSubmit(headers[:submittedCount], ...)   // always relative to original slice

Fix: track an explicit offset alongside marshaled:

offset := 0
// in submitWithRetry onSuccess:
postSubmit(headers[offset:offset+submittedCount], ...)
offset += submittedCount

Fix this →

2. Unvalidated SubmittedCount causes infinite loop or panic (da_submitter.go:385-394)

case datypes.StatusSuccess:
    submitted := int(res.SubmittedCount)
    if onSuccess != nil {
        onSuccess(submitted, res.Height)
    }
    if submitted == len(marshaled) {
        return
    }
    marshaled = marshaled[submitted:]  // panics if submitted > len(marshaled)
  • If submitted == 0: marshaled doesn't advance, the loop retries the same batch forever until MaxAttempts.
  • If submitted > len(marshaled): slice operation panics.

Both values come from an external DA client and must be validated. Fix this →


Major Issues

3. TOCTOU race: lastHeight read outside inFlightMu lock (pending_base.go:80-91)

getPending reads pb.lastHeight.Load() before acquiring inFlightMu, while setLastSubmittedHeight updates lastHeight then acquires inFlightMu to trim claims:

// getPending — lastHeight read BEFORE lock
lastSubmitted := pb.lastHeight.Load()     // <-- outside lock
...
pb.inFlightMu.Lock()
rangeStart, rangeEnd := findAvailableRange(pb.gaps, pb.inFlightClaims, lastSubmitted, storeHeight)
pb.inFlightMu.Unlock()

// setLastSubmittedHeight — lastHeight updated BEFORE lock
pb.lastHeight.CompareAndSwap(lsh, newLastSubmittedHeight)  // <-- outside lock
...
pb.inFlightMu.Lock()
pb.inFlightClaims = trimClaimsBelow(...)
pb.inFlightMu.Unlock()

With concurrent submissions, getPending can use a stale lastHeight while claims are already trimmed, causing findAvailableRange to re-expose already-submitted heights. The fix is to read lastHeight inside the same inFlightMu critical section and apply the same locking in setLastSubmittedHeight. Fix this →

4. Tracing spans end before async work completes (da_submitter_tracing.go:33-62)

defer span.End() fires the moment inner.SubmitHeaders/SubmitData returns — which now happens immediately after spawning the goroutine. Async failures routed via onSubmitError never reach the span, and recorded latency is only enqueue time (microseconds), not actual DA round-trip time.

The callbacks need to be wrapped to close the span on actual completion:

wrappedOnSuccess := func() {
    span.SetStatus(codes.Ok, "")
    span.End()
    if onSubmitSuccess != nil { onSubmitSuccess() }
}
wrappedOnError := func(err error) {
    if err != nil { span.RecordError(err); span.SetStatus(codes.Error, err.Error()) }
    span.End()
    if onSubmitError != nil { onSubmitError(err) }
}

Fix this →


Minor Issues

5. Missing doc comments on exported reset methods

ResetInFlightHeaderRange (pending_headers.go:83) and ResetInFlightDataRange (pending_data.go:88) are exported but have no doc comments, violating the repo's Go conventions ("Document exported types and functions").

6. Test resource leaks — Close() not deferred

In da_submitter_test.go (e.g. around line 218) and da_submitter_integration_test.go (line 110), Close() is called only on the happy path. An assertion failure before that point leaks the async worker goroutine into subsequent tests. Use defer submitter.Close() or t.Cleanup(func() { submitter.Close() }) immediately after construction.

7. fakeDASubmitter discards callbacks (submitter_test.go:424-440)

The test double drops onSubmitSuccess / onSubmitError without invoking them. The submission-loop tests therefore never exercise the timestamp-update or cache-reset paths that were central to this refactor. Consider invoking onSubmitSuccess() from the fake to cover that contract, or adding a dedicated test.

8. onError(nil) on context cancellation / empty batch (da_submitter.go:327-330, 356-359)

Calling onError(nil) on context cancellation triggers cache.ResetInFlightRange in the caller's error handler, which is correct for crash recovery but misleading for a clean shutdown. Consider passing a sentinel (e.g. context.Cause(ctx)) so the caller can distinguish orderly shutdown from a submission failure before resetting ranges.


Summary

# Severity File Issue
1 🔴 Critical da_submitter.go:228 Partial-success callback indexes wrong slice
2 🔴 Critical da_submitter.go:385 Unvalidated SubmittedCount → infinite loop / panic
3 🟠 Major pending_base.go:80 TOCTOU race on lastHeight outside inFlightMu
4 🟠 Major da_submitter_tracing.go:39 Span ends before async work; latency/error not recorded
5 🟡 Minor pending_headers.go:83, pending_data.go:88 Missing doc comments on exported methods
6 🟡 Minor da_submitter_test.go:218, integration test Close() not deferred — goroutine leaks on test failure
7 🟡 Minor submitter_test.go:424 fakeDASubmitter doesn't invoke callbacks
8 🟡 Minor da_submitter.go:327 onError(nil) on context cancel is misleading

Overall the structural direction is solid — the in-flight claim tracking logic in pending_base.go is clever and the recovery paths for oversized items and mempool pressure look well-thought-out. The partial-success slice bug (issue 1) and the missing SubmittedCount validation (issue 2) are the most important to address before merge.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 24, 2026

Codecov Report

❌ Patch coverage is 70.16575% with 54 lines in your changes missing coverage. Please review.
✅ Project coverage is 62.52%. Comparing base (49ef5c9) to head (3b5682c).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
block/internal/submitting/da_submitter.go 73.49% 29 Missing and 15 partials ⚠️
block/internal/submitting/submitter.go 11.11% 2 Missing and 6 partials ⚠️
block/internal/submitting/da_submitter_tracing.go 66.66% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3287      +/-   ##
==========================================
+ Coverage   62.45%   62.52%   +0.06%     
==========================================
  Files         122      122              
  Lines       13047    13078      +31     
==========================================
+ Hits         8149     8177      +28     
+ Misses       4012     4011       -1     
- Partials      886      890       +4     
Flag Coverage Δ
combined 62.52% <70.16%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@julienrbrt julienrbrt marked this pull request as ready for review April 29, 2026 08:05
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
block/internal/submitting/da_submitter_tracing.go (1)

33-61: ⚠️ Potential issue | 🟠 Major

The span now ends before the async submission does.

inner.SubmitHeaders/SubmitData now return right after scheduling background work, so defer span.End() closes the span before retries/callbacks run. Async failures routed through onSubmitError never reach the span, and the recorded latency is only the enqueue time. Wrap the callbacks and end/annotate the span from the actual completion path instead.

Also applies to: 64-92

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter_tracing.go` around lines 33 - 61, The
current tracedDASubmitter starts a span in SubmitHeaders (and similarly in
SubmitData) but defers span.End(), which closes the span before async work
completes; instead remove the immediate defer span.End() and wrap the
onSubmitSuccess and onSubmitError callbacks with closures that record
errors/status on the span and call span.End() when the async completion path
runs; i.e., in tracedDASubmitter.SubmitHeaders and SubmitData, create
wrappedOnSuccess := func(){ span.SetStatus(codes.Ok, ""); onSubmitSuccess();
span.End() } and wrappedOnError := func(err error){ span.RecordError(err);
span.SetStatus(codes.Error, err.Error()); onSubmitError(err); span.End() } (or
equivalent wrappers matching original signatures) and pass those to
t.inner.SubmitHeaders/SubmitData so the span lifetime and annotations reflect
actual completion.
block/internal/cache/pending_base.go (1)

80-96: ⚠️ Potential issue | 🟠 Major

Take lastHeight and the in-flight ranges under one synchronization boundary.

getPending() reads lastHeight before cloning inFlightClaims/gaps, while setLastSubmittedHeight() updates lastHeight and trims those slices independently. With the new concurrent submitter, the lastHeight=old + claims already trimmed interleaving can make findAvailableRange() hand out heights that were just acknowledged, causing duplicate DA submissions.

As per coding guidelines "Be careful with concurrent access to shared state".

Also applies to: 175-189

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/pending_base.go` around lines 80 - 96, getPending()
currently reads pb.lastHeight (pb.lastHeight.Load()) outside the pb.inFlightMu
critical section and then clones pb.inFlightClaims and pb.gaps, which allows an
interleaving with setLastSubmittedHeight() that trims those slices and updates
lastHeight causing findAvailableRange() to return already-acknowledged heights;
fix by moving the read of pb.lastHeight inside the same
pb.inFlightMu.Lock()/Unlock() block where you clone inFlightClaims and gaps so
lastHeight and the in-flight ranges are read atomically, and apply the same
locking discipline to setLastSubmittedHeight() (acquire pb.inFlightMu while
trimming inFlightClaims/gaps and updating pb.lastHeight) to prevent races when
findAvailableRange, getPending, and setLastSubmittedHeight interact.
🧹 Nitpick comments (3)
block/internal/cache/pending_data.go (1)

88-90: Document the new exported reset helper.

ResetInFlightDataRange is public, but it has no doc comment. That violates the repo's Go guideline for exported identifiers and will likely fail linting.

💡 Suggested fix
+// ResetInFlightDataRange clears the in-flight data claim state for heights in [start, end].
 func (pd *PendingData) ResetInFlightDataRange(start, end uint64) {

As per coding guidelines, Document exported types and functions in Go code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/pending_data.go` around lines 88 - 90, Add a Go doc
comment for the exported method ResetInFlightDataRange on type PendingData:
describe what the method does, its parameters (start, end uint64) and any
important behavior or side-effects (it delegates to pd.base.resetInFlightRange
to reset the in-flight data range). Place the comment immediately above the func
declaration for ResetInFlightDataRange so it satisfies Go exported identifier
documentation guidelines.
block/internal/cache/pending_headers.go (1)

83-85: Document the new exported reset helper.

ResetInFlightHeaderRange is public, but it has no doc comment. That violates the repo's Go guideline for exported identifiers and will likely fail linting.

💡 Suggested fix
+// ResetInFlightHeaderRange clears the in-flight header claim state for heights in [start, end].
 func (ph *PendingHeaders) ResetInFlightHeaderRange(start, end uint64) {

As per coding guidelines, Document exported types and functions in Go code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/pending_headers.go` around lines 83 - 85, Add a Go doc
comment for the exported method PendingHeaders.ResetInFlightHeaderRange
describing its purpose and behavior: explain that it resets the in-flight header
range tracked by the PendingHeaders instance between the inclusive start and end
uint64 indices, mention any side effects (it delegates to
ph.base.resetInFlightRange), and document the parameters (start, end) and any
expectations (e.g., inclusive bounds or preconditions). Place the comment
immediately above the ResetInFlightHeaderRange method.
block/internal/submitting/submitter_test.go (1)

424-440: Exercise the new callbacks in the fake submitter.

This test double now accepts onSubmitSuccess/onSubmitError, but it still discards them. That means the loop test only verifies enqueueing, not the success/error lifecycle that now updates timestamps and resets in-flight cache state.

Consider invoking the callbacks when non-nil or adding a focused test that covers that contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/submitter_test.go` around lines 424 - 440, The
fakeDASubmitter currently drops the provided callbacks in SubmitHeaders and
SubmitData; update these methods (SubmitHeaders and SubmitData on
fakeDASubmitter) to call the supplied on-success and on-error callbacks when
they are non-nil so the test exercises the full success/error lifecycle (e.g.,
invoke the success callback when you want the fake to simulate success, or
invoke the error callback with a test error to simulate failure), while
preserving the existing signaling to chHdr/chData; alternatively add a focused
test that uses a fake submitter which invokes those callbacks to assert
timestamps and in-flight cache resets.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/submitting/da_submitter_integration_test.go`:
- Around line 101-110: The test currently calls daSubmitter.Close() only at the
end, risking resource leaks if earlier assertions fail; after creating the async
submitter (the daSubmitter variable), ensure cleanup is guaranteed by
registering a deferred close or t.Cleanup call—e.g., immediately after
daSubmitter is constructed call defer daSubmitter.Close() or t.Cleanup(func(){
daSubmitter.Close() }) so the Close() method on daSubmitter always runs even if
the test fails early.

In `@block/internal/submitting/da_submitter_test.go`:
- Around line 216-218: The test currently calls submitter.Close() after
assertions which can leak the submitter's async worker if an assertion fails;
change the teardown to run immediately after setup by invoking defer
submitter.Close() (or t.Cleanup(func(){ submitter.Close() })) right after the
submitter is created so Close() always runs even on test failures — update the
tests that call submitter.SubmitHeaders(...) and later submitter.Close() (e.g.,
the cases around SubmitHeaders and the other similar test) to use deferred
cleanup instead.

In `@block/internal/submitting/da_submitter.go`:
- Around line 388-398: The datalayer success branch uses res.SubmittedCount
directly which can be 0 or >len(marshaled) and cause infinite loops or panics;
in the datypes.StatusSuccess case (around res.SubmittedCount handling) validate
that submitted := int(res.SubmittedCount) is >0 and <= len(marshaled) before
calling onSuccess or advancing the window (marshaled = marshaled[submitted:]);
if submitted==0 treat as a reject/error (update rs.Attempt or return/log and do
not spin) and if submitted>len(marshaled) treat as malformed input (log/error
and reject) so only a validated count is passed to onSuccess and used to slice
marshaled.

In `@block/internal/submitting/submitter.go`:
- Around line 236-250: The code enqueues a batch as in-flight via
GetPendingHeaders/GetPendingData but if s.daSubmitter.SubmitHeaders or
SubmitData returns an immediate error the in-flight claim is never released;
update the error path in submitter.go around s.daSubmitter.SubmitHeaders and the
analogous SubmitData call so that before logging or returning on synchronous
error you call s.cache.ResetInFlightHeaderRange(headers[0].Height(),
headers[len(headers)-1].Height()) (and for data use the corresponding
ResetInFlightDataRange with the first/last data heights), then proceed to
log/handle the error (including the existing ErrOversizedItem handling) so the
claimed heights are retried.

---

Outside diff comments:
In `@block/internal/cache/pending_base.go`:
- Around line 80-96: getPending() currently reads pb.lastHeight
(pb.lastHeight.Load()) outside the pb.inFlightMu critical section and then
clones pb.inFlightClaims and pb.gaps, which allows an interleaving with
setLastSubmittedHeight() that trims those slices and updates lastHeight causing
findAvailableRange() to return already-acknowledged heights; fix by moving the
read of pb.lastHeight inside the same pb.inFlightMu.Lock()/Unlock() block where
you clone inFlightClaims and gaps so lastHeight and the in-flight ranges are
read atomically, and apply the same locking discipline to
setLastSubmittedHeight() (acquire pb.inFlightMu while trimming
inFlightClaims/gaps and updating pb.lastHeight) to prevent races when
findAvailableRange, getPending, and setLastSubmittedHeight interact.

In `@block/internal/submitting/da_submitter_tracing.go`:
- Around line 33-61: The current tracedDASubmitter starts a span in
SubmitHeaders (and similarly in SubmitData) but defers span.End(), which closes
the span before async work completes; instead remove the immediate defer
span.End() and wrap the onSubmitSuccess and onSubmitError callbacks with
closures that record errors/status on the span and call span.End() when the
async completion path runs; i.e., in tracedDASubmitter.SubmitHeaders and
SubmitData, create wrappedOnSuccess := func(){ span.SetStatus(codes.Ok, "");
onSubmitSuccess(); span.End() } and wrappedOnError := func(err error){
span.RecordError(err); span.SetStatus(codes.Error, err.Error());
onSubmitError(err); span.End() } (or equivalent wrappers matching original
signatures) and pass those to t.inner.SubmitHeaders/SubmitData so the span
lifetime and annotations reflect actual completion.

---

Nitpick comments:
In `@block/internal/cache/pending_data.go`:
- Around line 88-90: Add a Go doc comment for the exported method
ResetInFlightDataRange on type PendingData: describe what the method does, its
parameters (start, end uint64) and any important behavior or side-effects (it
delegates to pd.base.resetInFlightRange to reset the in-flight data range).
Place the comment immediately above the func declaration for
ResetInFlightDataRange so it satisfies Go exported identifier documentation
guidelines.

In `@block/internal/cache/pending_headers.go`:
- Around line 83-85: Add a Go doc comment for the exported method
PendingHeaders.ResetInFlightHeaderRange describing its purpose and behavior:
explain that it resets the in-flight header range tracked by the PendingHeaders
instance between the inclusive start and end uint64 indices, mention any side
effects (it delegates to ph.base.resetInFlightRange), and document the
parameters (start, end) and any expectations (e.g., inclusive bounds or
preconditions). Place the comment immediately above the ResetInFlightHeaderRange
method.

In `@block/internal/submitting/submitter_test.go`:
- Around line 424-440: The fakeDASubmitter currently drops the provided
callbacks in SubmitHeaders and SubmitData; update these methods (SubmitHeaders
and SubmitData on fakeDASubmitter) to call the supplied on-success and on-error
callbacks when they are non-nil so the test exercises the full success/error
lifecycle (e.g., invoke the success callback when you want the fake to simulate
success, or invoke the error callback with a test error to simulate failure),
while preserving the existing signaling to chHdr/chData; alternatively add a
focused test that uses a fake submitter which invokes those callbacks to assert
timestamps and in-flight cache resets.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 996c9001-1856-439f-b61e-aa2ea4669b4e

📥 Commits

Reviewing files that changed from the base of the PR and between 49ef5c9 and 31093e6.

📒 Files selected for processing (16)
  • block/internal/cache/manager.go
  • block/internal/cache/manager_test.go
  • block/internal/cache/pending_base.go
  • block/internal/cache/pending_base_test.go
  • block/internal/cache/pending_data.go
  • block/internal/cache/pending_data_test.go
  • block/internal/cache/pending_headers.go
  • block/internal/cache/pending_headers_test.go
  • block/internal/submitting/da_submitter.go
  • block/internal/submitting/da_submitter_integration_test.go
  • block/internal/submitting/da_submitter_mocks_test.go
  • block/internal/submitting/da_submitter_test.go
  • block/internal/submitting/da_submitter_tracing.go
  • block/internal/submitting/da_submitter_tracing_test.go
  • block/internal/submitting/submitter.go
  • block/internal/submitting/submitter_test.go

Comment on lines 101 to +110
// Submit headers and data - cache returns both items and marshalled bytes
headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background())
require.NoError(t, err)
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n))
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n, nil, nil))

dataList, marshalledData, err := cm.GetPendingData(context.Background())
require.NoError(t, err)
require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen))
require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen, nil, nil))

daSubmitter.Close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Defer Close() so cleanup always runs.

Calling Close() only at the end of the successful path can leak the async submitter if an earlier assertion fails. Move it into a defer or t.Cleanup immediately after construction.

💡 Suggested fix
 daSubmitter := NewDASubmitter(client, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop(), noopDAHintAppender{}, noopDAHintAppender{})
+defer daSubmitter.Close()
@@
- daSubmitter.Close()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Submit headers and data - cache returns both items and marshalled bytes
headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background())
require.NoError(t, err)
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n))
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n, nil, nil))
dataList, marshalledData, err := cm.GetPendingData(context.Background())
require.NoError(t, err)
require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen))
require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen, nil, nil))
daSubmitter.Close()
// Submit headers and data - cache returns both items and marshalled bytes
headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background())
require.NoError(t, err)
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n, nil, nil))
dataList, marshalledData, err := cm.GetPendingData(context.Background())
require.NoError(t, err)
require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen, nil, nil))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter_integration_test.go` around lines 101
- 110, The test currently calls daSubmitter.Close() only at the end, risking
resource leaks if earlier assertions fail; after creating the async submitter
(the daSubmitter variable), ensure cleanup is guaranteed by registering a
deferred close or t.Cleanup call—e.g., immediately after daSubmitter is
constructed call defer daSubmitter.Close() or t.Cleanup(func(){
daSubmitter.Close() }) so the Close() method on daSubmitter always runs even if
the test fails early.

Comment on lines +216 to +218
err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer, nil, nil)
require.NoError(t, err)
submitter.Close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Defer Close() in the success-path tests.

Both tests close the submitter only after the assertions. If one of those assertions fails, teardown is skipped and the async worker can leak into later tests. Use defer or t.Cleanup right after setup instead.

💡 Suggested fix
 submitter, st, cm, mockDA, gen := setupDASubmitterTest(t)
+defer submitter.Close()
@@
- submitter.Close()

Also applies to: 331-333

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter_test.go` around lines 216 - 218, The
test currently calls submitter.Close() after assertions which can leak the
submitter's async worker if an assertion fails; change the teardown to run
immediately after setup by invoking defer submitter.Close() (or
t.Cleanup(func(){ submitter.Close() })) right after the submitter is created so
Close() always runs even on test failures — update the tests that call
submitter.SubmitHeaders(...) and later submitter.Close() (e.g., the cases around
SubmitHeaders and the other similar test) to use deferred cleanup instead.

Comment on lines +388 to +398
case datypes.StatusSuccess:
submitted := int(res.SubmittedCount)
if onSuccess != nil {
onSuccess(submitted, res.Height)
}
s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")
if submitted == len(marshaled) {
return
}
// partial success: advance window
marshaled = marshaled[submitted:]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Validate SubmittedCount before using it.

res.SubmittedCount comes from the DA client. A value of 0 leaves marshaled unchanged and rs.Attempt unmodified, so this loop spins forever; a value larger than the batch size panics here and in the outer headers[:submittedCount] / signedDataList[:submittedCount] callbacks. Reject out-of-range counts before calling onSuccess.

Suggested fix
 		case datypes.StatusSuccess:
 			submitted := int(res.SubmittedCount)
+			if submitted <= 0 || submitted > len(marshaled) {
+				err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
+				s.recordFailure(common.DASubmitterFailureReasonUnknown)
+				s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
+				if onError != nil {
+					onError(err)
+				}
+				return
+			}
 			if onSuccess != nil {
 				onSuccess(submitted, res.Height)
 			}
 			s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")

As per coding guidelines "Validate all inputs from external sources in Go code".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case datypes.StatusSuccess:
submitted := int(res.SubmittedCount)
if onSuccess != nil {
onSuccess(submitted, res.Height)
}
s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")
if submitted == len(marshaled) {
return
}
// partial success: advance window
marshaled = marshaled[submitted:]
case datypes.StatusSuccess:
submitted := int(res.SubmittedCount)
if submitted <= 0 || submitted > len(marshaled) {
err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
s.recordFailure(common.DASubmitterFailureReasonUnknown)
s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
if onError != nil {
onError(err)
}
return
}
if onSuccess != nil {
onSuccess(submitted, res.Height)
}
s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")
if submitted == len(marshaled) {
return
}
// partial success: advance window
marshaled = marshaled[submitted:]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/submitting/da_submitter.go` around lines 388 - 398, The
datalayer success branch uses res.SubmittedCount directly which can be 0 or
>len(marshaled) and cause infinite loops or panics; in the datypes.StatusSuccess
case (around res.SubmittedCount handling) validate that submitted :=
int(res.SubmittedCount) is >0 and <= len(marshaled) before calling onSuccess or
advancing the window (marshaled = marshaled[submitted:]); if submitted==0 treat
as a reject/error (update rs.Attempt or return/log and do not spin) and if
submitted>len(marshaled) treat as malformed input (log/error and reject) so only
a validated count is passed to onSuccess and used to slice marshaled.

Comment thread block/internal/submitting/submitter.go Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant