Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dev = [
"mypy==2.1.0",
"ruff>=0.15.16",
"respx==0.23.1",
"time-machine>=2.19.0,<3.0.0",
"time-machine>=2.19.0",
"ty>=0.0.25",
"zizmor>=1.24.1",
]
Expand Down
12 changes: 7 additions & 5 deletions tests/test_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pytest
import respx
from httpx import Response
from time_machine import TimeMachineFixture

from fastapi_cloud_cli.utils.api import (
STREAM_LOGS_MAX_RETRIES,
Expand Down Expand Up @@ -365,13 +364,12 @@ def test_stream_build_logs_connection_closed_without_complete_failed_or_timeout(
def test_stream_build_logs_retry_timeout(
logs_route: respx.Route,
client: APIClient,
time_machine: TimeMachineFixture,
deployment_id: str,
) -> None:
time_machine.move_to("2025-11-01 13:00:00", tick=False)
clock = [0.0] # Container with a value to use as a result of time.monotonic() mock

def responses(request: httpx.Request, route: respx.Route) -> Response:
time_machine.shift(timedelta(hours=1))
clock[0] += timedelta(hours=1).total_seconds() # Simulate time passing

return Response(
200,
Expand All @@ -382,7 +380,11 @@ def responses(request: httpx.Request, route: respx.Route) -> Response:

logs_route.mock(side_effect=responses)

with patch("time.sleep"), pytest.raises(TimeoutError, match="timed out"):
with (
patch("time.monotonic", side_effect=lambda: clock[0]),
patch("time.sleep"),
pytest.raises(TimeoutError, match="timed out"),
):
list(client.stream_build_logs(deployment_id))


Expand Down
53 changes: 33 additions & 20 deletions tests/test_cli_deploy.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import itertools
import json
import random
import re
import string
from datetime import timedelta
from pathlib import Path
from typing import TypedDict
from unittest.mock import patch
from unittest.mock import call, patch

import httpx
import pytest
import respx
import typer
from httpx import Response
from rich_toolkit.progress import Progress
from time_machine import TimeMachineFixture
from typer.testing import CliRunner, Result

from fastapi_cloud_cli.cli import app
from fastapi_cloud_cli.commands.deploy import wait
from fastapi_cloud_cli.config import Settings
from fastapi_cloud_cli.utils.api import StreamLogError, TooManyRetriesError
from tests.conftest import ConfiguredApp
Expand Down Expand Up @@ -1434,17 +1434,11 @@ def test_shows_error_message_on_build_log_http_error(


@pytest.mark.respx
@patch(
"fastapi_cloud_cli.commands.deploy.wait.WAITING_MESSAGES",
[("⏳", "short wait message")],
)
def test_short_wait_messages(
logged_in_cli: None,
tmp_path: Path,
respx_mock: respx.MockRouter,
time_machine: TimeMachineFixture,
) -> None:
time_machine.move_to("2025-11-01 13:00:00", tick=False)
app_data = _get_random_app()
team_data = _get_random_team()
app_id = app_data["id"]
Expand Down Expand Up @@ -1473,9 +1467,13 @@ def test_short_wait_messages(
)
)

# Each build-log request advances the fake monotonic clock so the elapsed
# time determines which message pool is used.
clock = [0.0]

def build_logs_handler(request: httpx.Request, route: respx.Route) -> Response:
if route.call_count <= 2:
time_machine.shift(timedelta(seconds=3))
clock[0] += 3
return Response(
200,
content=build_logs_response(
Expand Down Expand Up @@ -1503,26 +1501,28 @@ def build_logs_handler(request: httpx.Request, route: respx.Route) -> Response:
return_value=Response(200, json={**deployment_data, "status": "success"})
)

with changing_dir(tmp_path), patch("time.sleep"):
with (
changing_dir(tmp_path),
patch("time.sleep"),
patch("time.monotonic", side_effect=lambda: clock[0]),
patch.object(wait, "cycle", wraps=itertools.cycle) as cycle_spy,
):
result = runner.invoke(app, ["deploy"])

assert result.exit_code == 0
assert "Ready the chicken!" in result.output

# This is a short wait, so LONG_WAIT_MESSAGES should not be accessed by the
# `cycle` function.
assert call(wait.LONG_WAIT_MESSAGES) not in cycle_spy.call_args_list


@pytest.mark.respx
@patch(
"fastapi_cloud_cli.commands.deploy.wait.LONG_WAIT_MESSAGES",
[("⏳", "long wait message")],
)
def test_long_wait_messages(
logged_in_cli: None,
tmp_path: Path,
respx_mock: respx.MockRouter,
time_machine: TimeMachineFixture,
) -> None:
time_machine.move_to("2025-11-01 13:00:00", tick=False)

app_data = _get_random_app()
team_data = _get_random_team()
app_id = app_data["id"]
Expand Down Expand Up @@ -1551,9 +1551,13 @@ def test_long_wait_messages(
)
)

# Each build-log request advances the fake monotonic clock so the elapsed
# time determines which message pool is used.
clock = [0.0]

def build_logs_handler(request: httpx.Request, route: respx.Route) -> Response:
if route.call_count <= 2:
time_machine.shift(timedelta(seconds=35))
clock[0] += 35
return Response(
200,
content=build_logs_response(
Expand Down Expand Up @@ -1581,12 +1585,21 @@ def build_logs_handler(request: httpx.Request, route: respx.Route) -> Response:
return_value=Response(200, json={**deployment_data, "status": "success"})
)

with changing_dir(tmp_path), patch("time.sleep"):
with (
changing_dir(tmp_path),
patch("time.sleep"),
patch("time.monotonic", side_effect=lambda: clock[0]),
patch.object(wait, "cycle", wraps=itertools.cycle) as cycle_spy,
):
result = runner.invoke(app, ["deploy"])

assert result.exit_code == 0
assert "Ready the chicken!" in result.output

# This is a long wait, so LONG_WAIT_MESSAGES should be accessed by the `cycle`
# function.
assert call(wait.LONG_WAIT_MESSAGES) in cycle_spy.call_args_list


@pytest.mark.respx
def test_calls_upload_cancelled_when_user_interrupts(
Expand Down
Loading
Loading