Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions tests/unit/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,63 @@ async def capturing_sleep(interval: float) -> None:

assert recorded_intervals[0] == 5.0

async def test_stream_alerts_empty_monitors_yields_nothing(
self, async_client: AsyncHyperpingClient
) -> None:
"""Empty monitors list on first poll yields nothing."""
with (
patch.object(async_client, "_request", new=AsyncMock(return_value=[])),
patch("asyncio.sleep", new=AsyncMock(side_effect=asyncio.CancelledError)),
):
results = await _drain(
async_client.stream_alerts(poll_interval=0.0), until_cancelled=True
)

assert results == []

async def test_stream_alerts_empty_monitors_sleep_still_called(
self, async_client: AsyncHyperpingClient
) -> None:
"""asyncio.sleep is still called when the monitors list is empty."""
recorded_intervals: list[float] = []

async def capturing_sleep(interval: float) -> None:
recorded_intervals.append(interval)
raise asyncio.CancelledError

with (
patch.object(async_client, "_request", new=AsyncMock(return_value=[])),
patch("asyncio.sleep", new=capturing_sleep),
):
await _drain(async_client.stream_alerts(poll_interval=0.0), until_cancelled=True)

assert len(recorded_intervals) == 1

async def test_stream_alerts_empty_then_monitors_appear_no_transition(
self, async_client: AsyncHyperpingClient
) -> None:
"""Monitors appearing after an empty first poll set the baseline without yielding."""
poll1: list = []
poll2 = [_monitor_dict("mon_1", "Test", down=False)]

sleep_calls = 0

async def controlled_sleep(_interval: float) -> None:
nonlocal sleep_calls
sleep_calls += 1
if sleep_calls >= 2:
raise asyncio.CancelledError

with (
patch.object(async_client, "_request", new=AsyncMock(side_effect=[poll1, poll2])),
patch("asyncio.sleep", new=controlled_sleep),
):
results = await _drain(
async_client.stream_alerts(poll_interval=0.0), until_cancelled=True
)

assert results == []


# ---------------------------------------------------------------------------
# stream_incident_updates tests
Expand Down