From 7add8242c13a5beccc9dac0db238a38abae0f728 Mon Sep 17 00:00:00 2001 From: Khaled Salhab Date: Sun, 14 Jun 2026 09:16:13 +0300 Subject: [PATCH 1/2] test(streaming): add T1-T9 error-recovery tests for max_errors parameter (#b854ab) Covers: budget tolerance, budget exhaustion, auth/rate-limit bypass, counter reset on success, infinite tolerance (max_errors=None), and exponential backoff verification for both stream_alerts and stream_incident_updates. --- tests/unit/test_streaming.py | 282 ++++++++++++++++++++++++++++++++++- 1 file changed, 281 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index 96f39dd..7b4f1fe 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -10,7 +10,12 @@ from hyperping._async_client import AsyncHyperpingClient from hyperping.client import RetryConfig -from hyperping.exceptions import HyperpingNotFoundError +from hyperping.exceptions import ( + HyperpingAPIError, + HyperpingAuthError, + HyperpingNotFoundError, + HyperpingRateLimitError, +) from hyperping.models import Incident, IncidentUpdate, LocalizedText from hyperping.models._alert_models import Alert, AlertType @@ -472,3 +477,278 @@ async def capturing_sleep(interval: float) -> None: ) assert recorded_intervals[0] == 15.0 + + +# --------------------------------------------------------------------------- +# Error-recovery tests (max_errors parameter, #b854ab) +# --------------------------------------------------------------------------- + + +class TestStreamErrorRecovery: + async def test_stream_alerts_tolerates_errors_within_budget( + self, async_client: AsyncHyperpingClient + ) -> None: + """A transient API error within the max_errors budget does not raise; baseline preserved.""" + poll1 = [_monitor_dict("mon_1", "Test", down=False)] + poll3 = [_monitor_dict("mon_1", "Test", down=True)] + + with ( + patch.object( + async_client, + "_request", + new=AsyncMock(side_effect=[poll1, HyperpingAPIError("transient"), poll3]), + ), + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=[None, None, asyncio.CancelledError()]), + ), + ): + results = await _drain( + async_client.stream_alerts(poll_interval=0.0, max_errors=3), + until_cancelled=True, + ) + + assert len(results) == 1 + assert results[0].type == AlertType.DOWN + assert results[0].monitor_uuid == "mon_1" + + async def test_stream_alerts_raises_after_exceeding_error_budget( + self, async_client: AsyncHyperpingClient + ) -> None: + """Three consecutive errors with max_errors=3 re-raises the last exception.""" + poll1 = [_monitor_dict("mon_1", "Test", down=False)] + err = HyperpingAPIError("persistent failure") + + with ( + patch.object( + async_client, + "_request", + new=AsyncMock(side_effect=[poll1, err, err, err]), + ), + patch("asyncio.sleep", new=AsyncMock()), + ): + with pytest.raises(HyperpingAPIError): + async for _ in async_client.stream_alerts(poll_interval=0.0, max_errors=3): + pass + + async def test_stream_alerts_reraises_auth_error_immediately( + self, async_client: AsyncHyperpingClient + ) -> None: + """HyperpingAuthError bypasses tolerance and re-raises immediately.""" + poll1 = [_monitor_dict("mon_1", "Test", down=False)] + + with ( + patch.object( + async_client, + "_request", + new=AsyncMock( + side_effect=[ + poll1, + HyperpingAuthError("unauthorized", status_code=401), + ] + ), + ), + patch("asyncio.sleep", new=AsyncMock()), + ): + with pytest.raises(HyperpingAuthError): + async for _ in async_client.stream_alerts(poll_interval=0.0, max_errors=3): + pass + + async def test_stream_alerts_reraises_rate_limit_error_immediately( + self, async_client: AsyncHyperpingClient + ) -> None: + """HyperpingRateLimitError bypasses tolerance and re-raises immediately.""" + poll1 = [_monitor_dict("mon_1", "Test", down=False)] + + with ( + patch.object( + async_client, + "_request", + new=AsyncMock( + side_effect=[ + poll1, + HyperpingRateLimitError("rate limited", status_code=429), + ] + ), + ), + patch("asyncio.sleep", new=AsyncMock()), + ): + with pytest.raises(HyperpingRateLimitError): + async for _ in async_client.stream_alerts(poll_interval=0.0, max_errors=3): + pass + + async def test_stream_alerts_error_counter_resets_on_success( + self, async_client: AsyncHyperpingClient + ) -> None: + """Error counter resets after a successful poll; recovery poll alerts are yielded.""" + monitor_up = [_monitor_dict("mon_1", "Test", down=False)] + monitor_down = [_monitor_dict("mon_1", "Test", down=True)] + err = HyperpingAPIError("transient") + + with ( + patch.object( + async_client, + "_request", + new=AsyncMock( + side_effect=[ + monitor_up, # poll 1: baseline (consecutive_errors stays 0) + err, # poll 2: fail (consecutive_errors=1) + monitor_up, # poll 3: success, no change (consecutive_errors=0) + err, # poll 4: fail (consecutive_errors=1) + err, # poll 5: fail (consecutive_errors=2) + monitor_down, # poll 6: success, state change -> Alert(DOWN) + ] + ), + ), + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=[None, None, None, None, None, asyncio.CancelledError()]), + ), + ): + results = await _drain( + async_client.stream_alerts(poll_interval=0.0, max_errors=3), + until_cancelled=True, + ) + + assert len(results) == 1 + assert results[0].type == AlertType.DOWN + + async def test_stream_alerts_max_errors_none_tolerates_infinite_errors( + self, async_client: AsyncHyperpingClient + ) -> None: + """max_errors=None allows unlimited consecutive errors; generator recovers on success.""" + monitor_up = [_monitor_dict("mon_1", "Test", down=False)] + monitor_down = [_monitor_dict("mon_1", "Test", down=True)] + err = HyperpingAPIError("persistent") + + with ( + patch.object( + async_client, + "_request", + new=AsyncMock(side_effect=[monitor_up] + [err] * 10 + [monitor_down]), + ), + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=[None] * 11 + [asyncio.CancelledError()]), + ), + ): + results = await _drain( + async_client.stream_alerts(poll_interval=0.0, max_errors=None), + until_cancelled=True, + ) + + assert len(results) == 1 + assert results[0].type == AlertType.DOWN + + async def test_stream_incident_updates_tolerates_errors_within_budget( + self, async_client: AsyncHyperpingClient + ) -> None: + """A transient error within budget is tolerated; seen updates are not re-yielded.""" + upd1 = { + "uuid": "upd_1", + "date": "2026-01-01T00:00:00Z", + "text": "first", + "type": "investigating", + } + upd2 = { + "uuid": "upd_2", + "date": "2026-01-01T01:00:00Z", + "text": "second", + "type": "resolved", + } + incident_v1 = _incident("inci_1", [upd1]) + incident_v2 = _incident("inci_1", [upd1, upd2]) + + with ( + patch.object( + async_client, + "get_incident", + new=AsyncMock( + side_effect=[ + incident_v1, + HyperpingAPIError("transient"), + incident_v2, + ] + ), + ), + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=[None, None, asyncio.CancelledError()]), + ), + ): + results = await _drain( + async_client.stream_incident_updates("inci_1", poll_interval=0.0, max_errors=3), + until_cancelled=True, + ) + + uuids = [r.uuid for r in results] + assert "upd_2" in uuids + assert uuids.count("upd_1") == 1 + + async def test_stream_incident_updates_raises_after_exceeding_error_budget( + self, async_client: AsyncHyperpingClient + ) -> None: + """Three consecutive errors with max_errors=3 re-raises the exception.""" + incident_v1 = _incident("inci_1", []) + err = HyperpingAPIError("persistent") + + with ( + patch.object( + async_client, + "get_incident", + new=AsyncMock(side_effect=[incident_v1, err, err, err]), + ), + patch("asyncio.sleep", new=AsyncMock()), + ): + with pytest.raises(HyperpingAPIError): + async for _ in async_client.stream_incident_updates( + "inci_1", poll_interval=0.0, max_errors=3 + ): + pass + + async def test_stream_alerts_backoff_sleep_between_failures( + self, async_client: AsyncHyperpingClient + ) -> None: + """Backoff delays increase exponentially between failures and are capped at max_delay.""" + client = AsyncHyperpingClient( + api_key="sk_test_key", + retry_config=RetryConfig( + max_retries=0, initial_delay=1.0, backoff_factor=2.0, max_delay=3.0 + ), + ) + try: + monitor_up = [_monitor_dict("mon_1", "Test", down=False)] + err = HyperpingAPIError("transient") + recorded_sleeps: list[float] = [] + call_count = 0 + + async def tracking_sleep(delay: float) -> None: + nonlocal call_count + call_count += 1 + recorded_sleeps.append(delay) + if call_count >= 4: + raise asyncio.CancelledError + + with ( + patch.object( + client, + "_request", + new=AsyncMock(side_effect=[monitor_up, err, err, err, err]), + ), + patch("asyncio.sleep", new=tracking_sleep), + ): + await _drain( + client.stream_alerts(poll_interval=0.0, max_errors=10), + until_cancelled=True, + ) + finally: + await client.close() + + # sleep[0]: poll_interval after poll 1 success + # sleep[1]: 1st failure backoff = initial_delay * factor^0 = 1.0 + # sleep[2]: 2nd failure backoff = initial_delay * factor^1 = 2.0 + # sleep[3]: 3rd failure backoff = min(initial_delay * factor^2, max_delay) = 3.0 (capped) + assert recorded_sleeps[0] == 0.0 + assert recorded_sleeps[1] == 1.0 + assert recorded_sleeps[2] == 2.0 + assert recorded_sleeps[3] == 3.0 From 3363b5fc30bcc763acc1e5e9fc260b53392af9f4 Mon Sep 17 00:00:00 2001 From: Khaled Salhab Date: Sun, 14 Jun 2026 09:16:20 +0300 Subject: [PATCH 2/2] feat(streaming): add max_errors error-recovery to stream_alerts and stream_incident_updates (#b854ab) Both generators now accept max_errors (default 3) to tolerate N consecutive poll-cycle failures with exponential backoff before re-raising. Backoff uses retry_config.initial_delay/backoff_factor/max_delay for consistency with the per-request retry layer. HyperpingAuthError and HyperpingRateLimitError bypass tolerance and re-raise immediately. Adds retry_config attribute declaration to _AsyncClientProtocol so mypy can verify access. --- src/hyperping/_async_streaming_mixin.py | 136 +++++++++++++++++++----- src/hyperping/_protocols.py | 7 +- 2 files changed, 115 insertions(+), 28 deletions(-) diff --git a/src/hyperping/_async_streaming_mixin.py b/src/hyperping/_async_streaming_mixin.py index 476115b..d975f65 100644 --- a/src/hyperping/_async_streaming_mixin.py +++ b/src/hyperping/_async_streaming_mixin.py @@ -16,6 +16,7 @@ from hyperping._protocols import _AsyncClientProtocol from hyperping._utils import parse_list, unwrap_list, validate_id from hyperping.endpoints import Endpoint +from hyperping.exceptions import HyperpingAPIError, HyperpingAuthError, HyperpingRateLimitError from hyperping.models import IncidentUpdate, Monitor from hyperping.models._alert_models import Alert, AlertType @@ -32,7 +33,12 @@ class AsyncStreamingMixin(_AsyncClientProtocol): async def get_incident(self, incident_id: str) -> Incident: ... - async def stream_alerts(self, *, poll_interval: float = 30.0) -> AsyncIterator[Alert]: + async def stream_alerts( + self, + *, + poll_interval: float = 30.0, + max_errors: int | None = 3, + ) -> AsyncIterator[Alert]: """Stream monitor state-transition events. Polls ``GET /v1/monitors`` at *poll_interval* seconds. Yields an @@ -42,12 +48,27 @@ async def stream_alerts(self, *, poll_interval: float = 30.0) -> AsyncIterator[A Rate-limit note: at the default 30-second interval this uses 2 requests/min, roughly 0.67% of the 300 req/min account limit. + The *max_errors* parameter operates one level above the per-request + retry budget in :attr:`retry_config`. It tolerates N consecutive + poll-cycle failures after :meth:`_request` has already exhausted + its own retry budget, preserving the baseline state across transient + outages. :class:`~hyperping.exceptions.HyperpingAuthError` and + :class:`~hyperping.exceptions.HyperpingRateLimitError` always + bypass this tolerance and re-raise immediately. + Args: - poll_interval: Seconds between polls. Defaults to 30.0. + poll_interval: Seconds between successful polls. Defaults to 30.0. + max_errors: Maximum consecutive poll-cycle failures to tolerate + before re-raising. ``None`` means unlimited. Defaults to 3. Yields: :class:`~hyperping.models.Alert` on each monitor state transition. + Raises: + HyperpingAuthError: If authentication fails (bypasses tolerance). + HyperpingRateLimitError: If rate-limited (bypasses tolerance). + HyperpingAPIError: After *max_errors* consecutive failures. + Note: This implementation is provisional. The ``Alert`` model fields are inferred from the monitors list endpoint. When Hyperping ships a @@ -56,30 +77,56 @@ async def stream_alerts(self, *, poll_interval: float = 30.0) -> AsyncIterator[A to this method's signature. """ baseline: dict[str, bool] = {} + consecutive_errors = 0 while True: - response = await self._request("GET", Endpoint.MONITORS) - monitors = parse_list(unwrap_list(response, "monitors"), Monitor, "monitor") - - for monitor in monitors: - uuid = monitor.uuid - down = monitor.down - - if uuid in baseline and baseline[uuid] != down: - alert_type = AlertType.DOWN if down else AlertType.UP - yield Alert( - monitor_uuid=uuid, - monitor_name=monitor.name, - type=alert_type, - timestamp=datetime.now(UTC).isoformat(), - ) - - baseline[uuid] = down + try: + response = await self._request("GET", Endpoint.MONITORS) + monitors = parse_list(unwrap_list(response, "monitors"), Monitor, "monitor") + + for monitor in monitors: + uuid = monitor.uuid + down = monitor.down + + if uuid in baseline and baseline[uuid] != down: + alert_type = AlertType.DOWN if down else AlertType.UP + yield Alert( + monitor_uuid=uuid, + monitor_name=monitor.name, + type=alert_type, + timestamp=datetime.now(UTC).isoformat(), + ) + + baseline[uuid] = down + + consecutive_errors = 0 + except (HyperpingAuthError, HyperpingRateLimitError): + raise + except HyperpingAPIError: + consecutive_errors += 1 + logger.warning( + "stream_alerts: poll cycle failed (consecutive=%d)", + consecutive_errors, + exc_info=True, + ) + if max_errors is not None and consecutive_errors >= max_errors: + raise + delay = min( + self.retry_config.initial_delay + * self.retry_config.backoff_factor ** (consecutive_errors - 1), + self.retry_config.max_delay, + ) + await asyncio.sleep(delay) + continue await asyncio.sleep(poll_interval) async def stream_incident_updates( - self, incident_uuid: str, *, poll_interval: float = 30.0 + self, + incident_uuid: str, + *, + poll_interval: float = 30.0, + max_errors: int | None = 3, ) -> AsyncIterator[IncidentUpdate]: """Stream new updates for an incident. @@ -88,9 +135,19 @@ async def stream_incident_updates( by update UUID. All updates present on the first poll are yielded immediately; only new updates are yielded on subsequent polls. + The *max_errors* parameter operates one level above the per-request + retry budget in :attr:`retry_config`. It tolerates N consecutive + poll-cycle failures after :meth:`_request` has already exhausted + its own retry budget, preserving the ``seen`` set across transient + outages. :class:`~hyperping.exceptions.HyperpingAuthError` and + :class:`~hyperping.exceptions.HyperpingRateLimitError` always + bypass this tolerance and re-raise immediately. + Args: incident_uuid: UUID of the incident to watch. - poll_interval: Seconds between polls. Defaults to 30.0. + poll_interval: Seconds between successful polls. Defaults to 30.0. + max_errors: Maximum consecutive poll-cycle failures to tolerate + before re-raising. ``None`` means unlimited. Defaults to 3. Yields: :class:`~hyperping.models.IncidentUpdate` for each new update. @@ -98,16 +155,41 @@ async def stream_incident_updates( Raises: ValueError: If *incident_uuid* contains unsafe characters. HyperpingNotFoundError: If the incident does not exist (first poll). + HyperpingAuthError: If authentication fails (bypasses tolerance). + HyperpingRateLimitError: If rate-limited (bypasses tolerance). + HyperpingAPIError: After *max_errors* consecutive failures. """ validate_id(incident_uuid, "incident_uuid") seen: set[str] = set() + consecutive_errors = 0 while True: - incident = await self.get_incident(incident_uuid) - - for update in incident.updates: - if update.uuid not in seen: - seen.add(update.uuid) - yield update + try: + incident = await self.get_incident(incident_uuid) + + for update in incident.updates: + if update.uuid not in seen: + seen.add(update.uuid) + yield update + + consecutive_errors = 0 + except (HyperpingAuthError, HyperpingRateLimitError): + raise + except HyperpingAPIError: + consecutive_errors += 1 + logger.warning( + "stream_incident_updates: poll cycle failed (consecutive=%d)", + consecutive_errors, + exc_info=True, + ) + if max_errors is not None and consecutive_errors >= max_errors: + raise + delay = min( + self.retry_config.initial_delay + * self.retry_config.backoff_factor ** (consecutive_errors - 1), + self.retry_config.max_delay, + ) + await asyncio.sleep(delay) + continue await asyncio.sleep(poll_interval) diff --git a/src/hyperping/_protocols.py b/src/hyperping/_protocols.py index 6266b76..86850f3 100644 --- a/src/hyperping/_protocols.py +++ b/src/hyperping/_protocols.py @@ -8,7 +8,10 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from hyperping.client import RetryConfig class _ClientProtocol: @@ -45,6 +48,8 @@ class _AsyncClientProtocol: :class:`~hyperping._async_client.AsyncHyperpingClient`. """ + retry_config: RetryConfig + async def _request( self, method: str,