From df51f77cd75deb4003391b82f34ead01a74812bc Mon Sep 17 00:00:00 2001 From: kmajdoub Date: Thu, 28 May 2026 19:11:26 +0200 Subject: [PATCH] feat(persistent-worker): critic ping-pong protocol (#110) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Routes a typed CriticReport into the AWAITING_CRITIC outbound edge: - approve -> MERGED (auto-merge already in flight) - request_changes -> REVISING (counter bumps via store.increment_iterations, follow-up worker dispatched with the SDK session_id resumed and the critic comments serialised verbatim) - block (sev1) -> ABANDONED + PR labelled loop:needs-review The verbatim contract — "no summarisation" — is delivered by format_critic_followup_prompt(): every Finding's message, file/line, and severity/category tag is rendered into the next prompt as-is, so the resumed SDK worker sees exactly what the critic wrote. dispatch_revision is a callback so unit tests can capture the prompt and resume kwargs without actually spinning up an SDK session; the runner wires the real dispatcher in a follow-up. A dispatch crash is caught at the boundary so the FSM edge + counter bump are never undone — the next tick can retry from REVISING. Iteration-cap enforcement (#111) stays in enforce_critic_iteration_cap and is layered on top by the runner; this handler is the pure verdict→state router. Tests cover the four acceptance cases plus adversarial paths: unknown verdict no-ops, gh label failure can't undo ABANDONED, dispatch crash can't undo REVISING, wrong starting state raises InvalidTransition. Co-Authored-By: Claude Opus 4.7 --- src/forge_loop/runner/dispatch.py | 199 +++++++++++++++ tests/test_critic_ping_pong.py | 404 ++++++++++++++++++++++++++++++ 2 files changed, 603 insertions(+) create mode 100644 tests/test_critic_ping_pong.py diff --git a/src/forge_loop/runner/dispatch.py b/src/forge_loop/runner/dispatch.py index a7ed9bd..494ae61 100644 --- a/src/forge_loop/runner/dispatch.py +++ b/src/forge_loop/runner/dispatch.py @@ -34,6 +34,12 @@ # work that the loop has decided not to keep grinding on. NEEDS_HUMAN_LABEL = "loop:needs-human" +# Label applied to PRs whose critic returned a BLOCK (sev1) verdict during +# the ping-pong protocol (issue #110). Distinct from NEEDS_HUMAN_LABEL, +# which fires on iteration-cap exhaustion (#111). Operators reading the +# PR list can tell the two failure modes apart at a glance. +NEEDS_REVIEW_LABEL = "loop:needs-review" + def resume_kwargs_for( store: WorkerSessionStore, @@ -245,6 +251,199 @@ def enforce_critic_iteration_cap( return False +def format_critic_followup_prompt(report: Any) -> str: + """Serialise critic findings into the next worker prompt VERBATIM. + + Issue #110 contract: "The critic's comments get serialised into the + next worker prompt verbatim — no summarisation." So we render every + finding's full message + file/line + severity/category tag with NO + truncation, NO paraphrase, NO re-ordering. The worker sees exactly + what the critic wrote. + + The leading instruction line tells the resumed SDK session that + these are the changes-requested findings to address. Because the + session resumes via ``resume=``, the model already + has the prior worker context loaded (warm prompt cache) — we only + need to deliver the new feedback. + """ + findings = list(getattr(report, "findings", []) or []) + header = ( + "The critic returned REQUEST_CHANGES on your PR. " + "Address every finding below verbatim, then push a follow-up commit." + ) + if not findings: + # Defensive: a REQUEST_CHANGES with zero findings is unusual but + # legal. Pass the raw report text through so the worker sees the + # critic's own words rather than an empty prompt. + raw = str(getattr(report, "raw", "")).strip() + body = raw or "(critic provided no findings text)" + return f"{header}\n\nCritic report:\n{body}" + + lines: list[str] = [header, "", "Critic findings:"] + for f in findings: + loc = "" + if getattr(f, "file", None): + loc = f.file + if getattr(f, "line", None): + loc = f"{loc}:{f.line}" + loc = f" ({loc})" + lines.append(f"- [{f.severity}/{f.category}]{loc} {f.message}") + return "\n".join(lines) + + +def handle_critic_verdict( + *, + store: WorkerSessionStore, + session_id: str, + report: Any, + pr_url: str | None, + gh: Any = _gh, + repo: str | None = None, + emit: Any = None, + dispatch_revision: Any = None, +) -> str: + """Apply a critic verdict to the persistent-worker FSM (issue #110). + + Routing table (matches the issue's acceptance criteria): + + * ``approve`` → AWAITING_CRITIC → MERGED. Auto-merge has already + been queued by the gh layer; the FSM edge records that the loop + considers this session complete. Returns ``"merged"``. + + * ``request_changes`` → AWAITING_CRITIC → REVISING. Bumps + ``critic_iterations`` via ``store.increment_iterations`` (counter + first so a crash between bump-and-transition still observes the + intended attempt count), then dispatches a follow-up worker via + the ``dispatch_revision`` callback. The callback is given the + resumed-session kwargs (``resume=``) and the + verbatim critic-comments prompt — NO summarisation per the + ticket. ``dispatch_revision=None`` skips the dispatch step (used + by unit tests that only want to verify the FSM edge + bump). + Returns ``"revising"``. + + * ``block`` → AWAITING_CRITIC → ABANDONED. The session is over; + a human takes it from here. Labels the PR ``loop:needs-review`` + so operators can triage. Returns ``"abandoned"``. + + * any other verdict (error / unknown) → no-op, returns ``"noop"``. + The caller MUST be able to keep the session in AWAITING_CRITIC + so the next tick can re-run the critic on a fresh log. + + Pre-condition: ``session.state == AWAITING_CRITIC``. Any other + state raises :class:`InvalidTransition` so caller bugs surface + loudly instead of silently corrupting the FSM. + + Iteration-cap enforcement (the >= max_critic_iterations gate) is + NOT handled here — that's issue #111 / :func:`enforce_critic_iteration_cap`. + This function is the verdict→state router; the cap is a separate + policy layer the runner stacks on top. + """ + sess = store.get(session_id) + if sess is None: + raise KeyError(f"unknown session_id: {session_id}") + if sess.state != WorkerState.AWAITING_CRITIC: + raise InvalidTransition(sess.state, WorkerState.REVISING) + + overall = str(getattr(report, "overall", "")).lower() + + if overall == "approve": + store.transition_to( + session_id, + WorkerState.MERGED, + reason="critic approved", + ) + if emit is not None: + with contextlib.suppress(Exception): + emit( + "critic_verdict_merged", + issue=sess.issue, + session_id=session_id, + pr=pr_url, + ) + return "merged" + + if overall == "block": + reason = "critic blocked (sev1)" + store.transition_to(session_id, WorkerState.ABANDONED, reason=reason) + if pr_url: + try: + gh.add_pr_label(pr_url, [NEEDS_REVIEW_LABEL], repo=repo) + except Exception as ex_: # noqa: BLE001 — best-effort + if emit is not None: + with contextlib.suppress(Exception): + emit( + "critic_block_label_failed", + issue=sess.issue, + err=str(ex_)[:200], + ) + if emit is not None: + with contextlib.suppress(Exception): + emit( + "critic_verdict_blocked", + issue=sess.issue, + session_id=session_id, + pr=pr_url, + ) + return "abandoned" + + if overall == "request_changes": + # Bump BEFORE transitioning so a crash mid-edge leaves the row + # in AWAITING_CRITIC with the new count — the next tick will + # re-route correctly. + new_count = store.increment_iterations(session_id) + store.transition_to( + session_id, + WorkerState.REVISING, + reason="critic requested changes", + ) + # Build the verbatim follow-up prompt. This is what the spec + # calls "the critic's comments serialised into the next worker + # prompt verbatim". + followup_prompt = format_critic_followup_prompt(report) + resume_kw = resume_kwargs_for(store, session_id) + if emit is not None: + with contextlib.suppress(Exception): + emit( + "critic_verdict_revising", + issue=sess.issue, + session_id=session_id, + iterations=new_count, + pr=pr_url, + resumed=bool(resume_kw), + ) + if dispatch_revision is not None: + # Best-effort — a dispatch failure must not undo the FSM + # edge. The next tick can re-attempt from REVISING. + try: + dispatch_revision( + session=store.get(session_id), + prompt=followup_prompt, + resume_kwargs=resume_kw, + ) + except Exception as ex_: # noqa: BLE001 — boundary + if emit is not None: + with contextlib.suppress(Exception): + emit( + "critic_revision_dispatch_failed", + issue=sess.issue, + session_id=session_id, + err=str(ex_)[:300], + ) + return "revising" + + # Unknown / error verdict — leave the row in AWAITING_CRITIC so + # the next tick can retry the critic on a fresh log. + if emit is not None: + with contextlib.suppress(Exception): + emit( + "critic_verdict_unknown", + issue=sess.issue, + session_id=session_id, + overall=overall, + ) + return "noop" + + def _sev_counts(outcome: Any) -> dict[str, int]: """Tally sev1/sev2/sev3 from a CriticOutcome.report. Safe on None.""" report = getattr(outcome, "report", None) diff --git a/tests/test_critic_ping_pong.py b/tests/test_critic_ping_pong.py new file mode 100644 index 0000000..8609525 --- /dev/null +++ b/tests/test_critic_ping_pong.py @@ -0,0 +1,404 @@ +"""Tests for the critic ping-pong protocol (issue #110). + +Acceptance matrix (from the issue body): + +* APPROVE -> AWAITING_CRITIC -> MERGED. +* REQUEST_CHANGES -> AWAITING_CRITIC -> REVISING (loop) + + ``critic_iterations`` bumps via ``store.increment_iterations``. +* BLOCK (sev1) -> AWAITING_CRITIC -> ABANDONED + PR labelled + ``loop:needs-review``. +* The next dispatch prompt contains the critic comments VERBATIM — + no summarisation, no truncation. + +Adversarial coverage: + +* unknown/error verdict is a no-op (caller retries the critic). +* an exception from ``gh.add_pr_label`` MUST NOT undo the ABANDONED + transition (FSM is the source of truth). +* an exception from ``dispatch_revision`` MUST NOT undo the REVISING + transition or the counter bump. +* calling the handler with a session that is not in AWAITING_CRITIC + raises ``InvalidTransition`` so caller bugs surface loudly. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from forge_loop.critic import CriticReport, Finding +from forge_loop.runner.dispatch import ( + NEEDS_REVIEW_LABEL, + format_critic_followup_prompt, + handle_critic_verdict, +) +from forge_loop.worker_sessions import WorkerSessionStore +from forge_loop.worker_state import InvalidTransition, WorkerState + +# --------------------------------------------------------------------------- +# Stubs / helpers +# --------------------------------------------------------------------------- + + +class _StubGh: + def __init__(self, *, raise_on_label: bool = False) -> None: + self.labels: list[tuple[str, list[str], str | None]] = [] + self._raise_on_label = raise_on_label + + def add_pr_label( + self, + pr: str, + labels: list[str], + repo: str | None = None, + ) -> bool: + if self._raise_on_label: + raise RuntimeError("simulated gh failure") + self.labels.append((pr, list(labels), repo)) + return True + + +def _events() -> tuple[list[tuple[str, dict[str, Any]]], Any]: + sink: list[tuple[str, dict[str, Any]]] = [] + + def emit(kind: str, **kw: Any) -> None: + sink.append((kind, kw)) + + return sink, emit + + +def _seed_awaiting( + store: WorkerSessionStore, + *, + sdk_session_id: str | None = "sdk-abc-123", +) -> str: + sess = store.create(issue=110, branch="loop/110", worktree_path="/tmp/wt-loop-110") + store.transition_to(sess.session_id, WorkerState.RUNNING) + store.transition_to(sess.session_id, WorkerState.AWAITING_CRITIC, reason="pr opened") + if sdk_session_id: + store.set_sdk_session_id(sess.session_id, sdk_session_id) + store.set_pr_url(sess.session_id, "https://github.com/o/r/pull/110") + return sess.session_id + + +def _report( + overall: str, + *, + findings: list[Finding] | None = None, + raw: str = "", +) -> CriticReport: + return CriticReport(overall=overall, findings=findings or [], raw=raw) + + +# --------------------------------------------------------------------------- +# APPROVE -> MERGED +# --------------------------------------------------------------------------- + + +def test_approve_transitions_to_merged() -> None: + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store) + events, emit = _events() + + result = handle_critic_verdict( + store=store, + session_id=sid, + report=_report("approve"), + pr_url="https://github.com/o/r/pull/110", + gh=_StubGh(), + emit=emit, + ) + + assert result == "merged" + sess = store.get(sid) + assert sess is not None + assert sess.state == WorkerState.MERGED + assert sess.last_transition_reason == "critic approved" + assert any(k == "critic_verdict_merged" for k, _ in events) + + +# --------------------------------------------------------------------------- +# REQUEST_CHANGES -> REVISING (+ bump + dispatch w/ verbatim prompt) +# --------------------------------------------------------------------------- + + +def test_request_changes_transitions_to_revising_and_bumps_counter() -> None: + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store) + events, emit = _events() + + result = handle_critic_verdict( + store=store, + session_id=sid, + report=_report( + "request_changes", + findings=[ + Finding( + severity="sev2", + category="correctness", + file="src/foo.py", + line=42, + message="off-by-one in loop bound", + ), + ], + ), + pr_url="https://github.com/o/r/pull/110", + gh=_StubGh(), + emit=emit, + ) + + assert result == "revising" + sess = store.get(sid) + assert sess is not None + assert sess.state == WorkerState.REVISING + assert sess.critic_iterations == 1 + assert sess.last_transition_reason == "critic requested changes" + kinds = [k for k, _ in events] + assert "critic_verdict_revising" in kinds + + +def test_request_changes_dispatches_followup_with_verbatim_critic_comments() -> None: + """The acceptance contract: the next dispatch prompt MUST contain + the critic comments verbatim — no summarisation.""" + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store, sdk_session_id="sdk-xyz") + captured: dict[str, Any] = {} + + findings = [ + Finding( + severity="sev2", + category="correctness", + file="src/foo.py", + line=42, + message="precise verbatim message ALPHA", + ), + Finding( + severity="sev3", + category="style", + file=None, + line=None, + message="precise verbatim message BETA", + ), + ] + + def _dispatch(*, session: Any, prompt: str, resume_kwargs: dict[str, str]) -> None: + captured["session"] = session + captured["prompt"] = prompt + captured["resume_kwargs"] = resume_kwargs + + handle_critic_verdict( + store=store, + session_id=sid, + report=_report("request_changes", findings=findings), + pr_url="https://github.com/o/r/pull/110", + gh=_StubGh(), + dispatch_revision=_dispatch, + ) + + # Every finding's message MUST appear verbatim in the follow-up prompt. + assert "precise verbatim message ALPHA" in captured["prompt"] + assert "precise verbatim message BETA" in captured["prompt"] + # File/line and severity tags are also present. + assert "src/foo.py:42" in captured["prompt"] + assert "[sev2/correctness]" in captured["prompt"] + assert "[sev3/style]" in captured["prompt"] + # The session resumes the SDK session id so the prompt cache is warm. + assert captured["resume_kwargs"] == {"resume": "sdk-xyz"} + # The session handed to the dispatcher is the REVISING row. + assert captured["session"].state == WorkerState.REVISING + + +def test_format_followup_prompt_preserves_findings_verbatim() -> None: + """Pure prompt-builder unit test — no FSM involvement.""" + report = _report( + "request_changes", + findings=[ + Finding( + severity="sev1", + category="security", + file="x.py", + line=7, + message="CWE-89 raw SQL concat — fix before merge", + ), + ], + ) + out = format_critic_followup_prompt(report) + assert "CWE-89 raw SQL concat — fix before merge" in out + assert "[sev1/security]" in out + assert "x.py:7" in out + + +def test_format_followup_prompt_handles_empty_findings_with_raw() -> None: + """Adversarial: REQUEST_CHANGES with no findings — fall back to raw.""" + report = _report("request_changes", findings=[], raw="the critic said this") + out = format_critic_followup_prompt(report) + assert "the critic said this" in out + + +# --------------------------------------------------------------------------- +# BLOCK -> ABANDONED + label loop:needs-review +# --------------------------------------------------------------------------- + + +def test_block_transitions_to_abandoned_and_labels_pr() -> None: + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store) + gh = _StubGh() + events, emit = _events() + + result = handle_critic_verdict( + store=store, + session_id=sid, + report=_report( + "block", + findings=[ + Finding( + severity="sev1", + category="security", + file="x.py", + line=1, + message="SQL injection", + ), + ], + ), + pr_url="https://github.com/o/r/pull/110", + gh=gh, + repo="o/r", + emit=emit, + ) + + assert result == "abandoned" + sess = store.get(sid) + assert sess is not None + assert sess.state == WorkerState.ABANDONED + assert sess.last_transition_reason == "critic blocked (sev1)" + # PR was labelled per spec. + assert gh.labels == [ + ("https://github.com/o/r/pull/110", [NEEDS_REVIEW_LABEL], "o/r"), + ] + assert any(k == "critic_verdict_blocked" for k, _ in events) + + +def test_block_label_failure_does_not_undo_abandoned_transition() -> None: + """Robustness — a transient gh failure MUST NOT corrupt the FSM.""" + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store) + gh = _StubGh(raise_on_label=True) + events, emit = _events() + + result = handle_critic_verdict( + store=store, + session_id=sid, + report=_report("block"), + pr_url="https://github.com/o/r/pull/110", + gh=gh, + emit=emit, + ) + + assert result == "abandoned" + assert store.get(sid).state == WorkerState.ABANDONED + assert any(k == "critic_block_label_failed" for k, _ in events) + + +def test_block_without_pr_url_skips_label_call() -> None: + """Adversarial: BLOCK on a session whose PR URL was never set.""" + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store) + gh = _StubGh() + result = handle_critic_verdict( + store=store, + session_id=sid, + report=_report("block"), + pr_url=None, + gh=gh, + ) + assert result == "abandoned" + assert gh.labels == [] + + +# --------------------------------------------------------------------------- +# Adversarial / sad paths +# --------------------------------------------------------------------------- + + +def test_unknown_verdict_is_a_noop() -> None: + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store) + events, emit = _events() + result = handle_critic_verdict( + store=store, + session_id=sid, + report=_report("error"), + pr_url=None, + gh=_StubGh(), + emit=emit, + ) + assert result == "noop" + # Session must remain in AWAITING_CRITIC so the next tick can retry. + assert store.get(sid).state == WorkerState.AWAITING_CRITIC + assert any(k == "critic_verdict_unknown" for k, _ in events) + + +def test_wrong_starting_state_raises_invalid_transition() -> None: + """Calling the handler outside AWAITING_CRITIC is a caller bug.""" + store = WorkerSessionStore(":memory:") + sess = store.create(issue=110, branch="loop/110") + # session is in DISPATCHED, not AWAITING_CRITIC + with pytest.raises(InvalidTransition): + handle_critic_verdict( + store=store, + session_id=sess.session_id, + report=_report("approve"), + pr_url=None, + gh=_StubGh(), + ) + + +def test_unknown_session_raises_keyerror() -> None: + store = WorkerSessionStore(":memory:") + with pytest.raises(KeyError): + handle_critic_verdict( + store=store, + session_id="does-not-exist", + report=_report("approve"), + pr_url=None, + gh=_StubGh(), + ) + + +def test_dispatch_failure_does_not_undo_revising_transition() -> None: + """Robustness — a dispatch_revision crash MUST leave the FSM in + REVISING with the counter bumped. The next tick can retry.""" + store = WorkerSessionStore(":memory:") + sid = _seed_awaiting(store) + events, emit = _events() + + def _boom(**kw: Any) -> None: + raise RuntimeError("simulated dispatch crash") + + result = handle_critic_verdict( + store=store, + session_id=sid, + report=_report( + "request_changes", + findings=[ + Finding( + severity="sev2", + category="tests", + file=None, + line=None, + message="add a regression test", + ), + ], + ), + pr_url="https://github.com/o/r/pull/110", + gh=_StubGh(), + emit=emit, + dispatch_revision=_boom, + ) + + assert result == "revising" + sess = store.get(sid) + assert sess.state == WorkerState.REVISING + assert sess.critic_iterations == 1 + assert any(k == "critic_revision_dispatch_failed" for k, _ in events)