diff --git a/CHANGELOG.md b/CHANGELOG.md index a4a8780..ab3ebc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,54 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Secops sweeps now remember operator decisions across days.** The + `automation_decisions` table had a schema but no producer or consumer + — every 6am sweep re-asked about the same Dependabot PRs the operator + had already answered, drowning Telegram. Wired both halves: + 1. When the BLOCKED loop in `run_secops_all` (or the out-of-band + `resume_secops_from_pending` sweeper) receives an answer, regex + extracts every PR# from the question and records one row per PR# + keyed on `(repo, "dependabot_pr", "#N")` with the operator's + verbatim answer. PR# regex handles both `PR #60` and bare `#60` + forms, dedupes, and tolerates questions with no PR# (e.g. + CodeQL-only) by writing nothing. + 2. Before building each sweep's prompt, `run_secops_all` pulls the + last 30 days of decisions for that repo (namespaced to + `operation="dependabot_pr"` so e.g. CodeQL-suppression decisions + don't leak into the Dependabot prompt) and threads them via + `ctx.extra['prior_decisions']` into `_build_prompt`, which renders + a `## Prior operator decisions (last 30 days)` block listing each + decision verbatim alongside the original question snippet so the + agent can detect a force-pushed PR that swapped the version bump + under the same PR number. The block carries instructions to act + on prior answers unless circumstances have materially changed + (different version bump, CI state flipped). Without this the + persistence layer was dead weight. + +### Fixed + +- **Secops handles "no CI configured" without inventing options.** + Repos with no `.github/workflows/*` previously confused the agent + into freelance suggestions ("Want me to batch into a review PR?") + because the "auto-merge with passing CI" gate couldn't fire. Prompt + now explicitly directs: treat ALL Dependabot PRs as ASK regardless + of tier, and signal BLOCKED ONCE with a single consolidated question + per repo (not one per PR). +- **Secops escalates pre-existing CI failures instead of stalling PRs.** + When CI is failing on a check unrelated to the PR's package (e.g. + `pip-audit` flagging a transitive CVE in `idna` while the PR bumps + `boto3`), patch PRs were sitting stuck for weeks because the agent + reported the failure and moved on. Prompt now requires the agent to + signal BLOCKED with a one-line question naming the underlying issue + and a root-cause hint from `gh run view --log-failed`. +- **Secops scope discipline.** The prompt now enforces "exactly three + legitimate exits" per open PR (merge / leave / BLOCKED) and + explicitly forbids batching PRs into review PRs, opening tracking + issues, or performing manual reviews. Cuts off the freelance + options observed in production sweeps. + ## [0.5.0] - 2026-05-11 Minor release. Three changes on the secops path; together they take the diff --git a/src/ctrlrelay/cli.py b/src/ctrlrelay/cli.py index 70bd749..26db842 100644 --- a/src/ctrlrelay/cli.py +++ b/src/ctrlrelay/cli.py @@ -1631,6 +1631,7 @@ async def _run_pending_resume_sweeper() -> None: if sweep_repo_cfg is not None else None ), + question=row.get("question"), ) elif pipeline_name == "dev": # Dev resume needs the repo's branch template diff --git a/src/ctrlrelay/core/state.py b/src/ctrlrelay/core/state.py index a35d677..0d5c826 100644 --- a/src/ctrlrelay/core/state.py +++ b/src/ctrlrelay/core/state.py @@ -386,6 +386,80 @@ def mark_pending_resume_resumed(self, session_id: str) -> bool: self._conn.commit() return cursor.rowcount > 0 + # Automation decisions (operator answers persisted across sweeps) + + def record_automation_decision( + self, + *, + repo: str, + operation: str, + item_id: str, + decision: str, + decided_by: str = "operator", + context: str | None = None, + policy: str = "", + ) -> None: + """Record an operator decision so future sweeps can avoid + re-asking the same question. + + ``operation`` namespaces the decision domain + (e.g. ``"dependabot_pr"`` or ``"codeql_alert"``); ``item_id`` + is the specific item the decision applies to (e.g. ``"#60"`` + for PR #60). ``decision`` is the operator's verbatim answer — + the agent that reads this back interprets it; we don't try to + normalize to yes/no here because Telegram replies are free-form. + """ + self._conn.execute( + """INSERT INTO automation_decisions + (repo, operation, policy, item_id, decision, + decided_by, decided_at, context) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + (repo, operation, policy, item_id, decision, + decided_by, int(time.time()), context), + ) + self._conn.commit() + + def list_recent_automation_decisions( + self, + repo: str, + *, + operation: str | None = None, + since_ts: int | None = None, + limit: int = 100, + ) -> list[dict[str, Any]]: + """Recent operator decisions for this repo, newest first. + + ``operation`` namespaces the read so a Dependabot-PR consumer + only sees ``dependabot_pr`` rows, not e.g. ``codeql_alert`` + decisions also tracked in the same repo. Without this filter, + a "suppress this CVE" answer for a CodeQL alert would render + into the Dependabot prompt as a prior-PR decision and the + agent could act on it. Defaults to ``None`` (all operations) + to preserve the original behaviour for callers that want a + full audit view. + + ``since_ts`` (unix seconds) filters to decisions made + at-or-after that point — used by the secops prompt builder to + inject a 30-day rolling window into the agent's context so the + agent can act on prior decisions instead of asking again. + """ + clauses = ["repo = ?"] + params: list[Any] = [repo] + if operation is not None: + clauses.append("operation = ?") + params.append(operation) + if since_ts is not None: + clauses.append("decided_at >= ?") + params.append(since_ts) + sql = ( + "SELECT * FROM automation_decisions WHERE " + + " AND ".join(clauses) + + " ORDER BY decided_at DESC LIMIT ?" + ) + params.append(limit) + rows = self._conn.execute(sql, tuple(params)).fetchall() + return [dict(row) for row in rows] + # PR watches (durable, cross-restart) def add_pr_watch( diff --git a/src/ctrlrelay/pipelines/secops.py b/src/ctrlrelay/pipelines/secops.py index 1e2ecf5..567e0df 100644 --- a/src/ctrlrelay/pipelines/secops.py +++ b/src/ctrlrelay/pipelines/secops.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import re import time import uuid from dataclasses import dataclass @@ -23,6 +24,72 @@ DEFAULT_MAX_BLOCKED_ROUNDS = 5 +# 30-day rolling window of operator decisions injected into each +# sweep's prompt. Long enough to cover weekly cadences and a vacation; +# short enough that a stale "approve" from months ago can't auto-merge +# a PR whose risk profile may have shifted (new transitive deps, +# changed maintainer, etc.). +DECISION_RECALL_SECONDS = 30 * 86400 + +# Captures both "PR #60" and "#60" forms the agent uses interchangeably +# in BLOCKED questions. The negative-lookahead on `#0` avoids matching +# CVE-2026-... style identifiers; PR numbers are always >= 1. +_PR_NUM_RE = re.compile(r"(?:PR\s*)?#(\d+)(?!\d)", re.IGNORECASE) + + +def _extract_pr_numbers(question: str) -> list[str]: + """Pull deduplicated PR numbers out of a BLOCKED question so we + can record one operator-decision row per PR. Order-preserving so + a multi-PR question like 'approve #60, #61, #62?' records the + decision against each in the order the agent listed them.""" + seen: set[str] = set() + out: list[str] = [] + for n in _PR_NUM_RE.findall(question or ""): + if n in seen: + continue + seen.add(n) + out.append(n) + return out + + +def _record_decisions_from_answer( + state_db: StateDB, + *, + repo: str, + question: str, + answer: str, + session_id: str, +) -> None: + """Best-effort persistence of an operator's verbatim Telegram + answer against every PR# mentioned in the question. Failure here + is logged but does not break the pipeline — the answer has + already been acted on by the agent; losing the persistence layer + just means the next sweep will re-ask, which is the pre-fix + status quo.""" + pr_numbers = _extract_pr_numbers(question) + if not pr_numbers: + return + for pr_num in pr_numbers: + try: + state_db.record_automation_decision( + repo=repo, + operation="dependabot_pr", + item_id=f"#{pr_num}", + decision=answer, + decided_by="operator", + context=(question or "")[:500], + ) + except Exception as e: + log_event( + _logger, + "secops.decision.record_failed", + session_id=session_id, + repo=repo, + pr_number=pr_num, + error_type=type(e).__name__, + error=str(e)[:200], + ) + def _question_for_persist(session_id: str, result: PipelineResult) -> str: """Return a non-empty question string for pending_resumes storage. @@ -61,6 +128,7 @@ async def run(self, ctx: PipelineContext) -> PipelineResult: session_id=ctx.session_id, state_file=ctx.state_file, automation=ctx.extra.get("automation"), + prior_decisions=ctx.extra.get("prior_decisions"), ) result = await self._spawn(ctx, prompt, resume=False) @@ -127,6 +195,7 @@ def _build_prompt( session_id: str = "", state_file: Path | None = None, automation: Any = None, + prior_decisions: list[dict[str, Any]] | None = None, ) -> str: """Build the secops prompt. @@ -135,7 +204,13 @@ def _build_prompt( When provided, the prompt explicitly tells the agent which tiers auto-merge vs ask vs never-merge. When ``None``, falls back to the default policy (patch=auto, minor=ask, never-major) so - existing callers and the schema defaults stay coherent.""" + existing callers and the schema defaults stay coherent. + + ``prior_decisions`` is a list of recent automation_decisions + rows for this repo. When present, they're rendered as a + "Prior operator decisions" block so the agent can act on a + past answer instead of asking again about the same PR. + """ state_file_path = str(state_file) if state_file else "/tmp/state.json" # Translate per-repo policy into prompt directives. Use defaults @@ -180,6 +255,64 @@ def _policy_directive(tier: str, policy: str) -> str: ] ) + # Render the rolling window of prior operator decisions for this + # repo. The agent reads the verbatim Telegram answer and decides + # whether the prior call still applies — we don't try to + # normalize "yes"/"approve"/"merge it" to a canonical decision + # because that classification is exactly what the agent does + # best, and getting it wrong silently auto-merges PRs. + # + # We render the stored ``context`` (question snippet captured + # at decision time) alongside item_id + answer so the agent + # can compare prior bump/CI state with the PR it's looking at + # right now. Without context, the "only re-ASK if circumstances + # materially changed" rule below is unenforceable — the agent + # can't detect a force-pushed PR that swapped the version bump + # under the same PR number. + if prior_decisions: + lines = ["## Prior operator decisions (last 30 days)", ""] + for d in prior_decisions: + ts = d.get("decided_at") or 0 + date = ( + time.strftime("%Y-%m-%d", time.gmtime(int(ts))) + if ts else "unknown-date" + ) + item = d.get("item_id") or "?" + ans = (d.get("decision") or "").strip().replace("\n", " ") + # Cap each answer so a pathological Telegram reply + # doesn't bloat the prompt past the model's context. + if len(ans) > 160: + ans = ans[:157] + "..." + ctx = (d.get("context") or "").strip().replace("\n", " ") + # Same cap rationale as answers; the prompt has many + # entries and at-scale this is the bigger context risk. + if len(ctx) > 240: + ctx = ctx[:237] + "..." + if ctx: + lines.append( + f'- {date} {item}: operator said "{ans}" ' + f"(prior question: {ctx})" + ) + else: + lines.append( + f'- {date} {item}: operator said "{ans}"' + ) + lines.append("") + lines.append( + "**Use these to avoid re-asking.** If a Dependabot PR " + "matches an item above and circumstances are unchanged " + "(same package, version range still applies, CI state " + "still the same — compare the current PR against the " + "prior-question snippet above), act per the prior " + "decision without signalling BLOCKED. Only re-ASK if " + "the situation has materially changed (different " + "version bump, CI flipped red, new conflicts) — and " + "quote what changed in the question." + ) + prior_decisions_block = "\n".join(lines) + "\n\n" + else: + prior_decisions_block = "" + return f"""Execute security operations for repository {repo}. 1. Check Dependabot alerts: @@ -202,6 +335,42 @@ def _policy_directive(tier: str, policy: str) -> str: BLOCKED to ask for guidance. {dependabot_policy_block} + +{prior_decisions_block}**Edge cases that override the policy ladder above:** + +- **No CI configured** (repo has no `.github/workflows/*.yml`): the + "passing CI" gate cannot be evaluated, so an "auto" policy cannot + fire safely. Treat ALL open Dependabot PRs as ASK regardless of + their tier — signal BLOCKED ONCE for the whole repo with a single + consolidated question naming every open PR, e.g.: "Repo has no CI; + approve merging: #N (pkg X -> Y, patch), #M (pkg A -> B, minor)?" + Do not ask one BLOCKED per PR — operators get one consolidated + question per repo per sweep. + +- **CI failing on a check UNRELATED to the PR's own package** (e.g. + pip-audit fails on a transitive CVE in `idna` while the PR bumps + `boto3`): do NOT silently report this and move on — that's how + patch PRs end up stuck for weeks. Signal BLOCKED with a one-line + question naming the underlying issue: "Patch PRs blocked by + pre-existing pip-audit failure on idna CVE-2026-... in lockfile. + Fix idna >= 3.15 first, then re-evaluate #117 and #119?" Include + the failing check name and a one-line root-cause hint extracted + from `gh run view --log-failed`. + +**Scope discipline:** + +You have exactly three legitimate exits for each open PR: + 1. Merge it (per policy or per prior decision). + 2. Leave it open (per policy or per prior decision). + 3. Signal BLOCKED with a one-line policy question. + +Do NOT propose alternative actions outside this ladder. In +particular, do NOT offer to: batch multiple Dependabot PRs into a +single review PR, open a tracking issue for the operator, perform a +manual code review of the bump, or any other freelance option. If +the right action isn't clear from policy + prior decisions + edge +cases above, ASK with a one-line question — don't invent. + - **PR authored by `$OPERATOR` touching ONLY `.github/dependabot.yml`** (additive ecosystem entries, no other files changed): MAYBE auto-merge — but only after diff validation. These are the @@ -355,13 +524,38 @@ async def run_secops_all( state_file = worktree_path / ".ctrlrelay" / "state.json" state_file.parent.mkdir(parents=True, exist_ok=True) + # Pull the 30-day rolling window of prior operator decisions + # so the prompt can include them. Best-effort: if the lookup + # fails for any reason, fall through with an empty list — + # losing this just means the next sweep re-asks, which is the + # pre-fix status quo, not a regression. + try: + prior_decisions = state_db.list_recent_automation_decisions( + repo, + operation="dependabot_pr", + since_ts=int(time.time()) - DECISION_RECALL_SECONDS, + ) + except Exception as e: + log_event( + _logger, + "secops.prior_decisions.lookup_failed", + session_id=session_id, + repo=repo, + error_type=type(e).__name__, + error=str(e)[:200], + ) + prior_decisions = [] + ctx = PipelineContext( session_id=session_id, repo=repo, worktree_path=worktree_path, context_path=context_path, state_file=state_file, - extra={"automation": getattr(repo_config, "automation", None)}, + extra={ + "automation": getattr(repo_config, "automation", None), + "prior_decisions": prior_decisions, + }, ) state_db.execute( @@ -415,6 +609,13 @@ async def run_secops_all( outputs=result.outputs, ) break + _record_decisions_from_answer( + state_db, + repo=repo, + question=question, + answer=answer, + session_id=session_id, + ) rounds += 1 result = await pipeline.resume(ctx, answer) @@ -600,6 +801,7 @@ async def resume_secops_from_pending( transport: Transport | None, contexts_dir: Path, automation: Any = None, + question: str | None = None, ) -> PipelineResult: """Resume a BLOCKED secops session using an answer that arrived via Telegram after the original session had already torn down. @@ -609,7 +811,21 @@ async def resume_secops_from_pending( row in ``sessions`` (status='blocked') and a ``pending_resumes`` row ready for ``mark_pending_resume_resumed``. Caller is responsible for flipping the pending_resumes row after this returns. + + ``question`` is the original BLOCKED question (carried in the + pending_resumes row). When supplied, the operator answer is + persisted against any PR# the question mentioned so the next + sweep skips re-asking. Optional for backwards compat with older + callers that pre-date the persistence work. """ + if question: + _record_decisions_from_answer( + state_db, + repo=repo, + question=question, + answer=answer, + session_id=session_id, + ) pipeline = SecopsPipeline( dispatcher=dispatcher, github=github, diff --git a/tests/test_secops_pipeline.py b/tests/test_secops_pipeline.py index ce7e823..a30210f 100644 --- a/tests/test_secops_pipeline.py +++ b/tests/test_secops_pipeline.py @@ -945,3 +945,488 @@ async def hang_forever(ctx): # noqa: ARG001 # Assert: the per-repo lock was released. assert mock_db.release_lock.called + + +class TestPRNumberExtraction: + """The persistence layer keys decisions by PR#, so the regex that + pulls PR numbers out of free-form BLOCKED questions has to handle + the variants the agent emits in practice (see real + pending_resumes rows in production).""" + + def test_extracts_pr_hash_form(self) -> None: + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "Dependabot PR #60 is a MAJOR bump. Approve merge?" + assert _extract_pr_numbers(q) == ["60"] + + def test_extracts_bare_hash_form(self) -> None: + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "approve which of: #15, #14, #13?" + assert _extract_pr_numbers(q) == ["15", "14", "13"] + + def test_extracts_mixed_forms_and_dedupes(self) -> None: + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "PR #293 (major); PR #290 (major). Also #293 has alert." + assert _extract_pr_numbers(q) == ["293", "290"] + + def test_returns_empty_when_no_pr_mentioned(self) -> None: + """Edge-case questions about a repo with no PRs (e.g. CodeQL + alert decisions) shouldn't produce phantom rows.""" + from ctrlrelay.pipelines.secops import _extract_pr_numbers + + q = "Open CodeQL alert on injection sink. Suppress or fix?" + assert _extract_pr_numbers(q) == [] + + +class TestSecopsDecisionPersistence: + """When the operator answers a BLOCKED question, the pipeline must + persist that answer against every PR# mentioned in the question so + the next sweep skips re-asking. Without this, the daily 6am cron + surfaces the same "approve PR #60?" question every day even after + the operator has answered.""" + + @pytest.mark.asyncio + async def test_blocked_loop_records_decisions_per_pr( + self, tmp_path: Path + ) -> None: + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import run_secops_all + + blocked_state = MagicMock() + blocked_state.status = CheckpointStatus.BLOCKED_NEEDS_INPUT + blocked_state.question = "Approve majors PR #60 and PR #61?" + blocked_state.summary = None + blocked_state.outputs = {} + blocked_state.error = None + + done_state = MagicMock() + done_state.status = CheckpointStatus.DONE + done_state.summary = "Merged" + done_state.outputs = {} + done_state.error = None + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.side_effect = [ + SessionResult(session_id="s", exit_code=0, state=blocked_state), + SessionResult(session_id="s", exit_code=0, state=done_state), + ] + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + mock_db.list_recent_automation_decisions.return_value = [] + + mock_transport = AsyncMock() + mock_transport.ask.return_value = "yes merge both" + + repo = MagicMock() + repo.name = "owner/repo" + + await run_secops_all( + repos=[repo], + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=mock_transport, + contexts_dir=tmp_path / "contexts", + ) + + # One row per PR# in the question, both with the verbatim + # operator answer. + calls = mock_db.record_automation_decision.call_args_list + item_ids = sorted(c.kwargs["item_id"] for c in calls) + assert item_ids == ["#60", "#61"] + for c in calls: + assert c.kwargs["decision"] == "yes merge both" + assert c.kwargs["repo"] == "owner/repo" + assert c.kwargs["operation"] == "dependabot_pr" + + @pytest.mark.asyncio + async def test_no_decision_written_when_question_lacks_pr_number( + self, tmp_path: Path + ) -> None: + """A BLOCKED question about a CodeQL alert (no PR#) must not + produce a phantom row keyed on an empty item_id.""" + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import run_secops_all + + blocked_state = MagicMock() + blocked_state.status = CheckpointStatus.BLOCKED_NEEDS_INPUT + blocked_state.question = "Open CodeQL alert: suppress or fix?" + blocked_state.summary = None + blocked_state.outputs = {} + blocked_state.error = None + + done_state = MagicMock() + done_state.status = CheckpointStatus.DONE + done_state.summary = "Done" + done_state.outputs = {} + done_state.error = None + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.side_effect = [ + SessionResult(session_id="s", exit_code=0, state=blocked_state), + SessionResult(session_id="s", exit_code=0, state=done_state), + ] + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + mock_db.list_recent_automation_decisions.return_value = [] + + mock_transport = AsyncMock() + mock_transport.ask.return_value = "suppress for now" + + repo = MagicMock() + repo.name = "owner/repo" + + await run_secops_all( + repos=[repo], + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=mock_transport, + contexts_dir=tmp_path / "contexts", + ) + + mock_db.record_automation_decision.assert_not_called() + + @pytest.mark.asyncio + async def test_resume_from_pending_records_decisions( + self, tmp_path: Path + ) -> None: + """resume_secops_from_pending runs from the sweeper (out-of-band + Telegram reply, original session torn down). It must also + capture the operator's answer so the next 6am sweep doesn't + re-ask. The question text is carried from the pending_resumes + row.""" + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import resume_secops_from_pending + + done_state = MagicMock() + done_state.status = CheckpointStatus.DONE + done_state.summary = "Resumed and merged" + done_state.outputs = {} + done_state.error = None + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.return_value = SessionResult( + session_id="s", exit_code=0, state=done_state, + ) + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + + await resume_secops_from_pending( + session_id="s", + repo="owner/repo", + answer="merge 60 only", + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=None, + contexts_dir=tmp_path / "contexts", + question="Approve PR #60 and PR #61?", + ) + + calls = mock_db.record_automation_decision.call_args_list + item_ids = sorted(c.kwargs["item_id"] for c in calls) + assert item_ids == ["#60", "#61"] + for c in calls: + assert c.kwargs["decision"] == "merge 60 only" + + +class TestSecopsPriorDecisionsInPrompt: + """The prompt must inject the rolling window of prior operator + decisions for this repo so the agent can act on them instead of + re-asking. Without injection, persistence is dead weight.""" + + def test_prompt_includes_prior_decisions_block(self) -> None: + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prior = [ + { + "decided_at": 1779445200, + "item_id": "#60", + "decision": "yes, merge it", + }, + { + "decided_at": 1779358800, + "item_id": "#25", + "decision": "skip for now", + }, + ] + prompt = pipeline._build_prompt( + repo="o/r", session_id="s1", prior_decisions=prior, + ) + + assert "Prior operator decisions" in prompt + assert '#60: operator said "yes, merge it"' in prompt + assert '#25: operator said "skip for now"' in prompt + # Must instruct the agent to act on prior decisions, not just + # echo them back as decoration. + assert "avoid re-asking" in prompt.lower() + assert "materially changed" in prompt.lower() + + def test_prompt_omits_block_when_no_prior_decisions(self) -> None: + """When the table is empty (fresh repo, or window expired), the + block must be omitted entirely — not rendered as an empty + header that would confuse the agent.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + assert "Prior operator decisions" not in prompt + + @pytest.mark.asyncio + async def test_run_secops_all_threads_prior_decisions_into_prompt( + self, tmp_path: Path + ) -> None: + """run_secops_all must query state_db for prior decisions and + thread them through ctx.extra so _build_prompt picks them up.""" + from ctrlrelay.core.checkpoint import CheckpointStatus + from ctrlrelay.core.dispatcher import SessionResult + from ctrlrelay.pipelines.secops import run_secops_all + + captured_prompts: list[str] = [] + + async def fake_spawn(**kwargs): + captured_prompts.append(kwargs["prompt"]) + state = MagicMock() + state.status = CheckpointStatus.DONE + state.summary = "Done" + state.outputs = {} + state.error = None + return SessionResult( + session_id=kwargs["session_id"], exit_code=0, state=state, + ) + + mock_dispatcher = AsyncMock() + mock_dispatcher.spawn_session.side_effect = fake_spawn + + mock_worktree = AsyncMock() + mock_worktree.create_worktree.return_value = tmp_path / "worktree" + mock_worktree.ensure_bare_repo.return_value = tmp_path / "bare" + + mock_db = MagicMock() + mock_db.acquire_lock.return_value = True + mock_db.release_lock.return_value = True + mock_db.list_recent_automation_decisions.return_value = [ + { + "decided_at": 1779445200, + "item_id": "#42", + "decision": "approved last week", + } + ] + + repo = MagicMock() + repo.name = "owner/repo" + + await run_secops_all( + repos=[repo], + dispatcher=mock_dispatcher, + github=MagicMock(), + worktree=mock_worktree, + dashboard=None, + state_db=mock_db, + transport=None, + contexts_dir=tmp_path / "contexts", + ) + + assert len(captured_prompts) == 1 + assert "#42" in captured_prompts[0] + assert "approved last week" in captured_prompts[0] + # Lookup must use a since_ts (30-day window), not unbounded. + mock_db.list_recent_automation_decisions.assert_called_once() + assert ( + mock_db.list_recent_automation_decisions.call_args.kwargs.get( + "since_ts" + ) + is not None + ) + # Lookup must be namespaced to dependabot_pr — otherwise a + # `codeql_alert` decision in the same repo would render into + # this Dependabot prompt as a prior PR decision. + assert ( + mock_db.list_recent_automation_decisions.call_args.kwargs.get( + "operation" + ) + == "dependabot_pr" + ) + + def test_prompt_renders_context_snippet_for_stale_check(self) -> None: + """The prompt tells the agent to act on a prior decision only + when circumstances haven't materially changed (different + version bump, CI flipped). To enforce that, the rendered entry + must include the stored context (the original question + snippet) so the agent can compare prior bump/CI state with + the current PR. Without context, the rule is unenforceable.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prior = [ + { + "decided_at": 1779445200, + "item_id": "#60", + "decision": "yes merge it", + "context": "PR #60 actions/upload-artifact 4->7 (CI green)", + }, + ] + prompt = pipeline._build_prompt( + repo="o/r", session_id="s1", prior_decisions=prior, + ) + # Question snippet must appear so the agent can detect a + # force-pushed PR that swapped the version under #60. + assert "actions/upload-artifact 4->7" in prompt + # And the rule must explicitly point the agent at the snippet + # so it knows to compare, not just decoration. + assert "prior-question snippet" in prompt.lower() + + def test_prompt_handles_missing_context_gracefully(self) -> None: + """Older rows (written before context was stored) lack the + snippet. The render must not produce 'prior question: ' with + an empty tail, or worse a literal 'None' — degrade to the + item+answer-only form.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prior = [ + { + "decided_at": 1779445200, + "item_id": "#60", + "decision": "merge", + "context": None, + }, + ] + prompt = pipeline._build_prompt( + repo="o/r", session_id="s1", prior_decisions=prior, + ) + assert "#60" in prompt + assert '"merge"' in prompt + assert "prior question: " not in prompt + assert "None" not in prompt.split("Prior operator decisions")[1].split( + "**Use these" + )[0] + + +class TestSecopsEdgeCasePromptDirectives: + """The prompt must cover the two edge cases the agent reliably + fumbles in production: (1) repos with no CI configured, where + the 'auto-merge with passing CI' gate cannot be evaluated; and + (2) CI failing on a check unrelated to the PR's package, where + patch PRs would otherwise sit stuck for weeks.""" + + def test_prompt_covers_no_ci_configured_case(self) -> None: + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + # Must call out the missing-workflows signal explicitly so the + # agent can detect the case (not guess it). + assert ".github/workflows" in prompt + assert "No CI configured" in prompt or "no CI" in prompt.lower() + # Must instruct one consolidated question, not one per PR. + assert "consolidated" in prompt.lower() + + def test_prompt_covers_unrelated_ci_failure_case(self) -> None: + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + # Must instruct the agent to surface the pre-existing failure, + # not silently leave patches stuck. + assert "unrelated" in prompt.lower() + assert "pre-existing" in prompt.lower() or "stuck" in prompt.lower() + # Must reference the diagnostic command the agent runs to extract + # the root cause, otherwise it's just vague prose. + assert "gh run view" in prompt or "log-failed" in prompt.lower() + + def test_prompt_enforces_scope_discipline(self) -> None: + """Production saw the agent invent options like 'batch into a + single review PR' that were never in the policy ladder. Prompt + must explicitly forbid this.""" + from ctrlrelay.pipelines.secops import SecopsPipeline + + pipeline = SecopsPipeline( + dispatcher=MagicMock(), + github=MagicMock(), + worktree=MagicMock(), + dashboard=None, + state_db=MagicMock(), + transport=None, + ) + prompt = pipeline._build_prompt(repo="o/r", session_id="s1") + # Must name the freelance options the agent has been observed + # offering so the prohibition is concrete, not abstract. + assert "batch" in prompt.lower() + # Must establish the three-exit ladder so the agent has a clear + # mental model of legitimate actions. + assert ( + "three legitimate exits" in prompt.lower() + or "three exits" in prompt.lower() + ) + diff --git a/tests/test_state.py b/tests/test_state.py index 4eb3df9..41e1a13 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -592,3 +592,132 @@ def test_mark_resumed_is_noop_on_refreshed_unanswered_row( assert db.mark_pending_resume_resumed("s1") is True assert db.list_pending_resumes_to_execute() == [] db.close() + + +class TestAutomationDecisions: + """automation_decisions persists operator answers across sweeps so + the secops agent doesn't re-ask about the same Dependabot PR every + day. Decisions are scoped per-repo and the recall window is a + rolling 30 days (filter applied at read-time).""" + + def test_record_and_list_decisions(self, tmp_path: Path) -> None: + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="owner/repo", + operation="dependabot_pr", + item_id="#60", + decision="yes, merge it", + context="PR #60 actions/upload-artifact 4->7", + ) + rows = db.list_recent_automation_decisions("owner/repo") + assert len(rows) == 1 + assert rows[0]["repo"] == "owner/repo" + assert rows[0]["operation"] == "dependabot_pr" + assert rows[0]["item_id"] == "#60" + assert rows[0]["decision"] == "yes, merge it" + assert rows[0]["decided_by"] == "operator" + assert rows[0]["decided_at"] is not None + db.close() + + def test_list_is_scoped_per_repo(self, tmp_path: Path) -> None: + """A decision recorded for repo A must not leak into repo B's + prompt — otherwise an "approve" for repo A's upload-artifact + bump would silently authorize the same bump in unrelated repos.""" + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="owner/repo-a", operation="dependabot_pr", + item_id="#1", decision="approve", + ) + db.record_automation_decision( + repo="owner/repo-b", operation="dependabot_pr", + item_id="#1", decision="reject", + ) + assert len(db.list_recent_automation_decisions("owner/repo-a")) == 1 + assert ( + db.list_recent_automation_decisions("owner/repo-a")[0][ + "decision" + ] + == "approve" + ) + assert len(db.list_recent_automation_decisions("owner/repo-b")) == 1 + assert ( + db.list_recent_automation_decisions("owner/repo-b")[0][ + "decision" + ] + == "reject" + ) + db.close() + + def test_list_filters_by_since_ts(self, tmp_path: Path) -> None: + """30-day recall is implemented via since_ts at the read site, + not by deleting old rows. A decision older than the window + must be returned by an unbounded query but excluded when + since_ts is set just after its timestamp.""" + import time as _time + + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#1", decision="old answer", + ) + # Cutoff one second in the future — should exclude the row + # written above (decided_at == now). + cutoff = int(_time.time()) + 1 + rows = db.list_recent_automation_decisions("o/r", since_ts=cutoff) + assert rows == [] + # Same query without the cutoff still finds it. + rows = db.list_recent_automation_decisions("o/r") + assert len(rows) == 1 + db.close() + + def test_list_orders_newest_first(self, tmp_path: Path) -> None: + """The prompt-rendering site relies on newest-first ordering so + the most recent decision wins when the operator changed their + mind on the same PR.""" + import time as _time + + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#1", decision="first", + ) + # Force a tick so decided_at differs even on fast machines. + _time.sleep(1) + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#1", decision="second", + ) + rows = db.list_recent_automation_decisions("o/r") + assert [r["decision"] for r in rows] == ["second", "first"] + db.close() + + def test_list_filters_by_operation(self, tmp_path: Path) -> None: + """The schema namespaces decisions by `operation` so e.g. a + `codeql_alert` suppression doesn't leak into a Dependabot-PR + prompt as a prior PR decision. The reader must honour the + namespace — otherwise a 'suppress this CVE' answer would + render as 'merge this PR'.""" + db = StateDB(tmp_path / "state.db") + db.record_automation_decision( + repo="o/r", operation="dependabot_pr", + item_id="#10", decision="merge", + ) + db.record_automation_decision( + repo="o/r", operation="codeql_alert", + item_id="alert-42", decision="suppress until next quarter", + ) + dep_rows = db.list_recent_automation_decisions( + "o/r", operation="dependabot_pr", + ) + assert len(dep_rows) == 1 + assert dep_rows[0]["item_id"] == "#10" + cq_rows = db.list_recent_automation_decisions( + "o/r", operation="codeql_alert", + ) + assert len(cq_rows) == 1 + assert cq_rows[0]["item_id"] == "alert-42" + # Default (no operation filter) returns the full audit view — + # used by ops debugging / future audit UI, not the prompt. + all_rows = db.list_recent_automation_decisions("o/r") + assert len(all_rows) == 2 + db.close()