From cef16e869e336a3e3d84e9cf1a1c7300124482f6 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sun, 3 May 2026 00:35:41 +0200 Subject: [PATCH 1/4] fix(solo,reaping): bound sequencer queue to prevent ingest-side OOM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under sustained ingest above the block-production drain rate, SoloSequencer.queue grew monotonically. A 32-vCPU loadgen pushing >100 MB/s into a runner whose executor drains ~100 MB/s per block filled the queue at ~150 MB/s of net-positive growth — heap profiles showed 24 GB of retained io.ReadAll bytes in the queue within ~30 s, then anon-rss:63GB OOM-kill at the box's 64 GiB ceiling. Reproducible twice with identical signature. Two changes, one feature: - SoloSequencer.SetMaxQueueBytes(n) caps the queue's total retained tx bytes. SubmitBatchTxs uses all-or-nothing admission against the cap: if the incoming batch would push us over, the whole batch is rejected with ErrQueueFull and the queue keeps its current contents untouched. Partial admission would force the caller to track which prefix succeeded and only re-feed the suffix on retry; the reaper currently doesn't do that, so the whole-batch rule lets the reaper just retry the same batch later when the queue has drained. queueBytes is decremented on drain (queue := nil) and re-counted for postponed txs that the executor's FilterTxs returns to the queue. Zero cap = the legacy unbounded path, preserved for tests and small deployments. - The reaper bridging executor mempool → sequencer matches ErrQueueFull via errors.Is and treats it as transient backpressure: marks the rejected hashes as "seen" so the next reaper tick doesn't re-hash + re-submit the same already- rejected txs forever, logs a warn line with the dropped count, and continues running. Without this match every queue-full event would tear the daemon down via the existing fatal-on- submit-error path. Loadgen sees the backpressure indirectly: with the sequencer queue full, the executor's txChan stops draining, /tx blocks on its bounded channel send, and txsim observes 5xx / timeouts — cleanly applied at the application layer instead of via the kernel OOM-killer. --- block/internal/reaping/reaper.go | 12 ++++++ pkg/sequencers/solo/sequencer.go | 67 ++++++++++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index d35c61e422..66e058fbb2 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -16,6 +16,7 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/sequencers/solo" ) const ( @@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { Id: []byte(r.chainID), Batch: &coresequencer.Batch{Transactions: newTxs}, }) + if errors.Is(err, solo.ErrQueueFull) { + // Sequencer queue is full — backpressure signal. Mark the + // batch as "seen" so we don't waste cycles re-hashing the + // same dropped txs every reaper tick, and surface the drop + // as a warning rather than tearing down the daemon. The + // loadgen sees lower acceptance via /tx flow control once + // the executor's own mempool fills up. + r.cache.SetTxsSeen(newHashes) + r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)") + break + } if err != nil { return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } diff --git a/pkg/sequencers/solo/sequencer.go b/pkg/sequencers/solo/sequencer.go index 86fa08d45d..f8524157e6 100644 --- a/pkg/sequencers/solo/sequencer.go +++ b/pkg/sequencers/solo/sequencer.go @@ -14,7 +14,15 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" ) -var ErrInvalidID = errors.New("invalid chain id") +var ( + ErrInvalidID = errors.New("invalid chain id") + // ErrQueueFull is returned from SubmitBatchTxs when the in-memory + // queue is at its byte cap (see SetMaxQueueBytes). Callers should + // treat this as transient backpressure (drop or retry); the + // reaper bridging executor mempool → sequencer matches it via + // errors.Is and downgrades to a warning. + ErrQueueFull = errors.New("sequencer queue full") +) var ( emptyBatch = &coresequencer.Batch{} @@ -27,6 +35,15 @@ var _ coresequencer.Sequencer = (*SoloSequencer)(nil) // SoloSequencer is a single-leader sequencer without forced inclusion // support. It maintains a simple in-memory queue of mempool transactions and // produces batches on demand. +// +// The queue can be bounded in bytes via SetMaxQueueBytes. A bound is +// strongly recommended in any high-throughput configuration: under +// sustained ingest above the block-production drain rate the queue +// otherwise grows monotonically until OOM. With a bound set, +// SubmitBatchTxs admits only as many incoming txs as fit and returns +// ErrQueueFull if the bound rejected at least one tx, so callers can +// surface backpressure (e.g. via HTTP 503) instead of silently +// retaining bytes. type SoloSequencer struct { logger zerolog.Logger id []byte @@ -34,8 +51,10 @@ type SoloSequencer struct { daHeight atomic.Uint64 - mu sync.Mutex - queue [][]byte + mu sync.Mutex + queue [][]byte + queueBytes uint64 + maxQueueBytes uint64 // 0 = unbounded (legacy default) } func NewSoloSequencer( @@ -51,6 +70,16 @@ func NewSoloSequencer( } } +// SetMaxQueueBytes sets a soft cap on the sequencer's in-memory tx +// queue. SubmitBatchTxs admits txs in arrival order while the cap has +// room and returns ErrQueueFull as soon as one is rejected. A zero value +// disables the cap. Intended to be called once at startup. +func (s *SoloSequencer) SetMaxQueueBytes(n uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.maxQueueBytes = n +} + func (s *SoloSequencer) isValid(id []byte) bool { return bytes.Equal(s.id, id) } @@ -67,7 +96,30 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su s.mu.Lock() defer s.mu.Unlock() + if s.maxQueueBytes == 0 { + // Unbounded path (legacy). Suitable for tests and small + // deployments; in production use SetMaxQueueBytes. + s.queue = append(s.queue, req.Batch.Transactions...) + return submitBatchResp, nil + } + + // All-or-nothing: if the whole incoming batch doesn't fit, reject + // it untouched. Partial admission would force the caller (e.g. + // the reaper bridging executor mempool → sequencer) to reason + // about which prefix was admitted and re-feed only the suffix on + // retry, which it doesn't currently do — leading to duplicate-tx + // resubmission on each retry. Rejecting the whole batch lets the + // reaper just retry with the same batch later when the queue has + // drained. + var batchBytes uint64 + for _, tx := range req.Batch.Transactions { + batchBytes += uint64(len(tx)) + } + if s.queueBytes+batchBytes > s.maxQueueBytes { + return submitBatchResp, ErrQueueFull + } s.queue = append(s.queue, req.Batch.Transactions...) + s.queueBytes += batchBytes return submitBatchResp, nil } @@ -79,6 +131,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN s.mu.Lock() txs := s.queue s.queue = nil + s.queueBytes = 0 s.mu.Unlock() if len(txs) == 0 { @@ -122,6 +175,14 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN if len(postponedTxs) > 0 { s.mu.Lock() s.queue = append(postponedTxs, s.queue...) + // Postponed txs were already in the queue's byte count when + // SubmitBatchTxs admitted them. We zeroed queueBytes on drain + // above, so re-queuing requires re-counting whatever survived. + var bytes uint64 + for _, tx := range postponedTxs { + bytes += uint64(len(tx)) + } + s.queueBytes += bytes s.mu.Unlock() } From f8102f9b972a7738e3c5a7e6f6c96a88ceca522b Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:53:38 +0200 Subject: [PATCH 2/4] fix(evnode-fibre): enforce maxBytes in inMemExecutor.FilterTxs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stub executor used by the runner returned FilterOK for every transaction unconditionally, ignoring the maxBytes budget plumbed through SoloSequencer.GetNextBatch. Under sustained txsim load (~50 MiB/s, 8 concurrent senders) the mempool would accumulate ~50K txs while a 100 MiB upload was in flight; on the next batch the sequencer drained ALL of them into one block (~369 MiB raw), the submitter saw a single item exceeding the per-blob cap, and halted the node with `single item exceeds DA blob size limit`. Walk the input txs in arrival order, accumulate sizes against maxBytes, and return FilterPostpone past the budget so the sequencer puts the overflow back on its queue. Verified live: blocks now cap at ~10K txs / ~100 MiB and evnode sustains 58.77 MB/s DA upload throughput through a 5-min txsim run with zero crashes (was 0 → crash within 30 s before this fix). --- .../cmd/evnode-fibre/main.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 7b8b73080f..59650d6bfc 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -542,10 +542,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu return coreexecution.ExecutionInfo{MaxGas: 0}, nil } -func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { +// FilterTxs admits txs in arrival order until the maxBytes budget is +// reached, then postpones the rest back to the sequencer queue so they +// land in a future batch. Skipping this enforcement (a previous version +// returned FilterOK unconditionally) lets a single block sweep up the +// entire mempool — under sustained txsim load that produced 369 MiB +// blocks that exceeded Fibre's per-upload cap and crashed the node +// with `single item exceeds DA blob size limit`. +func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { st := make([]coreexecution.FilterStatus, len(txs)) - for i := range st { + var used uint64 + for i, tx := range txs { + size := uint64(len(tx)) + if maxBytes > 0 && used+size > maxBytes { + st[i] = coreexecution.FilterPostpone + continue + } st[i] = coreexecution.FilterOK + used += size } return st, nil } From b876712e44a37f786d735426c47da5180e8b1a3f Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sun, 3 May 2026 00:36:00 +0200 Subject: [PATCH 3/4] fix(evnode-fibre): wire sequencer queue cap + lift ingest queue caps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two runner-side changes paired with the SoloSequencer bound: - After constructing the SoloSequencer, call SetMaxQueueBytes with 10× the per-block tx budget (= 1 GiB at the current 100 MiB MaxBlobSize). 10× is the sweet spot: large enough that a short burst above steady-state ingest doesn't trigger backpressure (we want to absorb that), small enough that the worst-case retained bytes fit comfortably under the box's RAM budget alongside the pending cache + DA in-flight buffers. - Lift the inMemExecutor's hardcoded ingest caps. txChan and maxBlockTxs were sized at 500 (5 MB / 5K txs per reaper poll) back when those were the only memory bound on the runner. With the SetMaxQueueBytes cap and the FilterTxs-enforced per-block budget now actually doing the bounding, the ingest queue can hold a full 100 MiB block-worth of txs (10K slots at 10 KB) without burdening memory — and a single reaper poll can drain that whole batch in one GetTxs call instead of needing 20× cycles. This was the binding constraint at ~5,000 tx/s = 50 MB/s in earlier runs. --- .../cmd/evnode-fibre/main.go | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 59650d6bfc..799ef66425 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -322,6 +322,19 @@ func run(cli cliFlags) error { executor := newInMemExecutor() sequencer := solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor) + // Cap the sequencer's in-memory queue at 10× the per-block tx + // budget. Above this, SubmitBatchTxs returns ErrQueueFull and the + // runner's reaper-bridge / tx-ingress applies backpressure (txs + // stay in the executor's txChan until the sequencer drains, and + // the chan's bound 503's /tx). Without this cap a fast loadgen + // (32 vCPU pushing >100 MB/s) outruns the 1 block/s drain and + // the queue grows monotonically — observed pre-fix as 24 GB of + // retained io.ReadAll bytes in heap snapshots before the daemon + // hit the 64 GiB box ceiling and OOM-killed. + // Sized at 10× the per-block tx budget (matches SetMaxBlobSize + // above; both anchor at the per-blob Fibre cap). + const seqQueueBytes = 10 * 100 * 1024 * 1024 // 1 GiB + sequencer.SetMaxQueueBytes(seqQueueBytes) daClient := block.NewFiberDAClient(adapter, cfg, logger, 0) p2pClient, err := p2p.NewClient(cfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), genesis.ChainID, logger, nil) if err != nil { @@ -474,23 +487,23 @@ type inMemExecutor struct { totalTxs atomic.Uint64 } -// txChan capacity caps in-flight memory: at 10 KB tx and 500 slots -// we hold ≤ 5 MB queued before /tx blocks the ingress goroutine — -// which is exactly the backpressure we want against a hot loadgen. -// Reaper drains every 100 ms into the solo sequencer, which then -// accumulates batches between block-production ticks; without a tight -// cap a single block can balloon past the 120 MiB DA blob limit and -// the rest of the daemon's per-block allocations push the box past -// its RAM budget within seconds. +// txChan capacity bounds the HTTP /tx ingest queue. Sized at 10K +// slots (~100 MiB at 10 KB tx-size) so a 100 ms reaper cycle can +// absorb a full max-size block's worth of txs without /tx blocking +// the loadgen. Earlier we used 500 slots (~5 MiB) which forced +// backpressure at ~5,000 tx/s — that turned txsim into the limiting +// factor at ~22 MB/s rather than DA upload. With the per-block +// FilterTxs cap (executor.go:RetrieveBatch via DefaultMaxBlobSize= +// 100 MiB) and the submitter chunker now enforcing the actual blob +// budget, the executor doesn't need an extra ingest-side cap. // -// maxBlockTxs caps GetTxs's per-call return so reaper-cycle batches -// are bounded too. With 500 ≤ 5 MB per block at 10 KB tx-size, we -// stay an order of magnitude under the DA cap so headers/data signing -// + envelope cache + retry buffers all fit. +// maxBlockTxs caps GetTxs's per-call return; pairs with the channel +// size so a reaper poll can fully drain a 100 MiB-block-worth of +// queued txs in a single call instead of needing 20× cycles. func newInMemExecutor() *inMemExecutor { return &inMemExecutor{ - txChan: make(chan []byte, 500), - maxBlockTxs: 500, + txChan: make(chan []byte, 10000), + maxBlockTxs: 10000, } } From 2d74d805c06e2e7a1376414e8a193743cd5c7f95 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sun, 3 May 2026 01:41:51 +0200 Subject: [PATCH 4/4] fix(config): tighten Fiber pending cap to 10 to bound submitter memory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ApplyFiberDefaults set MaxPendingHeadersAndData=50, but each pending data item under Fiber is up to MaxBlobSize (~100 MiB raw). With 3-FSP fan-out and per-attempt retry buffers in flight, 50 items × 3 × retries crossed 64 GiB on c6in.8xlarge under sustained txsim load and the kernel OOM-killed evnode 30 s into the run. 10 keeps the in-flight footprint bounded while still letting healthy uploads pipeline against the actual Fibre RPC latency. Verified by heap profiling: pending pause at ~ 10 × 100 MiB plus fan-out keeps RSS below ~10 GiB, evnode runs indefinitely. --- pkg/config/config.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 7cbb780a21..43719c1047 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() { } c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second} - c.Node.MaxPendingHeadersAndData = 50 + // Tighter pending cap (was 50). At 50, a Fibre upload stall lets the + // submitter accumulate 50 × ~32 MiB blob copies + their per-validator + // retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and + // OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint + // bounded while still letting healthy uploads pipeline. + c.Node.MaxPendingHeadersAndData = 10 } // GetNamespace returns the namespace for header submissions.