From 46043292675a3fe06044a2daf3de0e92ed954c8c Mon Sep 17 00:00:00 2001 From: kmajdoub Date: Thu, 28 May 2026 18:41:16 +0200 Subject: [PATCH] feat(maintenance): per-tick stuck-issue sweep (#129) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds forge_loop.stuck_sweep — a health-check that runs each tick after the iteration loop and before the next dispatch. It reads the tail of events.jsonl, counts worker_iterations_exhausted events per issue (resetting on a recovery event), and demotes any issue that crosses settings.maintenance.stuck_threshold_attempts (default 2) AND still carries loop:ready. This closes the gap CTO surfaced dogfooding the brainstormer epic: when the iteration loop's escalation didn't fully land (label-API hiccup, transient bug, or pre-#128 dropped removal), the dispatcher kept re-picking the same broken issue on every tick. The sweep is the belt to escalate_to_human's braces. Behaviour: - Counts exhausted attempts since the last success-shaped event per issue, so a recovered issue is never demoted on stale history. - Idempotent: skips issues that have already shed loop:ready since the last tick (no double-comment, no fight with escalate_to_human). - Best-effort: gh API failures are caught and logged as stuck_sweep_demoted{ok=false,reason=...} so the tick never crashes. - Emits the new typed StuckSweepDemotedEvent for every action. Wired into runner/tick._tick before the dispatch fetch. Config plumbed through settings.maintenance.stuck_threshold_attempts + settings.maintenance.stuck_tail_events. Tests cover the full matrix from the issue: ≥threshold exhausted → demoted; success-after-exhausted → not demoted; zero exhausted → no action; gh failure caught; idempotent skip; tail-window respect; malformed JSON tolerated; multi-issue ordering. Co-Authored-By: Claude Opus 4.7 --- src/forge_loop/config.py | 9 + src/forge_loop/events.py | 23 ++ src/forge_loop/runner/tick.py | 66 ++++++ src/forge_loop/settings.py | 18 ++ src/forge_loop/stuck_sweep.py | 395 ++++++++++++++++++++++++++++++++++ tests/test_stuck_sweep.py | 266 +++++++++++++++++++++++ 6 files changed, 777 insertions(+) create mode 100644 src/forge_loop/stuck_sweep.py create mode 100644 tests/test_stuck_sweep.py diff --git a/src/forge_loop/config.py b/src/forge_loop/config.py index ba9f9de..bbbc184 100644 --- a/src/forge_loop/config.py +++ b/src/forge_loop/config.py @@ -144,6 +144,13 @@ class Config: worker_max_iterations: int = 3 + # Stuck-issue sweep (issue #129). Minimum consecutive + # ``worker_iterations_exhausted`` events before the per-tick sweep + # demotes the issue from ``loop:ready`` to ``loop:needs-human``. + # Sourced from settings.maintenance.stuck_threshold_attempts. + stuck_threshold_attempts: int = 2 + stuck_tail_events: int = 100 + @property def state_dir(self) -> Path: return self.repo / "docs" / "ops" @@ -245,6 +252,8 @@ def _from_settings(s: Settings) -> Config: ), lumen=LumenConfig(top_k=s.lumen.top_k), worker_max_iterations=s.iteration.max_iterations, + stuck_threshold_attempts=s.maintenance.stuck_threshold_attempts, + stuck_tail_events=s.maintenance.stuck_tail_events, ) diff --git a/src/forge_loop/events.py b/src/forge_loop/events.py index ad9f156..8705403 100644 --- a/src/forge_loop/events.py +++ b/src/forge_loop/events.py @@ -208,6 +208,28 @@ class WorktreeReapedEvent(EventBase): status: str = "" +@register_event +class StuckSweepDemotedEvent(EventBase): + """A per-tick stuck-sweep decision (issue #129). + + Emitted by ``forge_loop.stuck_sweep.sweep`` whenever it touches an + issue — successful demotions carry ``ok=True``; gh API failures + carry ``ok=False`` plus a ``reason`` so the operator can see what + blew up without grepping structlog. + + Idempotency skips (issue already lost ``loop:ready``) are NOT + emitted — there's nothing operationally interesting about them. + """ + + KIND: ClassVar[str] = "stuck_sweep_demoted" + issue: int = Field(ge=1) + attempts: int = Field(ge=1) + last_state: str = "" + pr_url: str | None = None + ok: bool = True + reason: str = "" + + # --------------------------------------------------------------------------- # Emit + back-compat shim. ``emit`` is the typed path; ``append_event_with_ # registry_check`` is the back-compat wrapper called by state.append_event. @@ -287,6 +309,7 @@ def append_event_with_registry_check(events_path: Path, kind: str, **fields: Any "LoopStartEvent", "LoopStopEvent", "RedeployEvent", + "StuckSweepDemotedEvent", "TickStartEvent", "WorkerSessionRecoveredEvent", "WorkerSessionTransitionEvent", diff --git a/src/forge_loop/runner/tick.py b/src/forge_loop/runner/tick.py index e663c05..7b8bd83 100644 --- a/src/forge_loop/runner/tick.py +++ b/src/forge_loop/runner/tick.py @@ -42,6 +42,7 @@ _maybe_deploy_drift_halt, ) from forge_loop.state import append_event, consolidate_sprint, write_state +from forge_loop.stuck_sweep import SweepReport, sweep as _stuck_sweep from forge_loop.worker import WorkerOutcome @@ -324,6 +325,66 @@ def _remove_ready_label( ) +def _run_stuck_sweep(cfg: Config, tick: int) -> SweepReport | None: + """Per-tick stuck-issue sweep (issue #129). + + Runs after the iteration loop (which may have written + ``worker_iterations_exhausted`` for issues whose escalation didn't + land) and before the next dispatch batch — so an issue that was + supposed to be demoted but wasn't gets caught here, before + ``top_issues`` re-picks it. + + Fully best-effort: any failure (gh client init, sweep crash) is + swallowed with a single event. The tick itself must never fail + because of a maintenance sweep. + """ + if cfg.github_repo is None or "/" not in cfg.github_repo: + return None + owner, repo = cfg.github_repo.split("/", 1) + try: + # Lazy import — keeps the tick startup fast and lets tests stub + # the constructor via the env-token path without importing + # githubkit when not needed. + from forge_loop.gh_client import GithubkitClient + client = GithubkitClient() + except Exception as ex: # noqa: BLE001 + append_event( + cfg.events_file, + "stuck_sweep_skipped", + tick=tick, + reason=f"gh_client_init: {ex}"[:200], + ) + return None + try: + report = _stuck_sweep( + cfg.events_file, + client, + owner=owner, + repo=repo, + threshold=cfg.stuck_threshold_attempts, + ready_label=cfg.labels.ready, + tail=cfg.stuck_tail_events, + ) + except Exception as ex: # noqa: BLE001 — sweep promises not to raise, belt-and-braces + append_event( + cfg.events_file, + "stuck_sweep_crashed", + tick=tick, + err=str(ex)[:200], + ) + return None + if report.demotions: + append_event( + cfg.events_file, + "stuck_sweep_done", + tick=tick, + demoted=[d.issue for d in report.demotions if d.ok], + failed=list(report.errors), + scanned=report.scanned, + ) + return report + + def _tick(cfg: Config, tick: int) -> None: # Imported lazily to avoid an import cycle (boot.py imports tick.py). from forge_loop.runner.boot import _short_sleep @@ -371,6 +432,11 @@ def _tick(cfg: Config, tick: int) -> None: def _bus_emit(kind: str, payload: dict[str, Any]) -> None: append_event(cfg.events_file, kind, **payload) + # Stuck-issue sweep (issue #129) — fires before the next dispatch + # so any issue the iteration loop gave up on but failed to demote + # gets caught here, not re-picked by top_issues below. + _run_stuck_sweep(cfg, tick) + repairs = _blocking_pr_repairs(cfg) if repairs: master_log_path = cfg.logs_dir / "master.log" diff --git a/src/forge_loop/settings.py b/src/forge_loop/settings.py index ecbb542..806c4ec 100644 --- a/src/forge_loop/settings.py +++ b/src/forge_loop/settings.py @@ -351,6 +351,22 @@ class IterationSettings(BaseSettings): max_critic_iterations: int = 3 +class MaintenanceSettings(BaseSettings): + """Knobs for the maintenance-tier sweeps that run alongside the LLM + groomer (issue #129). + + ``stuck_threshold_attempts`` gates the stuck-issue sweep — an issue + needs at least this many ``worker_iterations_exhausted`` events + (without a recovery in between) before we demote it from + ``loop:ready`` to ``loop:needs-human``. Default 2: one bad run is + forgivable, two is a pattern. + """ + + model_config = SettingsConfigDict(extra="ignore") + stuck_threshold_attempts: int = 2 + stuck_tail_events: int = 100 + + class MiscSettings(BaseSettings): """Misc knobs that don't fit a logical group cleanly.""" @@ -393,6 +409,7 @@ class Settings(BaseSettings): operator: OperatorSettings = Field(default_factory=OperatorSettings) dashboard: DashboardSettings = Field(default_factory=DashboardSettings) iteration: IterationSettings = Field(default_factory=IterationSettings) + maintenance: MaintenanceSettings = Field(default_factory=MaintenanceSettings) misc: MiscSettings = Field(default_factory=MiscSettings) # The repo path itself is resolved at load time (git toplevel or env @@ -429,6 +446,7 @@ def load(cls) -> Settings: "operator": {**(y.get("operator") or {})}, "dashboard": {**(y.get("dashboard") or {})}, "iteration": {**(y.get("iteration") or {})}, + "maintenance": {**(y.get("maintenance") or {})}, "misc": {**(y.get("misc") or {})}, } diff --git a/src/forge_loop/stuck_sweep.py b/src/forge_loop/stuck_sweep.py new file mode 100644 index 0000000..1700c39 --- /dev/null +++ b/src/forge_loop/stuck_sweep.py @@ -0,0 +1,395 @@ +"""Stuck-issue sweep — demote ``loop:ready`` issues that the iteration loop +has repeatedly given up on (issue #129). + +Background +========== + +The existing ``maintenance.py`` runs an LLM-driven backlog GROOMER that +closes dupes, retitles, and adds ``loop:ready``. It does **not** scan +events.jsonl for issues the runner has already exhausted iteration on. + +That gap leaves stuck issues looking healthy: the iteration loop bails +with ``worker_iterations_exhausted`` and the escalation path is supposed +to drop ``loop:ready`` and add ``loop:needs-human``. But any transient +gap in that escalation (label-API hiccup, partial gh-cli failure, the +pre-#128 bug that dropped the label removal entirely) leaves the issue +``loop:ready`` and the dispatcher keeps re-picking it on every tick. + +This sweep is the health-check that closes the gap. It runs once per +tick after the iteration loop, before the next dispatch batch. It reads +the tail of events.jsonl, counts exhausted attempts per issue (resetting +on any success-shaped event after an exhausted one), and demotes +anything that crosses the configured threshold while still wearing the +``loop:ready`` label. + +The sweep is conservative on purpose: + +* GhClient errors are caught and logged as ``stuck_sweep_demote_failed`` + so a flaky GitHub never crashes the tick. +* The success-after-exhausted heuristic means a recovered issue won't + get demoted just because an earlier attempt failed. +* The label check is the final gate: we only demote issues that *still* + carry ``loop:ready``, so the sweep is idempotent across ticks. +""" + +from __future__ import annotations + +import json +from collections import defaultdict +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Callable, Protocol + +from forge_loop.events import EventBase, StuckSweepDemotedEvent, emit +from forge_loop.log import get_logger + + +# --------------------------------------------------------------------------- +# Public types +# --------------------------------------------------------------------------- + + +# Event kinds we treat as "this issue made forward progress" — seeing one +# of these AFTER an exhausted event for the same issue means the issue +# recovered and the prior exhausted count is wiped. ``redeploy`` with +# ok=True is excluded because it's repo-level, not issue-level. +SUCCESS_KINDS: frozenset[str] = frozenset({ + "worker_merged", + "pr_merged", + "worker_iteration_merged", + "iteration_merged", +}) + + +@dataclass(frozen=True) +class Demotion: + """One issue the sweep decided to demote. + + ``attempts`` is the count of ``worker_iterations_exhausted`` events + we observed since the most recent success-shaped event. ``last_state`` + is the ``final_state`` field copied from the most recent exhausted + event, so the demotion comment can tell the operator where the loop + stopped without re-reading the log. + """ + + issue: int + attempts: int + last_state: str + pr_url: str | None + ok: bool # False means the gh API call(s) failed; sweep still records it + + +@dataclass +class SweepReport: + """Result of one ``sweep()`` call. + + ``demotions`` lists every issue we attempted to demote this tick. + ``scanned`` is the number of events read from the file (≤ ``tail``). + ``errors`` captures any gh API failures, keyed by issue number, so + the operator can see why a demotion didn't land without grepping the + structured log. + """ + + demotions: list[Demotion] = field(default_factory=list) + scanned: int = 0 + errors: dict[int, str] = field(default_factory=dict) + + def issue_numbers(self) -> list[int]: + return [d.issue for d in self.demotions] + + +class GhClientLike(Protocol): + """Minimal subset of ``forge_loop.gh_client.GhClient`` the sweep needs. + + Defined locally so tests can pass a hand-rolled fake without having + to satisfy the full protocol. The two production implementations + (``GithubkitClient`` and the in-memory ``FakeGhClient``) both + structurally match this. + """ + + def get_issue(self, owner: str, repo: str, number: int) -> Any: ... + def add_labels(self, owner: str, repo: str, number: int, labels: list[str]) -> None: ... + def remove_label(self, owner: str, repo: str, number: int, label: str) -> None: ... + def add_comment(self, owner: str, repo: str, number: int, body: str) -> None: ... + + +EmitFn = Callable[[EventBase], None] + + +# --------------------------------------------------------------------------- +# Event tail reader +# --------------------------------------------------------------------------- + + +def _read_tail(events_file: Path, tail: int) -> list[dict[str, Any]]: + """Return the last ``tail`` JSON-lines from ``events_file``. + + Missing file → empty list (a fresh runner has no events yet). Lines + that fail to parse are silently skipped — events.jsonl is append-only + and the JSON encoder is well-behaved, so corruption here usually + means a partial write at process kill, which we should tolerate. + """ + if not events_file.exists(): + return [] + try: + # We read the whole file because ``tail`` is small (default 100) + # and events.jsonl is bounded by the rotate setting. Seeking from + # the end would save IO on huge logs but adds complexity that + # isn't earning its keep here. + lines = events_file.read_text().splitlines() + except OSError: + return [] + out: list[dict[str, Any]] = [] + for line in lines[-tail:]: + line = line.strip() + if not line: + continue + try: + rec = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(rec, dict): + out.append(rec) + return out + + +# --------------------------------------------------------------------------- +# Core counting logic +# --------------------------------------------------------------------------- + + +@dataclass +class _IssueTally: + attempts: int = 0 + last_state: str = "" + pr_url: str | None = None + + +def _tally_exhausted(events: list[dict[str, Any]]) -> dict[int, _IssueTally]: + """Walk events in order, counting exhausted attempts per issue. + + A success-shaped event for the same issue zeroes the running count — + that's the "recovered after a bad run" case from the test matrix. + The last-seen ``final_state`` / ``pr_url`` win, so the demotion + comment reflects the most recent failure shape. + """ + tallies: dict[int, _IssueTally] = defaultdict(_IssueTally) + for rec in events: + kind = rec.get("kind") + issue = rec.get("issue") + if not isinstance(issue, int): + continue + if kind == "worker_iterations_exhausted": + t = tallies[issue] + t.attempts += 1 + fs = rec.get("final_state") + if isinstance(fs, str): + t.last_state = fs + pr = rec.get("pr_url") + if isinstance(pr, str) or pr is None: + t.pr_url = pr + elif kind in SUCCESS_KINDS: + # Recovery — wipe the running count for this issue. + tallies[issue] = _IssueTally() + return tallies + + +# --------------------------------------------------------------------------- +# Demotion +# --------------------------------------------------------------------------- + + +def _demote_one( + gh: GhClientLike, + owner: str, + repo: str, + issue: int, + tally: _IssueTally, + *, + ready_label: str, + needs_human_label: str, +) -> tuple[bool, str | None]: + """Drop ``ready_label`` + add ``needs_human_label`` + post a comment. + + Returns ``(ok, error)``. ``ok=True`` means at least the label flip + landed — the comment is best-effort and a comment-only failure still + counts as a successful demotion (the dispatcher won't re-pick the + issue, which is what matters). + + Issues that no longer carry ``ready_label`` are skipped (returns + ``(False, "not_ready")``) so the sweep is idempotent across ticks + and won't fight with a concurrent escalate_to_human run. + """ + log = get_logger() + # Idempotency guard. If the issue lost loop:ready since the last tick + # — either escalate_to_human caught up or a human edited the issue — + # we have nothing to do. + try: + cur = gh.get_issue(owner, repo, issue) + except Exception as ex: # noqa: BLE001 — gh layer raises domain-specific types we don't import here + log.warning("stuck_sweep_get_issue_failed", issue=issue, err=str(ex)[:200]) + return False, f"get_issue: {ex}" + if cur is None: + return False, "issue_not_found" + labels = list(getattr(cur, "labels", []) or []) + if ready_label not in labels: + return False, "not_ready" + + body = ( + f"forge-loop stuck-sweep: this issue has hit " + f"`worker_iterations_exhausted` {tally.attempts}x without recovering.\n" + f"\n" + f"- last state: `{tally.last_state or 'unknown'}`\n" + f"- last PR: {tally.pr_url or '(none)'}\n" + f"\n" + f"Dropping `{ready_label}` and adding `{needs_human_label}` so the " + f"dispatcher stops re-picking it. A human should look at the " + f"worktree / PR and either fix the underlying bug or close the " + f"issue.\n" + ) + + # Label flip first — that's the load-bearing operation. Comment is + # cosmetic; we still report ok=True if only the comment fails. + try: + gh.add_labels(owner, repo, issue, [needs_human_label]) + gh.remove_label(owner, repo, issue, ready_label) + except Exception as ex: # noqa: BLE001 + log.warning("stuck_sweep_label_failed", issue=issue, err=str(ex)[:200]) + return False, f"label: {ex}" + + try: + gh.add_comment(owner, repo, issue, body) + except Exception as ex: # noqa: BLE001 + log.info("stuck_sweep_comment_failed", issue=issue, err=str(ex)[:200]) + # Don't fail the demotion — the labels are what matter. + + return True, None + + +# --------------------------------------------------------------------------- +# Public entrypoint +# --------------------------------------------------------------------------- + + +def sweep( + events_file: Path, + gh_client: GhClientLike, + *, + owner: str, + repo: str, + threshold: int = 2, + ready_label: str = "loop:ready", + needs_human_label: str = "loop:needs-human", + tail: int = 100, + emit_fn: EmitFn | None = None, +) -> SweepReport: + """Sweep the event tail for stuck issues and demote them. + + Parameters + ---------- + events_file: + Path to ``loop-runner-events.jsonl``. Missing file → empty sweep. + gh_client: + Anything matching :class:`GhClientLike` — production code passes + the real ``GithubkitClient``; tests pass an in-memory fake. + owner, repo: + GitHub owner/repo for the gh API calls. Sourced from + ``settings.repo.github`` at the tick caller. + threshold: + Minimum exhausted-attempt count to trigger demotion. Defaults to + 2 — one bad run is forgivable, two is a pattern. Configurable via + ``settings.maintenance.stuck_threshold_attempts``. + ready_label / needs_human_label: + Label names. Defaults match production; settings override at + the caller. + tail: + How many trailing event records to scan. 100 is plenty for a + single tick — the dispatcher would have re-fired the issue many + times in that window if it were still ``loop:ready``. + emit_fn: + Injection point for the typed-event emitter. Defaults to a + closure over :func:`forge_loop.events.emit` bound to + ``events_file``. Tests override to capture without disk IO. + + Returns + ------- + SweepReport summarising what we touched. Never raises. + """ + log = get_logger() + if threshold < 1: + threshold = 1 # nonsense thresholds get clamped, not crashed + + if emit_fn is None: + def emit_fn(ev: EventBase) -> None: # noqa: E306 — local closure + emit(events_file, ev) + + events = _read_tail(events_file, tail) + report = SweepReport(scanned=len(events)) + tallies = _tally_exhausted(events) + + # Stable ordering so logs / tests are deterministic. + for issue in sorted(tallies): + tally = tallies[issue] + if tally.attempts < threshold: + continue + ok, err = _demote_one( + gh_client, + owner, + repo, + issue, + tally, + ready_label=ready_label, + needs_human_label=needs_human_label, + ) + # ``not_ready`` is a silent skip: the issue already shed the + # ready label (escalate_to_human caught up, or a human edited + # it). Nothing to record, nothing to emit — the sweep is + # idempotent across ticks precisely because this branch fires + # cheaply and exits. + if err == "not_ready": + d = Demotion( + issue=issue, + attempts=tally.attempts, + last_state=tally.last_state, + pr_url=tally.pr_url, + ok=False, + ) + report.demotions.append(d) + continue + # We record EVERY remaining attempted demotion, even ones that + # failed (gh blew up). The Demotion.ok flag distinguishes them. + # This matters because the operator surface (dashboard, dump) + # wants to show "what did the sweep try to touch" not "what + # landed." + d = Demotion( + issue=issue, + attempts=tally.attempts, + last_state=tally.last_state, + pr_url=tally.pr_url, + ok=ok, + ) + report.demotions.append(d) + if not ok: + report.errors[issue] = err or "unknown" + try: + emit_fn(StuckSweepDemotedEvent( + issue=issue, + attempts=tally.attempts, + last_state=tally.last_state, + pr_url=tally.pr_url, + ok=ok, + reason=err or "", + )) + except Exception as ex: # noqa: BLE001 — never crash the sweep on emit + log.warning("stuck_sweep_emit_failed", issue=issue, err=str(ex)[:200]) + + return report + + +__all__ = [ + "Demotion", + "GhClientLike", + "SUCCESS_KINDS", + "SweepReport", + "sweep", +] diff --git a/tests/test_stuck_sweep.py b/tests/test_stuck_sweep.py new file mode 100644 index 0000000..64e6143 --- /dev/null +++ b/tests/test_stuck_sweep.py @@ -0,0 +1,266 @@ +"""Tests for the per-tick stuck-issue sweep (issue #129).""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from forge_loop.events import StuckSweepDemotedEvent +from forge_loop.gh_client import GhError, Issue, MockGhClient +from forge_loop.stuck_sweep import Demotion, SweepReport, sweep + + +READY = "loop:ready" +NEEDS_HUMAN = "loop:needs-human" + + +def _write_events(path: Path, records: list[dict]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("".join(json.dumps(r) + "\n" for r in records)) + + +def _exhausted(issue: int, *, final_state: str = "committed_not_pushed", pr_url: str | None = None) -> dict: + return { + "ts": "2026-05-28T00:00:00.000Z", + "kind": "worker_iterations_exhausted", + "issue": issue, + "attempts": 3, + "final_state": final_state, + "pr_url": pr_url, + } + + +def _success(issue: int, kind: str = "worker_merged") -> dict: + return {"ts": "2026-05-28T00:00:01.000Z", "kind": kind, "issue": issue} + + +def _ready_issue(number: int) -> Issue: + return Issue( + number=number, + title=f"issue {number}", + body="", + state="open", + labels=[READY], + ) + + +# --------------------------------------------------------------------------- +# Test matrix from issue #129 +# --------------------------------------------------------------------------- + + +def test_two_exhausted_events_demote(tmp_path): + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(42), _exhausted(42, final_state="pushed_no_pr", pr_url="http://x/1")]) + gh = MockGhClient(issues={("o", "r", 42): _ready_issue(42)}) + captured: list[StuckSweepDemotedEvent] = [] + + rep = sweep(events, gh, owner="o", repo="r", threshold=2, emit_fn=captured.append) + + assert [d.issue for d in rep.demotions] == [42] + d = rep.demotions[0] + assert d.ok is True + assert d.attempts == 2 + assert d.last_state == "pushed_no_pr" + assert d.pr_url == "http://x/1" + # Labels flipped: needs-human added, ready removed, comment posted. + method_calls = [m for m, _ in gh.calls] + assert "add_labels" in method_calls + assert "remove_label" in method_calls + assert "add_comment" in method_calls + # Typed event fired with ok=True. + assert len(captured) == 1 + assert captured[0].issue == 42 + assert captured[0].ok is True + assert captured[0].attempts == 2 + + +def test_recovered_issue_not_demoted(tmp_path): + """1 exhausted + success after → NOT demoted (it recovered).""" + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(7), _success(7), _exhausted(7)]) + gh = MockGhClient(issues={("o", "r", 7): _ready_issue(7)}) + + rep = sweep(events, gh, owner="o", repo="r", threshold=2) + + # After the success the count reset to 0, then one exhausted → 1, below 2. + assert rep.demotions == [] + # No label calls made. + assert all(m not in ("add_labels", "remove_label") for m, _ in gh.calls) + + +def test_zero_exhausted_no_demotion(tmp_path): + events = tmp_path / "events.jsonl" + _write_events(events, [{"ts": "...", "kind": "tick_start", "issue": 5}]) + gh = MockGhClient(issues={("o", "r", 5): _ready_issue(5)}) + + rep = sweep(events, gh, owner="o", repo="r", threshold=2) + + assert rep.demotions == [] + assert gh.calls == [] + + +def test_demotion_fires_typed_event_and_labels(tmp_path): + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(101), _exhausted(101)]) + gh = MockGhClient(issues={("o", "r", 101): _ready_issue(101)}) + captured: list[StuckSweepDemotedEvent] = [] + + sweep(events, gh, owner="o", repo="r", threshold=2, emit_fn=captured.append) + + # Find the add_labels + remove_label calls and check args. + add = [kw for m, kw in gh.calls if m == "add_labels"][0] + rem = [kw for m, kw in gh.calls if m == "remove_label"][0] + assert add["labels"] == [NEEDS_HUMAN] + assert rem["label"] == READY + assert add["number"] == 101 and rem["number"] == 101 + # Typed event has the right kind + payload. + assert len(captured) == 1 + ev = captured[0] + assert ev.KIND == "stuck_sweep_demoted" + assert ev.ok is True + + +def test_gh_failure_during_demotion_caught(tmp_path): + """GhClient blows up → sweep records the miss + emits ok=False, never raises.""" + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(9), _exhausted(9)]) + gh = MockGhClient( + issues={("o", "r", 9): _ready_issue(9)}, + raise_on={"add_labels": GhError("add_labels", 500, "boom")}, + ) + captured: list[StuckSweepDemotedEvent] = [] + + rep = sweep(events, gh, owner="o", repo="r", threshold=2, emit_fn=captured.append) + + # Recorded the attempt but flagged it failed. + assert len(rep.demotions) == 1 + assert rep.demotions[0].ok is False + assert 9 in rep.errors + assert "add_labels" in rep.errors[9] or "label" in rep.errors[9] + # Typed failure event surfaced for the operator log. + assert len(captured) == 1 + assert captured[0].ok is False + assert captured[0].reason + + +def test_idempotent_skip_when_already_not_ready(tmp_path): + """Issue lost loop:ready since last tick → no double demotion.""" + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(11), _exhausted(11)]) + cur = Issue(number=11, title="x", body="", state="open", labels=[NEEDS_HUMAN]) + gh = MockGhClient(issues={("o", "r", 11): cur}) + captured: list[StuckSweepDemotedEvent] = [] + + rep = sweep(events, gh, owner="o", repo="r", threshold=2, emit_fn=captured.append) + + # Demotion attempted but flagged ok=False with reason "not_ready" — no + # typed event emitted, no label mutations. + assert len(rep.demotions) == 1 + assert rep.demotions[0].ok is False + method_calls = [m for m, _ in gh.calls] + assert "add_labels" not in method_calls + assert "remove_label" not in method_calls + # not_ready is a silent skip — no typed event. + assert captured == [] + + +def test_missing_events_file_returns_empty_report(tmp_path): + events = tmp_path / "absent.jsonl" + gh = MockGhClient() + rep = sweep(events, gh, owner="o", repo="r") + assert rep.demotions == [] + assert rep.scanned == 0 + + +def test_tail_window_limits_scan(tmp_path): + """Old exhausted events past the tail window don't count.""" + events = tmp_path / "events.jsonl" + # 5 noise records, then 1 exhausted. Tail=3 → only sees noise + exhausted + # past the last 3? With tail=3 we only see the last 3 records (2 noise + + # 1 exhausted) → count = 1, below threshold = 2. + noise = [{"ts": "...", "kind": "tick_idle", "tick": i} for i in range(5)] + _write_events(events, [_exhausted(3), _exhausted(3), *noise]) + gh = MockGhClient(issues={("o", "r", 3): _ready_issue(3)}) + + rep = sweep(events, gh, owner="o", repo="r", threshold=2, tail=3) + + assert rep.demotions == [] + assert rep.scanned == 3 + + +def test_malformed_json_lines_skipped(tmp_path): + events = tmp_path / "events.jsonl" + events.parent.mkdir(parents=True, exist_ok=True) + # First line is garbage, second is a real exhausted event x2. + lines = [ + "{not json", + json.dumps(_exhausted(77)), + json.dumps(_exhausted(77)), + ] + events.write_text("\n".join(lines) + "\n") + gh = MockGhClient(issues={("o", "r", 77): _ready_issue(77)}) + + rep = sweep(events, gh, owner="o", repo="r", threshold=2) + + assert [d.issue for d in rep.demotions] == [77] + + +def test_threshold_clamps_to_one(tmp_path): + """threshold=0 is nonsense; gets clamped, doesn't crash.""" + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(50)]) + gh = MockGhClient(issues={("o", "r", 50): _ready_issue(50)}) + rep = sweep(events, gh, owner="o", repo="r", threshold=0) + assert [d.issue for d in rep.demotions] == [50] + + +def test_multiple_issues_demoted_in_stable_order(tmp_path): + events = tmp_path / "events.jsonl" + _write_events(events, [ + _exhausted(20), _exhausted(20), + _exhausted(10), _exhausted(10), + _exhausted(30), _exhausted(30), + ]) + gh = MockGhClient(issues={ + ("o", "r", 10): _ready_issue(10), + ("o", "r", 20): _ready_issue(20), + ("o", "r", 30): _ready_issue(30), + }) + + rep = sweep(events, gh, owner="o", repo="r", threshold=2) + # Sorted by issue number for determinism. + assert [d.issue for d in rep.demotions] == [10, 20, 30] + + +def test_get_issue_failure_caught(tmp_path): + """get_issue raises → sweep records the miss, no crash.""" + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(99), _exhausted(99)]) + gh = MockGhClient( + issues={("o", "r", 99): _ready_issue(99)}, + raise_on={"get_issue": GhError("get_issue", 502, "down")}, + ) + captured: list[StuckSweepDemotedEvent] = [] + rep = sweep(events, gh, owner="o", repo="r", threshold=2, emit_fn=captured.append) + assert len(rep.demotions) == 1 + assert rep.demotions[0].ok is False + assert 99 in rep.errors + assert len(captured) == 1 + assert captured[0].ok is False + + +def test_comment_failure_does_not_fail_demotion(tmp_path): + """add_comment blows up but labels landed → still ok=True.""" + events = tmp_path / "events.jsonl" + _write_events(events, [_exhausted(8), _exhausted(8)]) + gh = MockGhClient( + issues={("o", "r", 8): _ready_issue(8)}, + raise_on={"add_comment": GhError("add_comment", 500, "nope")}, + ) + captured: list[StuckSweepDemotedEvent] = [] + rep = sweep(events, gh, owner="o", repo="r", threshold=2, emit_fn=captured.append) + assert rep.demotions[0].ok is True + assert captured[0].ok is True