Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ const TXS_FETCHED: &str = "signet.builder.cache.txs_fetched";
const TXS_FETCHED_HELP: &str = "Transactions fetched per poll cycle.";

const SSE_RECONNECT_ATTEMPTS: &str = "signet.builder.cache.sse_reconnect_attempts";
const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE transaction stream reconnect attempts.";
const SSE_RECONNECT_ATTEMPTS_HELP: &str = "SSE stream reconnect attempts.";

const SSE_SUBSCRIBE_ERRORS: &str = "signet.builder.cache.sse_subscribe_errors";
const SSE_SUBSCRIBE_ERRORS_HELP: &str = "SSE stream subscription failures.";

const BUNDLE_POLL_COUNT: &str = "signet.builder.cache.bundle_poll_count";
const BUNDLE_POLL_COUNT_HELP: &str = "Bundle cache poll attempts.";
Expand Down Expand Up @@ -152,6 +155,7 @@ static DESCRIPTIONS: LazyLock<()> = LazyLock::new(|| {
describe_counter!(TX_POLL_ERRORS, TX_POLL_ERRORS_HELP);
describe_histogram!(TXS_FETCHED, TXS_FETCHED_HELP);
describe_counter!(SSE_RECONNECT_ATTEMPTS, SSE_RECONNECT_ATTEMPTS_HELP);
describe_counter!(SSE_SUBSCRIBE_ERRORS, SSE_SUBSCRIBE_ERRORS_HELP);
describe_counter!(BUNDLE_POLL_COUNT, BUNDLE_POLL_COUNT_HELP);
describe_counter!(BUNDLE_POLL_ERRORS, BUNDLE_POLL_ERRORS_HELP);
describe_histogram!(BUNDLES_FETCHED, BUNDLES_FETCHED_HELP);
Expand Down Expand Up @@ -243,6 +247,11 @@ pub(crate) fn inc_sse_reconnect_attempts() {
counter!(SSE_RECONNECT_ATTEMPTS).increment(1);
}

/// Increment the SSE subscribe error counter.
pub(crate) fn inc_sse_subscribe_errors() {
counter!(SSE_SUBSCRIBE_ERRORS).increment(1);
}

/// Increment the bundle poll attempt counter.
pub(crate) fn inc_bundle_poll_count() {
counter!(BUNDLE_POLL_COUNT).increment(1);
Expand Down
266 changes: 196 additions & 70 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,87 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use futures_util::{TryFutureExt, TryStreamExt};
use crate::{config::BuilderConfig, tasks::env::SimEnv};
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
use signet_sim::{ProviderStateSource, SimItemValidity, check_bundle_tx_list};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use std::{ops::ControlFlow, pin::Pin, time::Duration};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
sync::{mpsc, watch},
task::JoinHandle,
time::{self, Duration},
time,
};
use tracing::{Instrument, debug_span, trace, trace_span, warn};
use tracing::{Instrument, debug, debug_span, trace, warn};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
type SseStream = Pin<Box<dyn Stream<Item = Result<CachedBundle, BuilderTxCacheError>> + Send>>;

/// The BundlePoller polls the tx-pool for bundles.
const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1);
const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30);

/// The BundlePoller fetches bundles from the tx-pool on startup and on each
/// block environment change, and subscribes to an SSE stream for real-time
/// delivery of new bundles in between.
#[derive(Debug)]
pub struct BundlePoller {
/// The builder configuration values.
config: &'static BuilderConfig,

/// Client for the tx cache.
tx_cache: BuilderTxCache,

/// Defines the interval at which the bundler polls the tx-pool for bundles.
poll_interval_ms: u64,
/// Receiver for block environment updates, used to trigger refetches.
envs: watch::Receiver<Option<SimEnv>>,
}

impl Default for BundlePoller {
fn default() -> Self {
Self::new()
}
}

/// Implements a poller for the block builder to pull bundles from the tx-pool.
impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config.
pub fn new() -> Self {
Self::new_with_poll_interval_ms(POLL_INTERVAL_MS)
}

/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
/// Returns a new [`BundlePoller`] with the given block environment receiver.
pub fn new(envs: watch::Receiver<Option<SimEnv>>) -> Self {
let config = crate::config();
let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token());
Self { config, tx_cache, poll_interval_ms }
Self { config, tx_cache, envs }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
/// Pulls every bundle currently in the cache, paginating until the stream
/// is exhausted. Pure fetch — no metrics, no forwarding.
async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
self.tx_cache.stream_bundles().try_collect().await
}

/// Fetches all bundles from the tx-cache, paginating through all available pages.
pub async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
self.tx_cache.stream_bundles().try_collect().await
/// Fetches all bundles from the cache and forwards each to the outbound
/// channel. Records poll metrics around the fetch.
async fn fetch_and_forward(&self, outbound: &mpsc::UnboundedSender<CachedBundle>) {
crate::metrics::inc_bundle_poll_count();
Comment thread
Fraser999 marked this conversation as resolved.
// NotOurSlot is expected whenever the builder isn't slot-permissioned;
// don't bump the error counter or warn.
let Ok(bundles) = self
.check_bundle_cache()
.inspect_err(|error| match error {
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
}
_ => {
crate::metrics::inc_bundle_poll_errors();
warn!(%error, "Failed to fetch bundles from tx-cache");
}
})
.await
else {
return;
};

crate::metrics::record_bundles_fetched(bundles.len());
trace!(count = bundles.len(), "found bundles");
for bundle in bundles {
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
}
}

/// Spawns a tokio task to check the validity of all host transactions in a
/// bundle before sending it to the cache task via the outbound channel.
///
/// Uses [`check_bundle_tx_list`] from `signet-sim` to validate host tx nonces
/// and balance against the host chain. Drops bundles that are not currently valid.
fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) {
fn spawn_check_bundle_nonces(
bundle: CachedBundle,
outbound: mpsc::UnboundedSender<CachedBundle>,
) {
let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id);
tokio::spawn(async move {
let recovered = match bundle.bundle.try_to_recovered() {
Expand Down Expand Up @@ -114,55 +133,162 @@ impl BundlePoller {
});
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
/// Returns `None` on connection failure; the caller is responsible for
/// scheduling a retry. Avoids the empty-stream sentinel pattern that
/// would double-log "stream ended" on a failure that never opened.
async fn subscribe(&self) -> Option<SseStream> {
self.tx_cache
.subscribe_bundles()
.await
.inspect(
|_| debug!(url = %self.config.tx_pool_url, "SSE bundle subscription established"),
)
.inspect_err(|error| match error {
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to subscribe to bundles");
}
_ => {
crate::metrics::inc_sse_subscribe_errors();
warn!(%error, "Failed to open SSE bundle subscription");
}
})
.ok()
.map(|s| Box::pin(s) as SseStream)
}

/// Loops with exponential backoff until either a fresh SSE stream is
/// established (returned as `Some`) or the outbound channel is closed
/// (returned as `None`, signalling the task should shut down). Runs a
/// full refetch alongside each subscribe attempt to cover items missed
/// while disconnected.
async fn reconnect(
&mut self,
outbound: &mpsc::UnboundedSender<CachedBundle>,
backoff: &mut Duration,
) -> Option<SseStream> {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
if outbound.is_closed() {
return None;
}
crate::metrics::inc_sse_reconnect_attempts();
tokio::select! {
// Biased: a block env change wins over the backoff sleep. An env
// change triggers a full refetch below anyway, which supersedes the
// sleep-then-reconnect path — so there's no point waiting out the
// backoff.
biased;
_ = self.envs.changed() => {}
_ = time::sleep(*backoff) => {}
}
*backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF);
let (_, stream) = tokio::join!(self.fetch_and_forward(outbound), self.subscribe());
if let Some(stream) = stream {
return Some(stream);
}
}
}

// Check this here to avoid making the web request if we know
// we don't need the results.
/// Reconnects and swaps in the fresh stream, or returns `Break` if the
/// outbound channel closed during the reconnect loop.
async fn try_reconnect(
&mut self,
outbound: &mpsc::UnboundedSender<CachedBundle>,
backoff: &mut Duration,
stream: &mut SseStream,
) -> ControlFlow<()> {
match self.reconnect(outbound, backoff).await {
Some(s) => {
*stream = s;
ControlFlow::Continue(())
}
None => ControlFlow::Break(()),
}
}

/// Returns `Break` when the outbound channel has closed and the task
/// should shut down.
async fn handle_sse_item(
&mut self,
item: Option<Result<CachedBundle, BuilderTxCacheError>>,
outbound: &mpsc::UnboundedSender<CachedBundle>,
backoff: &mut Duration,
stream: &mut SseStream,
) -> ControlFlow<()> {
match item {
Some(Ok(bundle)) => {
*backoff = INITIAL_RECONNECT_BACKOFF;
if outbound.is_closed() {
trace!("No receivers left, shutting down");
return ControlFlow::Break(());
}
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
ControlFlow::Continue(())
}
Some(Err(error)) => {
warn!(%error, "SSE bundle stream interrupted, reconnecting");
self.try_reconnect(outbound, backoff, stream).await
}
None => {
warn!("SSE bundle stream ended, reconnecting");
Comment thread
Fraser999 marked this conversation as resolved.
self.try_reconnect(outbound, backoff, stream).await
}
}
Comment thread
Evalir marked this conversation as resolved.
}

async fn task_future(mut self, outbound: mpsc::UnboundedSender<CachedBundle>) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should span this and pass the span to the lifecycle methods as necessary

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and as I think about it more, we probably want to add at least the block number to span's log fields.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4task_future now carries an #[instrument] span with url and block_number fields. block_number is recorded on startup and re-recorded on every env-change branch so the field tracks the current block. The lifecycle methods (fetch_and_forward, subscribe, reconnect, handle_sse_item) inherit this span automatically via async context propagation, so I dropped the redundant per-method trace_span!s.

let (_, sub) = tokio::join!(self.fetch_and_forward(&outbound), self.subscribe());
let mut backoff = INITIAL_RECONNECT_BACKOFF;
let mut sse_stream = match sub {
Some(s) => s,
None => match self.reconnect(&outbound, &mut backoff).await {
Some(s) => s,
None => return,
},
};

loop {
if outbound.is_closed() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is removed from this arm, but it's still called in the tx cache arm. Need to check outbound.is_closed somewhere in this arm. In its current state, the simulator could drop its receiver while the SSE is in the error or none branch and would repeat its reconnection loop forever.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Claude Code]

Done in 13b25a4 — added an if outbound.is_closed() { break; } guard at the top of task_future's select loop, plus an early outbound.is_closed() check at the top of reconnect's loop body. Together those break the forever-reconnect cycle: if the simulator drops its receiver while we're stuck in a stream error/None loop, the next iteration of either loop exits cleanly.

span.in_scope(|| trace!("No receivers left, shutting down"));
debug!("Outbound channel closed, shutting down");
break;
}

crate::metrics::inc_bundle_poll_count();
let Ok(bundles) = self
.check_bundle_cache()
.inspect_err(|error| match error {
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
tokio::select! {
item = sse_stream.next() => {
if self
.handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream)
.await
.is_break()
{
break;
}
_ => {
crate::metrics::inc_bundle_poll_errors();
warn!(%error, "Failed to fetch bundles from tx-cache");
}
res = self.envs.changed() => {
if res.is_err() {
debug!("Block env channel closed, shutting down");
break;
}
})
.instrument(span.clone())
.await
else {
time::sleep(self.poll_duration()).await;
continue;
};

{
let _guard = span.entered();
crate::metrics::record_bundles_fetched(bundles.len());
trace!(count = bundles.len(), "fetched bundles from tx-cache");
for bundle in bundles {
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
// Run the refetch under the BlockConstruction span built by
// EnvTask, so its sim.ru.number / sim.host.number fields
// attach to anything the refetch logs.
let span = self
.envs
.borrow()
.as_ref()
.map_or_else(tracing::Span::none, |env| env.clone_span());
async {
debug!("Block env changed, refetching all bundles");
self.fetch_and_forward(&outbound).await;
}
.instrument(span)
.await;
}
}

time::sleep(self.poll_duration()).await;
}
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(self) -> (UnboundedReceiver<CachedBundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();

/// Spawns the task future and returns a receiver for bundles it finds.
pub fn spawn(self) -> (mpsc::UnboundedReceiver<CachedBundle>, JoinHandle<()>) {
let (outbound, inbound) = mpsc::unbounded_channel();
let jh = tokio::spawn(self.task_future(outbound));

(inbound, jh)
}
}
2 changes: 1 addition & 1 deletion src/tasks/cache/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl CacheTasks {
let (tx_receiver, tx_poller) = tx_poller.spawn();

// Bundle Poller pulls bundles from the cache
let bundle_poller = BundlePoller::new();
let bundle_poller = BundlePoller::new(self.block_env.clone());
let (bundle_receiver, bundle_poller) = bundle_poller.spawn();

// Set up the cache task
Expand Down
7 changes: 5 additions & 2 deletions tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

use builder::test_utils::{setup_logging, setup_test_config};
use eyre::Result;
use futures_util::TryStreamExt;
use init4_bin_base::perms::tx_cache::BuilderTxCache;

#[tokio::test]
async fn test_bundle_poller_roundtrip() -> Result<()> {
setup_logging();
setup_test_config();

let bundle_poller = builder::tasks::cache::BundlePoller::new();
let config = builder::config();
let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token());

let _ = bundle_poller.check_bundle_cache().await?;
let _bundles: Vec<_> = tx_cache.stream_bundles().try_collect().await?;

Ok(())
}