diff --git a/Cargo.lock b/Cargo.lock index eb21d26..fcd6537 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2758,7 +2758,7 @@ dependencies = [ [[package]] name = "jito-protos" -version = "0.2.13" +version = "0.2.14" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", @@ -2769,7 +2769,7 @@ dependencies = [ [[package]] name = "jito-shredstream-proxy" -version = "0.2.13" +version = "0.2.14" dependencies = [ "ahash 0.8.11", "arc-swap", diff --git a/Cargo.toml b/Cargo.toml index 27d977f..912abc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["examples", "jito_protos", "proxy"] resolver = "2" [workspace.package] -version = "0.2.13" +version = "0.2.14" description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://docs.jito.wtf/lowlatencytxnfeed/ for details." authors = ["Jito Team "] homepage = "https://jito.wtf/" diff --git a/proxy/src/forwarder.rs b/proxy/src/forwarder.rs index b31f3b7..6db28da 100644 --- a/proxy/src/forwarder.rs +++ b/proxy/src/forwarder.rs @@ -55,6 +55,7 @@ pub fn start_forwarder_threads( src_port: u16, maybe_multicast_socket: Option>, num_threads: Option, + coalesce: Duration, deduper: Arc>>, should_reconstruct_shreds: bool, entry_sender: Arc>, @@ -148,7 +149,7 @@ pub fn start_forwarder_threads( packet_sender, recycler.clone(), forward_stats.clone(), - Duration::default(), + coalesce, false, None, false, diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 9b50a08..04aece6 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -187,6 +187,13 @@ struct CommonArgs { #[arg(long, env)] num_threads: Option, + /// Time in milliseconds to coalesce incoming packets into a batch before forwarding. + /// `0` (default) forwards each batch as soon as it is received for lowest latency. + /// A higher value waits up to this long to build larger batches, reducing CPU usage + /// at the cost of added latency. + #[arg(long, env, default_value_t = 0)] + coalesce_ms: u64, + /// Address to bind prometheus metrics server to. If not provided, prometheus server is disabled. #[arg(long, env)] prometheus_bind_addr: Option, @@ -356,6 +363,7 @@ fn main() -> Result<(), ShredstreamProxyError> { args.src_bind_port, maybe_multicast_socket, args.num_threads, + Duration::from_millis(args.coalesce_ms), deduper.clone(), args.grpc_service_port.is_some(), entry_sender.clone(),