From 488d892271b09e9e7f385bca2c3c9c798a2cb804 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Tue, 30 Jun 2026 12:23:51 -0700 Subject: [PATCH] feat: Orchestrator - Terminate PENDING-wedged executions with a deadline watchdog A pod that can never boot (e.g. a gcsfuse CSI-node mount wedge: MountVolume.SetUp failed ... code = Unauthenticated -> CreateContainerConfigError) stays in phase Pending forever. The orchestrator polls it indefinitely with no deadline, never terminating it or marking it SYSTEM_ERROR, so a run can sit stuck for days. PENDING-deadline watchdog: add an optional max_pending_duration to the orchestrator. In internal_process_one_running_execution, when a container is still PENDING past the deadline, terminate it and raise OrchestratorError; the existing outer handler marks it SYSTEM_ERROR, records the error, and skips downstream. The deadline check is a pure helper (_pending_deadline_exceeded) and defaults to disabled (None), so behavior is unchanged until a deployment opts in. Rows without created_at are never force-failed. Real kubelet reason in the error: add a pending_diagnostics property to the Kubernetes launcher that returns the main container's waiting reason and message (e.g. CreateContainerConfigError + the MountVolume.SetUp failure), so the SYSTEM_ERROR carries the real boot failure instead of a bare timeout. The orchestrator reads it via getattr, so no launcher interface change is needed. --- .../launchers/kubernetes_launchers.py | 10 ++ cloud_pipelines_backend/orchestrator_sql.py | 47 ++++++ tests/test_pending_deadline_watchdog.py | 140 ++++++++++++++++++ 3 files changed, 197 insertions(+) create mode 100644 tests/test_pending_deadline_watchdog.py diff --git a/cloud_pipelines_backend/launchers/kubernetes_launchers.py b/cloud_pipelines_backend/launchers/kubernetes_launchers.py index 25cd267b..7fad299c 100644 --- a/cloud_pipelines_backend/launchers/kubernetes_launchers.py +++ b/cloud_pipelines_backend/launchers/kubernetes_launchers.py @@ -875,6 +875,16 @@ def launcher_error_message(self) -> str | None: launcher_error_message = f"Kubernetes error. Reason: {main_container_terminated_state.reason}, message: {main_container_terminated_state.message}" return launcher_error_message + @property + def pending_diagnostics(self) -> str | None: + state = self._get_main_container_state() + waiting = state.waiting if state is not None else None + if waiting is None or not waiting.reason: + return None + if waiting.message: + return f"{waiting.reason}: {waiting.message}" + return waiting.reason + def to_dict(self) -> dict[str, Any]: pod_dict = _serialize_kubernetes_object_to_compact_dict(self._debug_pod) result = dict( diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 1a81f456..256c2c20 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -50,6 +50,7 @@ def __init__( default_task_annotations: dict[str, Any] | None = None, sleep_seconds_between_queue_sweeps: float = 1.0, output_data_purge_duration: datetime.timedelta = None, + max_pending_duration: datetime.timedelta | None = None, *, # Internal/experimental: _max_queue_batch_size: int = 1, @@ -65,6 +66,7 @@ def __init__( self._queued_executions_queue_idle = False self._running_executions_queue_idle = False self._output_data_purge_duration = output_data_purge_duration + self._max_pending_duration = max_pending_duration self._max_queue_batch_size = _max_queue_batch_size self._max_queue_batch_duration = _max_queue_batch_duration @@ -765,6 +767,35 @@ def internal_process_one_running_execution( reloaded_launched_container.status ) if new_status == previous_status: + if ( + new_status == launcher_interfaces.ContainerStatus.PENDING + and _pending_deadline_exceeded( + created_at=container_execution.created_at, + now=current_time, + max_pending_duration=self._max_pending_duration, + ) + ): + # The container never started (e.g. an unschedulable pod or a + # gcsfuse/image-pull boot wedge). Terminate it and let the outer + # exception handler mark it SYSTEM_ERROR and skip downstream. + pending_duration = current_time - container_execution.created_at + diagnostics = getattr( + reloaded_launched_container, "pending_diagnostics", None + ) + try: + _retry(reloaded_launched_container.upload_log) + except Exception: + _logger.exception( + "Error uploading logs before pending-deadline termination." + ) + reloaded_launched_container.terminate() + message = ( + f"Task pending {pending_duration}, never started" + f" (deadline {self._max_pending_duration})." + ) + if diagnostics: + message += f"\n{diagnostics}" + raise OrchestratorError(message) _logger.info(f"Container execution remains in {new_status} state.") return _logger.info( @@ -1055,6 +1086,22 @@ def _get_current_time() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc) +def _pending_deadline_exceeded( + created_at: datetime.datetime | None, + now: datetime.datetime, + max_pending_duration: datetime.timedelta | None, +) -> bool: + """Whether a still-pending container has outlived its start deadline. + + Returns False when the watchdog is disabled (``max_pending_duration`` is + None) or the creation time is unknown, so legacy rows without + ``created_at`` are never force-failed. + """ + if max_pending_duration is None or created_at is None: + return False + return (now - created_at) > max_pending_duration + + def _generate_random_id() -> str: import os import time diff --git a/tests/test_pending_deadline_watchdog.py b/tests/test_pending_deadline_watchdog.py new file mode 100644 index 00000000..4465dd27 --- /dev/null +++ b/tests/test_pending_deadline_watchdog.py @@ -0,0 +1,140 @@ +"""Tests for the PENDING-deadline watchdog and kubelet reason surfacing. + +Covers: +* ``_pending_deadline_exceeded`` boundary logic in the orchestrator. +* ``LaunchedKubernetesContainer.pending_diagnostics`` reason extraction. + +The diagnostics tests build ``V1Pod`` fixtures directly, so they run offline +without a cluster. +""" + +from __future__ import annotations + +import datetime + +from kubernetes import client as k8s_client_lib + +from cloud_pipelines_backend import orchestrator_sql +from cloud_pipelines_backend.launchers import kubernetes_launchers + +_NOW = datetime.datetime(2026, 6, 30, 12, 0, 0, tzinfo=datetime.timezone.utc) + + +def test_pending_deadline_disabled_when_duration_is_none(): + created_at = _NOW - datetime.timedelta(days=6) + assert ( + orchestrator_sql._pending_deadline_exceeded( + created_at=created_at, now=_NOW, max_pending_duration=None + ) + is False + ) + + +def test_pending_deadline_skipped_when_created_at_unknown(): + assert ( + orchestrator_sql._pending_deadline_exceeded( + created_at=None, + now=_NOW, + max_pending_duration=datetime.timedelta(minutes=30), + ) + is False + ) + + +def test_pending_deadline_not_exceeded_under_threshold(): + created_at = _NOW - datetime.timedelta(minutes=10) + assert ( + orchestrator_sql._pending_deadline_exceeded( + created_at=created_at, + now=_NOW, + max_pending_duration=datetime.timedelta(minutes=30), + ) + is False + ) + + +def test_pending_deadline_exceeded_past_threshold(): + created_at = _NOW - datetime.timedelta(minutes=31) + assert ( + orchestrator_sql._pending_deadline_exceeded( + created_at=created_at, + now=_NOW, + max_pending_duration=datetime.timedelta(minutes=30), + ) + is True + ) + + +def test_pending_deadline_boundary_is_strict(): + # Exactly at the threshold is not yet exceeded. + created_at = _NOW - datetime.timedelta(minutes=30) + assert ( + orchestrator_sql._pending_deadline_exceeded( + created_at=created_at, + now=_NOW, + max_pending_duration=datetime.timedelta(minutes=30), + ) + is False + ) + + +def _make_launched_container( + pod: k8s_client_lib.V1Pod, +) -> kubernetes_launchers.LaunchedKubernetesContainer: + return kubernetes_launchers.LaunchedKubernetesContainer( + pod_name="task-abc-rtdrr", + namespace="oasis", + output_uris={}, + log_uri="memory://log", + debug_pod=pod, + ) + + +def test_pending_diagnostics_surfaces_gcsfuse_mount_wedge(): + pod = k8s_client_lib.V1Pod( + metadata=k8s_client_lib.V1ObjectMeta(name="task-abc-rtdrr"), + status=k8s_client_lib.V1PodStatus( + phase="Pending", + container_statuses=[ + k8s_client_lib.V1ContainerStatus( + name="main", + image="img", + image_id="", + ready=False, + restart_count=0, + state=k8s_client_lib.V1ContainerState( + waiting=k8s_client_lib.V1ContainerStateWaiting( + reason="CreateContainerConfigError", + message=( + "MountVolume.SetUp failed for volume" + ' "gcsfuse-prd-oasis-tmp": code = Unauthenticated' + " desc = failed to prepare storage service" + ), + ) + ), + ) + ], + ), + ) + diagnostics = _make_launched_container(pod).pending_diagnostics + assert diagnostics is not None + assert "CreateContainerConfigError" in diagnostics + assert "code = Unauthenticated" in diagnostics + + +def test_pending_diagnostics_returns_none_without_main_container_status(): + # Before the kubelet creates a container status (e.g. an unschedulable pod), + # there is no main-container waiting reason to surface. + pod = k8s_client_lib.V1Pod( + metadata=k8s_client_lib.V1ObjectMeta(name="task-abc-rtdrr"), + status=k8s_client_lib.V1PodStatus(phase="Pending"), + ) + assert _make_launched_container(pod).pending_diagnostics is None + + +def test_pending_diagnostics_returns_none_when_no_status(): + pod = k8s_client_lib.V1Pod( + metadata=k8s_client_lib.V1ObjectMeta(name="task-abc-rtdrr"), + status=None, + ) + assert _make_launched_container(pod).pending_diagnostics is None