diff --git a/packages/dashmate/docker-compose.rate_limiter.yml b/packages/dashmate/docker-compose.rate_limiter.yml index d652b7fa22..b75836f6ed 100644 --- a/packages/dashmate/docker-compose.rate_limiter.yml +++ b/packages/dashmate/docker-compose.rate_limiter.yml @@ -42,6 +42,10 @@ services: - GRPC_MAX_CONNECTION_AGE=1h - GRPC_MAX_CONNECTION_AGE_GRACE=10m - GRPC_PORT=8081 + # Emit RateLimit-Limit / RateLimit-Remaining / RateLimit-Reset response + # headers so rs-dapi-client can read the exact reset window and ban the + # node for that duration instead of the exponential health-ban ladder. + - LIMIT_RESPONSE_HEADERS_ENABLED=true expose: - 8081 profiles: diff --git a/packages/rs-dapi-client/src/address_list.rs b/packages/rs-dapi-client/src/address_list.rs index fc1d734ccf..2b17a27212 100644 --- a/packages/rs-dapi-client/src/address_list.rs +++ b/packages/rs-dapi-client/src/address_list.rs @@ -103,6 +103,22 @@ impl AddressStatus { self.ban_reason = reason; } + /// Ban the address for an exact `period` (server-advertised), bypassing the + /// exponential ladder used by [`AddressStatus::ban_with_reason`]. + /// + /// The ban window is flat (not exponential), but `ban_count` is raised to + /// `max(ban_count, 1)` so that `is_banned()` and `ban_info()` correctly + /// report the node as banned. Side-effect: a previously-clean node + /// (ban_count 0) enters the ladder at floor 1, meaning its *next* genuine + /// health failure via [`AddressStatus::ban_with_reason`] uses + /// `60 s × e¹ ≈ 163 s` rather than the first-rung `60 s × e⁰ = 60 s`. + /// The counter resets to 0 on [`AddressStatus::unban`]. + pub fn ban_for(&mut self, period: Duration, reason: Option) { + self.banned_until = Some(chrono::Utc::now() + period); + self.ban_count = self.ban_count.max(1); + self.ban_reason = reason; + } + /// Check if [Address] is banned. pub fn is_banned(&self) -> bool { self.ban_count > 0 @@ -182,6 +198,23 @@ impl AddressList { true } + /// Ban the address for an exact `period` (server-advertised); delegates to + /// [`AddressStatus::ban_for`] — see that method for the full contract + /// including the `ban_count` floor and ladder side-effect. + /// + /// Returns `false` if the address is not in the list. + pub fn ban_for(&self, address: &Address, period: Duration, reason: Option) -> bool { + let mut guard = self.addresses.write().unwrap(); + + let Some(status) = guard.get_mut(address) else { + return false; + }; + + status.ban_for(period, reason); + + true + } + /// Clears address' ban record /// Returns false if the address is not in the list. pub fn unban(&self, address: &Address) -> bool { @@ -237,12 +270,16 @@ impl AddressList { self.add(Address::try_from(uri).expect("valid uri")) } - /// Randomly select a not banned address. + /// Randomly select a not-banned address. + /// + /// An address is considered live when it has never been banned or when its + /// ban period has already expired. pub fn get_live_address(&self) -> Option
{ + // TODO(low): module-wide `.read()/.write().unwrap()` panics on a + // poisoned lock; adopt poison-tolerant locking consistently (SEC-003). let guard = self.addresses.read().unwrap(); let mut rng = SmallRng::from_entropy(); - let now = chrono::Utc::now(); guard @@ -755,4 +792,167 @@ mod tests { let list = AddressList::new(); assert!(list.ban_info().is_empty()); } + + #[test] + fn test_address_status_ban_for_sets_exact_window_and_min_ban_count() { + let mut status = AddressStatus::default(); + assert_eq!(status.ban_count, 0); + assert!(status.banned_until.is_none()); + + let before = chrono::Utc::now(); + status.ban_for(Duration::from_secs(45), Some("rate limited".into())); + let after = chrono::Utc::now(); + + // ban_count must be at least 1 so is_banned() / ban_info().banned are consistent. + assert_eq!(status.ban_count, 1, "ban_for sets ban_count to max(0,1)=1"); + + // banned_until should be roughly now + 45 s. + let until = status.banned_until.expect("banned_until must be set"); + let lower = (until - before).num_milliseconds() as f64 / 1000.0; + let upper = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lower >= 44.9, + "banned_until lower bound too short: {lower}s" + ); + assert!(upper <= 45.1, "banned_until upper bound too long: {upper}s"); + assert_eq!(status.ban_reason.as_deref(), Some("rate limited")); + } + + /// `ban_for` on a fresh node (ban_count = 0) raises ban_count to 1 (the + /// ladder floor). That means the *next* genuine health ban will escalate + /// from position 1 (~163 s) instead of position 0 (~60 s). This pins the + /// documented side-effect so regressions are caught. + #[test] + fn test_ban_for_raises_fresh_node_to_ladder_floor() { + let mut status = AddressStatus::default(); + assert_eq!(status.ban_count, 0, "starts clean"); + + // Rate-limit ban on a never-before-banned node. + status.ban_for(Duration::from_secs(10), Some("rl".into())); + assert_eq!( + status.ban_count, 1, + "ban_for must raise ban_count 0 → 1 (ladder floor)" + ); + + // Subsequent genuine health failure must escalate from the floor (1), + // yielding ~60 s × e^1 ≈ 163 s, NOT the first-rung ~60 s × e^0 = 60 s. + let base = Duration::from_secs(60); + let before = chrono::Utc::now(); + status.ban_with_reason(&base, None); // ban_count 1 → 2; window = 60s × e^1 + let after = chrono::Utc::now(); + assert_eq!(status.ban_count, 2); + + let until = status.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + let expected = 60.0_f64 * std::f64::consts::E; // ≈ 163 s + assert!( + lo >= expected - 0.5, + "window lower {lo:.1}s < expected {expected:.1}s (should escalate from floor 1)" + ); + assert!( + hi <= expected + 0.5, + "window upper {hi:.1}s > expected {expected:.1}s" + ); + } + + #[test] + fn test_address_status_ban_for_does_not_inflate_existing_ban_count() { + // A node already health-banned (ban_count = 3) gets rate-limited. + // ban_count must stay at 3, not grow to 4. + let mut status = AddressStatus::default(); + let base = Duration::from_secs(60); + status.ban_with_reason(&base, None); // → 1 + status.ban_with_reason(&base, None); // → 2 + status.ban_with_reason(&base, None); // → 3 + status.ban_for(Duration::from_secs(30), Some("rl".into())); + assert_eq!( + status.ban_count, 3, + "ban_for must not inflate ban_count above its existing value" + ); + } + + #[test] + fn test_address_list_ban_for_returns_false_for_unknown() { + let list = AddressList::new(); + let addr: Address = "http://127.0.0.1:3000".parse().unwrap(); + assert!(!list.ban_for(&addr, Duration::from_secs(5), None)); + } + + #[test] + fn test_address_list_ban_for_bans_known_address() { + let mut list = AddressList::new(); + let addr: Address = "http://127.0.0.1:3000".parse().unwrap(); + list.add(addr.clone()); + + assert!(list.ban_for(&addr, Duration::from_secs(60), Some("rl".into()))); + // The address must now be hidden from get_live_address. + assert!(list.get_live_address().is_none()); + // ban_count is 1 (ban_for sets max(0,1)). + let info = list.ban_info(); + assert_eq!(info.len(), 1); + assert!(info[0].banned); + assert_eq!(info[0].ban_count, 1); + } + + /// After `ban_for`'s window expires the address re-enters rotation via + /// `get_live_address`. We verify both directions: the node is hidden during + /// an active window, and becomes live once the window passes. + /// + /// A zero-duration window means `banned_until = Utc::now()` at call time; + /// `get_live_address` samples `Utc::now()` fresh, so at least one clock tick + /// separates the two calls and `banned_until < new_now` holds. + #[test] + fn test_ban_for_address_re_enters_rotation_after_window_expires() { + let mut list = AddressList::new(); + let addr: Address = "http://127.0.0.1:3000".parse().unwrap(); + list.add(addr.clone()); + + // Active 300-second window → node hidden. + assert!(list.ban_for(&addr, Duration::from_secs(300), Some("rl".into()))); + assert!( + list.get_live_address().is_none(), + "node must be hidden during active ban window" + ); + + // Re-ban with a zero window (already expired when get_live_address runs). + assert!(list.ban_for(&addr, Duration::ZERO, None)); + assert!( + list.get_live_address().is_some(), + "address must re-enter rotation after ban_for window expires" + ); + } + + /// Invariant 1 at the ladder source: the exponential ban window is + /// `base × e^ban_count`, `ban_count` incrementing on each ban. This pins the + /// exact formula independently of the `update_address_ban_status` entrypoint. + #[test] + fn test_ban_ladder_windows_match_exponential_formula() { + let mut status = AddressStatus::default(); + let base_secs = 60.0_f64; + let base = Duration::from_secs(60); + + for n in 0..3usize { + // coefficient uses ban_count BEFORE this ban (== n here). + let before = chrono::Utc::now(); + status.ban(&base); + let after = chrono::Utc::now(); + + assert_eq!(status.ban_count, n + 1, "ban_count must increment"); + let period = base_secs * (n as f64).exp(); + let banned_until = status.banned_until.expect("banned_until is set"); + let lower = (banned_until - before).num_milliseconds() as f64 / 1000.0; + let upper = (banned_until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lower >= period - 0.05, + "ban #{} window lower bound {lower}s < expected {period}s", + n + 1 + ); + assert!( + upper <= period + 0.05, + "ban #{} window upper bound {upper}s > expected {period}s", + n + 1 + ); + } + } } diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 72ccf31c37..1d0138747f 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -18,6 +18,17 @@ use crate::{ RequestSettings, }; +/// Intended minimum for the Envoy-advertised `RateLimit-Reset` ban duration. +/// Note: the `> 0` filter applied before the clamp already rejects 0 → `None`, +/// so this constant never actively clamps the lower bound — it documents intent +/// (the smallest meaningful reset is 1 s) and acts as the `.clamp(MIN, MAX)` +/// lower argument for clarity. +pub(crate) const MIN_RATE_LIMIT_BAN_SECS: u64 = 1; +/// Ceiling for the Envoy-advertised `RateLimit-Reset` ban duration. +/// Prevents a misconfigured or hostile header from parking a healthy node for +/// an unreasonably long time. +pub(crate) const MAX_RATE_LIMIT_BAN_SECS: u64 = 600; + /// General DAPI request error type. #[derive(Debug, thiserror::Error, Clone)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] @@ -67,6 +78,13 @@ impl CanRetry for DapiClientError { DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_) ) } + + fn rate_limit_ban_duration(&self) -> Option { + match self { + DapiClientError::Transport(te) => te.rate_limit_ban_duration(), + _ => None, + } + } } /// Serialization of [DapiClientError]. @@ -187,7 +205,25 @@ pub fn update_address_ban_status( if error.can_retry() { if let Some(address) = error.address.as_ref() { if applied_settings.ban_failed_address { - if address_list.ban_with_reason(address, Some(error.to_string())) { + let reason = Some(error.to_string()); + let banned = match error.rate_limit_ban_duration() { + // Envoy advertised a reset window: ban for exactly that period. + // ban_count is set to max(ban_count,1) so diagnostics see the node + // as banned, but the exponential ladder is not inflated. + Some(period) => { + tracing::debug!( + ?address, + ban_secs = period.as_secs(), + "rate-limited (ResourceExhausted): banning {address} \ + for {}s (from RateLimit-Reset header)", + period.as_secs() + ); + address_list.ban_for(address, period, reason) + } + // No rate-limit hint: normal exponential health-ban ladder. + None => address_list.ban_with_reason(address, reason), + }; + if banned { tracing::warn!( ?address, ?error, @@ -276,6 +312,150 @@ mod tests { assert!(!err.can_retry()); } + /// `rate_limit_ban_duration` returns `Some` only when the `ratelimit-reset` + /// header is present and positive on a `ResourceExhausted` response, and + /// the value is clamped to `[MIN_RATE_LIMIT_BAN_SECS, MAX_RATE_LIMIT_BAN_SECS]`. + #[test] + fn test_rate_limit_ban_duration_header_parse() { + use dapi_grpc::tonic::metadata::MetadataValue; + + // Helper: build a ResourceExhausted status with a ratelimit-reset header. + let make_rl_status = |header: Option<&str>| -> dapi_grpc::tonic::Status { + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + if let Some(v) = header { + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from(v).unwrap()); + } + status + }; + + // Normal header value: returned clamped. + let s = make_rl_status(Some("45")); + let dur = TransportError::Grpc(s).rate_limit_ban_duration(); + assert_eq!(dur, Some(Duration::from_secs(45))); + + // Value above MAX → clamped to MAX. + let s = make_rl_status(Some("9999")); + let dur = TransportError::Grpc(s).rate_limit_ban_duration(); + assert_eq!(dur, Some(Duration::from_secs(MAX_RATE_LIMIT_BAN_SECS))); + + // Clamp edge: exactly MIN (1) → 1 s (passes through unchanged). + let s = make_rl_status(Some("1")); + assert_eq!( + TransportError::Grpc(s).rate_limit_ban_duration(), + Some(Duration::from_secs(1)) + ); + + // Clamp edge: exactly MAX (600) → 600 s (not clamped). + let s = make_rl_status(Some("600")); + assert_eq!( + TransportError::Grpc(s).rate_limit_ban_duration(), + Some(Duration::from_secs(600)) + ); + + // One above MAX (601) → clamped to 600 s. + let s = make_rl_status(Some("601")); + assert_eq!( + TransportError::Grpc(s).rate_limit_ban_duration(), + Some(Duration::from_secs(600)) + ); + + // Value below MIN (0) → filtered to None before clamp. + let s = make_rl_status(Some("0")); + assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none()); + + // Non-numeric → None. + let s = make_rl_status(Some("garbage")); + assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none()); + + // Header absent → None. + let s = make_rl_status(None); + assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none()); + + // Non-ResourceExhausted code → None regardless of header. + let mut unavail = dapi_grpc::tonic::Status::unavailable("down"); + unavail + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + assert!(TransportError::Grpc(unavail) + .rate_limit_ban_duration() + .is_none()); + } + + /// When `ResourceExhausted` carries a valid `ratelimit-reset` header, + /// `update_address_ban_status` calls `ban_for` (exact period, no ladder + /// inflation); when the header is absent it falls through to `ban_with_reason` + /// (normal exponential ladder). + #[test] + fn test_update_address_ban_status_rate_limit_ban_path() { + use dapi_grpc::tonic::metadata::MetadataValue; + + let mut address_list = AddressList::new(); + let addr = mock_address(); + address_list.add(addr.clone()); + + // Build a ResourceExhausted status with ratelimit-reset: 45. + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &make_applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + // Node is banned for ~45 s. + assert!(entry.banned, "rate-limited node must be banned"); + assert_eq!(entry.ban_count, 1, "ban_count must be 1 after ban_for"); + let until = entry.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= 44.9 && hi <= 45.1, + "ban window must be ~45 s, got lo={lo} hi={hi}" + ); + } + + /// When `ResourceExhausted` has NO `ratelimit-reset` header, + /// `update_address_ban_status` must fall back to the normal `ban_with_reason` + /// ladder (not produce a zero-second or panic ban). + #[test] + fn test_update_address_ban_status_rate_limit_no_header_uses_ladder() { + let mut address_list = AddressList::new(); + let addr = mock_address(); + address_list.add(addr.clone()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc( + dapi_grpc::tonic::Status::resource_exhausted("429"), + )), + retries: 0, + address: Some(addr.clone()), + }); + update_address_ban_status(&address_list, &result, &make_applied_settings(true)); + + // The ban ladder is invoked: first ban → ban_count = 1, window = 60 s. + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + assert!( + entry.banned, + "node must be banned on ResourceExhausted without header" + ); + assert_eq!( + entry.ban_count, 1, + "first health-ladder ban → ban_count = 1" + ); + } + #[cfg(feature = "mocks")] #[test] fn test_can_retry_mock_error() { diff --git a/packages/rs-dapi-client/src/executor.rs b/packages/rs-dapi-client/src/executor.rs index 963383742a..cdab8c6db1 100644 --- a/packages/rs-dapi-client/src/executor.rs +++ b/packages/rs-dapi-client/src/executor.rs @@ -83,6 +83,10 @@ impl CanRetry for ExecutionError { fn is_no_available_addresses(&self) -> bool { self.inner.is_no_available_addresses() } + + fn rate_limit_ban_duration(&self) -> Option { + self.inner.rate_limit_ban_duration() + } } /// Request execution response. diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index b31d2b35a2..2f77b2f7fc 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -95,6 +95,15 @@ pub trait CanRetry { false } + /// If this error is a gRPC `ResourceExhausted` (Envoy rate-limit) that + /// carries a `RateLimit-Reset` metadata header, returns the server-advertised + /// ban duration (clamped to a safe range). Returns `None` for all other + /// errors and for rate-limit errors that carry no usable header (the caller + /// falls back to the normal exponential ban ladder in that case). + fn rate_limit_ban_duration(&self) -> Option { + None + } + /// Get boolean flag that indicates if the error is retryable. /// /// Deprecated in favor of [CanRetry::can_retry]. diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index f488aeaffa..bea0968fd2 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -106,6 +106,12 @@ impl CanRetry for TransportError { TransportError::Grpc(status) => status.can_retry(), } } + + fn rate_limit_ban_duration(&self) -> Option { + match self { + TransportError::Grpc(status) => status.rate_limit_ban_duration(), + } + } } /// Serialization of [TransportError]. @@ -212,6 +218,65 @@ mod tests { assert!(!non_retryable.can_retry()); } + /// `rate_limit_ban_duration` returns `Some` only for `ResourceExhausted` with + /// a parseable positive `ratelimit-reset` header. Every other code returns + /// `None` regardless of headers. + #[test] + fn test_tonic_status_rate_limit_ban_duration() { + use dapi_grpc::tonic::metadata::MetadataValue; + + // ResourceExhausted + valid header → Some. + let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + assert_eq!( + status.rate_limit_ban_duration(), + Some(std::time::Duration::from_secs(30)) + ); + + // ResourceExhausted without header → None. + let no_header = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429"); + assert!(no_header.rate_limit_ban_duration().is_none()); + + // Non-ResourceExhausted codes → None regardless. + for code in [ + Code::Ok, + Code::Unavailable, + Code::Internal, + Code::DeadlineExceeded, + ] { + let mut s = dapi_grpc::tonic::Status::new(code, "x"); + s.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + assert!( + s.rate_limit_ban_duration().is_none(), + "code {code:?} must return None" + ); + } + } + + #[test] + fn test_transport_error_rate_limit_ban_duration_delegates() { + use dapi_grpc::tonic::metadata::MetadataValue; + + let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + let rate_limited = TransportError::Grpc(status); + assert_eq!( + rate_limited.rate_limit_ban_duration(), + Some(std::time::Duration::from_secs(45)) + ); + // Still retryable — rate-limit ban duration doesn't affect can_retry. + assert!(rate_limited.can_retry()); + + let unavailable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("down")); + assert!(unavailable.rate_limit_ban_duration().is_none()); + assert!(unavailable.can_retry()); + } + #[test] fn test_transport_error_clone() { let original = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test message")); diff --git a/packages/rs-dapi-client/src/transport/grpc.rs b/packages/rs-dapi-client/src/transport/grpc.rs index 68e0c5ae68..4253ea3d95 100644 --- a/packages/rs-dapi-client/src/transport/grpc.rs +++ b/packages/rs-dapi-client/src/transport/grpc.rs @@ -146,6 +146,39 @@ impl CanRetry for dapi_grpc::tonic::Status { | Unimplemented ) } + + /// Returns the Envoy-advertised ban duration for a `ResourceExhausted` + /// response, or `None` if this is not a rate-limit or carries no usable + /// `RateLimit-Reset` header. + /// + /// Envoy's global rate-limit filter emits `RateLimit-Reset: ` when + /// `LIMIT_RESPONSE_HEADERS_ENABLED=true` is set on the Lyft RLS container + /// (see `packages/dashmate/docker-compose.rate_limiter.yml`). The value is + /// the whole-second count until the per-IP window resets. + /// + /// Parse rules (adversarial-input safe): + /// * Non-`ResourceExhausted` code → `None`. + /// * Header absent, non-numeric, or `0` → `None` (caller uses normal ban + /// ladder). + /// * Valid positive integer → clamped to + /// [`[MIN_RATE_LIMIT_BAN_SECS, MAX_RATE_LIMIT_BAN_SECS]`] + /// (`dapi_client.rs`) and returned as `Some(Duration)`. + fn rate_limit_ban_duration(&self) -> Option { + use crate::dapi_client::{MAX_RATE_LIMIT_BAN_SECS, MIN_RATE_LIMIT_BAN_SECS}; + use dapi_grpc::tonic::Code; + if self.code() != Code::ResourceExhausted { + return None; + } + let secs = self + .metadata() + .get("ratelimit-reset") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.trim().parse::().ok()) + .filter(|&s| s > 0)?; + Some(std::time::Duration::from_secs( + secs.clamp(MIN_RATE_LIMIT_BAN_SECS, MAX_RATE_LIMIT_BAN_SECS), + )) + } } /// Macro to implement the `TransportRequest` trait for a given request type, response type, client type, and settings. diff --git a/packages/rs-dapi-client/tests/rate_limit_ban.rs b/packages/rs-dapi-client/tests/rate_limit_ban.rs new file mode 100644 index 0000000000..266db5044e --- /dev/null +++ b/packages/rs-dapi-client/tests/rate_limit_ban.rs @@ -0,0 +1,257 @@ +//! Integration test: `ResourceExhausted` with a `RateLimit-Reset` header causes +//! the node to be banned for that exact period (`ban_for`), while a missing +//! header falls back to the normal exponential health-ban ladder. + +use dapi_grpc::tonic::metadata::MetadataValue; +use rs_dapi_client::transport::{AppliedRequestSettings, TransportError}; +use rs_dapi_client::{ + update_address_ban_status, AddressList, CanRetry, DapiClientError, ExecutionError, + ExecutionResult, +}; +use std::time::Duration; + +fn make_address() -> rs_dapi_client::Address { + "http://127.0.0.1:3000".parse().expect("valid address") +} + +fn applied_settings(ban: bool) -> AppliedRequestSettings { + AppliedRequestSettings { + connect_timeout: None, + timeout: Duration::from_secs(10), + retries: 5, + ban_failed_address: ban, + max_decoding_message_size: None, + #[cfg(not(target_arch = "wasm32"))] + ca_certificate: None, + } +} + +/// `ResourceExhausted` + `ratelimit-reset: 45` → `ban_for` with a ~45s window. +/// `ban_count` must be set to at least 1 (diagnostics) but NOT escalated further. +#[test] +fn test_resource_exhausted_with_header_bans_for_advertised_period() { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + assert!( + entry.banned, + "node must be banned after ResourceExhausted+header" + ); + assert_eq!(entry.ban_count, 1, "ban_for sets ban_count to max(0,1)=1"); + + // Ban window must be approximately 45 s. + let until = entry.banned_until.expect("banned_until must be set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= 44.9 && hi <= 45.1, + "ban window must be ~45 s; got lo={lo:.2}s hi={hi:.2}s" + ); +} + +/// Large `ratelimit-reset` values are clamped to MAX_RATE_LIMIT_BAN_SECS (600). +#[test] +fn test_ratelimit_reset_clamped_to_max() { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("9999").unwrap()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + let until = entry.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + // Clamped at 600 s. + assert!( + lo >= 599.5 && hi <= 600.5, + "9999s must be clamped to 600s; got lo={lo:.2} hi={hi:.2}" + ); +} + +/// `ratelimit-reset: 0` or non-numeric → `None` → normal `ban_with_reason` ladder. +/// +/// The key assertion is the resulting `banned_until` window: the ladder's first +/// ban is `60 s × e^0 = 60 s`, **not** some header-derived value. Checking only +/// `ban_count == 1` would pass even if the wrong path (ban_for) were taken. +#[test] +fn test_zero_and_garbage_header_falls_back_to_ladder() { + // Default AddressList uses DEFAULT_BASE_BAN_PERIOD = 60 s. + // First ladder ban: coefficient = e^0 = 1.0, window = 60 s. + const EXPECTED_WINDOW_SECS: f64 = 60.0; + + for bad in &["0", "garbage", ""] { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + if !bad.is_empty() { + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from(*bad).unwrap()); + } + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + assert!( + entry.banned, + "bad header '{bad}' must still result in a ban via the ladder" + ); + assert_eq!( + entry.ban_count, 1, + "ladder ban → ban_count = 1 for header '{bad}'" + ); + + // The ban window must be the exponential ladder's first rung (~60 s), + // NOT a header-derived value. This assertion fails if ban_for were + // mistakenly called instead of ban_with_reason. + let until = entry + .banned_until + .expect("banned_until set for header '{bad}'"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= EXPECTED_WINDOW_SECS - 0.5, + "bad header '{bad}': ban window lower {lo:.1}s < expected ~{EXPECTED_WINDOW_SECS}s (ladder path)" + ); + assert!( + hi <= EXPECTED_WINDOW_SECS + 0.5, + "bad header '{bad}': ban window upper {hi:.1}s > expected ~{EXPECTED_WINDOW_SECS}s (should be ladder, not ban_for)" + ); + } +} + +/// Missing `ratelimit-reset` header → `None` → normal exponential health-ban ladder. +/// +/// Asserts the `banned_until` window is ~60 s (first ladder rung), NOT a +/// header-derived value. A mere `ban_count == 1` check would pass even if +/// `ban_for` were wrongly invoked (both paths yield ban_count 1 on first ban). +#[test] +fn test_missing_header_falls_back_to_ladder() { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc( + dapi_grpc::tonic::Status::resource_exhausted("429"), + )), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + assert!( + entry.banned, + "missing header must still result in a ladder ban" + ); + assert_eq!(entry.ban_count, 1, "first ladder ban → ban_count = 1"); + + // Window must be the first exponential rung: 60 s × e^0 = 60 s. + let until = entry.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= 59.5, + "ladder window lower {lo:.1}s < expected ~60 s (missing header must use ladder)" + ); + assert!( + hi <= 60.5, + "ladder window upper {hi:.1}s > expected ~60 s (should be ladder, not ban_for)" + ); +} + +/// `rate_limit_ban_duration` on `CanRetry` returns `Some` only for +/// `ResourceExhausted` with a parseable positive `ratelimit-reset`. +#[test] +fn test_rate_limit_ban_duration_trait_delegation() { + use rs_dapi_client::ExecutionError; + + // With header → Some(45s). + let mut s = dapi_grpc::tonic::Status::resource_exhausted("429"); + s.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + let te = TransportError::Grpc(s); + assert_eq!(te.rate_limit_ban_duration(), Some(Duration::from_secs(45))); + + // Unavailable → None. + let unavail = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("down")); + assert!(unavail.rate_limit_ban_duration().is_none()); + + // ResourceExhausted without header → None. + let re_no_header = TransportError::Grpc(dapi_grpc::tonic::Status::resource_exhausted("429")); + assert!(re_no_header.rate_limit_ban_duration().is_none()); + + // DapiClientError delegates. + let dce = DapiClientError::Transport(TransportError::Grpc({ + let mut s2 = dapi_grpc::tonic::Status::resource_exhausted("429"); + s2.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + s2 + })); + assert_eq!(dce.rate_limit_ban_duration(), Some(Duration::from_secs(30))); + + // ExecutionError delegates. + let ee: ExecutionError = ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc({ + let mut s3 = dapi_grpc::tonic::Status::resource_exhausted("429"); + s3.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("20").unwrap()); + s3 + })), + retries: 0, + address: Some(make_address()), + }; + assert_eq!(ee.rate_limit_ban_duration(), Some(Duration::from_secs(20))); +}