Skip to content
Open
4 changes: 4 additions & 0 deletions packages/dashmate/docker-compose.rate_limiter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ services:
- GRPC_MAX_CONNECTION_AGE=1h
- GRPC_MAX_CONNECTION_AGE_GRACE=10m
- GRPC_PORT=8081
# Emit RateLimit-Limit / RateLimit-Remaining / RateLimit-Reset response
# headers so rs-dapi-client can read the exact reset window and ban the
# node for that duration instead of the exponential health-ban ladder.
- LIMIT_RESPONSE_HEADERS_ENABLED=true

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I suggested the wrong direction. I mean, the ban system is correct, but exposing headers that are disabled by default and enabling them via env is risky. Could you please investigate what consequences and why it's disabled by default?

expose:
- 8081
profiles:
Expand Down
204 changes: 202 additions & 2 deletions packages/rs-dapi-client/src/address_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ impl AddressStatus {
self.ban_reason = reason;
}

/// Ban the address for an exact `period` (server-advertised), bypassing the
/// exponential ladder used by [`AddressStatus::ban_with_reason`].
///
/// The ban window is flat (not exponential), but `ban_count` is raised to
/// `max(ban_count, 1)` so that `is_banned()` and `ban_info()` correctly
/// report the node as banned. Side-effect: a previously-clean node
/// (ban_count 0) enters the ladder at floor 1, meaning its *next* genuine
/// health failure via [`AddressStatus::ban_with_reason`] uses
/// `60 s × e¹ ≈ 163 s` rather than the first-rung `60 s × e⁰ = 60 s`.
/// The counter resets to 0 on [`AddressStatus::unban`].
pub fn ban_for(&mut self, period: Duration, reason: Option<String>) {
self.banned_until = Some(chrono::Utc::now() + period);
self.ban_count = self.ban_count.max(1);
self.ban_reason = reason;
Comment on lines +116 to +119

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Blocking: Rate-limit ban can shorten an active longer ban

ban_for always replaces banned_until with now + period. Because request completions can arrive out of order for the same shared AddressList, a later ResourceExhausted response with a short ratelimit-reset value can shorten a longer health-ban or longer prior rate-limit ban that was already applied. That reintroduces the address before the existing ban window expires, so genuine failure bans are no longer preserved under overlapping requests.

Suggested change
pub fn ban_for(&mut self, period: Duration, reason: Option<String>) {
self.banned_until = Some(chrono::Utc::now() + period);
self.ban_count = self.ban_count.max(1);
self.ban_reason = reason;
pub fn ban_for(&mut self, period: Duration, reason: Option<String>) {
let advertised_until = chrono::Utc::now() + period;
if self
.banned_until
.map(|current_until| current_until < advertised_until)
.unwrap_or(true)
{
self.banned_until = Some(advertised_until);
self.ban_reason = reason;
}
self.ban_count = self.ban_count.max(1);
}

source: ['codex']

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in this update — Rate-limit ban can shorten an active longer ban no longer present.

Auto-resolved by the review system based on the latest commit diff. If you believe this was closed in error, reopen the thread.

}

/// Check if [Address] is banned.
pub fn is_banned(&self) -> bool {
self.ban_count > 0
Expand Down Expand Up @@ -182,6 +198,23 @@ impl AddressList {
true
}

/// Ban the address for an exact `period` (server-advertised); delegates to
/// [`AddressStatus::ban_for`] — see that method for the full contract
/// including the `ban_count` floor and ladder side-effect.
///
/// Returns `false` if the address is not in the list.
pub fn ban_for(&self, address: &Address, period: Duration, reason: Option<String>) -> bool {
let mut guard = self.addresses.write().unwrap();

let Some(status) = guard.get_mut(address) else {
return false;
};

status.ban_for(period, reason);

true
}

/// Clears address' ban record
/// Returns false if the address is not in the list.
pub fn unban(&self, address: &Address) -> bool {
Expand Down Expand Up @@ -237,12 +270,16 @@ impl AddressList {
self.add(Address::try_from(uri).expect("valid uri"))
}

/// Randomly select a not banned address.
/// Randomly select a not-banned address.
///
/// An address is considered live when it has never been banned or when its
/// ban period has already expired.
pub fn get_live_address(&self) -> Option<Address> {
// TODO(low): module-wide `.read()/.write().unwrap()` panics on a
// poisoned lock; adopt poison-tolerant locking consistently (SEC-003).
let guard = self.addresses.read().unwrap();

let mut rng = SmallRng::from_entropy();

let now = chrono::Utc::now();

guard
Expand Down Expand Up @@ -755,4 +792,167 @@ mod tests {
let list = AddressList::new();
assert!(list.ban_info().is_empty());
}

#[test]
fn test_address_status_ban_for_sets_exact_window_and_min_ban_count() {
let mut status = AddressStatus::default();
assert_eq!(status.ban_count, 0);
assert!(status.banned_until.is_none());

let before = chrono::Utc::now();
status.ban_for(Duration::from_secs(45), Some("rate limited".into()));
let after = chrono::Utc::now();

// ban_count must be at least 1 so is_banned() / ban_info().banned are consistent.
assert_eq!(status.ban_count, 1, "ban_for sets ban_count to max(0,1)=1");

// banned_until should be roughly now + 45 s.
let until = status.banned_until.expect("banned_until must be set");
let lower = (until - before).num_milliseconds() as f64 / 1000.0;
let upper = (until - after).num_milliseconds() as f64 / 1000.0;
assert!(
lower >= 44.9,
"banned_until lower bound too short: {lower}s"
);
assert!(upper <= 45.1, "banned_until upper bound too long: {upper}s");
assert_eq!(status.ban_reason.as_deref(), Some("rate limited"));
}

/// `ban_for` on a fresh node (ban_count = 0) raises ban_count to 1 (the
/// ladder floor). That means the *next* genuine health ban will escalate
/// from position 1 (~163 s) instead of position 0 (~60 s). This pins the
/// documented side-effect so regressions are caught.
#[test]
fn test_ban_for_raises_fresh_node_to_ladder_floor() {
let mut status = AddressStatus::default();
assert_eq!(status.ban_count, 0, "starts clean");

// Rate-limit ban on a never-before-banned node.
status.ban_for(Duration::from_secs(10), Some("rl".into()));
assert_eq!(
status.ban_count, 1,
"ban_for must raise ban_count 0 → 1 (ladder floor)"
);

// Subsequent genuine health failure must escalate from the floor (1),
// yielding ~60 s × e^1 ≈ 163 s, NOT the first-rung ~60 s × e^0 = 60 s.
let base = Duration::from_secs(60);
let before = chrono::Utc::now();
status.ban_with_reason(&base, None); // ban_count 1 → 2; window = 60s × e^1
let after = chrono::Utc::now();
assert_eq!(status.ban_count, 2);

let until = status.banned_until.expect("banned_until set");
let lo = (until - before).num_milliseconds() as f64 / 1000.0;
let hi = (until - after).num_milliseconds() as f64 / 1000.0;
let expected = 60.0_f64 * std::f64::consts::E; // ≈ 163 s
assert!(
lo >= expected - 0.5,
"window lower {lo:.1}s < expected {expected:.1}s (should escalate from floor 1)"
);
assert!(
hi <= expected + 0.5,
"window upper {hi:.1}s > expected {expected:.1}s"
);
}

#[test]
fn test_address_status_ban_for_does_not_inflate_existing_ban_count() {
// A node already health-banned (ban_count = 3) gets rate-limited.
// ban_count must stay at 3, not grow to 4.
let mut status = AddressStatus::default();
let base = Duration::from_secs(60);
status.ban_with_reason(&base, None); // → 1
status.ban_with_reason(&base, None); // → 2
status.ban_with_reason(&base, None); // → 3
status.ban_for(Duration::from_secs(30), Some("rl".into()));
assert_eq!(
status.ban_count, 3,
"ban_for must not inflate ban_count above its existing value"
);
}

#[test]
fn test_address_list_ban_for_returns_false_for_unknown() {
let list = AddressList::new();
let addr: Address = "http://127.0.0.1:3000".parse().unwrap();
assert!(!list.ban_for(&addr, Duration::from_secs(5), None));
}

#[test]
fn test_address_list_ban_for_bans_known_address() {
let mut list = AddressList::new();
let addr: Address = "http://127.0.0.1:3000".parse().unwrap();
list.add(addr.clone());

assert!(list.ban_for(&addr, Duration::from_secs(60), Some("rl".into())));
// The address must now be hidden from get_live_address.
assert!(list.get_live_address().is_none());
// ban_count is 1 (ban_for sets max(0,1)).
let info = list.ban_info();
assert_eq!(info.len(), 1);
assert!(info[0].banned);
assert_eq!(info[0].ban_count, 1);
}

/// After `ban_for`'s window expires the address re-enters rotation via
/// `get_live_address`. We verify both directions: the node is hidden during
/// an active window, and becomes live once the window passes.
///
/// A zero-duration window means `banned_until = Utc::now()` at call time;
/// `get_live_address` samples `Utc::now()` fresh, so at least one clock tick
/// separates the two calls and `banned_until < new_now` holds.
#[test]
fn test_ban_for_address_re_enters_rotation_after_window_expires() {
let mut list = AddressList::new();
let addr: Address = "http://127.0.0.1:3000".parse().unwrap();
list.add(addr.clone());

// Active 300-second window → node hidden.
assert!(list.ban_for(&addr, Duration::from_secs(300), Some("rl".into())));
assert!(
list.get_live_address().is_none(),
"node must be hidden during active ban window"
);

// Re-ban with a zero window (already expired when get_live_address runs).
assert!(list.ban_for(&addr, Duration::ZERO, None));
assert!(
list.get_live_address().is_some(),
"address must re-enter rotation after ban_for window expires"
);
}

/// Invariant 1 at the ladder source: the exponential ban window is
/// `base × e^ban_count`, `ban_count` incrementing on each ban. This pins the
/// exact formula independently of the `update_address_ban_status` entrypoint.
#[test]
fn test_ban_ladder_windows_match_exponential_formula() {
let mut status = AddressStatus::default();
let base_secs = 60.0_f64;
let base = Duration::from_secs(60);

for n in 0..3usize {
// coefficient uses ban_count BEFORE this ban (== n here).
let before = chrono::Utc::now();
status.ban(&base);
let after = chrono::Utc::now();

assert_eq!(status.ban_count, n + 1, "ban_count must increment");
let period = base_secs * (n as f64).exp();
let banned_until = status.banned_until.expect("banned_until is set");
let lower = (banned_until - before).num_milliseconds() as f64 / 1000.0;
let upper = (banned_until - after).num_milliseconds() as f64 / 1000.0;
assert!(
lower >= period - 0.05,
"ban #{} window lower bound {lower}s < expected {period}s",
n + 1
);
assert!(
upper <= period + 0.05,
"ban #{} window upper bound {upper}s > expected {period}s",
n + 1
);
}
}
}
Loading
Loading