diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index d35c61e422..66e058fbb2 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -16,6 +16,7 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/sequencers/solo" ) const ( @@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { Id: []byte(r.chainID), Batch: &coresequencer.Batch{Transactions: newTxs}, }) + if errors.Is(err, solo.ErrQueueFull) { + // Sequencer queue is full — backpressure signal. Mark the + // batch as "seen" so we don't waste cycles re-hashing the + // same dropped txs every reaper tick, and surface the drop + // as a warning rather than tearing down the daemon. The + // loadgen sees lower acceptance via /tx flow control once + // the executor's own mempool fills up. + r.cache.SetTxsSeen(newHashes) + r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)") + break + } if err != nil { return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } diff --git a/pkg/config/config.go b/pkg/config/config.go index 7cbb780a21..43719c1047 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() { } c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second} - c.Node.MaxPendingHeadersAndData = 50 + // Tighter pending cap (was 50). At 50, a Fibre upload stall lets the + // submitter accumulate 50 × ~32 MiB blob copies + their per-validator + // retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and + // OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint + // bounded while still letting healthy uploads pipeline. + c.Node.MaxPendingHeadersAndData = 10 } // GetNamespace returns the namespace for header submissions. diff --git a/pkg/sequencers/solo/sequencer.go b/pkg/sequencers/solo/sequencer.go index 86fa08d45d..f8524157e6 100644 --- a/pkg/sequencers/solo/sequencer.go +++ b/pkg/sequencers/solo/sequencer.go @@ -14,7 +14,15 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" ) -var ErrInvalidID = errors.New("invalid chain id") +var ( + ErrInvalidID = errors.New("invalid chain id") + // ErrQueueFull is returned from SubmitBatchTxs when the in-memory + // queue is at its byte cap (see SetMaxQueueBytes). Callers should + // treat this as transient backpressure (drop or retry); the + // reaper bridging executor mempool → sequencer matches it via + // errors.Is and downgrades to a warning. + ErrQueueFull = errors.New("sequencer queue full") +) var ( emptyBatch = &coresequencer.Batch{} @@ -27,6 +35,15 @@ var _ coresequencer.Sequencer = (*SoloSequencer)(nil) // SoloSequencer is a single-leader sequencer without forced inclusion // support. It maintains a simple in-memory queue of mempool transactions and // produces batches on demand. +// +// The queue can be bounded in bytes via SetMaxQueueBytes. A bound is +// strongly recommended in any high-throughput configuration: under +// sustained ingest above the block-production drain rate the queue +// otherwise grows monotonically until OOM. With a bound set, +// SubmitBatchTxs admits only as many incoming txs as fit and returns +// ErrQueueFull if the bound rejected at least one tx, so callers can +// surface backpressure (e.g. via HTTP 503) instead of silently +// retaining bytes. type SoloSequencer struct { logger zerolog.Logger id []byte @@ -34,8 +51,10 @@ type SoloSequencer struct { daHeight atomic.Uint64 - mu sync.Mutex - queue [][]byte + mu sync.Mutex + queue [][]byte + queueBytes uint64 + maxQueueBytes uint64 // 0 = unbounded (legacy default) } func NewSoloSequencer( @@ -51,6 +70,16 @@ func NewSoloSequencer( } } +// SetMaxQueueBytes sets a soft cap on the sequencer's in-memory tx +// queue. SubmitBatchTxs admits txs in arrival order while the cap has +// room and returns ErrQueueFull as soon as one is rejected. A zero value +// disables the cap. Intended to be called once at startup. +func (s *SoloSequencer) SetMaxQueueBytes(n uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.maxQueueBytes = n +} + func (s *SoloSequencer) isValid(id []byte) bool { return bytes.Equal(s.id, id) } @@ -67,7 +96,30 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su s.mu.Lock() defer s.mu.Unlock() + if s.maxQueueBytes == 0 { + // Unbounded path (legacy). Suitable for tests and small + // deployments; in production use SetMaxQueueBytes. + s.queue = append(s.queue, req.Batch.Transactions...) + return submitBatchResp, nil + } + + // All-or-nothing: if the whole incoming batch doesn't fit, reject + // it untouched. Partial admission would force the caller (e.g. + // the reaper bridging executor mempool → sequencer) to reason + // about which prefix was admitted and re-feed only the suffix on + // retry, which it doesn't currently do — leading to duplicate-tx + // resubmission on each retry. Rejecting the whole batch lets the + // reaper just retry with the same batch later when the queue has + // drained. + var batchBytes uint64 + for _, tx := range req.Batch.Transactions { + batchBytes += uint64(len(tx)) + } + if s.queueBytes+batchBytes > s.maxQueueBytes { + return submitBatchResp, ErrQueueFull + } s.queue = append(s.queue, req.Batch.Transactions...) + s.queueBytes += batchBytes return submitBatchResp, nil } @@ -79,6 +131,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN s.mu.Lock() txs := s.queue s.queue = nil + s.queueBytes = 0 s.mu.Unlock() if len(txs) == 0 { @@ -122,6 +175,14 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN if len(postponedTxs) > 0 { s.mu.Lock() s.queue = append(postponedTxs, s.queue...) + // Postponed txs were already in the queue's byte count when + // SubmitBatchTxs admitted them. We zeroed queueBytes on drain + // above, so re-queuing requires re-counting whatever survived. + var bytes uint64 + for _, tx := range postponedTxs { + bytes += uint64(len(tx)) + } + s.queueBytes += bytes s.mu.Unlock() } diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 7b8b73080f..799ef66425 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -322,6 +322,19 @@ func run(cli cliFlags) error { executor := newInMemExecutor() sequencer := solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor) + // Cap the sequencer's in-memory queue at 10× the per-block tx + // budget. Above this, SubmitBatchTxs returns ErrQueueFull and the + // runner's reaper-bridge / tx-ingress applies backpressure (txs + // stay in the executor's txChan until the sequencer drains, and + // the chan's bound 503's /tx). Without this cap a fast loadgen + // (32 vCPU pushing >100 MB/s) outruns the 1 block/s drain and + // the queue grows monotonically — observed pre-fix as 24 GB of + // retained io.ReadAll bytes in heap snapshots before the daemon + // hit the 64 GiB box ceiling and OOM-killed. + // Sized at 10× the per-block tx budget (matches SetMaxBlobSize + // above; both anchor at the per-blob Fibre cap). + const seqQueueBytes = 10 * 100 * 1024 * 1024 // 1 GiB + sequencer.SetMaxQueueBytes(seqQueueBytes) daClient := block.NewFiberDAClient(adapter, cfg, logger, 0) p2pClient, err := p2p.NewClient(cfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), genesis.ChainID, logger, nil) if err != nil { @@ -474,23 +487,23 @@ type inMemExecutor struct { totalTxs atomic.Uint64 } -// txChan capacity caps in-flight memory: at 10 KB tx and 500 slots -// we hold ≤ 5 MB queued before /tx blocks the ingress goroutine — -// which is exactly the backpressure we want against a hot loadgen. -// Reaper drains every 100 ms into the solo sequencer, which then -// accumulates batches between block-production ticks; without a tight -// cap a single block can balloon past the 120 MiB DA blob limit and -// the rest of the daemon's per-block allocations push the box past -// its RAM budget within seconds. +// txChan capacity bounds the HTTP /tx ingest queue. Sized at 10K +// slots (~100 MiB at 10 KB tx-size) so a 100 ms reaper cycle can +// absorb a full max-size block's worth of txs without /tx blocking +// the loadgen. Earlier we used 500 slots (~5 MiB) which forced +// backpressure at ~5,000 tx/s — that turned txsim into the limiting +// factor at ~22 MB/s rather than DA upload. With the per-block +// FilterTxs cap (executor.go:RetrieveBatch via DefaultMaxBlobSize= +// 100 MiB) and the submitter chunker now enforcing the actual blob +// budget, the executor doesn't need an extra ingest-side cap. // -// maxBlockTxs caps GetTxs's per-call return so reaper-cycle batches -// are bounded too. With 500 ≤ 5 MB per block at 10 KB tx-size, we -// stay an order of magnitude under the DA cap so headers/data signing -// + envelope cache + retry buffers all fit. +// maxBlockTxs caps GetTxs's per-call return; pairs with the channel +// size so a reaper poll can fully drain a 100 MiB-block-worth of +// queued txs in a single call instead of needing 20× cycles. func newInMemExecutor() *inMemExecutor { return &inMemExecutor{ - txChan: make(chan []byte, 500), - maxBlockTxs: 500, + txChan: make(chan []byte, 10000), + maxBlockTxs: 10000, } } @@ -542,10 +555,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu return coreexecution.ExecutionInfo{MaxGas: 0}, nil } -func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { +// FilterTxs admits txs in arrival order until the maxBytes budget is +// reached, then postpones the rest back to the sequencer queue so they +// land in a future batch. Skipping this enforcement (a previous version +// returned FilterOK unconditionally) lets a single block sweep up the +// entire mempool — under sustained txsim load that produced 369 MiB +// blocks that exceeded Fibre's per-upload cap and crashed the node +// with `single item exceeds DA blob size limit`. +func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { st := make([]coreexecution.FilterStatus, len(txs)) - for i := range st { + var used uint64 + for i, tx := range txs { + size := uint64(len(tx)) + if maxBytes > 0 && used+size > maxBytes { + st[i] = coreexecution.FilterPostpone + continue + } st[i] = coreexecution.FilterOK + used += size } return st, nil }