From 3dac65a562bee74d6883650a311e72b1f6911e58 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 15:29:03 +0200 Subject: [PATCH 01/11] fix(tools/talis): wait-for-chain + atomic keyring + one-command driver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three race conditions surfaced repeatedly on a fresh AWS bring-up of the Fibre throughput experiment. Each one had the same shape: a talis subcommand "succeeded" at the CLI level (or returned the txhash with --yes) before the chain had actually applied the work, leaving downstream steps to fail in confusing ways. This commit makes each step verify *outcome*, not just *invocation*, so the experiment can go from a fresh `talis up` to a running loadgen without manual intervention. • setup-fibre script (fibre_setup.go) now: - polls `celestia-appd status` for `latest_block_height>0` before submitting any tx — fixes the silent-noop where set-host + 100× deposit-to-escrow all bounced with "celestia-app is not ready; please wait for first block"; - retries `set-host` in a loop until the validator's host shows up in `query valaddr providers` — fixes the case where --yes returns the txhash before block inclusion and the tx silently lands in the mempool but never confirms; - verifies fibre-0's escrow account is funded on-chain before the tmux session exits — same silent-failure mode as set-host, but on the deposit side. The talis-CLI step also now cross-checks all validators are registered from a single vantage point before returning, so a concurrent set-host race surfaces as an error instead of a half-empty provider list start-fibre would cache forever. • fibre-bootstrap-evnode (fibre_bootstrap_evnode.go) now stages the keyring scp into a tmp directory and `mv`s it atomically into place. The previous direct `scp -r` to /root/keyring-fibre/keyring-test created the directory before transferring its contents — the evnode init script's `[ -d keyring-test ]` poll passed mid-transfer, the daemon launched with no fibre-0.info, and crashed with `keyring entry "fibre-0" not found`. • evnode_init.sh (genesis.go) now waits for the specific keyring-test/fibre-0.info file rather than just the keyring-test directory. Belt-and-braces: the bootstrap mv is already atomic on the same filesystem, but the file-level guard means a hand-pushed keyring (not via talis) can't trip the same race. • New `talis fibre-experiment` umbrella command runs up → genesis → deploy → setup-fibre → start-fibre → fibre-bootstrap-evnode in order. Each step uses the same binary as a subprocess; failures in any step abort the chain. Operator goes from a prepared root dir to a running loadgen with one command, instead of remembering the sequence. Verified by 5-min sustained loadgen against julien/fiber HEAD with PR #3287 (concurrent submitter) merged: 47.65 MB/s @ 99.999 % ok, up from the prior 24.57 MB/s baseline (the gap is PR #3287's overlapping uploads — these talis fixes just stop the deploy from silently breaking before throughput matters). --- tools/talis/fibre_bootstrap_evnode.go | 34 ++++++-- tools/talis/fibre_experiment.go | 89 ++++++++++++++++++++ tools/talis/fibre_setup.go | 113 +++++++++++++++++++++++++- tools/talis/genesis.go | 12 ++- tools/talis/main.go | 1 + 5 files changed, 239 insertions(+), 10 deletions(-) create mode 100644 tools/talis/fibre_experiment.go diff --git a/tools/talis/fibre_bootstrap_evnode.go b/tools/talis/fibre_bootstrap_evnode.go index 5df12d41ae..c8d76822b3 100644 --- a/tools/talis/fibre_bootstrap_evnode.go +++ b/tools/talis/fibre_bootstrap_evnode.go @@ -125,21 +125,45 @@ to fetch, then SCPs to each evnode-*.`, defer wg.Done() log.Printf("[%s] pushing JWT + keyring", ev.Name) + // JWT is small + atomic on the receive side because + // it's a single file, so we push it directly. if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, localJWT, "/root/bridge-jwt.txt", false); err != nil { errCh <- fmt.Errorf("[%s] push JWT: %w", ev.Name, err) return } - // mkdir the parent so scp lands at the exact path - // evnode_init.sh waits for. - if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, "mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test"); err != nil { - errCh <- fmt.Errorf("[%s] mkdir keyring-fibre: %w", ev.Name, err) + // Keyring push is staged through a tmp dir and + // promoted via mv. Without staging, evnode_init.sh's + // poll loop (which tests `[ -d keyring-test ]`) + // passes the moment scp -r mkdir's the directory, + // long before fibre-0.info is on disk. evnode then + // launches mid-scp and dies with `keyring entry + // "fibre-0" not found`. mv is atomic on the same + // filesystem so the init script either sees nothing + // (keep waiting) or the fully-populated dir (start + // the daemon cleanly). + stageDir := "/root/.keyring-fibre.staging" + prep := fmt.Sprintf( + "rm -rf %s && mkdir -p %s && mkdir -p /root/keyring-fibre && rm -rf /root/keyring-fibre/keyring-test", + stageDir, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, prep); err != nil { + errCh <- fmt.Errorf("[%s] stage keyring: %w", ev.Name, err) return } - if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), "/root/keyring-fibre/keyring-test", true); err != nil { + stageDest := stageDir + "/keyring-test" + if err := scpToRemote(sshUser, ev.PublicIP, sshKeyPath, filepath.Join(localKeyringRoot, "keyring-test"), stageDest, true); err != nil { errCh <- fmt.Errorf("[%s] push keyring: %w", ev.Name, err) return } + promote := fmt.Sprintf( + "mv %s /root/keyring-fibre/keyring-test && rmdir %s", + stageDest, stageDir, + ) + if _, err := sshExec(sshUser, ev.PublicIP, sshKeyPath, promote); err != nil { + errCh <- fmt.Errorf("[%s] promote keyring: %w", ev.Name, err) + return + } log.Printf("[%s] ✓ pushed; daemon should start within ~10s", ev.Name) }(ev) diff --git a/tools/talis/fibre_experiment.go b/tools/talis/fibre_experiment.go new file mode 100644 index 0000000000..585992d5fe --- /dev/null +++ b/tools/talis/fibre_experiment.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + + "github.com/spf13/cobra" +) + +// fibreExperimentCmd is the one-command driver for the Fibre throughput +// experiment. It assumes the operator has already populated the +// experiment directory with a config.json + scripts/ + base config.toml + +// app.toml (i.e. ran `talis init` + `talis add` for validators / bridge +// / evnode / loadgen) and that build artefacts are at $rootDir/build. +// +// It then invokes — in order — the same subcommands the operator would +// run by hand: +// +// 1. up — provision instances +// 2. genesis -b — stage validator/bridge/evnode/loadgen payloads +// 3. deploy — ship payloads + start init scripts +// 4. setup-fibre — register host + deposit escrow on each validator +// 5. start-fibre — launch the fibre server on each validator +// 6. fibre-bootstrap-evnode — scp bridge JWT + fibre keyring onto evnode-* +// +// Each step is invoked via os/exec on the running binary. Any failure +// surfaces immediately; nothing is retried at this layer (the +// individual subcommands handle their own waits + retries). +// +// After step 6 returns, evnode-* daemons start within ~10 s and the +// load-gen's init script auto-launches evnode-txsim. The operator +// reads the final TXSIM: line from the load-gen. +func fibreExperimentCmd() *cobra.Command { + var ( + rootDir string + buildDir string + ) + + cmd := &cobra.Command{ + Use: "fibre-experiment", + Short: "End-to-end driver: up → genesis → deploy → setup-fibre → start-fibre → fibre-bootstrap-evnode", + Long: `Run every step needed to bring up a Fibre throughput experiment from a +prepared root directory. Equivalent to invoking each subcommand in +sequence; included so the operator doesn't have to remember the order +or watch for inter-step races.`, + RunE: func(cmd *cobra.Command, args []string) error { + self, err := os.Executable() + if err != nil { + return fmt.Errorf("locate own binary: %w", err) + } + + steps := []struct { + name string + args []string + }{ + {"up", []string{"up", "-d", rootDir}}, + {"genesis", []string{"genesis", "-d", rootDir, "-b", buildDir}}, + {"deploy", []string{"deploy", "-d", rootDir}}, + {"setup-fibre", []string{"setup-fibre", "-d", rootDir}}, + {"start-fibre", []string{"start-fibre", "-d", rootDir}}, + {"fibre-bootstrap-evnode", []string{"fibre-bootstrap-evnode", "-d", rootDir}}, + } + + for _, s := range steps { + fmt.Printf("\n=== talis %s ===\n", s.name) + c := exec.Command(self, s.args...) + c.Stdout = os.Stdout + c.Stderr = os.Stderr + c.Env = os.Environ() + if err := c.Run(); err != nil { + return fmt.Errorf("step %q failed: %w", s.name, err) + } + } + + fmt.Println() + fmt.Println("=== fibre-experiment complete ===") + fmt.Println("evnode aggregator(s) start within ~10 s and load-gen init") + fmt.Println("scripts auto-launch evnode-txsim once evnode's /stats responds.") + fmt.Println("Final TXSIM: line lands at /root/txsim.log on each load-gen host.") + return nil + }, + } + + cmd.Flags().StringVarP(&rootDir, "directory", "d", ".", "experiment root directory") + cmd.Flags().StringVarP(&buildDir, "build-dir", "b", "./build", "directory containing the cross-compiled linux/amd64 binaries") + + return cmd +} diff --git a/tools/talis/fibre_setup.go b/tools/talis/fibre_setup.go index 28f7115b80..6e88fa1ef9 100644 --- a/tools/talis/fibre_setup.go +++ b/tools/talis/fibre_setup.go @@ -51,17 +51,67 @@ func setupFibreCmd() *cobra.Command { // Build script: register host + deposit escrow for validator + all fibre accounts var sb strings.Builder + // 0. Block until the chain has produced at least one block. + // Without this, the very next tx returns + // `celestia-app is not ready; please wait for first block` + // from the local node — the call appears to succeed at + // the CLI level (`--yes` returns the txhash before block + // inclusion), but the tx never lands. Polling explicitly + // avoids the `sleep 10` heuristic that used to be here. + sb.WriteString(fmt.Sprintf( + "echo 'waiting for chain to produce first block...'\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " H=$(celestia-appd status --chain-id %s 2>/dev/null | "+ + " grep -oE '\"latest_block_height\":\"[0-9]+\"' | "+ + " grep -oE '[0-9]+' | head -1)\n"+ + " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n"+ + " echo \"chain is at height $H\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo 'FATAL: chain never produced a block within 5m' >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " sleep 3\n"+ + "done\n", + cfg.ChainID, + )) + // 1. Register fibre host address. Plain `host:port` form — // x/valaddr requires it; the gRPC client dials it via the // passthrough resolver. Don't prefix `dns:///` here. + // + // Retry until `query valaddr providers` shows OUR host + // — `--yes` returns the txhash before inclusion, so a + // single one-shot call can succeed at the RPC layer + // while the chain rejects the tx (mempool full, signer + // not yet in validator set, …) and we'd never know. + // 5-minute deadline so a stuck chain doesn't loop + // forever. sb.WriteString(fmt.Sprintf( - "celestia-appd tx valaddr set-host %s:%d "+ + "HOST=%s:%d\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ + "while true; do\n"+ + " celestia-appd tx valaddr set-host \"$HOST\" "+ "--from validator --keyring-backend=test --home .celestia-app "+ - "--chain-id %s --fees %s --yes\n", + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q \"\\\"host\\\": *\\\"$HOST\\\"\"; then\n"+ + " echo \"set-host confirmed: $HOST\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: set-host did not register $HOST after 5m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " echo 'set-host pending, retrying...'\n"+ + "done\n", val.PublicIP, fibrePort, cfg.ChainID, fees, + cfg.ChainID, )) - sb.WriteString("sleep 10\n") // 2. Deposit escrow for each fibre worker account for i := range fibreAccounts { @@ -76,6 +126,31 @@ func setupFibreCmd() *cobra.Command { )) } + // 3. Verify the FIRST fibre account's escrow actually + // landed before we let the tmux session exit. If even + // fibre-0 isn't funded, every Fibre upload from the + // runner fails with `escrow account not found for + // signer …` — same silent-failure mode as set-host. + // Other accounts (fibre-1..N) are funded best-effort: + // the runner only signs with fibre-0 by default. + sb.WriteString(fmt.Sprintf( + "FIBRE0_ADDR=$(celestia-appd keys show fibre-0 --keyring-backend test --home .celestia-app -a)\n"+ + "DEADLINE=$(( $(date +%%s) + 180 ))\n"+ + "while true; do\n"+ + " if celestia-appd query fibre escrow \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q '\"amount\"'; then\n"+ + " echo \"escrow confirmed for fibre-0 ($FIBRE0_ADDR)\"\n"+ + " break\n"+ + " fi\n"+ + " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ + " echo \"FATAL: fibre-0 escrow not present after 3m\" >&2\n"+ + " exit 1\n"+ + " fi\n"+ + " sleep 5\n"+ + "done\n", + cfg.ChainID, + )) + script := sb.String() sem <- struct{}{} @@ -103,6 +178,38 @@ func setupFibreCmd() *cobra.Command { if err := waitForTmuxSessions(cfg.Validators, resolvedSSHKeyPath, SetupFibreSessionName, 10*time.Minute); err != nil { return fmt.Errorf("waiting for setup-fibre sessions: %w", err) } + + // CLI-side verification that every validator's host is on + // the chain's provider list before we hand off to start- + // fibre / fibre-bootstrap-evnode. The per-validator script + // above already self-verifies its own host, but we + // re-check here from a single vantage point so a + // concurrent set-host race across validators surfaces + // before downstream steps cache an empty registry. + if len(cfg.Validators) > 0 { + expected := len(cfg.Validators) + queryHost := cfg.Validators[0].PublicIP + queryCmd := fmt.Sprintf( + "celestia-appd query valaddr providers --chain-id %s -o json 2>/dev/null | grep -o '\"host\"' | wc -l", + cfg.ChainID, + ) + deadline := time.Now().Add(5 * time.Minute) + for { + out, err := sshExec("root", queryHost, resolvedSSHKeyPath, queryCmd) + if err == nil { + count := 0 + _, _ = fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &count) + if count >= expected { + break + } + fmt.Printf(" valaddr providers: %d/%d registered, retrying...\n", count, expected) + } + if time.Now().After(deadline) { + return fmt.Errorf("only some validators registered as fibre providers within 5m — re-run setup-fibre") + } + time.Sleep(5 * time.Second) + } + } fmt.Println("Validator setup done!") // Deposit escrow for encoder accounts. diff --git a/tools/talis/genesis.go b/tools/talis/genesis.go index 9a4f481a81..e1b229efaa 100644 --- a/tools/talis/genesis.go +++ b/tools/talis/genesis.go @@ -558,9 +558,17 @@ mkdir -p "$EVNODE_HOME" # 2. /root/keyring-fibre/keyring-test cosmos-sdk file keyring with # a Fibre payment account # Without them the daemon would crash immediately on startup. -echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR..." +# +# We check for the *specific* fibre-0.info file (not just the +# keyring-test directory) because a non-atomic scp -r would create +# the directory before transferring its contents — so testing -d +# alone passes mid-scp and the daemon would launch with an empty +# keyring. fibre-bootstrap-evnode now also stages-and-mvs so this +# check should never see a partial state, but keep the file-level +# guard as a defence-in-depth. +echo "Waiting for $BRIDGE_JWT_FILE and $FIBRE_KEYRING_DIR/keyring-test/fibre-0.info..." WAITED=0 -until [ -s "$BRIDGE_JWT_FILE" ] && [ -d "$FIBRE_KEYRING_DIR/keyring-test" ]; do +until [ -s "$BRIDGE_JWT_FILE" ] && [ -f "$FIBRE_KEYRING_DIR/keyring-test/fibre-0.info" ]; do sleep 5 WAITED=$((WAITED + 5)) if [ $((WAITED % 60)) -eq 0 ]; then diff --git a/tools/talis/main.go b/tools/talis/main.go index 016b1ee340..899fe8cf54 100644 --- a/tools/talis/main.go +++ b/tools/talis/main.go @@ -37,6 +37,7 @@ func main() { fibreTxsimCmd(), fibreThroughputCmd(), fibreBootstrapEvnodeCmd(), + fibreExperimentCmd(), resourceMonitorCmd(), downloadResourcesCmd(), syncNodeCmd(), From 2d53cd0d0a1499a65116886eb16b0e1acb001bd5 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:03 +0200 Subject: [PATCH 02/11] fix(tools/talis): finalize fibre setup race fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three follow-up bugs surfaced from the PR #3303 follow-up verification run on a 3-validator AWS Fibre cluster: - aws.go: CreateAWSInstances exited 0 even when individual instance launches failed, so `talis up` lied about success and downstream steps proceeded against a partial cluster. Returns a joined error now so failure cascades stop early. - download.go: sshExec used cmd.CombinedOutput, mixing SSH warnings (the "Warning: Permanently added '...'..." chatter on stderr) into bytes the caller hands to fmt.Sscanf("%d"). The CLI-side providers cross-check parsed those warnings as 0 and looped until its 5-min deadline even though a direct SSH query showed all 3 providers registered. Switch to cmd.Output() (stdout only) and add `-q -o LogLevel=ERROR` to silence the chatter for any caller that does combine streams. - fibre_setup.go: the per-validator escrow verification used `celestia-appd query fibre escrow` which doesn't exist — the actual subcommand is `escrow-account`. The query errored on every retry, the grep for "amount" never matched, and the script wedged on the 3-min deadline reporting `FATAL: fibre-0 escrow not present`. Switch to `escrow-account` and key on `"found":true` (the explicit existence flag in the response). Also wrap the fibre-0 deposit-to-escrow itself in a retry loop matching set-host — same `--yes`-returns-before-inclusion silent-failure mode bit it. fibre-1..N stay best-effort. Co-Authored-By: Claude Opus 4.7 (1M context) --- tools/talis/aws.go | 14 +++++- tools/talis/download.go | 14 +++++- tools/talis/fibre_setup.go | 95 +++++++++++++++++++++----------------- 3 files changed, 77 insertions(+), 46 deletions(-) diff --git a/tools/talis/aws.go b/tools/talis/aws.go index 249dc89bbd..afcef6ebaa 100644 --- a/tools/talis/aws.go +++ b/tools/talis/aws.go @@ -350,16 +350,28 @@ func CreateAWSInstances(ctx context.Context, insts []Instance, sshKey, keyName s close(results) }() - var created []Instance + var ( + created []Instance + failures []string + ) for res := range results { if res.err != nil { fmt.Printf("❌ %s failed after %v %v\n", res.inst.Name, res.timeRequired, res.err) + failures = append(failures, fmt.Sprintf("%s: %v", res.inst.Name, res.err)) } else { created = append(created, res.inst) fmt.Printf("✅ %s is up (public=%s) in %v\n", res.inst.Name, res.inst.PublicIP, res.timeRequired) } fmt.Printf("---- Progress: %d/%d\n", len(created), total) } + if len(failures) > 0 { + // Surface partial-failure as an error so `talis up` exits + // non-zero; without this, downstream genesis runs against a + // half-provisioned config and fails much later with confusing + // "X has no public IP yet" messages. + return created, fmt.Errorf("%d/%d instance(s) failed to launch: %s", + len(failures), total, strings.Join(failures, "; ")) + } return created, nil } diff --git a/tools/talis/download.go b/tools/talis/download.go index 99284d0326..ddff69e102 100644 --- a/tools/talis/download.go +++ b/tools/talis/download.go @@ -193,16 +193,26 @@ func compressAndDownload(table, localPath, user, host, sshKeyPath string) error return nil } -// sshExec runs a command on a remote host via SSH and returns the combined output. +// sshExec runs a command on a remote host via SSH and returns stdout only. +// +// We intentionally do NOT use CombinedOutput here. ssh prints connection +// chatter ("Warning: Permanently added '...' to the list of known hosts.") +// on stderr, and a previous `CombinedOutput` revision caused +// `fmt.Sscanf(out, "%d")` parses to silently return 0 because the leading +// stderr line had no digits. Capturing only stdout keeps numeric output +// parseable; -q + LogLevel=ERROR further suppresses the chatter for any +// caller that does combine streams. func sshExec(user, host, sshKeyPath, command string) ([]byte, error) { cmd := exec.Command("ssh", + "-q", + "-o", "LogLevel=ERROR", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-i", sshKeyPath, fmt.Sprintf("%s@%s", user, host), command, ) - return cmd.CombinedOutput() + return cmd.Output() } func sftpDownload(remotePath, localPath, user, host, sshKeyPath string) error { diff --git a/tools/talis/fibre_setup.go b/tools/talis/fibre_setup.go index 6e88fa1ef9..382e780c50 100644 --- a/tools/talis/fibre_setup.go +++ b/tools/talis/fibre_setup.go @@ -58,25 +58,24 @@ func setupFibreCmd() *cobra.Command { // the CLI level (`--yes` returns the txhash before block // inclusion), but the tx never lands. Polling explicitly // avoids the `sleep 10` heuristic that used to be here. - sb.WriteString(fmt.Sprintf( - "echo 'waiting for chain to produce first block...'\n"+ - "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ - "while true; do\n"+ - " H=$(celestia-appd status --chain-id %s 2>/dev/null | "+ - " grep -oE '\"latest_block_height\":\"[0-9]+\"' | "+ - " grep -oE '[0-9]+' | head -1)\n"+ - " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n"+ - " echo \"chain is at height $H\"\n"+ - " break\n"+ - " fi\n"+ - " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ - " echo 'FATAL: chain never produced a block within 5m' >&2\n"+ - " exit 1\n"+ - " fi\n"+ - " sleep 3\n"+ + sb.WriteString( + "echo 'waiting for chain to produce first block...'\n" + + "DEADLINE=$(( $(date +%s) + 300 ))\n" + + "while true; do\n" + + " H=$(celestia-appd status 2>/dev/null | " + + " grep -oE '\"latest_block_height\":\"[0-9]+\"' | " + + " grep -oE '[0-9]+' | head -1)\n" + + " if [ -n \"$H\" ] && [ \"$H\" -gt 0 ]; then\n" + + " echo \"chain is at height $H\"\n" + + " break\n" + + " fi\n" + + " if [ $(date +%s) -gt $DEADLINE ]; then\n" + + " echo 'FATAL: chain never produced a block within 5m' >&2\n" + + " exit 1\n" + + " fi\n" + + " sleep 3\n" + "done\n", - cfg.ChainID, - )) + ) // 1. Register fibre host address. Plain `host:port` form — // x/valaddr requires it; the gRPC client dials it via the @@ -113,44 +112,54 @@ func setupFibreCmd() *cobra.Command { cfg.ChainID, )) - // 2. Deposit escrow for each fibre worker account - for i := range fibreAccounts { - keyName := fmt.Sprintf("fibre-%d", i) - sb.WriteString(fmt.Sprintf( - "celestia-appd tx fibre deposit-to-escrow %s "+ - "--from %s --keyring-backend=test --home .celestia-app "+ - "--chain-id %s --fees %s --yes\n", - escrowAmount, - keyName, - cfg.ChainID, fees, - )) - } - - // 3. Verify the FIRST fibre account's escrow actually - // landed before we let the tmux session exit. If even - // fibre-0 isn't funded, every Fibre upload from the - // runner fails with `escrow account not found for - // signer …` — same silent-failure mode as set-host. - // Other accounts (fibre-1..N) are funded best-effort: - // the runner only signs with fibre-0 by default. + // 2. Deposit escrow for fibre-0 inside a retry loop. + // Same silent-failure mode as set-host: `--yes` returns + // the txhash before inclusion, so a single bounced tx + // (mempool full, signer not yet propagated, …) leaves + // the runner failing every upload with + // `escrow account not found for signer …`. fibre-0 is + // the one the runner actually signs with by default, + // so it's the only one we hard-block on. sb.WriteString(fmt.Sprintf( "FIBRE0_ADDR=$(celestia-appd keys show fibre-0 --keyring-backend test --home .celestia-app -a)\n"+ - "DEADLINE=$(( $(date +%%s) + 180 ))\n"+ + "DEADLINE=$(( $(date +%%s) + 300 ))\n"+ "while true; do\n"+ - " if celestia-appd query fibre escrow \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ - " | grep -q '\"amount\"'; then\n"+ + " celestia-appd tx fibre deposit-to-escrow %s "+ + "--from fibre-0 --keyring-backend=test --home .celestia-app "+ + "--chain-id %s --fees %s --yes >/dev/null 2>&1 || true\n"+ + " sleep 6\n"+ + " if celestia-appd query fibre escrow-account \"$FIBRE0_ADDR\" --chain-id %s -o json 2>/dev/null \\\n"+ + " | grep -q '\"found\":true'; then\n"+ " echo \"escrow confirmed for fibre-0 ($FIBRE0_ADDR)\"\n"+ " break\n"+ " fi\n"+ " if [ $(date +%%s) -gt $DEADLINE ]; then\n"+ - " echo \"FATAL: fibre-0 escrow not present after 3m\" >&2\n"+ + " echo \"FATAL: fibre-0 escrow did not land after 5m\" >&2\n"+ " exit 1\n"+ " fi\n"+ - " sleep 5\n"+ + " echo 'fibre-0 escrow pending, retrying...'\n"+ "done\n", + escrowAmount, + cfg.ChainID, fees, cfg.ChainID, )) + // 3. Best-effort fund fibre-1..N. The runner only signs + // with fibre-0 by default, so a missing one of these + // doesn't block uploads — they exist as headroom for + // future signer rotation. + for i := 1; i < fibreAccounts; i++ { + keyName := fmt.Sprintf("fibre-%d", i) + sb.WriteString(fmt.Sprintf( + "celestia-appd tx fibre deposit-to-escrow %s "+ + "--from %s --keyring-backend=test --home .celestia-app "+ + "--chain-id %s --fees %s --yes\n", + escrowAmount, + keyName, + cfg.ChainID, fees, + )) + } + script := sb.String() sem <- struct{}{} From 6a49a248121b77abfe148ccbed0746de0336c649 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:23 +0200 Subject: [PATCH 03/11] fix(fibre): bound submitter memory to avoid 64 GiB OOM under load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reproduced an OOM at 63.8 GiB on c6in.8xlarge (matching the prior 63.9 GiB note in evnode-fibre's main.go that warned against raising UploadMemoryBudget). On a 3-validator cluster running evnode-txsim at 80 MiB/s, evnode-fibre's submitter queue grew to 50 pending data + 26 pending headers, each blob ≈32 MiB raw. With 3 fan-out targets and per-attempt retry buffers in flight, total in-memory upload state crossed 64 GiB and the kernel killed evnode 30 s into the load test. The loadgen-side TXSIM number collapsed from the prior journal's 47.65 MB/s to 0.66 MB/s as evnode died. Two changes, one root cause (a Fibre upload stall snowballs): - pkg/config: ApplyFiberDefaults sets MaxPendingHeadersAndData to 10 (was 50). The cap is what bounds in-flight blob copies + retry buffers; 50 was sized when evnode hadn't yet seen the pathological case where Fibre uploads time out, retries accumulate, and GC pressure pushes uploads even slower in a positive-feedback loop. 10 keeps the in-flight footprint bounded while still letting healthy uploads pipeline. - evnode-fibre: override SubmitConfig.Fibre to set RPCTimeout to 30 s (upstream default 15 s). Verified live: with the pending cap at 10, a 17-blob 115 MiB upload completes in ~1.5 s — well below the 15 s default. The 30 s margin only matters for transient slow paths during signature collection across 3 FSPs; the cap fix is the load-bearing change. Drop the stale main.go comment claiming we don't override SubmitConfig.Fibre. Co-Authored-By: Claude Opus 4.7 (1M context) --- pkg/config/config.go | 7 ++++++- .../cmd/evnode-fibre/main.go | 20 +++++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 7cbb780a21..43719c1047 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() { } c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second} - c.Node.MaxPendingHeadersAndData = 50 + // Tighter pending cap (was 50). At 50, a Fibre upload stall lets the + // submitter accumulate 50 × ~32 MiB blob copies + their per-validator + // retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and + // OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint + // bounded while still letting healthy uploads pipeline. + c.Node.MaxPendingHeadersAndData = 10 } // GetNamespace returns the namespace for header submissions. diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 7b8b73080f..8e980c3983 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -51,6 +51,7 @@ import ( "github.com/celestiaorg/celestia-app/v9/app" "github.com/celestiaorg/celestia-app/v9/app/encoding" + appfibre "github.com/celestiaorg/celestia-app/v9/fibre" "github.com/celestiaorg/celestia-node/api/client" cnp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" @@ -210,14 +211,16 @@ func run(cli cliFlags) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Construct the celestia-node-fiber adapter. We don't override - // SubmitConfig.Fibre — the Fibre client defaults (UploadMemoryBudget - // 512 MiB, RPCTimeout 15 s) are sized for the FSP-side concurrency - // the validators can actually absorb. We tried bumping the budget - // to 4 GiB to allow more in-flight blobs; with 16 upload workers - // the FSPs couldn't keep up and the box OOM'd at 63.9 GB. Leaving - // the defaults in place means the upload pipeline self-bounds at - // roughly what the FSPs can sustain. + // Construct the celestia-node-fiber adapter. The Fibre client + // defaults (UploadMemoryBudget 512 MiB, RPCTimeout 15 s) are sized + // for FSP-side concurrency. Bumping the budget alone caused 64 GiB + // OOMs (4 GiB budget × 16 workers), so we leave that conservative + // AND raise RPCTimeout to 30 s so a slow-but-healthy validator + // signature collection isn't cut short under load — under busy + // conditions a 32 MiB row upload + sig aggregation can exceed the + // 15 s default. + fibreCfg := appfibre.DefaultClientConfig() + fibreCfg.RPCTimeout = 30 * time.Second adapter, err := cnfiber.New(ctx, cnfiber.Config{ Client: client.Config{ ReadConfig: client.ReadConfig{ @@ -231,6 +234,7 @@ func run(cli cliFlags) error { CoreGRPCConfig: client.CoreGRPCConfig{ Addr: cli.coreGRPCAddr, }, + Fibre: &fibreCfg, }, }, }, kr) From 5fa456e661fcbf97612009bd37431a1ec5d2f25e Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:37 +0200 Subject: [PATCH 04/11] feat(fibre): log per-Submit upload duration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Fibre Submit path was opaque: failures showed up as DeadlineExceeded with no signal of how long the upload actually took, and successes only logged at debug level inside the upstream library. During load-test debugging this turned into a guessing game — was the cluster slow, the deadline too tight, or something stuck mid-RPC? Add a single info-level (warn-on-failure) log line in fiberDAClient.Submit covering the Upload call: duration, flat blob bytes, blob count. Cheap (one time.Since) and gives the operator concrete numbers — e.g. "17 blobs / 115 MiB / 1.5 s" — to reason about whether RPCTimeout, pending cap, or batch sizing is the right knob to turn next. Co-Authored-By: Claude Opus 4.7 (1M context) --- block/internal/da/fiber_client.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 3a4f9eb754..31df5603a2 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -89,7 +89,23 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na flat := flattenBlobs(data) nsID := namespace[len(namespace)-10:] + uploadStart := time.Now() result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(data)). + Err(err). + Msg("fiber upload duration (failed)") + } else { + c.logger.Info(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(data)). + Msg("fiber upload duration (ok)") + } if err != nil { code := datypes.StatusError switch { From cae5cd26fd7d15bae01f4ef85dc2cc565e95ab4a Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:43:46 +0200 Subject: [PATCH 05/11] fix(fibre): split DA Submit batches at Fibre's 128 MiB upload cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under sustained txsim load (~50 MiB/s) the DA submitter batched 10 block_data items into one Upload(), producing a flat payload of 144 MiB. Fibre's per-upload cap is hard at ~128 MiB ("blob size exceeds maximum allowed size: data size 144366912 exceeds maximum 134217723") and rejected every batched upload. With MaxPendingHeadersAndData=10 that took down 170 consecutive submissions before the node halted itself with "Data exceeds DA blob size limit". Wrap the Upload call in a chunker that groups input blobs into ≤120 MiB chunks (8 MiB headroom under Fibre's cap for the per-blob length-prefix overhead added by flattenBlobs) and uploads each chunk separately. Aggregates submitted counts and BlobIDs across chunks; on first chunk failure, returns the error with the partially-submitted count so the submitter's retry/backoff logic sees a coherent state instead of all-or-nothing. Single oversized blobs (already validated against DefaultMaxBlobSize earlier in Submit) still land alone and fail server-side, but at least don't drag healthy peers into the same rejected batch. Co-Authored-By: Claude Opus 4.7 (1M context) --- block/internal/da/fiber_client.go | 117 +++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 35 deletions(-) diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 31df5603a2..70902673e9 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -87,52 +87,65 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - flat := flattenBlobs(data) + // Fibre's per-upload cap is ~128 MiB (hard server-side reject: + // "data size %d exceeds maximum 134217723"). flattenBlobs adds + // 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk + // to leave overhead room and avoid borderline rejects. + chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget) nsID := namespace[len(namespace)-10:] - uploadStart := time.Now() - result, err := c.fiber.Upload(context.Background(), nsID, flat) - uploadDuration := time.Since(uploadStart) - if err != nil { - c.logger.Warn(). - Dur("duration", uploadDuration). - Int("flat_size", len(flat)). - Int("blob_count", len(data)). - Err(err). - Msg("fiber upload duration (failed)") - } else { + + ids := make([][]byte, 0, len(chunks)) + var submitted int + for chunkIdx, chunk := range chunks { + flat := flattenBlobs(chunk) + uploadStart := time.Now() + result, err := c.fiber.Upload(context.Background(), nsID, flat) + uploadDuration := time.Since(uploadStart) + if err != nil { + c.logger.Warn(). + Dur("duration", uploadDuration). + Int("flat_size", len(flat)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). + Err(err). + Msg("fiber upload duration (failed)") + code := datypes.StatusError + switch { + case errors.Is(err, context.Canceled): + code = datypes.StatusContextCanceled + case errors.Is(err, context.DeadlineExceeded): + code = datypes.StatusContextDeadline + } + c.logger.Error().Err(err).Msg("fiber upload failed") + return datypes.ResultSubmit{ + BaseResult: datypes.BaseResult{ + Code: code, + Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err), + SubmittedCount: uint64(submitted), + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } + } c.logger.Info(). Dur("duration", uploadDuration). Int("flat_size", len(flat)). - Int("blob_count", len(data)). + Int("blob_count", len(chunk)). + Int("chunk_idx", chunkIdx). + Int("chunk_total", len(chunks)). Msg("fiber upload duration (ok)") - } - if err != nil { - code := datypes.StatusError - switch { - case errors.Is(err, context.Canceled): - code = datypes.StatusContextCanceled - case errors.Is(err, context.DeadlineExceeded): - code = datypes.StatusContextDeadline - } - c.logger.Error().Err(err).Msg("fiber upload failed") - return datypes.ResultSubmit{ - BaseResult: datypes.BaseResult{ - Code: code, - Message: fmt.Sprintf("fiber upload failed for blob: %v", err), - SubmittedCount: uint64(len(data) - 1), - BlobSize: blobSize, - Timestamp: time.Now(), - }, - } + ids = append(ids, result.BlobID) + submitted += len(chunk) } - c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") + c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusSuccess, - IDs: [][]byte{result.BlobID}, - SubmittedCount: uint64(len(data)), + IDs: ids, + SubmittedCount: uint64(submitted), Height: 0, /* TODO */ BlobSize: blobSize, Timestamp: time.Now(), @@ -140,6 +153,40 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } +// fibreUploadChunkBudget is the target maximum flattened size of a single +// Fibre Upload call. Fibre rejects payloads above ~128 MiB +// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for +// flattenBlobs's per-blob length prefixes and for any future overhead. +const fibreUploadChunkBudget = 120 * 1024 * 1024 + +// chunkBlobsForFibre groups data into chunks whose flattened size stays +// below budget. Per-blob length-prefix overhead matches flattenBlobs. +// A single oversized blob (already validated against DefaultMaxBlobSize +// above) lands in its own chunk; the upload still fails server-side but +// at least we don't drag healthy peers down with it. +func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte { + if len(data) == 0 { + return nil + } + chunks := make([][][]byte, 0, 1) + cur := make([][]byte, 0, len(data)) + curSize := 4 // flattenBlobs's count prefix + for _, b := range data { + entry := 4 + len(b) + if len(cur) > 0 && curSize+entry > budget { + chunks = append(chunks, cur) + cur = make([][]byte, 0, len(data)) + curSize = 4 + } + cur = append(cur, b) + curSize += entry + } + if len(cur) > 0 { + chunks = append(chunks, cur) + } + return chunks +} + func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { return c.retrieve(ctx, height, namespace, true) } From 5c1eab988aa6d5a8c342d374ccacfc87b8db9b1a Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:43:58 +0200 Subject: [PATCH 06/11] fix(evnode-fibre): cap per-block data at 100 MiB to fit a Fibre upload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Companion to the submitter chunking fix. The submitter can split a multi-blob batch into ≤120 MiB Fibre uploads, but a *single* block_data item that exceeds 128 MiB still ends up alone in its own chunk and fails server-side ("blob size exceeds maximum allowed size"). Lower the per-block cap to 100 MiB so under high-throughput txsim a single block can't grow past Fibre's hard limit, and update the comment to explain the relationship between this cap and Fibre's ~128 MiB upload reject threshold. Co-Authored-By: Claude Opus 4.7 (1M context) --- tools/celestia-node-fiber/cmd/evnode-fibre/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 8e980c3983..458905bc88 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -306,7 +306,12 @@ func run(cli cliFlags) error { // Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s, // DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap. cfg.ApplyFiberDefaults() - block.SetMaxBlobSize(120 * 1024 * 1024) + // 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we + // hit `data size exceeds maximum 134217723` at 128 MiB - 5 B). + // Set the per-block data cap below that so each block_data item + // fits in a single Fibre upload after the submitter splits a + // multi-blob batch into ≤120 MiB chunks. + block.SetMaxBlobSize(100 * 1024 * 1024) cfg.P2P.ListenAddress = cli.p2pListen cfg.P2P.DisableConnectionGater = true cfg.RPC.Address = cli.rpcListen From 046c0226332eecb44011ae943133b042d17745a1 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Fri, 1 May 2026 17:53:38 +0200 Subject: [PATCH 07/11] fix(evnode-fibre): enforce maxBytes in inMemExecutor.FilterTxs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stub executor used by the runner returned FilterOK for every transaction unconditionally, ignoring the maxBytes budget plumbed through SoloSequencer.GetNextBatch. Under sustained txsim load (~50 MiB/s, 8 concurrent senders) the mempool would accumulate ~50K txs while a 100 MiB upload was in flight; on the next batch the sequencer drained ALL of them into one block (~369 MiB raw), the submitter saw a single item exceeding the per-blob cap, and halted the node with `single item exceeds DA blob size limit`. Walk the input txs in arrival order, accumulate sizes against maxBytes, and return FilterPostpone past the budget so the sequencer puts the overflow back on its queue. Verified live: blocks now cap at ~10K txs / ~100 MiB and evnode sustains 58.77 MB/s DA upload throughput through a 5-min txsim run with zero crashes (was 0 → crash within 30 s before this fix). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cmd/evnode-fibre/main.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 458905bc88..0173609eec 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -551,10 +551,24 @@ func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.Execu return coreexecution.ExecutionInfo{MaxGas: 0}, nil } -func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, _, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { +// FilterTxs admits txs in arrival order until the maxBytes budget is +// reached, then postpones the rest back to the sequencer queue so they +// land in a future batch. Skipping this enforcement (a previous version +// returned FilterOK unconditionally) lets a single block sweep up the +// entire mempool — under sustained txsim load that produced 369 MiB +// blocks that exceeded Fibre's per-upload cap and crashed the node +// with `single item exceeds DA blob size limit`. +func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { st := make([]coreexecution.FilterStatus, len(txs)) - for i := range st { + var used uint64 + for i, tx := range txs { + size := uint64(len(tx)) + if maxBytes > 0 && used+size > maxBytes { + st[i] = coreexecution.FilterPostpone + continue + } st[i] = coreexecution.FilterOK + used += size } return st, nil } From 7a3d6105ebbf1818074f814f32f9f9e4f3be045c Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sun, 3 May 2026 00:34:55 +0200 Subject: [PATCH 08/11] fix(submitter): replace single-flight mutex with bounded worker pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Headers and data submissions used a Mutex.TryLock single-flight guard, so even though up to 10 blobs could be pending, only one upload of each kind ran at a time. That tied DA upload throughput to the latency of one gRPC round-trip; under load with 1-7 s per 98 MiB upload, achievable DA bandwidth capped at ~20 MB/s even though the FSPs and the network had abundant headroom. Replace each Mutex with a buffered-channel semaphore sized to config.Node.MaxPendingHeadersAndData (the same cap that bounds in-flight blob memory). Acquisition is non-blocking (`select` with `default:` skip on full), so a busy tick reuses the existing "try and skip" semantics — but instead of blocking the next batch, multiple batches can pipeline through Fibre concurrently. Pool size = pending-cap is the right relationship: each pending blob can have at most one in-flight submission, and we never exceed the memory the cap was set against. A zero pending-cap falls back to single-flight (cap 1) so callers that opted out of pending-bounding don't accidentally get unbounded fan-out. The TestSubmitter_daSubmissionLoop test constructs Submitter{} directly bypassing NewSubmitter, so it now also initializes the two semaphores — without them the channels are nil and `select` always hits the default branch, causing the test to time out waiting for SubmitHeaders/SubmitData calls. Co-Authored-By: Claude Opus 4.7 (1M context) --- block/internal/submitting/submitter.go | 66 ++++++++++++++------- block/internal/submitting/submitter_test.go | 24 ++++---- 2 files changed, 59 insertions(+), 31 deletions(-) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 2c613327f7..fa6aa5bba0 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -52,9 +52,18 @@ type Submitter struct { // DA state daIncludedHeight *atomic.Uint64 - // Submission state to prevent concurrent submissions - headerSubmissionMtx sync.Mutex - dataSubmissionMtx sync.Mutex + // Submission concurrency: each semaphore is a buffered channel + // sized to MaxPendingHeadersAndData. A zero value disables the + // limit and falls back to a single in-flight submission per type + // (= cap 1) so callers that opt out of pending-cap don't get + // unbounded fan-out. Tickets are acquired non-blocking via + // `select` and released by the goroutine that started the + // submission. Replaces the previous single-flight Mutex which + // pinned data-upload throughput at the latency of a single + // gRPC round-trip — under sustained load that capped DA at + // ~20 MB/s even though Fibre's per-blob upload took ≤1.5 s. + headerSubmissionSem chan struct{} + dataSubmissionSem chan struct{} // Batching strategy state lastHeaderSubmit atomic.Int64 // stores Unix nanoseconds @@ -95,20 +104,31 @@ func NewSubmitter( strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 0, 1) } + // Pool size = pending-cap. Each pending blob gets up to one + // in-flight submission; if the cap is 0 (unbounded pending) we + // keep at least one slot so we don't reintroduce single-flight + // behavior accidentally. + poolSize := int(config.Node.MaxPendingHeadersAndData) + if poolSize <= 0 { + poolSize = 1 + } + submitter := &Submitter{ - store: store, - exec: exec, - cache: cache, - metrics: metrics, - config: config, - genesis: genesis, - daSubmitter: daSubmitter, - sequencer: sequencer, - signer: signer, - daIncludedHeight: &atomic.Uint64{}, - batchingStrategy: strategy, - errorCh: errorCh, - logger: submitterLogger, + store: store, + exec: exec, + cache: cache, + metrics: metrics, + config: config, + genesis: genesis, + daSubmitter: daSubmitter, + sequencer: sequencer, + signer: signer, + daIncludedHeight: &atomic.Uint64{}, + batchingStrategy: strategy, + errorCh: errorCh, + logger: submitterLogger, + headerSubmissionSem: make(chan struct{}, poolSize), + dataSubmissionSem: make(chan struct{}, poolSize), } now := time.Now().UnixNano() @@ -194,12 +214,13 @@ func (s *Submitter) daSubmissionLoop() { // For strategy decision, we need to estimate the size // We'll fetch headers to check, but only submit if strategy approves - if s.headerSubmissionMtx.TryLock() { + select { + case s.headerSubmissionSem <- struct{}{}: s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress") s.wg.Add(1) go func() { defer func() { - s.headerSubmissionMtx.Unlock() + <-s.headerSubmissionSem s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed") s.wg.Done() }() @@ -266,6 +287,8 @@ func (s *Submitter) daSubmissionLoop() { s.logger.Error().Err(err).Msg("failed to enqueue header submission") } }() + default: + // All header workers busy; try again on the next tick. } } @@ -274,12 +297,13 @@ func (s *Submitter) daSubmissionLoop() { if dataNb > 0 { lastSubmitNanos := s.lastDataSubmit.Load() timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos)) - if s.dataSubmissionMtx.TryLock() { + select { + case s.dataSubmissionSem <- struct{}{}: s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress") s.wg.Add(1) go func() { defer func() { - s.dataSubmissionMtx.Unlock() + <-s.dataSubmissionSem s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed") s.wg.Done() }() @@ -346,6 +370,8 @@ func (s *Submitter) daSubmissionLoop() { s.logger.Error().Err(err).Msg("failed to enqueue data submission") } }() + default: + // All data workers busy; try again on the next tick. } } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 6e684176be..2231d19f30 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -361,17 +361,19 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) { require.NoError(t, err) s := &Submitter{ - store: st, - exec: exec, - cache: cm, - metrics: metrics, - config: cfg, - genesis: genesis.Genesis{}, - daSubmitter: fakeDA, - signer: &fakeSigner{}, - daIncludedHeight: &atomic.Uint64{}, - batchingStrategy: batchingStrategy, - logger: zerolog.Nop(), + store: st, + exec: exec, + cache: cm, + metrics: metrics, + config: cfg, + genesis: genesis.Genesis{}, + daSubmitter: fakeDA, + signer: &fakeSigner{}, + daIncludedHeight: &atomic.Uint64{}, + batchingStrategy: batchingStrategy, + logger: zerolog.Nop(), + headerSubmissionSem: make(chan struct{}, 1), + dataSubmissionSem: make(chan struct{}, 1), } // Set last submit times far in past so strategy allows submission From b7c6a16e25d7adf245aa20e282688dd49a128a2e Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sun, 3 May 2026 00:35:16 +0200 Subject: [PATCH 09/11] feat(evnode-txsim): keep-alive conn pool + pprof endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two diagnostic improvements for the load generator: 1. http.Transport.MaxIdleConnsPerHost defaults to 2 in stdlib. With --concurrency=8 (or higher), 6+ goroutines per cycle had to open fresh TCP+TLS sockets per request because the pool couldn't hold their idle conns between requests. Bump MaxIdleConns / MaxIdleConnsPerHost / MaxConnsPerHost to 2*concurrency so every active sender has a reusable keep-alive socket, eliminating handshake churn from the hot path. 2. Always-on net/http/pprof on 127.0.0.1:6060. evnode-txsim is a load tester, not a production daemon, so cost of always serving profiling is acceptable; the payoff is being able to grab CPU profiles under live load without re-deploying the binary — `ssh -L 6060:127.0.0.1:6060 root@loadgen \ go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30`. A profile captured this way under c=8 traced the per-request hot path: 25.5% in kernel write(2), 25% in net/http body marshaling. That diagnostic surfaced that the c6in.2xlarge loadgen was the binding constraint for the experiment at ~22 MB/s, not evnode or DA — a finding we'd have spent another debug round chasing without the in-process profiler. Co-Authored-By: Claude Opus 4.7 (1M context) --- tools/talis/cmd/evnode-txsim/main.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/tools/talis/cmd/evnode-txsim/main.go b/tools/talis/cmd/evnode-txsim/main.go index d8fd1a9435..39edd66fd1 100644 --- a/tools/talis/cmd/evnode-txsim/main.go +++ b/tools/talis/cmd/evnode-txsim/main.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "net/http" + _ "net/http/pprof" "os" "os/signal" "sort" @@ -113,7 +114,27 @@ func run(cli cliFlags) error { return fmt.Errorf("seed random pool: %w", err) } - httpClient := &http.Client{Timeout: cli.timeout} + // Bump per-host idle connections so concurrent goroutines reuse + // keep-alive sockets instead of churning TCP+TLS handshakes — + // stdlib default MaxIdleConnsPerHost=2 caps in-flight requests + // to 2 keep-alive sockets per target, which serializes any + // concurrency>2 onto fresh connections each request. + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 2 * cli.concurrency + transport.MaxIdleConnsPerHost = 2 * cli.concurrency + transport.MaxConnsPerHost = 2 * cli.concurrency + httpClient := &http.Client{Timeout: cli.timeout, Transport: transport} + + // pprof on a dedicated listener — `_ "net/http/pprof"` registers + // handlers on http.DefaultServeMux. Always-on at 127.0.0.1:6060 + // since this is a load-tester binary, not a production daemon; + // SSH port-forward to grab profiles under load: + // + // ssh -L 6060:127.0.0.1:6060 root@loadgen \ + // go tool pprof http://localhost:6060/debug/pprof/profile?seconds=10 + go func() { + _ = http.ListenAndServe("127.0.0.1:6060", nil) + }() ctx, cancel := context.WithCancel(context.Background()) if cli.duration > 0 { From 247ba86b356f34ffda1a455964a30ed7d187f840 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sun, 3 May 2026 00:35:41 +0200 Subject: [PATCH 10/11] fix(solo,reaping): bound sequencer queue to prevent ingest-side OOM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Under sustained ingest above the block-production drain rate, SoloSequencer.queue grew monotonically. A 32-vCPU loadgen pushing >100 MB/s into a runner whose executor drains ~100 MB/s per block filled the queue at ~150 MB/s of net-positive growth — heap profiles showed 24 GB of retained io.ReadAll bytes in the queue within ~30 s, then anon-rss:63GB OOM-kill at the box's 64 GiB ceiling. Reproducible twice with identical signature. Two changes, one feature: - SoloSequencer.SetMaxQueueBytes(n) caps the queue's total retained tx bytes. SubmitBatchTxs uses all-or-nothing admission against the cap: if the incoming batch would push us over, the whole batch is rejected with ErrQueueFull and the queue keeps its current contents untouched. Partial admission would force the caller to track which prefix succeeded and only re-feed the suffix on retry; the reaper currently doesn't do that, so the whole-batch rule lets the reaper just retry the same batch later when the queue has drained. queueBytes is decremented on drain (queue := nil) and re-counted for postponed txs that the executor's FilterTxs returns to the queue. Zero cap = the legacy unbounded path, preserved for tests and small deployments. - The reaper bridging executor mempool → sequencer matches ErrQueueFull via errors.Is and treats it as transient backpressure: marks the rejected hashes as "seen" so the next reaper tick doesn't re-hash + re-submit the same already- rejected txs forever, logs a warn line with the dropped count, and continues running. Without this match every queue-full event would tear the daemon down via the existing fatal-on- submit-error path. Loadgen sees the backpressure indirectly: with the sequencer queue full, the executor's txChan stops draining, /tx blocks on its bounded channel send, and txsim observes 5xx / timeouts — cleanly applied at the application layer instead of via the kernel OOM-killer. Co-Authored-By: Claude Opus 4.7 (1M context) --- block/internal/reaping/reaper.go | 12 ++++++ pkg/sequencers/solo/sequencer.go | 67 ++++++++++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index d35c61e422..66e058fbb2 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -16,6 +16,7 @@ import ( coreexecutor "github.com/evstack/ev-node/core/execution" coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/sequencers/solo" ) const ( @@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) { Id: []byte(r.chainID), Batch: &coresequencer.Batch{Transactions: newTxs}, }) + if errors.Is(err, solo.ErrQueueFull) { + // Sequencer queue is full — backpressure signal. Mark the + // batch as "seen" so we don't waste cycles re-hashing the + // same dropped txs every reaper tick, and surface the drop + // as a warning rather than tearing down the daemon. The + // loadgen sees lower acceptance via /tx flow control once + // the executor's own mempool fills up. + r.cache.SetTxsSeen(newHashes) + r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)") + break + } if err != nil { return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) } diff --git a/pkg/sequencers/solo/sequencer.go b/pkg/sequencers/solo/sequencer.go index 86fa08d45d..f8524157e6 100644 --- a/pkg/sequencers/solo/sequencer.go +++ b/pkg/sequencers/solo/sequencer.go @@ -14,7 +14,15 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" ) -var ErrInvalidID = errors.New("invalid chain id") +var ( + ErrInvalidID = errors.New("invalid chain id") + // ErrQueueFull is returned from SubmitBatchTxs when the in-memory + // queue is at its byte cap (see SetMaxQueueBytes). Callers should + // treat this as transient backpressure (drop or retry); the + // reaper bridging executor mempool → sequencer matches it via + // errors.Is and downgrades to a warning. + ErrQueueFull = errors.New("sequencer queue full") +) var ( emptyBatch = &coresequencer.Batch{} @@ -27,6 +35,15 @@ var _ coresequencer.Sequencer = (*SoloSequencer)(nil) // SoloSequencer is a single-leader sequencer without forced inclusion // support. It maintains a simple in-memory queue of mempool transactions and // produces batches on demand. +// +// The queue can be bounded in bytes via SetMaxQueueBytes. A bound is +// strongly recommended in any high-throughput configuration: under +// sustained ingest above the block-production drain rate the queue +// otherwise grows monotonically until OOM. With a bound set, +// SubmitBatchTxs admits only as many incoming txs as fit and returns +// ErrQueueFull if the bound rejected at least one tx, so callers can +// surface backpressure (e.g. via HTTP 503) instead of silently +// retaining bytes. type SoloSequencer struct { logger zerolog.Logger id []byte @@ -34,8 +51,10 @@ type SoloSequencer struct { daHeight atomic.Uint64 - mu sync.Mutex - queue [][]byte + mu sync.Mutex + queue [][]byte + queueBytes uint64 + maxQueueBytes uint64 // 0 = unbounded (legacy default) } func NewSoloSequencer( @@ -51,6 +70,16 @@ func NewSoloSequencer( } } +// SetMaxQueueBytes sets a soft cap on the sequencer's in-memory tx +// queue. SubmitBatchTxs admits txs in arrival order while the cap has +// room and returns ErrQueueFull as soon as one is rejected. A zero value +// disables the cap. Intended to be called once at startup. +func (s *SoloSequencer) SetMaxQueueBytes(n uint64) { + s.mu.Lock() + defer s.mu.Unlock() + s.maxQueueBytes = n +} + func (s *SoloSequencer) isValid(id []byte) bool { return bytes.Equal(s.id, id) } @@ -67,7 +96,30 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su s.mu.Lock() defer s.mu.Unlock() + if s.maxQueueBytes == 0 { + // Unbounded path (legacy). Suitable for tests and small + // deployments; in production use SetMaxQueueBytes. + s.queue = append(s.queue, req.Batch.Transactions...) + return submitBatchResp, nil + } + + // All-or-nothing: if the whole incoming batch doesn't fit, reject + // it untouched. Partial admission would force the caller (e.g. + // the reaper bridging executor mempool → sequencer) to reason + // about which prefix was admitted and re-feed only the suffix on + // retry, which it doesn't currently do — leading to duplicate-tx + // resubmission on each retry. Rejecting the whole batch lets the + // reaper just retry with the same batch later when the queue has + // drained. + var batchBytes uint64 + for _, tx := range req.Batch.Transactions { + batchBytes += uint64(len(tx)) + } + if s.queueBytes+batchBytes > s.maxQueueBytes { + return submitBatchResp, ErrQueueFull + } s.queue = append(s.queue, req.Batch.Transactions...) + s.queueBytes += batchBytes return submitBatchResp, nil } @@ -79,6 +131,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN s.mu.Lock() txs := s.queue s.queue = nil + s.queueBytes = 0 s.mu.Unlock() if len(txs) == 0 { @@ -122,6 +175,14 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN if len(postponedTxs) > 0 { s.mu.Lock() s.queue = append(postponedTxs, s.queue...) + // Postponed txs were already in the queue's byte count when + // SubmitBatchTxs admitted them. We zeroed queueBytes on drain + // above, so re-queuing requires re-counting whatever survived. + var bytes uint64 + for _, tx := range postponedTxs { + bytes += uint64(len(tx)) + } + s.queueBytes += bytes s.mu.Unlock() } From d3224133e8552a9f0a32d56a27b05007971d8415 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Sun, 3 May 2026 00:36:00 +0200 Subject: [PATCH 11/11] fix(evnode-fibre): wire sequencer queue cap + lift ingest queue caps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two runner-side changes paired with the SoloSequencer bound: - After constructing the SoloSequencer, call SetMaxQueueBytes with 10× the per-block tx budget (= 1 GiB at the current 100 MiB MaxBlobSize). 10× is the sweet spot: large enough that a short burst above steady-state ingest doesn't trigger backpressure (we want to absorb that), small enough that the worst-case retained bytes fit comfortably under the box's RAM budget alongside the pending cache + DA in-flight buffers. - Lift the inMemExecutor's hardcoded ingest caps. txChan and maxBlockTxs were sized at 500 (5 MB / 5K txs per reaper poll) back when those were the only memory bound on the runner. With the SetMaxQueueBytes cap and the FilterTxs-enforced per-block budget now actually doing the bounding, the ingest queue can hold a full 100 MiB block-worth of txs (10K slots at 10 KB) without burdening memory — and a single reaper poll can drain that whole batch in one GetTxs call instead of needing 20× cycles. This was the binding constraint at ~5,000 tx/s = 50 MB/s in earlier runs. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cmd/evnode-fibre/main.go | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go index 0173609eec..c8fd9f88d5 100644 --- a/tools/celestia-node-fiber/cmd/evnode-fibre/main.go +++ b/tools/celestia-node-fiber/cmd/evnode-fibre/main.go @@ -331,6 +331,19 @@ func run(cli cliFlags) error { executor := newInMemExecutor() sequencer := solo.NewSoloSequencer(logger, []byte(genesis.ChainID), executor) + // Cap the sequencer's in-memory queue at 10× the per-block tx + // budget. Above this, SubmitBatchTxs returns ErrQueueFull and the + // runner's reaper-bridge / tx-ingress applies backpressure (txs + // stay in the executor's txChan until the sequencer drains, and + // the chan's bound 503's /tx). Without this cap a fast loadgen + // (32 vCPU pushing >100 MB/s) outruns the 1 block/s drain and + // the queue grows monotonically — observed pre-fix as 24 GB of + // retained io.ReadAll bytes in heap snapshots before the daemon + // hit the 64 GiB box ceiling and OOM-killed. + // Sized at 10× the per-block tx budget (matches SetMaxBlobSize + // above; both anchor at the per-blob Fibre cap). + const seqQueueBytes = 10 * 100 * 1024 * 1024 // 1 GiB + sequencer.SetMaxQueueBytes(seqQueueBytes) daClient := block.NewFiberDAClient(adapter, cfg, logger, 0) p2pClient, err := p2p.NewClient(cfg.P2P, nodeKey.PrivKey, datastore.NewMapDatastore(), genesis.ChainID, logger, nil) if err != nil { @@ -483,23 +496,23 @@ type inMemExecutor struct { totalTxs atomic.Uint64 } -// txChan capacity caps in-flight memory: at 10 KB tx and 500 slots -// we hold ≤ 5 MB queued before /tx blocks the ingress goroutine — -// which is exactly the backpressure we want against a hot loadgen. -// Reaper drains every 100 ms into the solo sequencer, which then -// accumulates batches between block-production ticks; without a tight -// cap a single block can balloon past the 120 MiB DA blob limit and -// the rest of the daemon's per-block allocations push the box past -// its RAM budget within seconds. +// txChan capacity bounds the HTTP /tx ingest queue. Sized at 10K +// slots (~100 MiB at 10 KB tx-size) so a 100 ms reaper cycle can +// absorb a full max-size block's worth of txs without /tx blocking +// the loadgen. Earlier we used 500 slots (~5 MiB) which forced +// backpressure at ~5,000 tx/s — that turned txsim into the limiting +// factor at ~22 MB/s rather than DA upload. With the per-block +// FilterTxs cap (executor.go:RetrieveBatch via DefaultMaxBlobSize= +// 100 MiB) and the submitter chunker now enforcing the actual blob +// budget, the executor doesn't need an extra ingest-side cap. // -// maxBlockTxs caps GetTxs's per-call return so reaper-cycle batches -// are bounded too. With 500 ≤ 5 MB per block at 10 KB tx-size, we -// stay an order of magnitude under the DA cap so headers/data signing -// + envelope cache + retry buffers all fit. +// maxBlockTxs caps GetTxs's per-call return; pairs with the channel +// size so a reaper poll can fully drain a 100 MiB-block-worth of +// queued txs in a single call instead of needing 20× cycles. func newInMemExecutor() *inMemExecutor { return &inMemExecutor{ - txChan: make(chan []byte, 500), - maxBlockTxs: 500, + txChan: make(chan []byte, 10000), + maxBlockTxs: 10000, } }