Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tests/test_event_recording.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import re
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "tools"))

Expand Down Expand Up @@ -77,4 +78,7 @@ def test_phase_transcript_does_not_truncate_existing_file(tmp_path, monkeypatch)

assert existing.read_text(encoding="utf-8") == "keep me\n"
assert transcript.path != existing
assert transcript.path.name.startswith("last-phase-1c-no-finding-attempt-1-")
assert re.fullmatch(
r"last-phase-1c-no-finding-attempt-1-\d{8}-\d{6}-pid\d+\.jsonl",
transcript.path.name,
)
99 changes: 99 additions & 0 deletions tests/test_findings_quality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from __future__ import annotations

import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "tools"))


def _write_phase2_finding(path: Path, *, title: str, category: str, target_area: str, summary: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
"---\n"
"id: \"CC-0099\"\n"
f"title: \"{title}\"\n"
"status: \"PENDING\"\n"
"severity: \"MEDIUM\"\n"
"cvss_v4:\n vector: \"\"\n score: 0.0\n justification: \"\"\n"
"confidence: \"MEDIUM\"\n"
f"category: \"{category}\"\n"
"cwe: [\"CWE-287\"]\n"
"language: \"c\"\n"
f"target_area: \"{target_area}\"\n"
"files: [\"scheduler/ipp.c\"]\n"
"symbols: [\"ippReadIO\"]\n"
"entry_points: [\"IPP request parser\"]\n"
"sources: [\"network IPP request\"]\n"
"sinks: [\"authorization decision\"]\n"
"trust_boundary: \"remote client to scheduler\"\n"
"assets_at_risk: [\"scheduler authorization state\"]\n"
"validation:\n status: \"NOT_STARTED\"\n methods: []\n evidence_dir: \"itemdb/evidence/CC-0099\"\n summary: \"\"\n"
"exploitation:\n status: \"NOT_STARTED\"\n impact_demonstrated: \"\"\n exploit_type: \"\"\n severity_before: \"\"\n severity_after: \"\"\n artifacts_dir: \"itemdb/evidence/CC-0099/exploits\"\n summary: \"\"\n"
"created_at: \"2026-06-18\"\n"
"updated_at: \"2026-06-18\"\n"
"---\n\n"
"# Summary\n\n"
f"{summary}\n\n"
"# Target context\n\n"
"The CUPS scheduler accepts IPP requests from remote clients and maps request metadata into authorization state.\n\n"
"# Affected code\n\n"
"The affected path is `scheduler/ipp.c` in the IPP request parser and authorization handoff.\n\n"
"# Vulnerability hypothesis\n\n"
"A remote client may control the user identity attribute that reaches an authorization decision without canonicalization.\n\n"
"# Source-to-sink reasoning\n\n"
"The source is a network IPP request. The parser copies the identity attribute into scheduler request state. The sink is an authorization check that trusts that state.\n\n"
"# Attackability / trigger conditions\n\n"
"An unauthenticated remote client can send a crafted IPP request before authorization is evaluated.\n\n"
"# Impact\n\n"
"Successful exploitation could bypass authorization checks and perform scheduler operations as a more privileged user.\n\n"
"# Validation plan\n\n"
"Send crafted IPP requests with controlled identity attributes and compare the scheduler authorization outcome against a baseline request.\n\n"
"# Counter-analysis\n\n"
"Review parser normalization, authentication layers, and later authorization checks to determine whether attacker control is removed before the sink.\n\n"
"# Validation result\n\n"
"Pending.\n\n"
"# Evidence\n\n"
"Pending.\n",
encoding="utf-8",
)


def test_phase2_quality_rejects_test_template_artifact(tmp_path: Path) -> None:
from findings.quality import validate_phase2_finding_quality

finding = tmp_path / "itemdb" / "findings" / "PENDING" / "CC-0099-test-finding.md"
_write_phase2_finding(
finding,
title="Test finding to see template",
category="Test",
target_area="Testing",
summary="This is a test finding created to verify the template system. It does not represent an actual vulnerability.",
)

errors = validate_phase2_finding_quality(finding)

assert any("test/template artifact" in error for error in errors), errors
assert any("not an actual target vulnerability" in error for error in errors), errors


def test_phase2_quality_rejects_case_insensitive_template_markers(tmp_path: Path) -> None:
from findings.quality import validate_phase2_finding_quality

finding = tmp_path / "itemdb" / "findings" / "PENDING" / "CC-0099-case-variant.md"
_write_phase2_finding(
finding,
title="IPP identity bypass in request parser",
category="Auth",
target_area="Scheduler",
summary="A remote client may inject identity attributes.",
)
content = finding.read_text(encoding="utf-8")
content = content.replace(
"A remote client may control the user identity attribute",
"briefly describe the suspected vulnerability. the parser accepts unvalidated identity attributes.",
)
finding.write_text(content, encoding="utf-8")

errors = validate_phase2_finding_quality(finding)

assert any("contains template guidance" in error for error in errors), errors
15 changes: 15 additions & 0 deletions tests/test_gate_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,18 @@ def test_gate_phase_4_rejects_wrong_status(tmp_path, monkeypatch):
assert exit_code == 1
finally:
gates_module.ROOT = original_root


def test_gate_phase_3_no_pending_is_noop_success(tmp_path, capsys):
original_root = gates_module.ROOT
gates_module.ROOT = tmp_path
(tmp_path / "itemdb" / "findings" / "PENDING").mkdir(parents=True)

try:
exit_code = gates_module.gate_phase_3()
finally:
gates_module.ROOT = original_root

out = capsys.readouterr().out
assert exit_code == 0
assert "nothing to review" in out
82 changes: 82 additions & 0 deletions tests/test_phase_artifacts_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,85 @@ def test_has_valid_threat_model_returns_false_when_missing(tmp_path: Path) -> No

with patch("phases.artifact_checks.ROOT", tmp_path):
assert not has_valid_threat_model()


def test_phase2_artifacts_accept_explicit_no_findings_summary(tmp_path: Path) -> None:
from phases.artifact_checks import check_phase_2_artifacts

runs = tmp_path / "runs"
runs.mkdir(parents=True)
(runs / "phase-2-summary-2026-06-16-120000.md").write_text(
"# Findings created\n\n"
"| ID | Title | Path |\n"
"|---|---|---|\n"
"| - | None. | - |\n",
encoding="utf-8",
)

with patch("phases.artifact_checks.ROOT", tmp_path):
assert check_phase_2_artifacts() == []


def test_phase2_artifacts_reject_stub_finding(tmp_path: Path) -> None:
from phases.artifact_checks import check_phase_2_artifacts

runs = tmp_path / "runs"
pending = tmp_path / "itemdb" / "findings" / "PENDING"
runs.mkdir(parents=True)
pending.mkdir(parents=True)
(runs / "phase-2-summary-2026-06-16-120000.md").write_text(
"# Findings created\n\n"
"| ID | Title | Path |\n"
"|---|---|---|\n"
"| CC-0001 | Stub | itemdb/findings/PENDING/CC-0001-stub.md |\n",
encoding="utf-8",
)
(pending / "CC-0001-stub.md").write_text(
"---\n"
"id: \"CC-0001\"\n"
"title: \"Stub\"\n"
"status: \"PENDING\"\n"
"severity: \"MEDIUM\"\n"
"cvss_v4:\n vector: \"\"\n score: 0.0\n justification: \"\"\n"
"confidence: \"LOW\"\ncategory: \"Unclassified\"\ncwe: []\nlanguage: \"unknown\"\ntarget_area: \"unknown\"\n"
"files: []\nsymbols: []\nentry_points: []\nsources: []\nsinks: []\ntrust_boundary: \"unknown\"\nassets_at_risk: []\n"
"validation:\n status: \"NOT_STARTED\"\n methods: []\n evidence_dir: \"itemdb/evidence/CC-0001\"\n summary: \"\"\n"
"exploitation:\n status: \"NOT_STARTED\"\n impact_demonstrated: \"\"\n exploit_type: \"\"\n severity_before: \"\"\n severity_after: \"\"\n artifacts_dir: \"itemdb/evidence/CC-0001/exploits\"\n summary: \"\"\n"
"created_at: \"2026-06-16\"\nupdated_at: \"2026-06-16\"\n---\n\n# Summary\n\nPending.\n",
encoding="utf-8",
)

with patch("phases.artifact_checks.ROOT", tmp_path):
errors = check_phase_2_artifacts()

assert any("not a complete Phase 2 finding" in error for error in errors), errors


def test_phase2_artifacts_report_all_quality_errors(tmp_path: Path, monkeypatch) -> None:
from phases.artifact_checks import check_phase_2_artifacts
from findings import quality as quality_mod

runs = tmp_path / "runs"
pending = tmp_path / "itemdb" / "findings" / "PENDING"
runs.mkdir(parents=True)
pending.mkdir(parents=True)
(runs / "phase-2-summary-2026-06-18-120000.md").write_text(
"# Findings created\n\n"
"| ID | Title | Path |\n"
"|---|---|---|\n"
"| CC-0099 | Many | itemdb/findings/PENDING/CC-0099-many-errors.md |\n",
encoding="utf-8",
)
(pending / "CC-0099-many-errors.md").write_text("placeholder", encoding="utf-8")
monkeypatch.setattr(
quality_mod,
"validate_phase2_finding_quality",
lambda _path: [f"artifact-error-{i}" for i in range(7)],
)

with patch("phases.artifact_checks.ROOT", tmp_path):
errors = check_phase_2_artifacts()

joined = "\n".join(errors)
for i in range(7):
assert f"artifact-error-{i}" in joined
132 changes: 132 additions & 0 deletions tests/test_phase_failure_state_reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,138 @@ def fake_resume_prompt(*_args, failure_details=None, **_kw):
assert captured == [["stale failure from previous attempt"], None]


def test_phase_mode_terminal_stop_missing_artifacts_auto_resumes(monkeypatch):
from codecome import harness as harness_mod
from codecome import runner as runner_mod

transcript = harness_mod.ROOT / "tmp" / "fake.jsonl"
attempts = iter([
(0, "ses_test", _terminal_result(), transcript),
(0, "ses_test", _terminal_result(), transcript),
])
completion_results = iter([
(False, ["runs/phase-2-summary*.md was not updated during this run"]),
(True, []),
])
captured: list[list[str] | None] = []
prompts: list[str] = []

def fake_run_single_attempt(_args, _console, prompt, *_a, **_kw):
prompts.append(prompt)
return next(attempts)

def fake_resume_prompt(*_args, failure_details=None, **_kw):
captured.append(failure_details)
return "resume prompt"

monkeypatch.setattr(harness_mod, "ServerRunner", lambda: _FakeServerRunner())
monkeypatch.setenv("CODECOME_MAX_ITERATION_RETRIES", "1")
monkeypatch.setattr(harness_mod, "load_prompt", lambda *_a, **_kw: "initial prompt")
monkeypatch.setattr(harness_mod, "resolve_runtime_config", lambda _agent: _FakeRuntimeConfig())
monkeypatch.setattr(harness_mod, "configure_rendering", lambda *_a, **_kw: None)
monkeypatch.setattr(runner_mod, "_run_single_attempt", fake_run_single_attempt)
monkeypatch.setattr(
harness_mod,
"check_phase_graceful_completion",
lambda *_a, **_kw: next(completion_results),
)
monkeypatch.setattr(harness_mod, "build_phase_resume_prompt", fake_resume_prompt)

from findings import checks_entry
monkeypatch.setattr(checks_entry, "run_frontmatter_validation", lambda: (0, ""))

rc = harness_mod.run_phase_mode(_args())

assert rc == 0
assert prompts == ["initial prompt", "resume prompt"]
assert captured == [["runs/phase-2-summary*.md was not updated during this run"]]


def test_phase_mode_final_failure_prints_gate_failures(monkeypatch, capsys):
from codecome import harness as harness_mod
from codecome import runner as runner_mod

transcript = harness_mod.ROOT / "tmp" / "fake.jsonl"

monkeypatch.setattr(harness_mod, "ServerRunner", lambda: _FakeServerRunner())
monkeypatch.setenv("CODECOME_MAX_ITERATION_RETRIES", "0")
monkeypatch.setattr(harness_mod, "load_prompt", lambda *_a, **_kw: "initial prompt")
monkeypatch.setattr(harness_mod, "resolve_runtime_config", lambda _agent: _FakeRuntimeConfig())
monkeypatch.setattr(harness_mod, "configure_rendering", lambda *_a, **_kw: None)
monkeypatch.setattr(
runner_mod,
"_run_single_attempt",
lambda *_a, **_kw: (0, "ses_test", _terminal_result(), transcript),
)
monkeypatch.setattr(
harness_mod,
"check_phase_graceful_completion",
lambda *_a, **_kw: (False, ["Invalid: CC-0015 still contains template guidance"]),
)

rc = harness_mod.run_phase_mode(_args())
output = capsys.readouterr().out

assert rc == 2
assert "remaining gate failures" in output
assert "CC-0015 still contains template guidance" in output


def _budget_result() -> RunResult:
return RunResult(
last_finish_reason="length",
any_step_finish_seen=True,
step_finish_count=1,
)


def test_phase_mode_budget_exhaustion_auto_resumes(monkeypatch):
from codecome import harness as harness_mod
from codecome import runner as runner_mod

transcript = harness_mod.ROOT / "tmp" / "fake.jsonl"
attempts = iter([
(0, "ses_test", _budget_result(), transcript),
(0, "ses_test", _budget_result(), transcript),
])
completion_results = iter([
(False, ["runs/phase-2-summary*.md was not updated during this run"]),
(True, []),
])
captured: list[list[str] | None] = []
prompts: list[str] = []

def fake_run_single_attempt(_args, _console, prompt, *_a, **_kw):
prompts.append(prompt)
return next(attempts)

def fake_resume_prompt(*_args, failure_details=None, **_kw):
captured.append(failure_details)
return "resume prompt"

monkeypatch.setattr(harness_mod, "ServerRunner", lambda: _FakeServerRunner())
monkeypatch.setenv("CODECOME_MAX_ITERATION_RETRIES", "1")
monkeypatch.setattr(harness_mod, "load_prompt", lambda *_a, **_kw: "initial prompt")
monkeypatch.setattr(harness_mod, "resolve_runtime_config", lambda _agent: _FakeRuntimeConfig())
monkeypatch.setattr(harness_mod, "configure_rendering", lambda *_a, **_kw: None)
monkeypatch.setattr(runner_mod, "_run_single_attempt", fake_run_single_attempt)
monkeypatch.setattr(
harness_mod,
"check_phase_graceful_completion",
lambda *_a, **_kw: next(completion_results),
)
monkeypatch.setattr(harness_mod, "build_phase_resume_prompt", fake_resume_prompt)

from findings import checks_entry
monkeypatch.setattr(checks_entry, "run_frontmatter_validation", lambda: (0, ""))

rc = harness_mod.run_phase_mode(_args())

assert rc == 0
assert prompts == ["initial prompt", "resume prompt"]
assert captured == [["runs/phase-2-summary*.md was not updated during this run"]]


def test_phase1_subphase_does_not_reuse_previous_attempt_failures(monkeypatch):
from codecome import phase_1 as p1

Expand Down
Loading
Loading