Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions src/forge_loop/runner/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
# work that the loop has decided not to keep grinding on.
NEEDS_HUMAN_LABEL = "loop:needs-human"

# Label applied to PRs whose critic returned a BLOCK (sev1) verdict during
# the ping-pong protocol (issue #110). Distinct from NEEDS_HUMAN_LABEL,
# which fires on iteration-cap exhaustion (#111). Operators reading the
# PR list can tell the two failure modes apart at a glance.
NEEDS_REVIEW_LABEL = "loop:needs-review"


def resume_kwargs_for(
store: WorkerSessionStore,
Expand Down Expand Up @@ -245,6 +251,199 @@ def enforce_critic_iteration_cap(
return False


def format_critic_followup_prompt(report: Any) -> str:
"""Serialise critic findings into the next worker prompt VERBATIM.

Issue #110 contract: "The critic's comments get serialised into the
next worker prompt verbatim — no summarisation." So we render every
finding's full message + file/line + severity/category tag with NO
truncation, NO paraphrase, NO re-ordering. The worker sees exactly
what the critic wrote.

The leading instruction line tells the resumed SDK session that
these are the changes-requested findings to address. Because the
session resumes via ``resume=<sdk_session_id>``, the model already
has the prior worker context loaded (warm prompt cache) — we only
need to deliver the new feedback.
"""
findings = list(getattr(report, "findings", []) or [])
header = (
"The critic returned REQUEST_CHANGES on your PR. "
"Address every finding below verbatim, then push a follow-up commit."
)
if not findings:
# Defensive: a REQUEST_CHANGES with zero findings is unusual but
# legal. Pass the raw report text through so the worker sees the
# critic's own words rather than an empty prompt.
raw = str(getattr(report, "raw", "")).strip()
body = raw or "(critic provided no findings text)"
return f"{header}\n\nCritic report:\n{body}"

lines: list[str] = [header, "", "Critic findings:"]
for f in findings:
loc = ""
if getattr(f, "file", None):
loc = f.file
if getattr(f, "line", None):
loc = f"{loc}:{f.line}"
loc = f" ({loc})"
lines.append(f"- [{f.severity}/{f.category}]{loc} {f.message}")
return "\n".join(lines)


def handle_critic_verdict(
*,
store: WorkerSessionStore,
session_id: str,
report: Any,
pr_url: str | None,
gh: Any = _gh,
repo: str | None = None,
emit: Any = None,
dispatch_revision: Any = None,
) -> str:
"""Apply a critic verdict to the persistent-worker FSM (issue #110).

Routing table (matches the issue's acceptance criteria):

* ``approve`` → AWAITING_CRITIC → MERGED. Auto-merge has already
been queued by the gh layer; the FSM edge records that the loop
considers this session complete. Returns ``"merged"``.

* ``request_changes`` → AWAITING_CRITIC → REVISING. Bumps
``critic_iterations`` via ``store.increment_iterations`` (counter
first so a crash between bump-and-transition still observes the
intended attempt count), then dispatches a follow-up worker via
the ``dispatch_revision`` callback. The callback is given the
resumed-session kwargs (``resume=<sdk_session_id>``) and the
verbatim critic-comments prompt — NO summarisation per the
ticket. ``dispatch_revision=None`` skips the dispatch step (used
by unit tests that only want to verify the FSM edge + bump).
Returns ``"revising"``.

* ``block`` → AWAITING_CRITIC → ABANDONED. The session is over;
a human takes it from here. Labels the PR ``loop:needs-review``
so operators can triage. Returns ``"abandoned"``.

* any other verdict (error / unknown) → no-op, returns ``"noop"``.
The caller MUST be able to keep the session in AWAITING_CRITIC
so the next tick can re-run the critic on a fresh log.

Pre-condition: ``session.state == AWAITING_CRITIC``. Any other
state raises :class:`InvalidTransition` so caller bugs surface
loudly instead of silently corrupting the FSM.

Iteration-cap enforcement (the >= max_critic_iterations gate) is
NOT handled here — that's issue #111 / :func:`enforce_critic_iteration_cap`.
This function is the verdict→state router; the cap is a separate
policy layer the runner stacks on top.
"""
sess = store.get(session_id)
if sess is None:
raise KeyError(f"unknown session_id: {session_id}")
if sess.state != WorkerState.AWAITING_CRITIC:
raise InvalidTransition(sess.state, WorkerState.REVISING)

overall = str(getattr(report, "overall", "")).lower()

if overall == "approve":
store.transition_to(
session_id,
WorkerState.MERGED,
reason="critic approved",
)
if emit is not None:
with contextlib.suppress(Exception):
emit(
"critic_verdict_merged",
issue=sess.issue,
session_id=session_id,
pr=pr_url,
)
return "merged"

if overall == "block":
reason = "critic blocked (sev1)"
store.transition_to(session_id, WorkerState.ABANDONED, reason=reason)
if pr_url:
try:
gh.add_pr_label(pr_url, [NEEDS_REVIEW_LABEL], repo=repo)
except Exception as ex_: # noqa: BLE001 — best-effort
if emit is not None:
with contextlib.suppress(Exception):
emit(
"critic_block_label_failed",
issue=sess.issue,
err=str(ex_)[:200],
)
if emit is not None:
with contextlib.suppress(Exception):
emit(
"critic_verdict_blocked",
issue=sess.issue,
session_id=session_id,
pr=pr_url,
)
return "abandoned"

if overall == "request_changes":
# Bump BEFORE transitioning so a crash mid-edge leaves the row
# in AWAITING_CRITIC with the new count — the next tick will
# re-route correctly.
new_count = store.increment_iterations(session_id)
store.transition_to(
session_id,
WorkerState.REVISING,
reason="critic requested changes",
)
# Build the verbatim follow-up prompt. This is what the spec
# calls "the critic's comments serialised into the next worker
# prompt verbatim".
followup_prompt = format_critic_followup_prompt(report)
resume_kw = resume_kwargs_for(store, session_id)
if emit is not None:
with contextlib.suppress(Exception):
emit(
"critic_verdict_revising",
issue=sess.issue,
session_id=session_id,
iterations=new_count,
pr=pr_url,
resumed=bool(resume_kw),
)
if dispatch_revision is not None:
# Best-effort — a dispatch failure must not undo the FSM
# edge. The next tick can re-attempt from REVISING.
try:
dispatch_revision(
session=store.get(session_id),
prompt=followup_prompt,
resume_kwargs=resume_kw,
)
except Exception as ex_: # noqa: BLE001 — boundary
if emit is not None:
with contextlib.suppress(Exception):
emit(
"critic_revision_dispatch_failed",
issue=sess.issue,
session_id=session_id,
err=str(ex_)[:300],
)
return "revising"

# Unknown / error verdict — leave the row in AWAITING_CRITIC so
# the next tick can retry the critic on a fresh log.
if emit is not None:
with contextlib.suppress(Exception):
emit(
"critic_verdict_unknown",
issue=sess.issue,
session_id=session_id,
overall=overall,
)
return "noop"


def _sev_counts(outcome: Any) -> dict[str, int]:
"""Tally sev1/sev2/sev3 from a CriticOutcome.report. Safe on None."""
report = getattr(outcome, "report", None)
Expand Down
Loading
Loading