From 5c5ee92be354e3cc7d4de76aa55750653073d37b Mon Sep 17 00:00:00 2001 From: Ray Walker Date: Sat, 16 May 2026 15:15:11 +1000 Subject: [PATCH] test: eliminate flaky time.sleep() tests with deterministic coordination Replaces 26 flaky time.sleep() calls across 5 test files with deterministic synchronization primitives: - ThreadGate: named sync points for thread coordination (new helper) - TimingHelper.wait_for_condition: poll observable state instead of hoping sleep durations are sufficient - collector.flush(): use existing drain API instead of sleeping - time-machine: replace patch("time.time") for thread-safe clock control (test_l1_swr.py) Fixes the test_version_mismatch_aborts_refresh_concurrent flake that blocks the release-please PR (#112). --- tests/critical/conftest.py | 1 + .../test_backpressure_load_control.py | 48 +++++-- tests/unit/test_l1_swr.py | 129 ++++++++++-------- tests/unit/test_load_control.py | 39 ++++-- tests/unit/test_metrics_collection.py | 31 ++--- tests/utils/timing_helper.py | 61 ++++++++- 6 files changed, 200 insertions(+), 109 deletions(-) diff --git a/tests/critical/conftest.py b/tests/critical/conftest.py index eced810..2b3babd 100644 --- a/tests/critical/conftest.py +++ b/tests/critical/conftest.py @@ -12,6 +12,7 @@ def pytest_runtest_setup(item): or "memcached_backend" in item.nodeid or "secure_env_fallback" in item.nodeid or "local_cache_works" in item.nodeid + or "backpressure_load_control" in item.nodeid ) if skip_redis: # Remove autouse redis fixtures for tests that don't need Redis diff --git a/tests/critical/test_backpressure_load_control.py b/tests/critical/test_backpressure_load_control.py index 755f1bc..030d03c 100644 --- a/tests/critical/test_backpressure_load_control.py +++ b/tests/critical/test_backpressure_load_control.py @@ -16,6 +16,7 @@ import pytest from cachekit.reliability.load_control import BackpressureController +from tests.utils.timing_helper import ThreadGate, TimingHelper pytestmark = pytest.mark.critical @@ -38,7 +39,7 @@ def slow_operation(): active_count["value"] += 1 max_observed["value"] = max(max_observed["value"], active_count["value"]) - time.sleep(0.1) # Hold permit for 100ms + time.sleep(0.1) # Hold permit for 100ms (simulates real work) with lock: active_count["value"] -= 1 @@ -61,14 +62,17 @@ def test_request_rejected_when_queue_full(self): timeout=10.0, # Long timeout (we won't wait) ) + gate = ThreadGate() + # Block the single execution slot - lock = threading.Lock() - lock.acquire() # Held until we release it + execution_lock = threading.Lock() + execution_lock.acquire() # Held until we release it def blocking_operation(): """Operation that blocks until lock is released.""" with controller.acquire(): - with lock: # This will block + gate.signal("acquired") + with execution_lock: # This will block pass def quick_operation(): @@ -79,7 +83,7 @@ def quick_operation(): # Start the blocking operation in a thread blocking_thread = threading.Thread(target=blocking_operation) blocking_thread.start() - time.sleep(0.05) # Let it acquire the execution permit + gate.wait("acquired") # Now the execution slot is occupied, queue should start filling # Try to launch more operations than queue_size @@ -103,7 +107,7 @@ def try_operation(): t.join(timeout=1.0) # Release the blocking operation - lock.release() + execution_lock.release() blocking_thread.join(timeout=1.0) # Should have rejected the operations beyond queue capacity @@ -117,19 +121,22 @@ def test_fast_fail_when_overloaded(self): timeout=0.01, # Very short timeout for fast fail ) + gate = ThreadGate() + # Occupy the execution slot execution_lock = threading.Lock() execution_lock.acquire() def blocking_op(): with controller.acquire(): + gate.signal("acquired") with execution_lock: pass # Start blocking operation thread = threading.Thread(target=blocking_op) thread.start() - time.sleep(0.05) # Let it acquire + gate.wait("acquired") # Now queue is full and execution is blocked # New request should fail fast @@ -176,7 +183,12 @@ def queued_operation(): for t in threads: t.start() - time.sleep(0.1) # Let them queue up + # Wait for threads to queue up (deterministic polling) + TimingHelper.wait_for_condition( + lambda: controller.queue_depth > 0, + timeout=2.0, + message="Should have queued requests", + ) # Queue depth should be >0 (threads are waiting) current_depth = controller.queue_depth @@ -199,18 +211,21 @@ def test_rejected_count_increments_correctly(self): timeout=0.01, ) + gate = ThreadGate() + # Block the execution slot execution_lock = threading.Lock() execution_lock.acquire() def blocking_op(): with controller.acquire(): + gate.signal("acquired") with execution_lock: pass thread = threading.Thread(target=blocking_op) thread.start() - time.sleep(0.05) + gate.wait("acquired") initial_rejected = controller.rejected_count @@ -326,13 +341,15 @@ def queued_op(): for t in threads: t.start() - time.sleep(0.1) # Let them queue + # Wait for threads to queue (deterministic polling) + TimingHelper.wait_for_condition( + lambda: controller.queue_depth > 0, + timeout=2.0, + message="Should have queued requests", + ) stats = controller.get_stats() assert stats["queue_depth"] > 0, "Should show queued requests" - # Health should be False if queue_depth >= 80% of queue_size - # 90 operations, 10 concurrent = 80 in queue -> 80% threshold - # This might be True or False depending on exact timing # Cleanup execution_lock.release() @@ -402,18 +419,21 @@ def test_reset_stats_clears_rejected_count(self): """CRITICAL: reset_stats() properly clears monitoring counters.""" controller = BackpressureController(max_concurrent=1, queue_size=1, timeout=0.01) + gate = ThreadGate() + # Block execution execution_lock = threading.Lock() execution_lock.acquire() def blocking_op(): with controller.acquire(): + gate.signal("acquired") with execution_lock: pass thread = threading.Thread(target=blocking_op) thread.start() - time.sleep(0.05) + gate.wait("acquired") # Generate rejections from cachekit.backends.errors import BackendError diff --git a/tests/unit/test_l1_swr.py b/tests/unit/test_l1_swr.py index 823a863..bb868eb 100644 --- a/tests/unit/test_l1_swr.py +++ b/tests/unit/test_l1_swr.py @@ -1,10 +1,9 @@ """Unit tests for L1Cache stale-while-revalidate (SWR) functionality.""" import threading -import time -from unittest.mock import patch import pytest +import time_machine from cachekit.config.nested import L1CacheConfig from cachekit.l1_cache import L1Cache @@ -41,14 +40,13 @@ def test_stale_entry_triggers_refresh(self): value = b"test_value" ttl = 100.0 - # Mock time to simulate staleness - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Time at put - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(key, value, redis_ttl=ttl) # Time at get (60s later, past threshold of ~50s even with jitter) - mock_time.return_value = 1060.0 + traveller.move_to(1060.0) hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) assert hit is True @@ -65,13 +63,13 @@ def test_refreshing_prevents_duplicate_refresh(self): value = b"test_value" ttl = 100.0 - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Put at time 1000 - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(key, value, redis_ttl=ttl) # First get at 1060 (triggers refresh) - mock_time.return_value = 1060.0 + traveller.move_to(1060.0) hit1, val1, needs_refresh1, version1 = cache.get_with_swr(key, ttl) assert needs_refresh1 is True @@ -90,20 +88,20 @@ def test_complete_refresh_updates_entry(self): new_value = b"new_value" ttl = 100.0 - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Put old value - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(key, old_value, redis_ttl=ttl) # Trigger refresh - mock_time.return_value = 1060.0 + traveller.move_to(1060.0) hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) assert needs_refresh is True assert version == 0 # Complete refresh - mock_time.return_value = 1065.0 - success = cache.complete_refresh(key, version, new_value, mock_time.return_value) + traveller.move_to(1065.0) + success = cache.complete_refresh(key, version, new_value, 1065.0) assert success is True # Verify new value is cached @@ -129,14 +127,14 @@ def test_jitter_varies_threshold(self): cache.clear() test_key = f"{key}_{i}" - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Put at time 1000 - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(test_key, value, redis_ttl=ttl) # Test at different elapsed times to find threshold for elapsed in range(40, 61): # 40-60s (around 50s ±10%) - mock_time.return_value = 1000.0 + elapsed + traveller.move_to(1000.0 + elapsed) hit, val, needs_refresh, version = cache.get_with_swr(test_key, ttl) if needs_refresh: @@ -164,13 +162,13 @@ def test_cancel_refresh_clears_flag(self): value = b"test_value" ttl = 100.0 - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Put at time 1000 - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(key, value, redis_ttl=ttl) # Trigger refresh - mock_time.return_value = 1060.0 + traveller.move_to(1060.0) hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) assert needs_refresh is True @@ -186,9 +184,10 @@ def test_version_mismatch_aborts_refresh_concurrent(self): """Test that version mismatch prevents stale refresh under concurrent invalidation. CRITICAL RACE CONDITION TEST: - Thread 1: Triggers SWR refresh, sleeps 100ms, attempts complete_refresh() - Thread 2: Invalidates key after 50ms - Expected: complete_refresh() returns False (version mismatch), entry does NOT exist + Uses threading.Event for deterministic ordering (no sleep-based timing): + 1. Refresher triggers SWR refresh, signals ready, waits for invalidation + 2. Invalidator waits for ready signal, invalidates key, signals done + 3. Refresher attempts complete_refresh() — must see version mismatch """ config = L1CacheConfig(swr_enabled=True, swr_threshold_ratio=0.5) cache = L1Cache(max_memory_mb=10, config=config) @@ -198,54 +197,64 @@ def test_version_mismatch_aborts_refresh_concurrent(self): new_value = b"new_value" ttl = 100.0 + # Deterministic synchronization — no timing assumptions + refresh_triggered = threading.Event() + invalidation_done = threading.Event() + # Shared state refresh_result = [None] thread_errors = [] def refresher_thread(): - """Thread that triggers refresh and attempts to complete it after delay.""" + """Thread that triggers refresh, waits for invalidation, then completes.""" try: - with patch("time.time") as mock_time: - # Trigger refresh - mock_time.return_value = 1060.0 - hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) + # Trigger refresh (time is frozen at 1060.0 by time-machine) + hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) - if needs_refresh: - # Simulate slow refresh (100ms) - time.sleep(0.1) + if needs_refresh: + # Signal: refresh is in progress, invalidator can proceed + refresh_triggered.set() + + # Wait for invalidation to complete before attempting refresh + invalidation_done.wait(timeout=2.0) - # Try to complete refresh - mock_time.return_value = 1065.0 - success = cache.complete_refresh(key, version, new_value, mock_time.return_value) - refresh_result[0] = success + # Try to complete refresh — should fail (version mismatch) + success = cache.complete_refresh(key, version, new_value, 1065.0) + refresh_result[0] = success except Exception as e: thread_errors.append(e) def invalidator_thread(): - """Thread that invalidates key during refresh.""" + """Thread that invalidates key after refresh is triggered.""" try: - # Wait 50ms (midway through refresh) - time.sleep(0.05) + # Wait until refresher has triggered SWR + refresh_triggered.wait(timeout=2.0) - # Invalidate key + # Invalidate key (increments version) cache.invalidate_by_key(key) + + # Signal: invalidation complete, refresher can proceed + invalidation_done.set() except Exception as e: thread_errors.append(e) - # Setup: Put initial value - with patch("time.time") as mock_time: - mock_time.return_value = 1000.0 + # Setup: Put initial value and advance clock past SWR threshold + with time_machine.travel(0, tick=False) as traveller: + traveller.move_to(1000.0) cache.put(key, old_value, redis_ttl=ttl) - # Start concurrent threads - t1 = threading.Thread(target=refresher_thread) - t2 = threading.Thread(target=invalidator_thread) + # Advance past SWR threshold so get_with_swr triggers refresh + traveller.move_to(1060.0) + + # Start concurrent threads + t1 = threading.Thread(target=refresher_thread) + t2 = threading.Thread(target=invalidator_thread) - t1.start() - t2.start() + t1.start() + t2.start() - t1.join(timeout=1.0) - t2.join(timeout=1.0) + t1.join(timeout=5.0) + t2.join(timeout=5.0) # Verify no thread errors assert len(thread_errors) == 0, f"Thread errors occurred: {thread_errors}" @@ -267,13 +276,13 @@ def test_swr_disabled_never_triggers_refresh(self): value = b"test_value" ttl = 100.0 - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Put at time 1000 - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(key, value, redis_ttl=ttl) # Get at 1060 (way past threshold) - mock_time.return_value = 1060.0 + traveller.move_to(1060.0) hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) assert hit is True @@ -289,13 +298,13 @@ def test_expired_entry_not_returned(self): value = b"test_value" ttl = 100.0 - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Put at time 1000 - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(key, value, redis_ttl=ttl) # Get at 1101 (past expiry) - mock_time.return_value = 1101.0 + traveller.move_to(1101.0) hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) assert hit is False @@ -312,13 +321,13 @@ def test_complete_refresh_with_evicted_entry(self): new_value = b"new_value" ttl = 100.0 - with patch("time.time") as mock_time: + with time_machine.travel(0, tick=False) as traveller: # Put entry - mock_time.return_value = 1000.0 + traveller.move_to(1000.0) cache.put(key, value, redis_ttl=ttl) # Trigger refresh - mock_time.return_value = 1060.0 + traveller.move_to(1060.0) hit, val, needs_refresh, version = cache.get_with_swr(key, ttl) assert needs_refresh is True @@ -326,8 +335,8 @@ def test_complete_refresh_with_evicted_entry(self): cache.invalidate_by_key(key) # Try to complete refresh (entry no longer exists) - mock_time.return_value = 1065.0 - success = cache.complete_refresh(key, version, new_value, mock_time.return_value) + traveller.move_to(1065.0) + success = cache.complete_refresh(key, version, new_value, 1065.0) # Should return False (entry was evicted, even though version might match) # NOTE: Current implementation returns False for evicted entries diff --git a/tests/unit/test_load_control.py b/tests/unit/test_load_control.py index c089350..707b311 100644 --- a/tests/unit/test_load_control.py +++ b/tests/unit/test_load_control.py @@ -8,6 +8,7 @@ import pytest from cachekit.reliability.load_control import BackpressureController +from tests.utils.timing_helper import ThreadGate, TimingHelper class TestBackpressureController: @@ -51,17 +52,19 @@ def test_queue_depth_limiting(self): """Test that requests are rejected when queue is full.""" controller = BackpressureController(max_concurrent=1, queue_size=2, timeout=0.1) + gate = ThreadGate() + release = threading.Event() + # Fill the semaphore with a long-running operation def blocking_operation(): with controller.acquire(): - time.sleep(0.2) # Hold the permit + gate.signal("acquired") + release.wait(timeout=5.0) # Start background thread to hold the semaphore thread = threading.Thread(target=blocking_operation) thread.start() - - # Give time for thread to acquire semaphore - time.sleep(0.05) + gate.wait("acquired") # These requests should be able to join the queue exceptions = [] @@ -82,7 +85,12 @@ def queue_request(request_id): queue_threads.append(t) t.start() - time.sleep(0.05) # Let them enter queue + # Wait for queue to fill (threads are waiting on the semaphore) + TimingHelper.wait_for_condition( + lambda: controller.queue_depth >= 2, + timeout=2.0, + message="Queue should have 2 entries", + ) # This request should be rejected (queue full) from cachekit.backends.errors import BackendError @@ -95,6 +103,7 @@ def queue_request(request_id): assert controller.rejected_count >= 1 # Clean up + release.set() thread.join() for t in queue_threads: t.join() @@ -103,16 +112,18 @@ def test_semaphore_timeout(self): """Test that requests timeout when unable to acquire semaphore.""" controller = BackpressureController(max_concurrent=1, timeout=0.05) + gate = ThreadGate() + release = threading.Event() + # Hold the semaphore with a blocking operation def blocking_operation(): with controller.acquire(): - time.sleep(0.2) + gate.signal("acquired") + release.wait(timeout=5.0) thread = threading.Thread(target=blocking_operation) thread.start() - - # Give time for thread to acquire semaphore - time.sleep(0.02) + gate.wait("acquired") # This should timeout from cachekit.backends.errors import BackendError @@ -125,6 +136,7 @@ def blocking_operation(): assert controller.rejected_count == 1 # Clean up + release.set() thread.join() def test_context_manager_exception_handling(self): @@ -333,14 +345,18 @@ def test_very_small_timeout(self): """Test behavior with very small timeouts.""" controller = BackpressureController(max_concurrent=1, timeout=0.001) + gate = ThreadGate() + release = threading.Event() + # Hold the semaphore def blocking_operation(): with controller.acquire(): - time.sleep(0.1) + gate.signal("acquired") + release.wait(timeout=5.0) thread = threading.Thread(target=blocking_operation) thread.start() - time.sleep(0.01) # Ensure semaphore is held + gate.wait("acquired") # This should timeout very quickly from cachekit.backends.errors import BackendError @@ -353,4 +369,5 @@ def blocking_operation(): elapsed = time.time() - start_time assert elapsed < 0.2 # Should timeout quickly (generous for CI jitter) + release.set() thread.join() diff --git a/tests/unit/test_metrics_collection.py b/tests/unit/test_metrics_collection.py index a012677..6c89865 100644 --- a/tests/unit/test_metrics_collection.py +++ b/tests/unit/test_metrics_collection.py @@ -1,7 +1,6 @@ """Unit tests for metrics collection infrastructure.""" import threading -import time from unittest.mock import patch from cachekit.reliability.metrics_collection import ( @@ -71,8 +70,7 @@ def test_record_counter_metric(self): collector.record_counter("test_counter", {"label1": "value1"}, 2.0) collector.record_counter("test_counter", {"label1": "value2"}, 1.0) - # Allow time for processing - time.sleep(0.1) + collector.flush() stats = collector.get_stats() assert stats["processed_count"] >= 2 @@ -88,8 +86,7 @@ def test_record_histogram_metric(self): collector.record_histogram("test_histogram", 0.025, {"operation": "get"}) collector.record_histogram("test_histogram", 0.15, {"operation": "set"}) - # Allow time for processing - time.sleep(0.1) + collector.flush() stats = collector.get_stats() assert stats["processed_count"] >= 2 @@ -104,8 +101,7 @@ def test_record_gauge_metric(self): collector.record_gauge("test_gauge", 0.75, {"type": "utilization"}) collector.record_gauge("test_gauge", 0.85, {"type": "utilization"}) - # Allow time for processing - time.sleep(0.1) + collector.flush() stats = collector.get_stats() assert stats["processed_count"] >= 2 @@ -121,8 +117,7 @@ def test_queue_overflow_handling(self): for i in range(20): collector.record_counter("overflow_test", {"id": str(i)}) - # Allow time for processing - time.sleep(0.2) + collector.flush() stats = collector.get_stats() # Some metrics should have been dropped @@ -202,8 +197,7 @@ def record_metrics(thread_id: int): for thread in threads: thread.join() - # Allow time for processing - time.sleep(0.2) + collector.flush() stats = collector.get_stats() # Should have processed 50 metrics (5 threads × 10 metrics each) @@ -223,8 +217,7 @@ def test_error_handling_in_worker_thread(self): # Record a valid metric after the malformed one collector.record_counter("valid_metric", {"test": "value"}) - # Allow time for processing - time.sleep(0.2) + collector.flush() # Worker should still be alive despite the error assert collector._worker_thread.is_alive() @@ -274,8 +267,7 @@ def test_prometheus_counter_integration(self, mock_counter): # Record a cache operation metric collector.record_counter("redis_cache_operations_total", {"operation": "get", "status": "hit"}, 1.0) - # Allow time for processing - time.sleep(0.1) + collector.flush() # Should have called the Prometheus metric mock_counter.labels.assert_called_with(operation="get", status="hit") @@ -291,8 +283,7 @@ def test_prometheus_histogram_integration(self, mock_histogram): # Record a latency metric collector.record_histogram("redis_cache_operation_duration_seconds", 0.025, {"operation": "set"}) - # Allow time for processing - time.sleep(0.1) + collector.flush() # Should have called the Prometheus metric mock_histogram.labels.assert_called_with(operation="set") @@ -308,8 +299,7 @@ def test_prometheus_gauge_integration(self, mock_gauge): # Record a circuit breaker state metric collector.record_gauge("redis_circuit_breaker_state", 1.0, {"namespace": "test"}) - # Allow time for processing - time.sleep(0.1) + collector.flush() # Should have called the Prometheus metric mock_gauge.labels.assert_called_with(namespace="test") @@ -325,8 +315,7 @@ def test_unknown_metric_name_handling(self): # Record a metric with unknown name collector.record_counter("unknown_metric_name", {"test": "value"}, 1.0) - # Allow time for processing - time.sleep(0.1) + collector.flush() # Should log debug message about unknown metric mock_logger.debug.assert_called() diff --git a/tests/utils/timing_helper.py b/tests/utils/timing_helper.py index 10c3061..7e9cd21 100644 --- a/tests/utils/timing_helper.py +++ b/tests/utils/timing_helper.py @@ -1,7 +1,8 @@ -"""Timing utilities for deterministic test polling (no flakiness).""" +"""Timing utilities for deterministic test coordination (no flakiness).""" from __future__ import annotations +import threading import time from typing import Callable @@ -13,7 +14,7 @@ class TimingHelper: def wait_for_condition( condition_func: Callable[[], bool], timeout: float = 5.0, - interval: float = 0.1, + interval: float = 0.01, message: str = "Condition not met", ) -> None: """Wait for condition with timeout (deterministic, no flakiness). @@ -21,7 +22,7 @@ def wait_for_condition( Args: condition_func: Function that returns True when condition is met. timeout: Maximum time to wait in seconds (default: 5.0). - interval: Time to sleep between checks in seconds (default: 0.1). + interval: Time to sleep between checks in seconds (default: 0.01). message: Error message to raise on timeout. Raises: @@ -33,3 +34,57 @@ def wait_for_condition( return time.sleep(interval) raise TimeoutError(f"{message} after {timeout}s") + + +class ThreadGate: + """Named synchronization points for deterministic thread coordination. + + Replaces flaky ``time.sleep()`` ordering with explicit signals between + threads. Each gate is a named ``threading.Event`` created on first use. + + Example:: + + gate = ThreadGate() + + def worker(): + with controller.acquire(): + gate.signal("acquired") # tell main thread we hold the permit + with blocking_lock: # block until main thread releases + pass + + thread = threading.Thread(target=worker) + thread.start() + gate.wait("acquired") # deterministic — no sleep needed + + Args: + timeout: Default timeout for all ``wait()`` calls (seconds). + """ + + def __init__(self, timeout: float = 5.0) -> None: + self._events: dict[str, threading.Event] = {} + self._lock = threading.Lock() + self._timeout = timeout + + def _get_event(self, name: str) -> threading.Event: + with self._lock: + if name not in self._events: + self._events[name] = threading.Event() + return self._events[name] + + def signal(self, name: str) -> None: + """Signal that a named checkpoint has been reached.""" + self._get_event(name).set() + + def wait(self, name: str, timeout: float | None = None) -> None: + """Block until the named checkpoint is signaled. + + Args: + name: Gate name to wait on. + timeout: Override the default timeout (seconds). + + Raises: + TimeoutError: If the gate is not signaled within *timeout*. + """ + t = timeout if timeout is not None else self._timeout + if not self._get_event(name).wait(timeout=t): + raise TimeoutError(f"ThreadGate '{name}' not signaled within {t}s")