diff --git a/Cargo.lock b/Cargo.lock index c868afab..bee67dc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3780,6 +3780,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "evmap" version = "11.0.0" @@ -11243,9 +11254,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba06dc92c72b877f042ba141c7ee553c68aab1c6e077a3ba1adb2eae17fdaff4" dependencies = [ "alloy", + "eventsource-stream", "futures-util", "reqwest", "serde", + "serde_json", "signet-bundle", "signet-constants", "signet-types", diff --git a/Cargo.toml b/Cargo.toml index 79bdf6f0..0a26fe1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ name = "zenith-builder-example" path = "bin/builder.rs" [dependencies] -init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] } +init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] } signet-constants = { version = "0.16.0-rc.17" } signet-sim = { version = "0.16.0-rc.17" } diff --git a/src/metrics.rs b/src/metrics.rs index 14c57dbd..ac9b8f01 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -33,6 +33,9 @@ const TX_POLL_ERRORS_HELP: &str = "Transaction cache poll errors."; const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched"; const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle."; +const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts"; +const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE transaction stream reconnect attempts."; + const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count"; const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts."; @@ -148,6 +151,7 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| { describe_counter!(TX_POLL_COUNT, TX_POLL_COUNT_HELP); describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP); describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP); + describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP); describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP); describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP); describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP); @@ -234,6 +238,11 @@ pub(crate) fn record_txs_fetched(count: usize) { histogram!(TXS_FETCHED).record(count as f64); } +/// Increment the SSE reconnect attempts counter. +pub(crate) fn inc_sse_reconnect_attempts() { + counter!(SSE_RECONNECT_ATTEMPTS).increment(1); +} + /// Increment the bundle poll attempt counter. pub(crate) fn inc_bundle_poll_count() { counter!(BUNDLE_POLL_COUNT).increment(1); diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 81223a36..89698a4d 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -23,7 +23,7 @@ impl CacheTasks { /// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s. pub fn spawn(&self) -> CacheSystem { // Tx Poller pulls transactions from the cache - let tx_poller = TxPoller::new(); + let tx_poller = TxPoller::new(self.block_env.clone()); let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 25a481d9..99939876 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -1,59 +1,51 @@ //! Transaction service responsible for fetching and sending transactions to the simulator. -use crate::config::BuilderConfig; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use futures_util::{TryFutureExt, TryStreamExt}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use signet_tx_cache::{TxCache, TxCacheError}; -use std::time::Duration; -use tokio::{sync::mpsc, task::JoinHandle, time}; -use tracing::{Instrument, debug, debug_span, trace, trace_span}; +use std::{ops::ControlFlow, pin::Pin, time::Duration}; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, + time, +}; +use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; + +type SseStream = Pin> + Send>>; -/// Poll interval for the transaction poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); -/// Implements a poller for the block builder to pull transactions from the -/// transaction pool. -#[derive(Debug, Clone)] +/// Fetches transactions from the transaction pool on startup and on each +/// block environment change, and subscribes to an SSE stream for real-time +/// delivery of new transactions in between. +#[derive(Debug)] pub struct TxPoller { /// Config values from the Builder. config: &'static BuilderConfig, /// Client for the tx cache. tx_cache: TxCache, - /// Defines the interval at which the service should poll the cache. - poll_interval_ms: u64, -} - -impl Default for TxPoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, + /// SSE reconnect backoff. Doubles on each reconnect (capped at + /// `MAX_RECONNECT_BACKOFF`) and resets to `INITIAL_RECONNECT_BACKOFF` + /// on each successfully received tx. + backoff: Duration, } -/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool -/// and sends them into the provided channel sender. impl TxPoller { - /// Returns a new [`TxPoller`] with the given config. - /// * Defaults to 1000ms poll interval (1s). - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } - - /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`TxPoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = TxCache::new(config.tx_pool_url.clone()); - Self { config, tx_cache, poll_interval_ms } + Self { config, tx_cache, envs, backoff: INITIAL_RECONNECT_BACKOFF } } - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) - } - - // Spawn a tokio task to check the nonce of a transaction before sending - // it to the cachetask via the outbound channel. + /// Spawn a tokio task to check the nonce of a transaction before sending + /// it to the cachetask via the outbound channel. fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_nonce", tx_id = %tx.tx_hash()); @@ -95,46 +87,133 @@ impl TxPoller { }); } - /// Polls the transaction cache for transactions, paginating through all available pages. - pub async fn check_tx_cache(&self) -> Result, TxCacheError> { + /// Pulls every transaction currently in the cache, paginating until the + /// stream is exhausted. Pure fetch — no metrics, no dispatch. + async fn check_tx_cache(&self) -> Result, TxCacheError> { self.tx_cache.stream_transactions().try_collect().await } - async fn task_future(self, outbound: mpsc::UnboundedSender) { - loop { - let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url); + /// Fetches all transactions from the cache and dispatches each one to + /// a nonce-check task. Records poll metrics around the fetch. + async fn fetch_and_dispatch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("TxPoller::fetch_and_dispatch", url = %self.config.tx_pool_url); + + crate::metrics::inc_tx_poll_count(); + let Ok(transactions) = self + .check_tx_cache() + .inspect_err(|error| { + crate::metrics::inc_tx_poll_errors(); + debug!(%error, "Error fetching transactions"); + }) + .instrument(span.clone()) + .await + else { + return; + }; + + let _guard = span.entered(); + crate::metrics::record_txs_fetched(transactions.len()); + trace!(count = transactions.len(), "found transactions"); + for tx in transactions { + self.spawn_check_nonce(tx, outbound.clone()); + } + } - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); - break; - } + /// Opens an SSE subscription to the transaction feed. Returns an empty + /// stream on connection failure so the caller can handle reconnection + /// uniformly. + async fn subscribe(&self) -> SseStream { + self.tx_cache + .subscribe_transactions() + .await + .inspect(|_| debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established")) + .inspect_err(|error| warn!(%error, "Failed to open SSE transaction subscription")) + .map(|s| Box::pin(s) as SseStream) + .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) + } - crate::metrics::inc_tx_poll_count(); - if let Ok(transactions) = self - .check_tx_cache() - .inspect_err(|error| { - crate::metrics::inc_tx_poll_errors(); - debug!(%error, "Error fetching transactions"); - }) - .instrument(span.clone()) - .await - { - let _guard = span.entered(); - crate::metrics::record_txs_fetched(transactions.len()); - trace!(count = transactions.len(), "found transactions"); - for tx in transactions.into_iter() { - self.spawn_check_nonce(tx, outbound.clone()); + /// Reconnects the SSE stream with backoff. Performs a full refetch to + /// cover any items missed while disconnected. + async fn reconnect(&mut self, outbound: &mpsc::UnboundedSender) -> SseStream { + crate::metrics::inc_sse_reconnect_attempts(); + tokio::select! { + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; + _ = self.envs.changed() => {} + _ = time::sleep(self.backoff) => {} + } + self.backoff = (self.backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.fetch_and_dispatch(outbound), self.subscribe()); + stream + } + + /// Processes a single item yielded by the SSE stream: dispatches the tx + /// for nonce checking on success, or reconnects on error / stream end. + /// Returns `Break` when the outbound channel has closed and the task + /// should shut down. + async fn handle_sse_item( + &mut self, + item: Option>, + outbound: &mpsc::UnboundedSender, + stream: &mut SseStream, + ) -> ControlFlow<()> { + match item { + Some(Ok(tx)) => { + self.backoff = INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + return ControlFlow::Break(()); } + self.spawn_check_nonce(tx, outbound.clone()); + } + Some(Err(error)) => { + warn!(%error, "SSE transaction stream error, reconnecting"); + *stream = self.reconnect(outbound).await; + } + None => { + warn!("SSE transaction stream ended, reconnecting"); + *stream = self.reconnect(outbound).await; } + } + ControlFlow::Continue(()) + } - time::sleep(self.poll_duration()).await; + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + // Initial full fetch of all currently-cached transactions, plus SSE + // subscription for real-time delivery, run concurrently — symmetric + // with the reconnect path. + let (_, mut sse_stream) = + tokio::join!(self.fetch_and_dispatch(&outbound), self.subscribe()); + + loop { + tokio::select! { + item = sse_stream.next() => { + if self + .handle_sse_item(item, &outbound, &mut sse_stream) + .await + .is_break() + { + break; + } + } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); + break; + } + debug!("Block env changed, refetching all transactions"); + self.fetch_and_dispatch(&outbound).await; + } + } } } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to - /// its sender. + /// Spawns a task that fetches all current transactions, then subscribes + /// to the SSE feed for real-time updates, refetching on each new block + /// environment. pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index d607d8c6..40ef31c6 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,11 +1,10 @@ #![cfg(feature = "test-utils")] use alloy::{primitives::U256, signers::local::PrivateKeySigner}; -use builder::{ - tasks::cache::TxPoller, - test_utils::{new_signed_tx, setup_logging, setup_test_config}, -}; +use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; use eyre::{Ok, Result}; +use futures_util::TryStreamExt; +use signet_tx_cache::TxCache; #[tokio::test] async fn test_tx_roundtrip() -> Result<()> { @@ -15,11 +14,9 @@ async fn test_tx_roundtrip() -> Result<()> { // Post a transaction to the cache post_tx().await?; - // Create a new poller - let poller = TxPoller::new(); - // Fetch transactions from the pool - let transactions = poller.check_tx_cache().await?; + let tx_cache = TxCache::new(builder::config().tx_pool_url.clone()); + let transactions: Vec<_> = tx_cache.stream_transactions().try_collect().await?; // Ensure at least one transaction exists assert!(!transactions.is_empty());