From ac1a8e3233df0ea3bdd497808ee5de130cbb6edc Mon Sep 17 00:00:00 2001 From: Oscar V Date: Mon, 25 May 2026 22:26:56 -0700 Subject: [PATCH 1/2] feat(secops): persist operator decisions and tighten edge-case prompt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 6am sweep was waking the operator with the same major-version questions every day because automation_decisions had a schema with no producer or consumer — a sweep that asked "approve PR #60?" yesterday asked the same thing today regardless of how it was answered. On top of that, the agent invented options outside the policy ladder when the policy gate couldn't fire (no CI configured, or CI failing on a pre-existing unrelated check). Wires the persistence layer end-to-end and pins three prompt edge cases the agent was reliably fumbling. ### Persistence Both halves of the loop now exist: - After the BLOCKED loop in run_secops_all receives an answer (and in resume_secops_from_pending for the out-of-band sweeper path), regex extracts every PR# from the question and records one automation_decisions row per PR# keyed on (repo, "dependabot_pr", "#N") with the operator's verbatim Telegram answer. Regex handles PR #60, bare #60, dedupes, and writes nothing when the question has no PR# (CodeQL-only questions, etc). - Before each sweep, run_secops_all pulls the last 30 days of decisions for that repo and threads them via ctx.extra['prior_decisions'] into _build_prompt, which renders a "Prior operator decisions (last 30 days)" block. Carries instructions to act on prior answers unless circumstances changed materially (different version, CI state flipped). The verbatim- answer approach avoids semantic parsing of Telegram replies — the agent already interprets free-form text well. Storing the verbatim answer rather than a canonical yes/no means the agent can distinguish "approve" from "approve once we bump idna in the lockfile" from "skip until next quarter" without us trying to model operator nuance upstream. ### Edge cases the prompt now covers - No CI configured (no .github/workflows/*.yml): the auto-merge-with- passing-CI gate cannot evaluate, so previously the agent freelanced. Prompt now: treat ALL Dependabot PRs as ASK regardless of tier and signal BLOCKED ONCE with a consolidated question per repo (not one per PR). - CI failing on a check unrelated to the PR's package (pip-audit flagging a transitive idna CVE while the PR bumps boto3): the agent was reporting the failure and moving on, leaving patch PRs stuck for weeks. Prompt now requires BLOCKED with a one-line question naming the underlying issue and a root-cause hint pulled from gh run view --log-failed. - Scope discipline: three legitimate exits (merge / leave / BLOCKED) with an explicit prohibition on batching PRs into a review PR, opening tracking issues, or proposing manual reviews. Cuts off the freelance options observed in production sweeps ("Want me to batch the patch + minor PRs into a single review PR like before?"). ### Tests 17 new tests cover: the state helpers (per-repo scoping, since_ts window, newest-first ordering), the PR# regex (all observed variants + dedupe + no-PR case), decision capture in both the in-loop BLOCKED path and the resume-from-pending path, prompt injection of prior decisions, and the three new prompt directives. 109 of the targeted test files green; full suite is 664/665 with one pre-existing test_transport.py flake that also hangs on main code. --- CHANGELOG.md | 44 ++++ src/ctrlrelay/cli.py | 1 + src/ctrlrelay/core/state.py | 61 +++++ src/ctrlrelay/pipelines/secops.py | 197 ++++++++++++++- tests/test_secops_pipeline.py | 408 ++++++++++++++++++++++++++++++ tests/test_state.py | 98 +++++++ 6 files changed, 807 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4a8780..6c79554 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,50 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Secops sweeps now remember operator decisions across days.** The + `automation_decisions` table had a schema but no producer or consumer + — every 6am sweep re-asked about the same Dependabot PRs the operator + had already answered, drowning Telegram. Wired both halves: + 1. When the BLOCKED loop in `run_secops_all` (or the out-of-band + `resume_secops_from_pending` sweeper) receives an answer, regex + extracts every PR# from the question and records one row per PR# + keyed on `(repo, "dependabot_pr", "#N")` with the operator's + verbatim answer. PR# regex handles both `PR #60` and bare `#60` + forms, dedupes, and tolerates questions with no PR# (e.g. + CodeQL-only) by writing nothing. + 2. Before building each sweep's prompt, `run_secops_all` pulls the + last 30 days of decisions for that repo and threads them via + `ctx.extra['prior_decisions']` into `_build_prompt`, which renders + a `## Prior operator decisions (last 30 days)` block listing each + decision verbatim. The block carries instructions to act on prior + answers unless circumstances have materially changed (different + version bump, CI state flipped). Without this the persistence + layer was dead weight. + +### Fixed + +- **Secops handles "no CI configured" without inventing options.** + Repos with no `.github/workflows/*` previously confused the agent + into freelance suggestions ("Want me to batch into a review PR?") + because the "auto-merge with passing CI" gate couldn't fire. Prompt + now explicitly directs: treat ALL Dependabot PRs as ASK regardless + of tier, and signal BLOCKED ONCE with a single consolidated question + per repo (not one per PR). +- **Secops escalates pre-existing CI failures instead of stalling PRs.** + When CI is failing on a check unrelated to the PR's package (e.g. + `pip-audit` flagging a transitive CVE in `idna` while the PR bumps + `boto3`), patch PRs were sitting stuck for weeks because the agent + reported the failure and moved on. Prompt now requires the agent to + signal BLOCKED with a one-line question naming the underlying issue + and a root-cause hint from `gh run view --log-failed`. +- **Secops scope discipline.** The prompt now enforces "exactly three + legitimate exits" per open PR (merge / leave / BLOCKED) and + explicitly forbids batching PRs into review PRs, opening tracking + issues, or performing manual reviews. Cuts off the freelance + options observed in production sweeps. + ## [0.5.0] - 2026-05-11 Minor release. Three changes on the secops path; together they take the diff --git a/src/ctrlrelay/cli.py b/src/ctrlrelay/cli.py index 70bd749..26db842 100644 --- a/src/ctrlrelay/cli.py +++ b/src/ctrlrelay/cli.py @@ -1631,6 +1631,7 @@ async def _run_pending_resume_sweeper() -> None: if sweep_repo_cfg is not None else None ), + question=row.get("question"), ) elif pipeline_name == "dev": # Dev resume needs the repo's branch template diff --git a/src/ctrlrelay/core/state.py b/src/ctrlrelay/core/state.py index a35d677..2cc331b 100644 --- a/src/ctrlrelay/core/state.py +++ b/src/ctrlrelay/core/state.py @@ -386,6 +386,67 @@ def mark_pending_resume_resumed(self, session_id: str) -> bool: self._conn.commit() return cursor.rowcount > 0 + # Automation decisions (operator answers persisted across sweeps) + + def record_automation_decision( + self, + *, + repo: str, + operation: str, + item_id: str, + decision: str, + decided_by: str = "operator", + context: str | None = None, + policy: str = "", + ) -> None: + """Record an operator decision so future sweeps can avoid + re-asking the same question. + + ``operation`` namespaces the decision domain + (e.g. ``"dependabot_pr"`` or ``"codeql_alert"``); ``item_id`` + is the specific item the decision applies to (e.g. ``"#60"`` + for PR #60). ``decision`` is the operator's verbatim answer — + the agent that reads this back interprets it; we don't try to + normalize to yes/no here because Telegram replies are free-form. + """ + self._conn.execute( + """INSERT INTO automation_decisions + (repo, operation, policy, item_id, decision, + decided_by, decided_at, context) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + (repo, operation, policy, item_id, decision, + decided_by, int(time.time()), context), + ) + self._conn.commit() + + def list_recent_automation_decisions( + self, + repo: str, + *, + since_ts: int | None = None, + limit: int = 100, + ) -> list[dict[str, Any]]: + """Recent operator decisions for this repo, newest first. + + ``since_ts`` (unix seconds) filters to decisions made + at-or-after that point — used by the secops prompt builder to + inject a 30-day rolling window into the agent's context so the + agent can act on prior decisions instead of asking again. + """ + if since_ts is None: + rows = self._conn.execute( + "SELECT * FROM automation_decisions WHERE repo = ? " + "ORDER BY decided_at DESC LIMIT ?", + (repo, limit), + ).fetchall() + else: + rows = self._conn.execute( + "SELECT * FROM automation_decisions WHERE repo = ? " + "AND decided_at >= ? ORDER BY decided_at DESC LIMIT ?", + (repo, since_ts, limit), + ).fetchall() + return [dict(row) for row in rows] + # PR watches (durable, cross-restart) def add_pr_watch( diff --git a/src/ctrlrelay/pipelines/secops.py b/src/ctrlrelay/pipelines/secops.py index 1e2ecf5..d111313 100644 --- a/src/ctrlrelay/pipelines/secops.py +++ b/src/ctrlrelay/pipelines/secops.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import re import time import uuid from dataclasses import dataclass @@ -23,6 +24,72 @@ DEFAULT_MAX_BLOCKED_ROUNDS = 5 +# 30-day rolling window of operator decisions injected into each +# sweep's prompt. Long enough to cover weekly cadences and a vacation; +# short enough that a stale "approve" from months ago can't auto-merge +# a PR whose risk profile may have shifted (new transitive deps, +# changed maintainer, etc.). +DECISION_RECALL_SECONDS = 30 * 86400 + +# Captures both "PR #60" and "#60" forms the agent uses interchangeably +# in BLOCKED questions. The negative-lookahead on `#0` avoids matching +# CVE-2026-... style identifiers; PR numbers are always >= 1. +_PR_NUM_RE = re.compile(r"(?:PR\s*)?#(\d+)(?!\d)", re.IGNORECASE) + + +def _extract_pr_numbers(question: str) -> list[str]: + """Pull deduplicated PR numbers out of a BLOCKED question so we + can record one operator-decision row per PR. Order-preserving so + a multi-PR question like 'approve #60, #61, #62?' records the + decision against each in the order the agent listed them.""" + seen: set[str] = set() + out: list[str] = [] + for n in _PR_NUM_RE.findall(question or ""): + if n in seen: + continue + seen.add(n) + out.append(n) + return out + + +def _record_decisions_from_answer( + state_db: StateDB, + *, + repo: str, + question: str, + answer: str, + session_id: str, +) -> None: + """Best-effort persistence of an operator's verbatim Telegram + answer against every PR# mentioned in the question. Failure here + is logged but does not break the pipeline — the answer has + already been acted on by the agent; losing the persistence layer + just means the next sweep will re-ask, which is the pre-fix + status quo.""" + pr_numbers = _extract_pr_numbers(question) + if not pr_numbers: + return + for pr_num in pr_numbers: + try: + state_db.record_automation_decision( + repo=repo, + operation="dependabot_pr", + item_id=f"#{pr_num}", + decision=answer, + decided_by="operator", + context=(question or "")[:500], + ) + except Exception as e: + log_event( + _logger, + "secops.decision.record_failed", + session_id=session_id, + repo=repo, + pr_number=pr_num, + error_type=type(e).__name__, + error=str(e)[:200], + ) + def _question_for_persist(session_id: str, result: PipelineResult) -> str: """Return a non-empty question string for pending_resumes storage. @@ -61,6 +128,7 @@ async def run(self, ctx: PipelineContext) -> PipelineResult: session_id=ctx.session_id, state_file=ctx.state_file, automation=ctx.extra.get("automation"), + prior_decisions=ctx.extra.get("prior_decisions"), ) result = await self._spawn(ctx, prompt, resume=False) @@ -127,6 +195,7 @@ def _build_prompt( session_id: str = "", state_file: Path | None = None, automation: Any = None, + prior_decisions: list[dict[str, Any]] | None = None, ) -> str: """Build the secops prompt. @@ -135,7 +204,13 @@ def _build_prompt( When provided, the prompt explicitly tells the agent which tiers auto-merge vs ask vs never-merge. When ``None``, falls back to the default policy (patch=auto, minor=ask, never-major) so - existing callers and the schema defaults stay coherent.""" + existing callers and the schema defaults stay coherent. + + ``prior_decisions`` is a list of recent automation_decisions + rows for this repo. When present, they're rendered as a + "Prior operator decisions" block so the agent can act on a + past answer instead of asking again about the same PR. + """ state_file_path = str(state_file) if state_file else "/tmp/state.json" # Translate per-repo policy into prompt directives. Use defaults @@ -180,6 +255,42 @@ def _policy_directive(tier: str, policy: str) -> str: ] ) + # Render the rolling window of prior operator decisions for this + # repo. The agent reads the verbatim Telegram answer and decides + # whether the prior call still applies — we don't try to + # normalize "yes"/"approve"/"merge it" to a canonical decision + # because that classification is exactly what the agent does + # best, and getting it wrong silently auto-merges PRs. + if prior_decisions: + lines = ["## Prior operator decisions (last 30 days)", ""] + for d in prior_decisions: + ts = d.get("decided_at") or 0 + date = ( + time.strftime("%Y-%m-%d", time.gmtime(int(ts))) + if ts else "unknown-date" + ) + item = d.get("item_id") or "?" + ans = (d.get("decision") or "").strip().replace("\n", " ") + # Cap each answer so a pathological Telegram reply + # doesn't bloat the prompt past the model's context. + if len(ans) > 160: + ans = ans[:157] + "..." + lines.append(f'- {date} {item}: operator said "{ans}"') + lines.append("") + lines.append( + "**Use these to avoid re-asking.** If a Dependabot PR " + "matches an item above and circumstances are unchanged " + "(same package, version range still applies, CI state " + "still the same), act per the prior decision without " + "signalling BLOCKED. Only re-ASK if the situation has " + "materially changed (different version bump, CI flipped " + "red, new conflicts) — and quote what changed in the " + "question." + ) + prior_decisions_block = "\n".join(lines) + "\n\n" + else: + prior_decisions_block = "" + return f"""Execute security operations for repository {repo}. 1. Check Dependabot alerts: @@ -202,6 +313,42 @@ def _policy_directive(tier: str, policy: str) -> str: BLOCKED to ask for guidance. {dependabot_policy_block} + +{prior_decisions_block}**Edge cases that override the policy ladder above:** + +- **No CI configured** (repo has no `.github/workflows/*.yml`): the + "passing CI" gate cannot be evaluated, so an "auto" policy cannot + fire safely. Treat ALL open Dependabot PRs as ASK regardless of + their tier — signal BLOCKED ONCE for the whole repo with a single + consolidated question naming every open PR, e.g.: "Repo has no CI; + approve merging: #N (pkg X -> Y, patch), #M (pkg A -> B, minor)?" + Do not ask one BLOCKED per PR — operators get one consolidated + question per repo per sweep. + +- **CI failing on a check UNRELATED to the PR's own package** (e.g. + pip-audit fails on a transitive CVE in `idna` while the PR bumps + `boto3`): do NOT silently report this and move on — that's how + patch PRs end up stuck for weeks. Signal BLOCKED with a one-line + question naming the underlying issue: "Patch PRs blocked by + pre-existing pip-audit failure on idna CVE-2026-... in lockfile. + Fix idna >= 3.15 first, then re-evaluate #117 and #119?" Include + the failing check name and a one-line root-cause hint extracted + from `gh run view --log-failed`. + +**Scope discipline:** + +You have exactly three legitimate exits for each open PR: + 1. Merge it (per policy or per prior decision). + 2. Leave it open (per policy or per prior decision). + 3. Signal BLOCKED with a one-line policy question. + +Do NOT propose alternative actions outside this ladder. In +particular, do NOT offer to: batch multiple Dependabot PRs into a +single review PR, open a tracking issue for the operator, perform a +manual code review of the bump, or any other freelance option. If +the right action isn't clear from policy + prior decisions + edge +cases above, ASK with a one-line question — don't invent. + - **PR authored by `$OPERATOR` touching ONLY `.github/dependabot.yml`** (additive ecosystem entries, no other files changed): MAYBE auto-merge — but only after diff validation. These are the @@ -355,13 +502,37 @@ async def run_secops_all( state_file = worktree_path / ".ctrlrelay" / "state.json" state_file.parent.mkdir(parents=True, exist_ok=True) + # Pull the 30-day rolling window of prior operator decisions + # so the prompt can include them. Best-effort: if the lookup + # fails for any reason, fall through with an empty list — + # losing this just means the next sweep re-asks, which is the + # pre-fix status quo, not a regression. + try: + prior_decisions = state_db.list_recent_automation_decisions( + repo, + since_ts=int(time.time()) - DECISION_RECALL_SECONDS, + ) + except Exception as e: + log_event( + _logger, + "secops.prior_decisions.lookup_failed", + session_id=session_id, + repo=repo, + error_type=type(e).__name__, + error=str(e)[:200], + ) + prior_decisions = [] + ctx = PipelineContext( session_id=session_id, repo=repo, worktree_path=worktree_path, context_path=context_path, state_file=state_file, - extra={"automation": getattr(repo_config, "automation", None)}, + extra={ + "automation": getattr(repo_config, "automation", None), + "prior_decisions": prior_decisions, + }, ) state_db.execute( @@ -415,6 +586,13 @@ async def run_secops_all( outputs=result.outputs, ) break + _record_decisions_from_answer( + state_db, + repo=repo, + question=question, + answer=answer, + session_id=session_id, + ) rounds += 1 result = await pipeline.resume(ctx, answer) @@ -600,6 +778,7 @@ async def resume_secops_from_pending( transport: Transport | None, contexts_dir: Path, automation: Any = None, + question: str | None = None, ) -> PipelineResult: """Resume a BLOCKED secops session using an answer that arrived via Telegram after the original session had already torn down. @@ -609,7 +788,21 @@ async def resume_secops_from_pending( row in ``sessions`` (status='blocked') and a ``pending_resumes`` row ready for ``mark_pending_resume_resumed``. Caller is responsible for flipping the pending_resumes row after this returns. + + ``question`` is the original BLOCKED question (carried in the + pending_resumes row). When supplied, the operator answer is + persisted against any PR# the question mentioned so the next + sweep skips re-asking. Optional for backwards compat with older + callers that pre-date the persistence work. """ + if question: + _record_decisions_from_answer( + state_db, + repo=repo, + question=question, + answer=answer, + session_id=session_id, + ) pipeline = SecopsPipeline( dispatcher=dispatcher, github=github, diff --git a/tests/test_secops_pipeline.py b/tests/test_secops_pipeline.py index ce7e823..8676fce 100644 --- a/tests/test_secops_pipeline.py +++ b/tests/test_secops_pipeline.py @@ -945,3 +945,411 @@ async def hang_forever(ctx): # noqa: ARG001 # Assert: the per-repo lock was released. assert mock_db.release_lock.called + + +class TestPRNumberExtraction: + """The persistence layer keys decisions by PR#, so the regex that + pulls PR numbers out of free-form BLOCKED questions has to handle + the variants the agent emits in practice (see real + pending_resumes rows in production).""" + + def test_extracts_pr_hash_form(self) -> None: + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "Dependabot PR #60 is a MAJOR bump. Approve merge?" + assert _extract_pr_numbers(q) == ["60"] + + def test_extracts_bare_hash_form(self) -> None: + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "approve which of: #15, #14, #13?" + assert _extract_pr_numbers(q) == ["15", "14", "13"] + + def test_extracts_mixed_forms_and_dedupes(self) -> None: + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "PR #293 (major); PR #290 (major). Also #293 has alert." + assert _extract_pr_numbers(q) == ["293", "290"] + + def test_returns_empty_when_no_pr_mentioned(self) -> None: + """Edge-case questions about a repo with no PRs (e.g. CodeQL + alert decisions) shouldn't produce phantom rows.""" + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "Open CodeQL alert on injection sink. Suppress or fix?" + assert _extract_pr_numbers(q) == [] + + +class TestSecopsDecisionPersistence: + """When the operator answers a BLOCKED question, the pipeline must + persist that answer against every PR# mentioned in the question so + the next sweep skips re-asking. Without this, the daily 6am cron + surfaces the same "approve PR #60?" question every day even after + the operator has answered.""" + + @pytest.mark.asyncio + async def test_blocked_loop_records_decisions_per_pr( + self, tmp_path: Path + ) -> None: + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import run_secops_all + + blocked_state = MagicMock() + blocked_state.status = CheckpointStatus.BLOCKED_NEEDS_INPUT + blocked_state.question = "Approve majors PR #60 and PR #61?" + blocked_state.summary = None + blocked_state.outputs = {} + blocked_state.error = None + + done_state = MagicMock() + done_state.status = CheckpointStatus.DONE + done_state.summary = "Merged" + done_state.outputs = {} + done_state.error = None + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.side_effect = [ + SessionResult(session_id="s", exit_code=0, state=blocked_state), + SessionResult(session_id="s", exit_code=0, state=done_state), + ] + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + mock_db.list_recent_automation_decisions.return_value = [] + + mock_transport = AsyncMock() + mock_transport.ask.return_value = "yes merge both" + + repo = MagicMock() + repo.name = "owner/repo" + + await run_secops_all( + repos=[repo], + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=mock_transport, + contexts_dir=tmp_path / "contexts", + ) + + # One row per PR# in the question, both with the verbatim + # operator answer. + calls = mock_db.record_automation_decision.call_args_list + item_ids = sorted(c.kwargs["item_id"] for c in calls) + assert item_ids == ["#60", "#61"] + for c in calls: + assert c.kwargs["decision"] == "yes merge both" + assert c.kwargs["repo"] == "owner/repo" + assert c.kwargs["operation"] == "dependabot_pr" + + @pytest.mark.asyncio + async def test_no_decision_written_when_question_lacks_pr_number( + self, tmp_path: Path + ) -> None: + """A BLOCKED question about a CodeQL alert (no PR#) must not + produce a phantom row keyed on an empty item_id.""" + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import run_secops_all + + blocked_state = MagicMock() + blocked_state.status = CheckpointStatus.BLOCKED_NEEDS_INPUT + blocked_state.question = "Open CodeQL alert: suppress or fix?" + blocked_state.summary = None + blocked_state.outputs = {} + blocked_state.error = None + + done_state = MagicMock() + done_state.status = CheckpointStatus.DONE + done_state.summary = "Done" + done_state.outputs = {} + done_state.error = None + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.side_effect = [ + SessionResult(session_id="s", exit_code=0, state=blocked_state), + SessionResult(session_id="s", exit_code=0, state=done_state), + ] + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + mock_db.list_recent_automation_decisions.return_value = [] + + mock_transport = AsyncMock() + mock_transport.ask.return_value = "suppress for now" + + repo = MagicMock() + repo.name = "owner/repo" + + await run_secops_all( + repos=[repo], + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=mock_transport, + contexts_dir=tmp_path / "contexts", + ) + + mock_db.record_automation_decision.assert_not_called() + + @pytest.mark.asyncio + async def test_resume_from_pending_records_decisions( + self, tmp_path: Path + ) -> None: + """resume_secops_from_pending runs from the sweeper (out-of-band + Telegram reply, original session torn down). It must also + capture the operator's answer so the next 6am sweep doesn't + re-ask. The question text is carried from the pending_resumes + row.""" + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import resume_secops_from_pending + + done_state = MagicMock() + done_state.status = CheckpointStatus.DONE + done_state.summary = "Resumed and merged" + done_state.outputs = {} + done_state.error = None + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.return_value = SessionResult( + session_id="s", exit_code=0, state=done_state, + ) + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + + await resume_secops_from_pending( + session_id="s", + repo="owner/repo", + answer="merge 60 only", + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=None, + contexts_dir=tmp_path / "contexts", + question="Approve PR #60 and PR #61?", + ) + + calls = mock_db.record_automation_decision.call_args_list + item_ids = sorted(c.kwargs["item_id"] for c in calls) + assert item_ids == ["#60", "#61"] + for c in calls: + assert c.kwargs["decision"] == "merge 60 only" + + +class TestSecopsPriorDecisionsInPrompt: + """The prompt must inject the rolling window of prior operator + decisions for this repo so the agent can act on them instead of + re-asking. Without injection, persistence is dead weight.""" + + def test_prompt_includes_prior_decisions_block(self) -> None: + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prior = [ + { + "decided_at": 1779445200, + "item_id": "#60", + "decision": "yes, merge it", + }, + { + "decided_at": 1779358800, + "item_id": "#25", + "decision": "skip for now", + }, + ] + prompt = pipeline._build_prompt( + repo="o/r", session_id="s1", prior_decisions=prior, + ) + + assert "Prior operator decisions" in prompt + assert '#60: operator said "yes, merge it"' in prompt + assert '#25: operator said "skip for now"' in prompt + # Must instruct the agent to act on prior decisions, not just + # echo them back as decoration. + assert "avoid re-asking" in prompt.lower() + assert "materially changed" in prompt.lower() + + def test_prompt_omits_block_when_no_prior_decisions(self) -> None: + """When the table is empty (fresh repo, or window expired), the + block must be omitted entirely — not rendered as an empty + header that would confuse the agent.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + assert "Prior operator decisions" not in prompt + + @pytest.mark.asyncio + async def test_run_secops_all_threads_prior_decisions_into_prompt( + self, tmp_path: Path + ) -> None: + """run_secops_all must query state_db for prior decisions and + thread them through ctx.extra so _build_prompt picks them up.""" + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import run_secops_all + + captured_prompts: list[str] = [] + + async def fake_spawn(**kwargs): + captured_prompts.append(kwargs["prompt"]) + state = MagicMock() + state.status = CheckpointStatus.DONE + state.summary = "Done" + state.outputs = {} + state.error = None + return SessionResult( + session_id=kwargs["session_id"], exit_code=0, state=state, + ) + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.side_effect = fake_spawn + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + mock_db.release_lock.return_value = True + mock_db.list_recent_automation_decisions.return_value = [ + { + "decided_at": 1779445200, + "item_id": "#42", + "decision": "approved last week", + } + ] + + repo = MagicMock() + repo.name = "owner/repo" + + await run_secops_all( + repos=[repo], + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=None, + contexts_dir=tmp_path / "contexts", + ) + + assert len(captured_prompts) == 1 + assert "#42" in captured_prompts[0] + assert "approved last week" in captured_prompts[0] + # Lookup must use a since_ts (30-day window), not unbounded. + mock_db.list_recent_automation_decisions.assert_called_once() + assert ( + mock_db.list_recent_automation_decisions.call_args.kwargs.get( + "since_ts" + ) + is not None + ) + + +class TestSecopsEdgeCasePromptDirectives: + """The prompt must cover the two edge cases the agent reliably + fumbles in production: (1) repos with no CI configured, where + the 'auto-merge with passing CI' gate cannot be evaluated; and + (2) CI failing on a check unrelated to the PR's package, where + patch PRs would otherwise sit stuck for weeks.""" + + def test_prompt_covers_no_ci_configured_case(self) -> None: + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + # Must call out the missing-workflows signal explicitly so the + # agent can detect the case (not guess it). + assert ".github/workflows" in prompt + assert "No CI configured" in prompt or "no CI" in prompt.lower() + # Must instruct one consolidated question, not one per PR. + assert "consolidated" in prompt.lower() + + def test_prompt_covers_unrelated_ci_failure_case(self) -> None: + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + # Must instruct the agent to surface the pre-existing failure, + # not silently leave patches stuck. + assert "unrelated" in prompt.lower() + assert "pre-existing" in prompt.lower() or "stuck" in prompt.lower() + # Must reference the diagnostic command the agent runs to extract + # the root cause, otherwise it's just vague prose. + assert "gh run view" in prompt or "log-failed" in prompt.lower() + + def test_prompt_enforces_scope_discipline(self) -> None: + """Production saw the agent invent options like 'batch into a + single review PR' that were never in the policy ladder. Prompt + must explicitly forbid this.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + # Must name the freelance options the agent has been observed + # offering so the prohibition is concrete, not abstract. + assert "batch" in prompt.lower() + # Must establish the three-exit ladder so the agent has a clear + # mental model of legitimate actions. + assert ( + "three legitimate exits" in prompt.lower() + or "three exits" in prompt.lower() + ) + diff --git a/tests/test_state.py b/tests/test_state.py index 4eb3df9..6d20f35 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -592,3 +592,101 @@ def test_mark_resumed_is_noop_on_refreshed_unanswered_row( assert db.mark_pending_resume_resumed("s1") is True assert db.list_pending_resumes_to_execute() == [] db.close() + + +class TestAutomationDecisions: + """automation_decisions persists operator answers across sweeps so + the secops agent doesn't re-ask about the same Dependabot PR every + day. Decisions are scoped per-repo and the recall window is a + rolling 30 days (filter applied at read-time).""" + + def test_record_and_list_decisions(self, tmp_path: Path) -> None: + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="owner/repo", + operation="dependabot_pr", + item_id="#60", + decision="yes, merge it", + context="PR #60 actions/upload-artifact 4->7", + ) + rows = db.list_recent_automation_decisions("owner/repo") + assert len(rows) == 1 + assert rows[0]["repo"] == "owner/repo" + assert rows[0]["operation"] == "dependabot_pr" + assert rows[0]["item_id"] == "#60" + assert rows[0]["decision"] == "yes, merge it" + assert rows[0]["decided_by"] == "operator" + assert rows[0]["decided_at"] is not None + db.close() + + def test_list_is_scoped_per_repo(self, tmp_path: Path) -> None: + """A decision recorded for repo A must not leak into repo B's + prompt — otherwise an "approve" for repo A's upload-artifact + bump would silently authorize the same bump in unrelated repos.""" + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="owner/repo-a", operation="dependabot_pr", + item_id="#1", decision="approve", + ) + db.record_automation_decision( + repo="owner/repo-b", operation="dependabot_pr", + item_id="#1", decision="reject", + ) + assert len(db.list_recent_automation_decisions("owner/repo-a")) == 1 + assert ( + db.list_recent_automation_decisions("owner/repo-a")[0][ + "decision" + ] + == "approve" + ) + assert len(db.list_recent_automation_decisions("owner/repo-b")) == 1 + assert ( + db.list_recent_automation_decisions("owner/repo-b")[0][ + "decision" + ] + == "reject" + ) + db.close() + + def test_list_filters_by_since_ts(self, tmp_path: Path) -> None: + """30-day recall is implemented via since_ts at the read site, + not by deleting old rows. A decision older than the window + must be returned by an unbounded query but excluded when + since_ts is set just after its timestamp.""" + import time as _time + + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#1", decision="old answer", + ) + # Cutoff one second in the future — should exclude the row + # written above (decided_at == now). + cutoff = int(_time.time()) + 1 + rows = db.list_recent_automation_decisions("o/r", since_ts=cutoff) + assert rows == [] + # Same query without the cutoff still finds it. + rows = db.list_recent_automation_decisions("o/r") + assert len(rows) == 1 + db.close() + + def test_list_orders_newest_first(self, tmp_path: Path) -> None: + """The prompt-rendering site relies on newest-first ordering so + the most recent decision wins when the operator changed their + mind on the same PR.""" + import time as _time + + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#1", decision="first", + ) + # Force a tick so decided_at differs even on fast machines. + _time.sleep(1) + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#1", decision="second", + ) + rows = db.list_recent_automation_decisions("o/r") + assert [r["decision"] for r in rows] == ["second", "first"] + db.close() From e5e13c3b14aeafb766816c984c9ce737d5aa03ff Mon Sep 17 00:00:00 2001 From: Oscar V Date: Mon, 25 May 2026 23:12:26 -0700 Subject: [PATCH 2/2] fix(secops): namespace decision recall and render question snippet Two correctness gaps in the persistence layer surfaced during review: 1. list_recent_automation_decisions filtered only by repo, so a `codeql_alert` (or any future `operation`) row in the same repo would render into the Dependabot-PR prompt as a "prior PR decision". A "suppress this CVE" answer for a CodeQL alert could end up being applied to a Dependabot PR. Added an optional `operation` kwarg, default None (full audit view preserved for debugging / future audit UI), and the secops pipeline now passes `operation="dependabot_pr"` at the read site. 2. The prompt rule said "act on prior decision only when circumstances are unchanged (different version bump, CI state flipped)" but the rendered entry only showed item_id + answer. The agent literally could not detect a force-pushed PR that swapped the version under the same PR number, so the rule was either unenforceable or forced re-asking every time (defeating the persistence layer). We were already storing the question snippet as `context` at write time but never reading it back. Now rendered inline as `(prior question: ...)` with a 240-char cap, and the rule directive explicitly points the agent at the snippet to compare. Older rows without context degrade to the item+answer form (not `prior question: None`). 3 new tests cover both gaps + the missing-context degradation path. Targeted suite 73/73. --- CHANGELOG.md | 14 ++++-- src/ctrlrelay/core/state.py | 37 ++++++++++----- src/ctrlrelay/pipelines/secops.py | 35 +++++++++++--- tests/test_secops_pipeline.py | 77 +++++++++++++++++++++++++++++++ tests/test_state.py | 31 +++++++++++++ 5 files changed, 171 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c79554..ab3ebc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,13 +21,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 forms, dedupes, and tolerates questions with no PR# (e.g. CodeQL-only) by writing nothing. 2. Before building each sweep's prompt, `run_secops_all` pulls the - last 30 days of decisions for that repo and threads them via + last 30 days of decisions for that repo (namespaced to + `operation="dependabot_pr"` so e.g. CodeQL-suppression decisions + don't leak into the Dependabot prompt) and threads them via `ctx.extra['prior_decisions']` into `_build_prompt`, which renders a `## Prior operator decisions (last 30 days)` block listing each - decision verbatim. The block carries instructions to act on prior - answers unless circumstances have materially changed (different - version bump, CI state flipped). Without this the persistence - layer was dead weight. + decision verbatim alongside the original question snippet so the + agent can detect a force-pushed PR that swapped the version bump + under the same PR number. The block carries instructions to act + on prior answers unless circumstances have materially changed + (different version bump, CI state flipped). Without this the + persistence layer was dead weight. ### Fixed diff --git a/src/ctrlrelay/core/state.py b/src/ctrlrelay/core/state.py index 2cc331b..0d5c826 100644 --- a/src/ctrlrelay/core/state.py +++ b/src/ctrlrelay/core/state.py @@ -423,28 +423,41 @@ def list_recent_automation_decisions( self, repo: str, *, + operation: str | None = None, since_ts: int | None = None, limit: int = 100, ) -> list[dict[str, Any]]: """Recent operator decisions for this repo, newest first. + ``operation`` namespaces the read so a Dependabot-PR consumer + only sees ``dependabot_pr`` rows, not e.g. ``codeql_alert`` + decisions also tracked in the same repo. Without this filter, + a "suppress this CVE" answer for a CodeQL alert would render + into the Dependabot prompt as a prior-PR decision and the + agent could act on it. Defaults to ``None`` (all operations) + to preserve the original behaviour for callers that want a + full audit view. + ``since_ts`` (unix seconds) filters to decisions made at-or-after that point — used by the secops prompt builder to inject a 30-day rolling window into the agent's context so the agent can act on prior decisions instead of asking again. """ - if since_ts is None: - rows = self._conn.execute( - "SELECT * FROM automation_decisions WHERE repo = ? " - "ORDER BY decided_at DESC LIMIT ?", - (repo, limit), - ).fetchall() - else: - rows = self._conn.execute( - "SELECT * FROM automation_decisions WHERE repo = ? " - "AND decided_at >= ? ORDER BY decided_at DESC LIMIT ?", - (repo, since_ts, limit), - ).fetchall() + clauses = ["repo = ?"] + params: list[Any] = [repo] + if operation is not None: + clauses.append("operation = ?") + params.append(operation) + if since_ts is not None: + clauses.append("decided_at >= ?") + params.append(since_ts) + sql = ( + "SELECT * FROM automation_decisions WHERE " + + " AND ".join(clauses) + + " ORDER BY decided_at DESC LIMIT ?" + ) + params.append(limit) + rows = self._conn.execute(sql, tuple(params)).fetchall() return [dict(row) for row in rows] # PR watches (durable, cross-restart) diff --git a/src/ctrlrelay/pipelines/secops.py b/src/ctrlrelay/pipelines/secops.py index d111313..567e0df 100644 --- a/src/ctrlrelay/pipelines/secops.py +++ b/src/ctrlrelay/pipelines/secops.py @@ -261,6 +261,14 @@ def _policy_directive(tier: str, policy: str) -> str: # normalize "yes"/"approve"/"merge it" to a canonical decision # because that classification is exactly what the agent does # best, and getting it wrong silently auto-merges PRs. + # + # We render the stored ``context`` (question snippet captured + # at decision time) alongside item_id + answer so the agent + # can compare prior bump/CI state with the PR it's looking at + # right now. Without context, the "only re-ASK if circumstances + # materially changed" rule below is unenforceable — the agent + # can't detect a force-pushed PR that swapped the version bump + # under the same PR number. if prior_decisions: lines = ["## Prior operator decisions (last 30 days)", ""] for d in prior_decisions: @@ -275,17 +283,31 @@ def _policy_directive(tier: str, policy: str) -> str: # doesn't bloat the prompt past the model's context. if len(ans) > 160: ans = ans[:157] + "..." - lines.append(f'- {date} {item}: operator said "{ans}"') + ctx = (d.get("context") or "").strip().replace("\n", " ") + # Same cap rationale as answers; the prompt has many + # entries and at-scale this is the bigger context risk. + if len(ctx) > 240: + ctx = ctx[:237] + "..." + if ctx: + lines.append( + f'- {date} {item}: operator said "{ans}" ' + f"(prior question: {ctx})" + ) + else: + lines.append( + f'- {date} {item}: operator said "{ans}"' + ) lines.append("") lines.append( "**Use these to avoid re-asking.** If a Dependabot PR " "matches an item above and circumstances are unchanged " "(same package, version range still applies, CI state " - "still the same), act per the prior decision without " - "signalling BLOCKED. Only re-ASK if the situation has " - "materially changed (different version bump, CI flipped " - "red, new conflicts) — and quote what changed in the " - "question." + "still the same — compare the current PR against the " + "prior-question snippet above), act per the prior " + "decision without signalling BLOCKED. Only re-ASK if " + "the situation has materially changed (different " + "version bump, CI flipped red, new conflicts) — and " + "quote what changed in the question." ) prior_decisions_block = "\n".join(lines) + "\n\n" else: @@ -510,6 +532,7 @@ async def run_secops_all( try: prior_decisions = state_db.list_recent_automation_decisions( repo, + operation="dependabot_pr", since_ts=int(time.time()) - DECISION_RECALL_SECONDS, ) except Exception as e: diff --git a/tests/test_secops_pipeline.py b/tests/test_secops_pipeline.py index 8676fce..a30210f 100644 --- a/tests/test_secops_pipeline.py +++ b/tests/test_secops_pipeline.py @@ -1280,6 +1280,83 @@ async def fake_spawn(**kwargs): ) is not None ) + # Lookup must be namespaced to dependabot_pr — otherwise a + # `codeql_alert` decision in the same repo would render into + # this Dependabot prompt as a prior PR decision. + assert ( + mock_db.list_recent_automation_decisions.call_args.kwargs.get( + "operation" + ) + == "dependabot_pr" + ) + + def test_prompt_renders_context_snippet_for_stale_check(self) -> None: + """The prompt tells the agent to act on a prior decision only + when circumstances haven't materially changed (different + version bump, CI flipped). To enforce that, the rendered entry + must include the stored context (the original question + snippet) so the agent can compare prior bump/CI state with + the current PR. Without context, the rule is unenforceable.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prior = [ + { + "decided_at": 1779445200, + "item_id": "#60", + "decision": "yes merge it", + "context": "PR #60 actions/upload-artifact 4->7 (CI green)", + }, + ] + prompt = pipeline._build_prompt( + repo="o/r", session_id="s1", prior_decisions=prior, + ) + # Question snippet must appear so the agent can detect a + # force-pushed PR that swapped the version under #60. + assert "actions/upload-artifact 4->7" in prompt + # And the rule must explicitly point the agent at the snippet + # so it knows to compare, not just decoration. + assert "prior-question snippet" in prompt.lower() + + def test_prompt_handles_missing_context_gracefully(self) -> None: + """Older rows (written before context was stored) lack the + snippet. The render must not produce 'prior question: ' with + an empty tail, or worse a literal 'None' — degrade to the + item+answer-only form.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prior = [ + { + "decided_at": 1779445200, + "item_id": "#60", + "decision": "merge", + "context": None, + }, + ] + prompt = pipeline._build_prompt( + repo="o/r", session_id="s1", prior_decisions=prior, + ) + assert "#60" in prompt + assert '"merge"' in prompt + assert "prior question: " not in prompt + assert "None" not in prompt.split("Prior operator decisions")[1].split( + "**Use these" + )[0] class TestSecopsEdgeCasePromptDirectives: diff --git a/tests/test_state.py b/tests/test_state.py index 6d20f35..41e1a13 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -690,3 +690,34 @@ def test_list_orders_newest_first(self, tmp_path: Path) -> None: rows = db.list_recent_automation_decisions("o/r") assert [r["decision"] for r in rows] == ["second", "first"] db.close() + + def test_list_filters_by_operation(self, tmp_path: Path) -> None: + """The schema namespaces decisions by `operation` so e.g. a + `codeql_alert` suppression doesn't leak into a Dependabot-PR + prompt as a prior PR decision. The reader must honour the + namespace — otherwise a 'suppress this CVE' answer would + render as 'merge this PR'.""" + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#10", decision="merge", + ) + db.record_automation_decision( + repo="o/r", operation="codeql_alert", + item_id="alert-42", decision="suppress until next quarter", + ) + dep_rows = db.list_recent_automation_decisions( + "o/r", operation="dependabot_pr", + ) + assert len(dep_rows) == 1 + assert dep_rows[0]["item_id"] == "#10" + cq_rows = db.list_recent_automation_decisions( + "o/r", operation="codeql_alert", + ) + assert len(cq_rows) == 1 + assert cq_rows[0]["item_id"] == "alert-42" + # Default (no operation filter) returns the full audit view — + # used by ops debugging / future audit UI, not the prompt. + all_rows = db.list_recent_automation_decisions("o/r") + assert len(all_rows) == 2 + db.close()