Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cloud_pipelines_backend/launchers/kubernetes_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,16 @@ def launcher_error_message(self) -> str | None:
launcher_error_message = f"Kubernetes error. Reason: {main_container_terminated_state.reason}, message: {main_container_terminated_state.message}"
return launcher_error_message

@property
def pending_diagnostics(self) -> str | None:
state = self._get_main_container_state()
waiting = state.waiting if state is not None else None
if waiting is None or not waiting.reason:
return None
if waiting.message:
return f"{waiting.reason}: {waiting.message}"
return waiting.reason

def to_dict(self) -> dict[str, Any]:
pod_dict = _serialize_kubernetes_object_to_compact_dict(self._debug_pod)
result = dict(
Expand Down
47 changes: 47 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
default_task_annotations: dict[str, Any] | None = None,
sleep_seconds_between_queue_sweeps: float = 1.0,
output_data_purge_duration: datetime.timedelta = None,
max_pending_duration: datetime.timedelta | None = None,
*,
# Internal/experimental:
_max_queue_batch_size: int = 1,
Expand All @@ -65,6 +66,7 @@ def __init__(
self._queued_executions_queue_idle = False
self._running_executions_queue_idle = False
self._output_data_purge_duration = output_data_purge_duration
self._max_pending_duration = max_pending_duration

self._max_queue_batch_size = _max_queue_batch_size
self._max_queue_batch_duration = _max_queue_batch_duration
Expand Down Expand Up @@ -765,6 +767,35 @@ def internal_process_one_running_execution(
reloaded_launched_container.status
)
if new_status == previous_status:
if (
new_status == launcher_interfaces.ContainerStatus.PENDING
and _pending_deadline_exceeded(
created_at=container_execution.created_at,
now=current_time,
max_pending_duration=self._max_pending_duration,
)
):
# The container never started (e.g. an unschedulable pod or a
# gcsfuse/image-pull boot wedge). Terminate it and let the outer
# exception handler mark it SYSTEM_ERROR and skip downstream.
pending_duration = current_time - container_execution.created_at
diagnostics = getattr(
reloaded_launched_container, "pending_diagnostics", None
)
try:
_retry(reloaded_launched_container.upload_log)
except Exception:
_logger.exception(
"Error uploading logs before pending-deadline termination."
)
reloaded_launched_container.terminate()
message = (
f"Task pending {pending_duration}, never started"
f" (deadline {self._max_pending_duration})."
)
if diagnostics:
message += f"\n{diagnostics}"
raise OrchestratorError(message)
_logger.info(f"Container execution remains in {new_status} state.")
return
_logger.info(
Expand Down Expand Up @@ -1055,6 +1086,22 @@ def _get_current_time() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)


def _pending_deadline_exceeded(
created_at: datetime.datetime | None,
now: datetime.datetime,
max_pending_duration: datetime.timedelta | None,
) -> bool:
"""Whether a still-pending container has outlived its start deadline.

Returns False when the watchdog is disabled (``max_pending_duration`` is
None) or the creation time is unknown, so legacy rows without
``created_at`` are never force-failed.
"""
if max_pending_duration is None or created_at is None:
return False
return (now - created_at) > max_pending_duration


def _generate_random_id() -> str:
import os
import time
Expand Down
140 changes: 140 additions & 0 deletions tests/test_pending_deadline_watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Tests for the PENDING-deadline watchdog and kubelet reason surfacing.

Covers:
* ``_pending_deadline_exceeded`` boundary logic in the orchestrator.
* ``LaunchedKubernetesContainer.pending_diagnostics`` reason extraction.

The diagnostics tests build ``V1Pod`` fixtures directly, so they run offline
without a cluster.
"""

from __future__ import annotations

import datetime

from kubernetes import client as k8s_client_lib

from cloud_pipelines_backend import orchestrator_sql
from cloud_pipelines_backend.launchers import kubernetes_launchers

_NOW = datetime.datetime(2026, 6, 30, 12, 0, 0, tzinfo=datetime.timezone.utc)


def test_pending_deadline_disabled_when_duration_is_none():
created_at = _NOW - datetime.timedelta(days=6)
assert (
orchestrator_sql._pending_deadline_exceeded(
created_at=created_at, now=_NOW, max_pending_duration=None
)
is False
)


def test_pending_deadline_skipped_when_created_at_unknown():
assert (
orchestrator_sql._pending_deadline_exceeded(
created_at=None,
now=_NOW,
max_pending_duration=datetime.timedelta(minutes=30),
)
is False
)


def test_pending_deadline_not_exceeded_under_threshold():
created_at = _NOW - datetime.timedelta(minutes=10)
assert (
orchestrator_sql._pending_deadline_exceeded(
created_at=created_at,
now=_NOW,
max_pending_duration=datetime.timedelta(minutes=30),
)
is False
)


def test_pending_deadline_exceeded_past_threshold():
created_at = _NOW - datetime.timedelta(minutes=31)
assert (
orchestrator_sql._pending_deadline_exceeded(
created_at=created_at,
now=_NOW,
max_pending_duration=datetime.timedelta(minutes=30),
)
is True
)


def test_pending_deadline_boundary_is_strict():
# Exactly at the threshold is not yet exceeded.
created_at = _NOW - datetime.timedelta(minutes=30)
assert (
orchestrator_sql._pending_deadline_exceeded(
created_at=created_at,
now=_NOW,
max_pending_duration=datetime.timedelta(minutes=30),
)
is False
)


def _make_launched_container(
pod: k8s_client_lib.V1Pod,
) -> kubernetes_launchers.LaunchedKubernetesContainer:
return kubernetes_launchers.LaunchedKubernetesContainer(
pod_name="task-abc-rtdrr",
namespace="oasis",
output_uris={},
log_uri="memory://log",
debug_pod=pod,
)


def test_pending_diagnostics_surfaces_gcsfuse_mount_wedge():
pod = k8s_client_lib.V1Pod(
metadata=k8s_client_lib.V1ObjectMeta(name="task-abc-rtdrr"),
status=k8s_client_lib.V1PodStatus(
phase="Pending",
container_statuses=[
k8s_client_lib.V1ContainerStatus(
name="main",
image="img",
image_id="",
ready=False,
restart_count=0,
state=k8s_client_lib.V1ContainerState(
waiting=k8s_client_lib.V1ContainerStateWaiting(
reason="CreateContainerConfigError",
message=(
"MountVolume.SetUp failed for volume"
' "gcsfuse-prd-oasis-tmp": code = Unauthenticated'
" desc = failed to prepare storage service"
),
)
),
)
],
),
)
diagnostics = _make_launched_container(pod).pending_diagnostics
assert diagnostics is not None
assert "CreateContainerConfigError" in diagnostics
assert "code = Unauthenticated" in diagnostics


def test_pending_diagnostics_returns_none_without_main_container_status():
# Before the kubelet creates a container status (e.g. an unschedulable pod),
# there is no main-container waiting reason to surface.
pod = k8s_client_lib.V1Pod(
metadata=k8s_client_lib.V1ObjectMeta(name="task-abc-rtdrr"),
status=k8s_client_lib.V1PodStatus(phase="Pending"),
)
assert _make_launched_container(pod).pending_diagnostics is None


def test_pending_diagnostics_returns_none_when_no_status():
pod = k8s_client_lib.V1Pod(
metadata=k8s_client_lib.V1ObjectMeta(name="task-abc-rtdrr"),
status=None,
)
assert _make_launched_container(pod).pending_diagnostics is None
Loading