Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/sequencers/solo"
)

const (
Expand Down Expand Up @@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
Id: []byte(r.chainID),
Batch: &coresequencer.Batch{Transactions: newTxs},
})
if errors.Is(err, solo.ErrQueueFull) {
// Sequencer queue is full — backpressure signal. Mark the
// batch as "seen" so we don't waste cycles re-hashing the
// same dropped txs every reaper tick, and surface the drop
// as a warning rather than tearing down the daemon. The
// loadgen sees lower acceptance via /tx flow control once
// the executor's own mempool fills up.
r.cache.SetTxsSeen(newHashes)
r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)")
break
}
if err != nil {
return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() {
}

c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second}
c.Node.MaxPendingHeadersAndData = 50
// Tighter pending cap (was 50). At 50, a Fibre upload stall lets the
// submitter accumulate 50 × ~32 MiB blob copies + their per-validator
// retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and
// OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint
// bounded while still letting healthy uploads pipeline.
c.Node.MaxPendingHeadersAndData = 10
}

// GetNamespace returns the namespace for header submissions.
Expand Down
67 changes: 64 additions & 3 deletions pkg/sequencers/solo/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ import (
coresequencer "github.com/evstack/ev-node/core/sequencer"
)

var ErrInvalidID = errors.New("invalid chain id")
var (
ErrInvalidID = errors.New("invalid chain id")
// ErrQueueFull is returned from SubmitBatchTxs when the in-memory
// queue is at its byte cap (see SetMaxQueueBytes). Callers should
// treat this as transient backpressure (drop or retry); the
// reaper bridging executor mempool → sequencer matches it via
// errors.Is and downgrades to a warning.
ErrQueueFull = errors.New("sequencer queue full")
)

var (
emptyBatch = &coresequencer.Batch{}
Expand All @@ -27,15 +35,26 @@ var _ coresequencer.Sequencer = (*SoloSequencer)(nil)
// SoloSequencer is a single-leader sequencer without forced inclusion
// support. It maintains a simple in-memory queue of mempool transactions and
// produces batches on demand.
//
// The queue can be bounded in bytes via SetMaxQueueBytes. A bound is
// strongly recommended in any high-throughput configuration: under
// sustained ingest above the block-production drain rate the queue
// otherwise grows monotonically until OOM. With a bound set,
// SubmitBatchTxs admits only as many incoming txs as fit and returns
// ErrQueueFull if the bound rejected at least one tx, so callers can
// surface backpressure (e.g. via HTTP 503) instead of silently
// retaining bytes.
type SoloSequencer struct {
logger zerolog.Logger
id []byte
executor execution.Executor

daHeight atomic.Uint64

mu sync.Mutex
queue [][]byte
mu sync.Mutex
queue [][]byte
queueBytes uint64
maxQueueBytes uint64 // 0 = unbounded (legacy default)
}

func NewSoloSequencer(
Expand All @@ -51,6 +70,16 @@ func NewSoloSequencer(
}
}

// SetMaxQueueBytes sets a soft cap on the sequencer's in-memory tx
// queue. SubmitBatchTxs admits txs in arrival order while the cap has
// room and returns ErrQueueFull as soon as one is rejected. A zero value
// disables the cap. Intended to be called once at startup.
func (s *SoloSequencer) SetMaxQueueBytes(n uint64) {
s.mu.Lock()
defer s.mu.Unlock()
s.maxQueueBytes = n
}

func (s *SoloSequencer) isValid(id []byte) bool {
return bytes.Equal(s.id, id)
}
Expand All @@ -67,7 +96,30 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su
s.mu.Lock()
defer s.mu.Unlock()

if s.maxQueueBytes == 0 {
// Unbounded path (legacy). Suitable for tests and small
// deployments; in production use SetMaxQueueBytes.
s.queue = append(s.queue, req.Batch.Transactions...)
return submitBatchResp, nil
}

// All-or-nothing: if the whole incoming batch doesn't fit, reject
// it untouched. Partial admission would force the caller (e.g.
// the reaper bridging executor mempool → sequencer) to reason
// about which prefix was admitted and re-feed only the suffix on
// retry, which it doesn't currently do — leading to duplicate-tx
// resubmission on each retry. Rejecting the whole batch lets the
// reaper just retry with the same batch later when the queue has
// drained.
var batchBytes uint64
for _, tx := range req.Batch.Transactions {
batchBytes += uint64(len(tx))
}
if s.queueBytes+batchBytes > s.maxQueueBytes {
return submitBatchResp, ErrQueueFull
}
s.queue = append(s.queue, req.Batch.Transactions...)
s.queueBytes += batchBytes
return submitBatchResp, nil
}

Expand All @@ -79,6 +131,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN
s.mu.Lock()
txs := s.queue
s.queue = nil
s.queueBytes = 0
s.mu.Unlock()

if len(txs) == 0 {
Expand Down Expand Up @@ -122,6 +175,14 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN
if len(postponedTxs) > 0 {
s.mu.Lock()
s.queue = append(postponedTxs, s.queue...)
// Postponed txs were already in the queue's byte count when
// SubmitBatchTxs admitted them. We zeroed queueBytes on drain
// above, so re-queuing requires re-counting whatever survived.
var bytes uint64
for _, tx := range postponedTxs {
bytes += uint64(len(tx))
}
s.queueBytes += bytes
s.mu.Unlock()
}

Expand Down
59 changes: 43 additions & 16 deletions tools/celestia-node-fiber/cmd/evnode-fibre/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,19 @@ func run(cli cliFlags) error {

executor := newInMemExecutor()
sequencer := solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor)
// Cap the sequencer's in-memory queue at 10× the per-block tx
// budget. Above this, SubmitBatchTxs returns ErrQueueFull and the
// runner's reaper-bridge / tx-ingress applies backpressure (txs
// stay in the executor's txChan until the sequencer drains, and
// the chan's bound 503's /tx). Without this cap a fast loadgen
// (32 vCPU pushing >100 MB/s) outruns the 1 block/s drain and
// the queue grows monotonically — observed pre-fix as 24 GB of
// retained io.ReadAll bytes in heap snapshots before the daemon
// hit the 64 GiB box ceiling and OOM-killed.
// Sized at 10× the per-block tx budget (matches SetMaxBlobSize
// above; both anchor at the per-blob Fibre cap).
const seqQueueBytes = 10 * 100 * 1024 * 1024 // 1 GiB
sequencer.SetMaxQueueBytes(seqQueueBytes)
daClient := block.NewFiberDAClient(adapter, cfg, logger, 0)
p2pClient, err := p2p.NewClient(cfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), genesis.ChainID, logger, nil)
if err != nil {
Expand Down Expand Up @@ -474,23 +487,23 @@ type inMemExecutor struct {
totalTxs atomic.Uint64
}

// txChan capacity caps in-flight memory: at 10 KB tx and 500 slots
// we hold ≤ 5 MB queued before /tx blocks the ingress goroutine —
// which is exactly the backpressure we want against a hot loadgen.
// Reaper drains every 100 ms into the solo sequencer, which then
// accumulates batches between block-production ticks; without a tight
// cap a single block can balloon past the 120 MiB DA blob limit and
// the rest of the daemon's per-block allocations push the box past
// its RAM budget within seconds.
// txChan capacity bounds the HTTP /tx ingest queue. Sized at 10K
// slots (~100 MiB at 10 KB tx-size) so a 100 ms reaper cycle can
// absorb a full max-size block's worth of txs without /tx blocking
// the loadgen. Earlier we used 500 slots (~5 MiB) which forced
// backpressure at ~5,000 tx/s — that turned txsim into the limiting
// factor at ~22 MB/s rather than DA upload. With the per-block
// FilterTxs cap (executor.go:RetrieveBatch via DefaultMaxBlobSize=
// 100 MiB) and the submitter chunker now enforcing the actual blob
// budget, the executor doesn't need an extra ingest-side cap.
//
// maxBlockTxs caps GetTxs's per-call return so reaper-cycle batches
// are bounded too. With 500 ≤ 5 MB per block at 10 KB tx-size, we
// stay an order of magnitude under the DA cap so headers/data signing
// + envelope cache + retry buffers all fit.
// maxBlockTxs caps GetTxs's per-call return; pairs with the channel
// size so a reaper poll can fully drain a 100 MiB-block-worth of
// queued txs in a single call instead of needing 20× cycles.
func newInMemExecutor() *inMemExecutor {
return &inMemExecutor{
txChan: make(chan []byte, 500),
maxBlockTxs: 500,
txChan: make(chan []byte, 10000),
maxBlockTxs: 10000,
}
}

Expand Down Expand Up @@ -542,10 +555,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu
return coreexecution.ExecutionInfo{MaxGas: 0}, nil
}

func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) {
// FilterTxs admits txs in arrival order until the maxBytes budget is
// reached, then postpones the rest back to the sequencer queue so they
// land in a future batch. Skipping this enforcement (a previous version
// returned FilterOK unconditionally) lets a single block sweep up the
// entire mempool — under sustained txsim load that produced 369 MiB
// blocks that exceeded Fibre's per-upload cap and crashed the node
// with `single item exceeds DA blob size limit`.
func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) {
st := make([]coreexecution.FilterStatus, len(txs))
for i := range st {
var used uint64
for i, tx := range txs {
size := uint64(len(tx))
if maxBytes > 0 && used+size > maxBytes {
st[i] = coreexecution.FilterPostpone
continue
}
st[i] = coreexecution.FilterOK
used += size
}
return st, nil
}
Expand Down
Loading