From 65ca87c2937e79691b391a7ee0dc682f10f6463d Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 2 Apr 2026 19:52:39 +0200 Subject: [PATCH 1/7] feat: replace TxPoller polling with SSE streaming Switch TxPoller from 1s timer-based polling to SSE streaming for real-time transaction delivery. The new lifecycle: 1. Full fetch of all transactions at startup 2. SSE stream for real-time new transaction delivery 3. Full refetch on each block environment change Adds exponential backoff (1s-30s) on SSE reconnection to prevent tight loops when the endpoint is unavailable. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 177 ++++++++++++++++++++++++++------------ Cargo.toml | 2 +- src/tasks/cache/system.rs | 2 +- src/tasks/cache/tx.rs | 171 ++++++++++++++++++++++-------------- tests/tx_poller_test.rs | 13 ++- 5 files changed, 237 insertions(+), 128 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93d1ce79..19502c2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1455,9 +1455,9 @@ checksum = "4858a9d740c5007a9069007c3b4e91152d0506f13c1b31dd49051fd537656156" [[package]] name = "async-compression" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1" +checksum = "e79b3f8a79cccc2898f31920fc69f304859b3bd567490f75ebf51ae1c792a9ac" dependencies = [ "compression-codecs", "compression-core", @@ -2539,9 +2539,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.60" +version = "1.2.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" +checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d" dependencies = [ "find-msvc-tools", "jobserver", @@ -2781,9 +2781,9 @@ dependencies = [ [[package]] name = "compression-codecs" -version = "0.4.37" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7" +checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "brotli", "compression-core", @@ -2795,9 +2795,9 @@ dependencies = [ [[package]] name = "compression-core" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" +checksum = "cc14f565cf027a105f7a44ccf9e5b424348421a1d8952a8fc9d499d313107789" [[package]] name = "concat-kdf" @@ -2914,9 +2914,9 @@ dependencies = [ [[package]] name = "crc-catalog" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +checksum = "217698eaf96b4a3f0bc4f3662aaa55bdf913cd54d7204591faa790070c6d0853" [[package]] name = "crc32fast" @@ -3217,15 +3217,15 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" [[package]] name = "data-encoding-macro" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8142a83c17aa9461d637e649271eae18bf2edd00e91f2e105df36c3c16355bdb" +checksum = "3259c913752a86488b501ed8680446a5ed2d5aeac6e596cb23ba3800768ea32c" dependencies = [ "data-encoding", "data-encoding-macro-internal", @@ -3233,9 +3233,9 @@ dependencies = [ [[package]] name = "data-encoding-macro-internal" -version = "0.1.17" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" +checksum = "ccc2776f0c61eca1ca32528f85548abd1a4be8fb53d1b21c013e4f18da1e7090" dependencies = [ "data-encoding", "syn 2.0.117", @@ -3780,6 +3780,28 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + +[[package]] +name = "evmap" +version = "11.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8874945f036109c72242964c1174cf99434e30cfa45bf45fedc983f50046f8" +dependencies = [ + "hashbag", + "left-right", + "smallvec", +] + [[package]] name = "eyre" version = "0.6.12" @@ -4146,6 +4168,21 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42012b0f064e01aa58b545fe3727f90f7dd4020f4a3ea735b50344965f5a57e9" +[[package]] +name = "generator" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows-link", + "windows-result", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -4345,6 +4382,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbag" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7040a10f52cba493ddb09926e15d10a9d8a28043708a405931fe4c6f19fac064" + [[package]] name = "hashbrown" version = "0.12.3" @@ -5167,9 +5210,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.95" +version = "0.3.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" +checksum = "a1840c94c045fbcf8ba2812c95db44499f7c64910a912551aaaa541decebcacf" dependencies = [ "cfg-if", "futures-util", @@ -5456,11 +5499,22 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "left-right" +version = "0.11.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f0c21e4c8ff95f487fb34e6f9182875f42c84cef966d29216bf115d9bba835a" +dependencies = [ + "crossbeam-utils", + "loom", + "slab", +] + [[package]] name = "libc" -version = "0.2.185" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libgit2-sys" @@ -5619,6 +5673,19 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber 0.3.23", +] + [[package]] name = "lru" version = "0.16.4" @@ -5743,12 +5810,12 @@ dependencies = [ [[package]] name = "metrics" -version = "0.24.3" +version = "0.24.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +checksum = "b7cd3e9eb685089c784f5769b1197d348c7274bc20d4e1349650f63b91b6d0af" dependencies = [ - "ahash", "portable-atomic", + "rapidhash", ] [[package]] @@ -5764,11 +5831,12 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.18.1" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3589659543c04c7dc5526ec858591015b87cd8746583b51b48ef4353f99dbcda" +checksum = "5c0ca2990f7f78a72c4000ddce186db7d1b700477426563ee851c95ea3c0d0c4" dependencies = [ "base64 0.22.1", + "evmap", "http-body-util", "hyper", "hyper-rustls", @@ -5802,9 +5870,9 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.20.1" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +checksum = "55ff5c12b797ebf094dc7c1d87e905efc0329cba332f96d51db03875441012b5" dependencies = [ "crossbeam-epoch", "crossbeam-utils", @@ -5813,6 +5881,7 @@ dependencies = [ "quanta", "rand 0.9.4", "rand_xoshiro", + "rapidhash", "sketches-ddsketch", ] @@ -5937,11 +6006,10 @@ dependencies = [ [[package]] name = "multihash" -version = "0.19.4" +version = "0.19.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ace881e3f514092ce9efbcb8f413d0ad9763860b828981c2de51ddc666936c" +checksum = "577c63b00ad74d57e8c9aa870b5fccebf2fd64a308a5aee9f1bb88e4aea19447" dependencies = [ - "no_std_io2", "unsigned-varint", ] @@ -5962,15 +6030,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "no_std_io2" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a3564ce7035b1e4778d8cb6cacebb5d766b5e8fe5a75b9e441e33fb61a872c6" -dependencies = [ - "memchr", -] - [[package]] name = "nom" version = "7.1.3" @@ -10473,9 +10532,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.38" +version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ "aws-lc-rs", "log", @@ -10501,9 +10560,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.14.0" +version = "1.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" dependencies = [ "web-time", "zeroize", @@ -10631,6 +10690,12 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -11189,9 +11254,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba06dc92c72b877f042ba141c7ee553c68aab1c6e077a3ba1adb2eae17fdaff4" dependencies = [ "alloy", + "eventsource-stream", "futures-util", "reqwest", "serde", + "serde_json", "signet-bundle", "signet-constants", "signet-types", @@ -12500,9 +12567,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" +checksum = "df52b6d9b87e0c74c9edfa1eb2d9bf85e5d63515474513aa50fa181b3c4f5db1" dependencies = [ "cfg-if", "once_cell", @@ -12513,9 +12580,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.68" +version = "0.4.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" +checksum = "af934872acec734c2d80e6617bbb5ff4f12b052dd8e6332b0817bce889516084" dependencies = [ "js-sys", "wasm-bindgen", @@ -12523,9 +12590,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" +checksum = "78b1041f495fb322e64aca85f5756b2172e35cd459376e67f2a6c9dffcedb103" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -12533,9 +12600,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" +checksum = "9dcd0ff20416988a18ac686d4d4d0f6aae9ebf08a389ff5d29012b05af2a1b41" dependencies = [ "bumpalo", "proc-macro2", @@ -12546,9 +12613,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" +checksum = "49757b3c82ebf16c57d69365a142940b384176c24df52a087fb748e2085359ea" dependencies = [ "unicode-ident", ] @@ -12616,9 +12683,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.95" +version = "0.3.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" +checksum = "2eadbac71025cd7b0834f20d1fe8472e8495821b4e9801eb0a60bd1f19827602" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 73b80026..b0600b84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ name = "zenith-builder-example" path = "bin/builder.rs" [dependencies] -init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] } +init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] } signet-constants = { version = "0.16.0-rc.17" } signet-sim = { version = "0.16.0-rc.17" } diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 81223a36..89698a4d 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -23,7 +23,7 @@ impl CacheTasks { /// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s. pub fn spawn(&self) -> CacheSystem { // Tx Poller pulls transactions from the cache - let tx_poller = TxPoller::new(); + let tx_poller = TxPoller::new(self.block_env.clone()); let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 25a481d9..5459edab 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -1,59 +1,46 @@ //! Transaction service responsible for fetching and sending transactions to the simulator. -use crate::config::BuilderConfig; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use futures_util::{TryFutureExt, TryStreamExt}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; -use std::time::Duration; -use tokio::{sync::mpsc, task::JoinHandle, time}; -use tracing::{Instrument, debug, debug_span, trace, trace_span}; +use std::{pin::Pin, time::Duration}; +use tokio::{sync::{mpsc, watch}, task::JoinHandle, time}; +use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; -/// Poll interval for the transaction poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +type SseStream = Pin> + Send>>; /// Implements a poller for the block builder to pull transactions from the /// transaction pool. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TxPoller { /// Config values from the Builder. config: &'static BuilderConfig, /// Client for the tx cache. tx_cache: TxCache, - /// Defines the interval at which the service should poll the cache. - poll_interval_ms: u64, -} - -impl Default for TxPoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, } -/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool -/// and sends them into the provided channel sender. +/// [`TxPoller`] fetches transactions from the transaction pool on startup +/// and on each block environment change, and subscribes to an SSE stream +/// for real-time delivery of new transactions in between. impl TxPoller { - /// Returns a new [`TxPoller`] with the given config. - /// * Defaults to 1000ms poll interval (1s). - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } + const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); + const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); - /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`TxPoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = TxCache::new(config.tx_pool_url.clone()); - Self { config, tx_cache, poll_interval_ms } - } - - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) + Self { config, tx_cache, envs } } - // Spawn a tokio task to check the nonce of a transaction before sending - // it to the cachetask via the outbound channel. + /// Spawn a tokio task to check the nonce of a transaction before sending + /// it to the cachetask via the outbound channel. fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_nonce", tx_id = %tx.tx_hash()); @@ -95,46 +82,104 @@ impl TxPoller { }); } - /// Polls the transaction cache for transactions, paginating through all available pages. - pub async fn check_tx_cache(&self) -> Result, TxCacheError> { - self.tx_cache.stream_transactions().try_collect().await + /// Fetches all transactions from the cache, forwarding each to nonce + /// checking before it reaches the [`CacheTask`]. + async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); + + counter!("signet.builder.cache.tx_poll_count").increment(1); + if let Ok(transactions) = self + .tx_cache + .stream_transactions() + .try_collect::>() + .inspect_err(|error| { + counter!("signet.builder.cache.tx_poll_errors").increment(1); + debug!(%error, "Error fetching transactions"); + }) + .instrument(span.clone()) + .await + { + let _guard = span.entered(); + histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64); + trace!(count = transactions.len(), "found transactions"); + for tx in transactions { + self.spawn_check_nonce(tx, outbound.clone()); + } + } } - async fn task_future(self, outbound: mpsc::UnboundedSender) { - loop { - let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url); - - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); - break; + /// Opens an SSE subscription to the transaction feed. Returns an empty + /// stream on connection failure so the caller can handle reconnection + /// uniformly. + async fn subscribe(&self) -> SseStream { + match self.tx_cache.subscribe_transactions().await { + Ok(stream) => { + debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established"); + Box::pin(stream) + } + Err(error) => { + warn!(%error, "Failed to open SSE transaction subscription"); + Box::pin(futures_util::stream::empty()) } + } + } - crate::metrics::inc_tx_poll_count(); - if let Ok(transactions) = self - .check_tx_cache() - .inspect_err(|error| { - crate::metrics::inc_tx_poll_errors(); - debug!(%error, "Error fetching transactions"); - }) - .instrument(span.clone()) - .await - { - let _guard = span.entered(); - crate::metrics::record_txs_fetched(transactions.len()); - trace!(count = transactions.len(), "found transactions"); - for tx in transactions.into_iter() { + /// Reconnects the SSE stream with backoff. Performs a full refetch to + /// cover any items missed while disconnected. + async fn reconnect( + &self, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + ) -> SseStream { + time::sleep(*backoff).await; + *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); + self.full_fetch(outbound).await; + self.subscribe().await + } + + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + // Initial full fetch of all transactions currently in the cache. + self.full_fetch(&outbound).await; + + // Open the SSE stream for real-time delivery of new transactions. + let mut sse_stream = self.subscribe().await; + let mut backoff = Self::INITIAL_RECONNECT_BACKOFF; + + loop { + tokio::select! { + item = sse_stream.next() => { + let Some(result) = item else { + warn!("SSE transaction stream ended, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + continue; + }; + let Ok(tx) = result else { + warn!(error = %result.unwrap_err(), "SSE transaction stream error, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + continue; + }; + backoff = Self::INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } self.spawn_check_nonce(tx, outbound.clone()); } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); + break; + } + trace!("Block env changed, refetching all transactions"); + self.full_fetch(&outbound).await; + } } - - time::sleep(self.poll_duration()).await; } } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to - /// its sender. + /// Spawns a task that fetches all current transactions, then subscribes + /// to the SSE feed for real-time updates, refetching on each new block + /// environment. pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index d607d8c6..40ef31c6 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,11 +1,10 @@ #![cfg(feature = "test-utils")] use alloy::{primitives::U256, signers::local::PrivateKeySigner}; -use builder::{ - tasks::cache::TxPoller, - test_utils::{new_signed_tx, setup_logging, setup_test_config}, -}; +use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; use eyre::{Ok, Result}; +use futures_util::TryStreamExt; +use signet_tx_cache::TxCache; #[tokio::test] async fn test_tx_roundtrip() -> Result<()> { @@ -15,11 +14,9 @@ async fn test_tx_roundtrip() -> Result<()> { // Post a transaction to the cache post_tx().await?; - // Create a new poller - let poller = TxPoller::new(); - // Fetch transactions from the pool - let transactions = poller.check_tx_cache().await?; + let tx_cache = TxCache::new(builder::config().tx_pool_url.clone()); + let transactions: Vec<_> = tx_cache.stream_transactions().try_collect().await?; // Ensure at least one transaction exists assert!(!transactions.is_empty()); From 7d0d80d2d62f3504f2a8269920f0abd55ae19199 Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 2 Apr 2026 20:03:00 +0200 Subject: [PATCH 2/7] fix: rustfmt and rustdoc CI failures Expand tokio import for nightly rustfmt, remove unresolved `CacheTask` rustdoc link. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/tasks/cache/tx.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 5459edab..d92ed251 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -8,7 +8,11 @@ use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; use std::{pin::Pin, time::Duration}; -use tokio::{sync::{mpsc, watch}, task::JoinHandle, time}; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, + time, +}; use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; type SseStream = Pin> + Send>>; @@ -83,7 +87,7 @@ impl TxPoller { } /// Fetches all transactions from the cache, forwarding each to nonce - /// checking before it reaches the [`CacheTask`]. + /// checking before it reaches the cache task. async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); From 1bef5f572cfc92de98724f44380088f1e6b9b3ed Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 15 Apr 2026 18:35:35 +0200 Subject: [PATCH 3/7] fix: make TxPoller reconnect responsive to block env changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Race the backoff sleep against envs.changed() so a block env change arriving during reconnect cuts the sleep short, instead of buffering up to 30s while the simulator operates on a stale cache. Also replace the nested let-else + unwrap_err in the SSE arm with a single match — no behavior change, drops the double-unwrap. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/tasks/cache/tx.rs | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index d92ed251..06701657 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -131,11 +131,16 @@ impl TxPoller { /// Reconnects the SSE stream with backoff. Performs a full refetch to /// cover any items missed while disconnected. async fn reconnect( - &self, + &mut self, outbound: &mpsc::UnboundedSender, backoff: &mut Duration, ) -> SseStream { - time::sleep(*backoff).await; + tokio::select! { + _ = time::sleep(*backoff) => {} + // Break the sleep early on block env change or channel close — + // full_fetch below serves the same purpose the env arm would have. + _ = self.envs.changed() => {} + } *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); self.full_fetch(outbound).await; self.subscribe().await @@ -152,22 +157,24 @@ impl TxPoller { loop { tokio::select! { item = sse_stream.next() => { - let Some(result) = item else { - warn!("SSE transaction stream ended, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - continue; - }; - let Ok(tx) = result else { - warn!(error = %result.unwrap_err(), "SSE transaction stream error, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - continue; - }; - backoff = Self::INITIAL_RECONNECT_BACKOFF; - if outbound.is_closed() { - trace!("No receivers left, shutting down"); - break; + match item { + Some(Ok(tx)) => { + backoff = Self::INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + self.spawn_check_nonce(tx, outbound.clone()); + } + Some(Err(error)) => { + warn!(%error, "SSE transaction stream error, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + } + None => { + warn!("SSE transaction stream ended, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + } } - self.spawn_check_nonce(tx, outbound.clone()); } res = self.envs.changed() => { if res.is_err() { From 015b51d1d78ad3998d9e0ad9fc2444d273e41e26 Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 23 Apr 2026 21:04:27 +0200 Subject: [PATCH 4/7] refactor: apply PR #259 review feedback Addresses the obvious nits from prestwich's review on src/tasks/cache/tx.rs: - Move backoff constants to module level (was assoc consts) - Use crate::metrics::inc_*/record_* helpers instead of bare counter!/histogram! macros (matches #263's metrics module) - Rewrite subscribe() as a combinator chain over the Result - Bias the select! in reconnect() so env changes preempt the backoff sleep, with an inline rationale - Run full_fetch and subscribe concurrently via tokio::join! in reconnect() - Extract handle_sse_item helper to flatten the nested match inside task_future's SSE select arm Deferred to a follow-up (need design decisions): - Split full_fetch into fetch + dispatch with better name - Replace inline backoff with backon + permanence criterion - Rename SDK stream_transactions/subscribe_transactions Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tasks/cache/tx.rs | 104 +++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 42 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 06701657..7f9728d4 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -5,9 +5,8 @@ use alloy::{ providers::Provider, }; use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; -use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; -use std::{pin::Pin, time::Duration}; +use std::{ops::ControlFlow, pin::Pin, time::Duration}; use tokio::{ sync::{mpsc, watch}, task::JoinHandle, @@ -17,6 +16,9 @@ use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; type SseStream = Pin> + Send>>; +const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); + /// Implements a poller for the block builder to pull transactions from the /// transaction pool. #[derive(Debug)] @@ -33,9 +35,6 @@ pub struct TxPoller { /// and on each block environment change, and subscribes to an SSE stream /// for real-time delivery of new transactions in between. impl TxPoller { - const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); - const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); - /// Returns a new [`TxPoller`] with the given block environment receiver. pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); @@ -91,20 +90,20 @@ impl TxPoller { async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); - counter!("signet.builder.cache.tx_poll_count").increment(1); + crate::metrics::inc_tx_poll_count(); if let Ok(transactions) = self .tx_cache .stream_transactions() .try_collect::>() .inspect_err(|error| { - counter!("signet.builder.cache.tx_poll_errors").increment(1); + crate::metrics::inc_tx_poll_errors(); debug!(%error, "Error fetching transactions"); }) .instrument(span.clone()) .await { let _guard = span.entered(); - histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64); + crate::metrics::record_txs_fetched(transactions.len()); trace!(count = transactions.len(), "found transactions"); for tx in transactions { self.spawn_check_nonce(tx, outbound.clone()); @@ -116,16 +115,13 @@ impl TxPoller { /// stream on connection failure so the caller can handle reconnection /// uniformly. async fn subscribe(&self) -> SseStream { - match self.tx_cache.subscribe_transactions().await { - Ok(stream) => { - debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established"); - Box::pin(stream) - } - Err(error) => { - warn!(%error, "Failed to open SSE transaction subscription"); - Box::pin(futures_util::stream::empty()) - } - } + self.tx_cache + .subscribe_transactions() + .await + .inspect(|_| debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established")) + .inspect_err(|error| warn!(%error, "Failed to open SSE transaction subscription")) + .map(|s| Box::pin(s) as SseStream) + .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) } /// Reconnects the SSE stream with backoff. Performs a full refetch to @@ -136,14 +132,49 @@ impl TxPoller { backoff: &mut Duration, ) -> SseStream { tokio::select! { - _ = time::sleep(*backoff) => {} - // Break the sleep early on block env change or channel close — - // full_fetch below serves the same purpose the env arm would have. + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; _ = self.envs.changed() => {} + _ = time::sleep(*backoff) => {} + } + *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); + stream + } + + /// Processes a single item yielded by the SSE stream: dispatches the tx + /// for nonce checking on success, or reconnects on error / stream end. + /// Returns `Break` when the outbound channel has closed and the task + /// should shut down. + async fn handle_sse_item( + &mut self, + item: Option>, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + stream: &mut SseStream, + ) -> ControlFlow<()> { + match item { + Some(Ok(tx)) => { + *backoff = INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + return ControlFlow::Break(()); + } + self.spawn_check_nonce(tx, outbound.clone()); + } + Some(Err(error)) => { + warn!(%error, "SSE transaction stream error, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } + None => { + warn!("SSE transaction stream ended, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } } - *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); - self.full_fetch(outbound).await; - self.subscribe().await + ControlFlow::Continue(()) } async fn task_future(mut self, outbound: mpsc::UnboundedSender) { @@ -152,28 +183,17 @@ impl TxPoller { // Open the SSE stream for real-time delivery of new transactions. let mut sse_stream = self.subscribe().await; - let mut backoff = Self::INITIAL_RECONNECT_BACKOFF; + let mut backoff = INITIAL_RECONNECT_BACKOFF; loop { tokio::select! { item = sse_stream.next() => { - match item { - Some(Ok(tx)) => { - backoff = Self::INITIAL_RECONNECT_BACKOFF; - if outbound.is_closed() { - trace!("No receivers left, shutting down"); - break; - } - self.spawn_check_nonce(tx, outbound.clone()); - } - Some(Err(error)) => { - warn!(%error, "SSE transaction stream error, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - } - None => { - warn!("SSE transaction stream ended, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - } + if self + .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) + .await + .is_break() + { + break; } } res = self.envs.changed() => { From 0fea37774037b7a1c087617d898ef7bf3246bb36 Mon Sep 17 00:00:00 2001 From: evalir Date: Mon, 27 Apr 2026 13:33:05 +0200 Subject: [PATCH 5/7] refactor: split fetch from dispatch in TxPoller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR #259 nit #2: full_fetch was named like a pure data accessor but actually dispatched nonce-check tasks per tx. Restore check_tx_cache as a private pure-fetch helper returning Result, _>, and rename full_fetch to fetch_and_dispatch — its name now matches what it does. The orchestrator uses let-else over the fetch result to drop a level of indentation. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tasks/cache/tx.rs | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 7f9728d4..8c175070 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -85,29 +85,35 @@ impl TxPoller { }); } - /// Fetches all transactions from the cache, forwarding each to nonce - /// checking before it reaches the cache task. - async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { - let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); + /// Pulls every transaction currently in the cache, paginating until the + /// stream is exhausted. Pure fetch — no metrics, no dispatch. + async fn check_tx_cache(&self) -> Result, TxCacheError> { + self.tx_cache.stream_transactions().try_collect().await + } + + /// Fetches all transactions from the cache and dispatches each one to + /// a nonce-check task. Records poll metrics around the fetch. + async fn fetch_and_dispatch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("TxPoller::fetch_and_dispatch", url = %self.config.tx_pool_url); crate::metrics::inc_tx_poll_count(); - if let Ok(transactions) = self - .tx_cache - .stream_transactions() - .try_collect::>() + let Ok(transactions) = self + .check_tx_cache() .inspect_err(|error| { crate::metrics::inc_tx_poll_errors(); debug!(%error, "Error fetching transactions"); }) .instrument(span.clone()) .await - { - let _guard = span.entered(); - crate::metrics::record_txs_fetched(transactions.len()); - trace!(count = transactions.len(), "found transactions"); - for tx in transactions { - self.spawn_check_nonce(tx, outbound.clone()); - } + else { + return; + }; + + let _guard = span.entered(); + crate::metrics::record_txs_fetched(transactions.len()); + trace!(count = transactions.len(), "found transactions"); + for tx in transactions { + self.spawn_check_nonce(tx, outbound.clone()); } } @@ -141,7 +147,7 @@ impl TxPoller { _ = time::sleep(*backoff) => {} } *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); - let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); + let (_, stream) = tokio::join!(self.fetch_and_dispatch(outbound), self.subscribe()); stream } @@ -179,7 +185,7 @@ impl TxPoller { async fn task_future(mut self, outbound: mpsc::UnboundedSender) { // Initial full fetch of all transactions currently in the cache. - self.full_fetch(&outbound).await; + self.fetch_and_dispatch(&outbound).await; // Open the SSE stream for real-time delivery of new transactions. let mut sse_stream = self.subscribe().await; @@ -202,7 +208,7 @@ impl TxPoller { break; } trace!("Block env changed, refetching all transactions"); - self.full_fetch(&outbound).await; + self.fetch_and_dispatch(&outbound).await; } } } From ddf5548f102d9b0f65b2f9808af05f85d9f2c760 Mon Sep 17 00:00:00 2001 From: evalir Date: Tue, 28 Apr 2026 17:54:03 +0200 Subject: [PATCH 6/7] refactor: apply fraser's PR #259 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move struct doc to reflect SSE behavior (was still describing the old polling implementation); drop the redundant impl-block doc. - Run initial fetch + SSE subscribe concurrently in task_future via tokio::join!, mirroring the reconnect path. - Bump "Block env changed" log from trace to debug — env changes are infrequent and worth seeing in normal debug output. - Add a sse_reconnect_attempts counter; increment once per reconnect call. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/metrics.rs | 9 +++++++++ src/tasks/cache/tx.rs | 21 ++++++++++----------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 14c57dbd..ac9b8f01 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -33,6 +33,9 @@ const TX_POLL_ERRORS_HELP: &str = "Transaction cache poll errors."; const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched"; const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle."; +const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts"; +const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE transaction stream reconnect attempts."; + const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count"; const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts."; @@ -148,6 +151,7 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| { describe_counter!(TX_POLL_COUNT, TX_POLL_COUNT_HELP); describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP); describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP); + describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP); describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP); describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP); describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP); @@ -234,6 +238,11 @@ pub(crate) fn record_txs_fetched(count: usize) { histogram!(TXS_FETCHED).record(count as f64); } +/// Increment the SSE reconnect attempts counter. +pub(crate) fn inc_sse_reconnect_attempts() { + counter!(SSE_RECONNECT_ATTEMPTS).increment(1); +} + /// Increment the bundle poll attempt counter. pub(crate) fn inc_bundle_poll_count() { counter!(BUNDLE_POLL_COUNT).increment(1); diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 8c175070..015126bf 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -19,8 +19,9 @@ type SseStream = Pin> + S const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); -/// Implements a poller for the block builder to pull transactions from the -/// transaction pool. +/// Fetches transactions from the transaction pool on startup and on each +/// block environment change, and subscribes to an SSE stream for real-time +/// delivery of new transactions in between. #[derive(Debug)] pub struct TxPoller { /// Config values from the Builder. @@ -31,9 +32,6 @@ pub struct TxPoller { envs: watch::Receiver>, } -/// [`TxPoller`] fetches transactions from the transaction pool on startup -/// and on each block environment change, and subscribes to an SSE stream -/// for real-time delivery of new transactions in between. impl TxPoller { /// Returns a new [`TxPoller`] with the given block environment receiver. pub fn new(envs: watch::Receiver>) -> Self { @@ -137,6 +135,7 @@ impl TxPoller { outbound: &mpsc::UnboundedSender, backoff: &mut Duration, ) -> SseStream { + crate::metrics::inc_sse_reconnect_attempts(); tokio::select! { // Biased: a block env change wins over the backoff sleep. An env // change triggers a full refetch below anyway, which supersedes the @@ -184,11 +183,11 @@ impl TxPoller { } async fn task_future(mut self, outbound: mpsc::UnboundedSender) { - // Initial full fetch of all transactions currently in the cache. - self.fetch_and_dispatch(&outbound).await; - - // Open the SSE stream for real-time delivery of new transactions. - let mut sse_stream = self.subscribe().await; + // Initial full fetch of all currently-cached transactions, plus SSE + // subscription for real-time delivery, run concurrently — symmetric + // with the reconnect path. + let (_, mut sse_stream) = + tokio::join!(self.fetch_and_dispatch(&outbound), self.subscribe()); let mut backoff = INITIAL_RECONNECT_BACKOFF; loop { @@ -207,7 +206,7 @@ impl TxPoller { debug!("Block env channel closed, shutting down"); break; } - trace!("Block env changed, refetching all transactions"); + debug!("Block env changed, refetching all transactions"); self.fetch_and_dispatch(&outbound).await; } } From 6dfe101ca8c96d0bfb38ba6576ea2184ec3d12a7 Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 29 Apr 2026 12:00:38 +0200 Subject: [PATCH 7/7] refactor: hold reconnect backoff on TxPoller Move the SSE reconnect backoff from a `&mut Duration` plumbed through `reconnect` and `handle_sse_item` to a field on `TxPoller`. The state was already specific to the running task; carrying it on self drops two parameters and one local in `task_future`. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tasks/cache/tx.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 015126bf..99939876 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -30,6 +30,10 @@ pub struct TxPoller { tx_cache: TxCache, /// Receiver for block environment updates, used to trigger refetches. envs: watch::Receiver>, + /// SSE reconnect backoff. Doubles on each reconnect (capped at + /// `MAX_RECONNECT_BACKOFF`) and resets to `INITIAL_RECONNECT_BACKOFF` + /// on each successfully received tx. + backoff: Duration, } impl TxPoller { @@ -37,7 +41,7 @@ impl TxPoller { pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = TxCache::new(config.tx_pool_url.clone()); - Self { config, tx_cache, envs } + Self { config, tx_cache, envs, backoff: INITIAL_RECONNECT_BACKOFF } } /// Spawn a tokio task to check the nonce of a transaction before sending @@ -130,11 +134,7 @@ impl TxPoller { /// Reconnects the SSE stream with backoff. Performs a full refetch to /// cover any items missed while disconnected. - async fn reconnect( - &mut self, - outbound: &mpsc::UnboundedSender, - backoff: &mut Duration, - ) -> SseStream { + async fn reconnect(&mut self, outbound: &mpsc::UnboundedSender) -> SseStream { crate::metrics::inc_sse_reconnect_attempts(); tokio::select! { // Biased: a block env change wins over the backoff sleep. An env @@ -143,9 +143,9 @@ impl TxPoller { // backoff. biased; _ = self.envs.changed() => {} - _ = time::sleep(*backoff) => {} + _ = time::sleep(self.backoff) => {} } - *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + self.backoff = (self.backoff * 2).min(MAX_RECONNECT_BACKOFF); let (_, stream) = tokio::join!(self.fetch_and_dispatch(outbound), self.subscribe()); stream } @@ -158,12 +158,11 @@ impl TxPoller { &mut self, item: Option>, outbound: &mpsc::UnboundedSender, - backoff: &mut Duration, stream: &mut SseStream, ) -> ControlFlow<()> { match item { Some(Ok(tx)) => { - *backoff = INITIAL_RECONNECT_BACKOFF; + self.backoff = INITIAL_RECONNECT_BACKOFF; if outbound.is_closed() { trace!("No receivers left, shutting down"); return ControlFlow::Break(()); @@ -172,11 +171,11 @@ impl TxPoller { } Some(Err(error)) => { warn!(%error, "SSE transaction stream error, reconnecting"); - *stream = self.reconnect(outbound, backoff).await; + *stream = self.reconnect(outbound).await; } None => { warn!("SSE transaction stream ended, reconnecting"); - *stream = self.reconnect(outbound, backoff).await; + *stream = self.reconnect(outbound).await; } } ControlFlow::Continue(()) @@ -188,13 +187,12 @@ impl TxPoller { // with the reconnect path. let (_, mut sse_stream) = tokio::join!(self.fetch_and_dispatch(&outbound), self.subscribe()); - let mut backoff = INITIAL_RECONNECT_BACKOFF; loop { tokio::select! { item = sse_stream.next() => { if self - .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) + .handle_sse_item(item, &outbound, &mut sse_stream) .await .is_break() {