Skip to content

feat(firehose): async emission, shutdown drain, and startup gap detection#1

Draft
Copilot wants to merge 3 commits into
mainfrom
copilot/update-firehose-emission-mode
Draft

feat(firehose): async emission, shutdown drain, and startup gap detection#1
Copilot wants to merge 3 commits into
mainfrom
copilot/update-firehose-emission-mode

Conversation

Copy link
Copy Markdown

Copilot AI commented Apr 30, 2026

Wires firehose-tracer v5.1.0 into the reth node with emission-mode selection, graceful shutdown draining, and cursor-based gap detection before the sync pipeline starts.

New crate: crates/firehose (reth-firehose)

  • FirehoseArgs — clap argument group flattened into the node command:
    • --firehose.emission-mode blocking|async|auto (default: auto)
    • --firehose.channel-capacity <N> (default: 32)
    • --firehose.live-threshold <SECS> (default: 60)
    • --firehose.cursor-path <PATH> (default: <datadir>/firehose.cursor)
  • init_tracer(config) -> Option<ShutdownHandle> — one-shot global OnceLock<Mutex<Tracer>> init; panics on double-init to surface programming errors early
  • check_gap_and_re_trace(provider, cursor_path) — reads the cursor file, compares against the execution stage checkpoint, and calls firehose_re_trace_range when a gap exists
  • firehose_re_trace_range(range, provider) — iterates the gap; pruning detection surfaces naturally from execution errors with a clear fatal message pointing to archive-node requirements; full call-level tracing pending FirehoseEvmInspector integration (documented as TODO in re_trace_single_block)

bin/reth/src/main.rs

Switch to Cli::<EthereumChainSpecParser, FirehoseArgs> and add an on_component_initialized hook — runs before the sync pipeline starts:

.on_component_initialized(move |node| {
    let cfg = firehose_args.to_tracer_config(&data_dir);
    let cursor_path = cfg.cursor_path.clone();
    let shutdown_handle = reth_firehose::init_tracer(cfg);

    reth_firehose::check_gap_and_re_trace(&node.provider, cursor_path.as_deref())?;

    if let Some(handle) = shutdown_handle {
        node.task_executor.spawn_with_graceful_shutdown_signal(|shutdown| async move {
            let _guard = shutdown.await;
            handle.drain(); // flush async writer before process exit
        });
    }
    Ok(())
})

Dependency conflict: vendor/rbase64

firehose-tracer depends on rbase64 v2.0.3, which unconditionally sets #[global_allocator] (MiMalloc) in its library crate — a bug that conflicts with reth's jemalloc. A minimal vendor patch removes that declaration; wired via [patch.crates-io] in the workspace Cargo.toml.

Original prompt

Plan: Reth Integration — Async Firehose Emission

Prerequisite: evm-firehose-tracer-rs plan fully implemented and published.


1. Bump dependency

Update Cargo.toml to the new firehose-tracer version that includes
EmissionMode, ShutdownHandle, and last_confirmed_block().


2. Expose EmissionMode via CLI / config

In bin/reth/src/main.rs (or wherever firehose_tracer::Config is built),
wire a CLI flag or env var to Config::emission_mode:

--firehose.emission-mode  blocking | async | auto   (default: auto)
--firehose.channel-capacity  <n>                    (default: 32)
--firehose.live-threshold    <secs>                 (default: 60)
--firehose.cursor-path       <path>                 (default: <datadir>/firehose.cursor)

Keep it minimal — env-var override is fine if CLI flags feel heavy.


3. Shutdown wiring

In the node startup code (where init_tracer(...) is called), capture the
shutdown handle and register it with reth's shutdown lifecycle:

let tracer = firehose_tracer::Tracer::new(config);
let shutdown_handle = tracer.shutdown_handle(); // Option<ShutdownHandle>
init_tracer(tracer);

// Wire drain into node shutdown — one line
if let Some(handle) = shutdown_handle {
    node.on_shutdown(move || handle.drain());
}

The exact node.on_shutdown API depends on reth's node builder; adapt as needed.
The invariant: ShutdownHandle::drain() must be called before the process exits,
and after the execution stage has stopped producing blocks.


4. Gap detection and re-trace on startup

Before the sync pipeline starts, check for a gap between the cursor and the
execution stage checkpoint:

// In the Firehose startup hook (before pipeline launch)
let last_written = reth_firehose::tracer().last_confirmed_block();
let exec_tip     = provider.get_stage_checkpoint(StageId::Execution)?.block_number;

if let Some(last) = last_written {
    if last < exec_tip {
        firehose_re_trace_range(last + 1..=exec_tip, &provider).await?;
    }
}

firehose_re_trace_range lives in reth/crates/firehose/src/runner.rs (or a new
re_trace.rs). It:

  1. Iterates block numbers in the range.
  2. For each block, fetches the sealed block + pre-execution state from the provider
    (same approach as debug_traceBlock).
  3. Runs FirehoseWrappedExecutor with the Firehose inspector in emit-only mode —
    no DB writes.
  4. Calls on_block_end() which emits the FIRE BLOCK line (and updates the cursor).

Pruning guard: before starting the re-trace loop, verify that
provider.earliest_full_block() (or equivalent) is ≤ last + 1. If the required
historical state has been pruned, log a fatal error with a clear message explaining
that Firehose requires an archive node (or that pruning must be configured to retain
state back to last_written). Do not silently skip blocks.


5. Acceptance criteria

  • --firehose.emission-mode auto is the default; blocking matches previous behaviour exactly.
  • ShutdownHandle::drain() is called on graceful shutdown before process exit.
  • On startup with a gap, firehose_re_trace_range re-emits the missing blocks before sync resumes.
  • If pruned state prevents re-trace, the node exits with a clear fatal error.
  • No changes to the FIRE BLOCK line format or tracer event semantics.

Created from VS Code.

Copilot AI and others added 2 commits April 30, 2026 21:05
… detection

- Create `crates/firehose/` crate (`reth-firehose`) with:
  - `FirehoseArgs`: clap CLI args for `--firehose.emission-mode`, `--firehose.channel-capacity`, `--firehose.live-threshold`, `--firehose.cursor-path`
  - `init_tracer()`: one-shot global tracer initialisation returning `ShutdownHandle`
  - `check_gap_and_re_trace()`: detects gaps between cursor file and execution stage checkpoint, re-emits missing blocks
  - `firehose_re_trace_range()`: pruning guard + per-block re-execution loop (full EVM inspector integration noted as TODO)
- Patch `rbase64` (transitive dep of `firehose-tracer`) to remove its `#[global_allocator]` declaration that conflicts with reth's jemalloc allocator
- Add `firehose-tracer = "5.1.0"` and `reth-firehose` to workspace `Cargo.toml`
- Modify `bin/reth/src/main.rs` to use `FirehoseArgs` as CLI extension type and wire the `on_component_initialized` startup hook that: initialises the tracer, checks for gaps, and registers `ShutdownHandle::drain()` with `spawn_with_graceful_shutdown_signal`

Agent-Logs-Url: https://github.com/streamingfast/reth/sessions/088879c2-d333-44f1-83d2-83a9ae603230

Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com>
- Fix incorrect pruning guard: remove flawed `best_block_number > first` check that
  would have triggered spurious fatal errors on any synced node; pruning detection
  now happens naturally during block execution with context-enriched error message
- Remove unused lock acquisition in `re_trace_single_block`; tracer init check uses
  `let _ =` pattern without holding the lock
- Improve `read_cursor` docstring to explicitly document that file corruption is
  treated the same as a missing file
- Clarify `init_tracer` panic documentation: explain that double-init is a
  programming error (not just "development artifact") to prevent silent data loss
- Fix misleading "automatically generated" comment in vendor/rbase64/Cargo.toml
- Remove unused `error` tracing import

Agent-Logs-Url: https://github.com/streamingfast/reth/sessions/088879c2-d333-44f1-83d2-83a9ae603230

Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com>
Copilot AI changed the title [WIP] Expose EmissionMode via CLI and update dependency feat(firehose): async emission, shutdown drain, and startup gap detection Apr 30, 2026
Copilot AI requested a review from maoueh April 30, 2026 21:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants