Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 109 additions & 27 deletions src/hyperping/_async_streaming_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from hyperping._protocols import _AsyncClientProtocol
from hyperping._utils import parse_list, unwrap_list, validate_id
from hyperping.endpoints import Endpoint
from hyperping.exceptions import HyperpingAPIError, HyperpingAuthError, HyperpingRateLimitError
from hyperping.models import IncidentUpdate, Monitor
from hyperping.models._alert_models import Alert, AlertType

Expand All @@ -32,7 +33,12 @@ class AsyncStreamingMixin(_AsyncClientProtocol):

async def get_incident(self, incident_id: str) -> Incident: ...

async def stream_alerts(self, *, poll_interval: float = 30.0) -> AsyncIterator[Alert]:
async def stream_alerts(
self,
*,
poll_interval: float = 30.0,
max_errors: int | None = 3,
) -> AsyncIterator[Alert]:
"""Stream monitor state-transition events.

Polls ``GET /v1/monitors`` at *poll_interval* seconds. Yields an
Expand All @@ -42,12 +48,27 @@ async def stream_alerts(self, *, poll_interval: float = 30.0) -> AsyncIterator[A
Rate-limit note: at the default 30-second interval this uses
2 requests/min, roughly 0.67% of the 300 req/min account limit.

The *max_errors* parameter operates one level above the per-request
retry budget in :attr:`retry_config`. It tolerates N consecutive
poll-cycle failures after :meth:`_request` has already exhausted
its own retry budget, preserving the baseline state across transient
outages. :class:`~hyperping.exceptions.HyperpingAuthError` and
:class:`~hyperping.exceptions.HyperpingRateLimitError` always
bypass this tolerance and re-raise immediately.

Args:
poll_interval: Seconds between polls. Defaults to 30.0.
poll_interval: Seconds between successful polls. Defaults to 30.0.
max_errors: Maximum consecutive poll-cycle failures to tolerate
before re-raising. ``None`` means unlimited. Defaults to 3.

Yields:
:class:`~hyperping.models.Alert` on each monitor state transition.

Raises:
HyperpingAuthError: If authentication fails (bypasses tolerance).
HyperpingRateLimitError: If rate-limited (bypasses tolerance).
HyperpingAPIError: After *max_errors* consecutive failures.

Note:
This implementation is provisional. The ``Alert`` model fields are
inferred from the monitors list endpoint. When Hyperping ships a
Expand All @@ -56,30 +77,56 @@ async def stream_alerts(self, *, poll_interval: float = 30.0) -> AsyncIterator[A
to this method's signature.
"""
baseline: dict[str, bool] = {}
consecutive_errors = 0

while True:
response = await self._request("GET", Endpoint.MONITORS)
monitors = parse_list(unwrap_list(response, "monitors"), Monitor, "monitor")

for monitor in monitors:
uuid = monitor.uuid
down = monitor.down

if uuid in baseline and baseline[uuid] != down:
alert_type = AlertType.DOWN if down else AlertType.UP
yield Alert(
monitor_uuid=uuid,
monitor_name=monitor.name,
type=alert_type,
timestamp=datetime.now(UTC).isoformat(),
)

baseline[uuid] = down
try:
response = await self._request("GET", Endpoint.MONITORS)
monitors = parse_list(unwrap_list(response, "monitors"), Monitor, "monitor")

for monitor in monitors:
uuid = monitor.uuid
down = monitor.down

if uuid in baseline and baseline[uuid] != down:
alert_type = AlertType.DOWN if down else AlertType.UP
yield Alert(
monitor_uuid=uuid,
monitor_name=monitor.name,
type=alert_type,
timestamp=datetime.now(UTC).isoformat(),
)

baseline[uuid] = down

consecutive_errors = 0
except (HyperpingAuthError, HyperpingRateLimitError):
raise
except HyperpingAPIError:
consecutive_errors += 1
logger.warning(
"stream_alerts: poll cycle failed (consecutive=%d)",
consecutive_errors,
exc_info=True,
)
if max_errors is not None and consecutive_errors >= max_errors:
raise
delay = min(
self.retry_config.initial_delay
* self.retry_config.backoff_factor ** (consecutive_errors - 1),
self.retry_config.max_delay,
)
await asyncio.sleep(delay)
continue

await asyncio.sleep(poll_interval)

async def stream_incident_updates(
self, incident_uuid: str, *, poll_interval: float = 30.0
self,
incident_uuid: str,
*,
poll_interval: float = 30.0,
max_errors: int | None = 3,
) -> AsyncIterator[IncidentUpdate]:
"""Stream new updates for an incident.

Expand All @@ -88,26 +135,61 @@ async def stream_incident_updates(
by update UUID. All updates present on the first poll are yielded
immediately; only new updates are yielded on subsequent polls.

The *max_errors* parameter operates one level above the per-request
retry budget in :attr:`retry_config`. It tolerates N consecutive
poll-cycle failures after :meth:`_request` has already exhausted
its own retry budget, preserving the ``seen`` set across transient
outages. :class:`~hyperping.exceptions.HyperpingAuthError` and
:class:`~hyperping.exceptions.HyperpingRateLimitError` always
bypass this tolerance and re-raise immediately.

Args:
incident_uuid: UUID of the incident to watch.
poll_interval: Seconds between polls. Defaults to 30.0.
poll_interval: Seconds between successful polls. Defaults to 30.0.
max_errors: Maximum consecutive poll-cycle failures to tolerate
before re-raising. ``None`` means unlimited. Defaults to 3.

Yields:
:class:`~hyperping.models.IncidentUpdate` for each new update.

Raises:
ValueError: If *incident_uuid* contains unsafe characters.
HyperpingNotFoundError: If the incident does not exist (first poll).
HyperpingAuthError: If authentication fails (bypasses tolerance).
HyperpingRateLimitError: If rate-limited (bypasses tolerance).
HyperpingAPIError: After *max_errors* consecutive failures.
"""
validate_id(incident_uuid, "incident_uuid")
seen: set[str] = set()
consecutive_errors = 0

while True:
incident = await self.get_incident(incident_uuid)

for update in incident.updates:
if update.uuid not in seen:
seen.add(update.uuid)
yield update
try:
incident = await self.get_incident(incident_uuid)

for update in incident.updates:
if update.uuid not in seen:
seen.add(update.uuid)
yield update

consecutive_errors = 0
except (HyperpingAuthError, HyperpingRateLimitError):
raise
except HyperpingAPIError:
consecutive_errors += 1
logger.warning(
"stream_incident_updates: poll cycle failed (consecutive=%d)",
consecutive_errors,
exc_info=True,
)
if max_errors is not None and consecutive_errors >= max_errors:
raise
delay = min(
self.retry_config.initial_delay
* self.retry_config.backoff_factor ** (consecutive_errors - 1),
self.retry_config.max_delay,
)
await asyncio.sleep(delay)
continue

await asyncio.sleep(poll_interval)
7 changes: 6 additions & 1 deletion src/hyperping/_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

from __future__ import annotations

from typing import Any
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from hyperping.client import RetryConfig


class _ClientProtocol:
Expand Down Expand Up @@ -45,6 +48,8 @@ class _AsyncClientProtocol:
:class:`~hyperping._async_client.AsyncHyperpingClient`.
"""

retry_config: RetryConfig

async def _request(
self,
method: str,
Expand Down
Loading