From c506500db810d1be2f45d7ef2b5c5a6b382ce9fd Mon Sep 17 00:00:00 2001 From: MAJDOUB Khalid Date: Thu, 28 May 2026 00:59:56 +0200 Subject: [PATCH 1/2] Add Codex agent provider support --- README.md | 43 +++++- forge-loop.example.yaml | 20 +++ pyproject.toml | 2 +- src/forge_loop/agent_backend.py | 156 ++++++++++++++++++++ src/forge_loop/cli.py | 128 ++++++++++------ src/forge_loop/config.py | 98 ++++++++++--- src/forge_loop/critic.py | 51 +++++++ src/forge_loop/init.py | 40 ++++- src/forge_loop/multirepo/loader.py | 7 + src/forge_loop/po.py | 47 +++++- src/forge_loop/runner/_pipeline_driver.py | 68 ++++++--- src/forge_loop/runner/boot.py | 169 +++++++++++++++------- src/forge_loop/runner/dispatch.py | 75 +++++++--- src/forge_loop/runner/tick.py | 139 ++++++++++++------ src/forge_loop/worker.py | 134 ++++++++++++++--- tests/test_codex_backend.py | 38 +++++ tests/test_config.py | 87 +++++++++-- tests/test_critic_model.py | 44 +++++- tests/test_po_model.py | 50 ++++++- tests/test_worker.py | 157 +++++++++++++++----- 20 files changed, 1254 insertions(+), 299 deletions(-) create mode 100644 src/forge_loop/agent_backend.py create mode 100644 tests/test_codex_backend.py diff --git a/README.md b/README.md index be34166..44753c2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # forge-loop -> Autonomous multi-worker dispatcher for Claude Code. +> Autonomous multi-worker dispatcher for Claude Code and Codex. > File an issue, label it `loop:ready`, walk away. forge-loop turns a Claude Code subscription into an unattended swarm of @@ -58,6 +58,7 @@ This isn't a "code generator". It is a **harness** — a runner that lets the op | Surface | Status | Notes | |---|---|---| | **SDK worker (Opus 4.7)** | **stable** | Native Anthropic SDK, typed event stream | +| **Codex provider** | beta | `codex exec` backend for worker / PO / critic roles | | **Typed critic (`CriticReport`)** | **stable** | sev1/sev2/sev3 findings, gates auto-merge on sev1 | | **Retry + fingerprint cooldown** | **stable** | Skips in-flight and cooldown duplicates | | **PO spec expander** | **stable** | Rewrites thin issue bodies into feature-grade specs | @@ -97,7 +98,9 @@ forge-loop run # foreground (Ctrl-C to stop) tmux new -d -s loop "forge-loop run" ``` -Prerequisites: `gh` authenticated, `git`, Python 3.11+, `claude` CLI signed in on a subscription plan. +Prerequisites: `gh` authenticated, `git`, Python 3.11+, and at least one +configured agent provider: `claude` CLI signed in on a subscription plan or +`codex` CLI signed in locally. --- @@ -126,6 +129,9 @@ labels: blocked: loop:blocked risk_gate: risk:high # auto-merge skipped; human review +agent: + provider: claude # claude or codex; role blocks can override + critic: enabled: true # typed-rubric review before merge @@ -134,22 +140,49 @@ attempts: worker: brief_template: .forge-loop/briefs/worker.md.tmpl + provider: claude model: claude-opus-4-7 thinking: medium po: brief_template: .forge-loop/briefs/po.md.tmpl + provider: claude model: claude-opus-4-7 thinking: high ``` +### Agent providers + +Claude remains the default and uses the Claude Agent SDK for workers. Codex is +available through the local `codex exec` CLI for worker, PO, and critic roles. +Set it globally: + +```yaml +agent: + provider: codex +``` + +or per role: + +```yaml +worker: + provider: codex + model: gpt-5-codex # optional; omit or set "" to use the Codex CLI default +po: + provider: claude +critic: + provider: codex +``` + ### Env-var overrides (highest priority) | Var | Effect | |---|---| | `LOOP_GH_REPO` | `owner/repo` — required | +| `LOOP_AGENT_PROVIDER` | Global provider: `claude` or `codex` | +| `LOOP_WORKER_PROVIDER` / `LOOP_PO_PROVIDER` / `LOOP_CRITIC_PROVIDER` | Per-role provider override | | `LOOP_WORKER_BRIEF` / `LOOP_PO_BRIEF` / `LOOP_CRITIC_BRIEF` | Path to a custom brief template | -| `LOOP_WORKER_MODEL` / `LOOP_PO_MODEL` / `LOOP_CRITIC_MODEL` | Per-role model override | +| `LOOP_WORKER_MODEL` / `LOOP_PO_MODEL` / `LOOP_CRITIC_MODEL` | Per-role model override; Codex may be blank to use CLI default | | `LOOP_PARALLEL` / `LOOP_TICK_INTERVAL_S` | Scheduling | | `LOOP_DEPLOY_TASK` | Task target run after merges. Empty = skip | | `LOOP_DEPLOY_DRIFT_HALT=1` | Opt in to the 3-fails-then-halt brake | @@ -394,7 +427,7 @@ The PO pass rewrites thin tickets — but it can't invent intent. Spend two minu - **Subscription billing only.** forge-loop assumes the operator is on a Claude Code subscription (flat fee). The budget tracking that existed in early versions was removed in #38; if you need per-token gating because you're paying per call, file an issue. - **Secrets.** The loop never reads secrets. Workers should fetch them via your project's secret manager (Infisical, Vault, sealed-secrets). The bundled worker brief explicitly forbids plaintext secrets in commits. - **Identity.** All `gh` calls go through the operator's `gh auth login`. Workers commit under the operator's git identity (configurable via `LOOP_COAUTHOR` for the `Co-Authored-By:` trailer). -- **Rate limits.** GitHub: the loop's pickup query is one `gh issue list` per tick (cheap). `gh pr merge --auto` doesn't poll. Anthropic: Opus 4.7 via SDK; rate limits hit naturally if you push parallel > 5 on a free workspace. +- **Rate limits.** GitHub: the loop's pickup query is one `gh issue list` per tick (cheap). `gh pr merge --auto` doesn't poll. Agent-provider limits depend on the configured backend: Anthropic Opus via SDK or the local Codex CLI. - **Cost.** Observed: $3-$5 per shipped PR on Opus 4.7, $9 wasted per duplicate-race (rare). Roughly $50-$100/week for a full unattended-overnight workflow. --- @@ -423,4 +456,4 @@ MIT — see [LICENSE](LICENSE). Contributions under the same license. forge-loop was extracted from the harness that built [Titan](https://github.com/hadamrd/dashboard-plugin) — a post-Jenkins CI/CD product — on its own backlog. The recursive-bootstrap dogfooding pattern (the loop shipping its own features) is documented in the repository's commit history; PRs #2, #27, #41, #62 are particularly worth reading. -Built on [Claude Code](https://claude.com/claude-code) + the [Claude Agent SDK](https://github.com/anthropics/claude-agent-sdk-python). +Built on [Claude Code](https://claude.com/claude-code) + the [Claude Agent SDK](https://github.com/anthropics/claude-agent-sdk-python), with optional Codex CLI support. diff --git a/forge-loop.example.yaml b/forge-loop.example.yaml index 61b8975..65642af 100644 --- a/forge-loop.example.yaml +++ b/forge-loop.example.yaml @@ -36,6 +36,26 @@ labels: # Optional: issues tagged this label skip auto-merge (human review required). risk_gate: "risk:high" +agent: + # Default provider for worker / PO / critic. Use per-role provider fields + # below to mix providers. + provider: claude + +worker: + provider: claude + model: claude-opus-4-7 + thinking: medium + +po: + provider: claude + model: claude-opus-4-7 + thinking: high + +critic: + provider: claude + model: claude-sonnet-4-6 + thinking: off + # Optional: configure the Lumen test-discovery pattern. Match the test-file # glob for your stack: e.g. "**/*Test.java" for JUnit, "**/test_*.py" for pytest. lumen: diff --git a/pyproject.toml b/pyproject.toml index 7349a7d..0e4080a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "forge-loop" version = "0.1.0" -description = "Autonomous multi-worker dispatcher for Claude Code — picks up GitHub issues by label, dispatches parallel workers in git worktrees, watches PRs, merges, and redeploys." +description = "Autonomous multi-worker dispatcher for Claude Code and Codex — picks up GitHub issues by label, dispatches parallel workers in git worktrees, watches PRs, merges, and redeploys." readme = "README.md" requires-python = ">=3.11" # STABLE surface only (issue #39). Anything heavier — fastapi, uvicorn, diff --git a/src/forge_loop/agent_backend.py b/src/forge_loop/agent_backend.py new file mode 100644 index 0000000..7ac8401 --- /dev/null +++ b/src/forge_loop/agent_backend.py @@ -0,0 +1,156 @@ +"""Agent-provider subprocess backends. + +The primary worker path remains the Claude Agent SDK. This module contains the +provider-neutral glue for CLI-backed agents, starting with Codex. +""" + +from __future__ import annotations + +import json +import os +import re +import subprocess +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +@dataclass(frozen=True) +class AgentRunResult: + provider: str + log_path: Path + last_message: str + duration_s: float + timed_out: bool = False + error: str | None = None + + +def subagent_env() -> dict[str, str]: + """Environment for nested agent CLIs launched from an IDE agent session.""" + env = dict(os.environ) + env.pop("CLAUDECODE", None) + env.pop("CLAUDE_CODE_SSE_PORT", None) + return env + + +def build_codex_exec_argv( + *, + cwd: Path, + last_message_path: Path, + model: str | None = None, + add_dirs: list[Path] | None = None, +) -> list[str]: + """Build the noninteractive Codex CLI argv. + + The prompt is supplied on stdin via ``-`` to avoid argv-size limits on large + issue bodies and rendered briefs. + """ + argv = [ + "codex", + "exec", + "-", + "--json", + "-C", + str(cwd), + "-s", + "danger-full-access", + "--dangerously-bypass-approvals-and-sandbox", + "--skip-git-repo-check", + "--output-last-message", + str(last_message_path), + ] + if model: + argv.extend(["-m", model]) + for directory in add_dirs or []: + argv.extend(["--add-dir", str(directory)]) + return argv + + +def run_codex_exec( + *, + prompt: str, + cwd: Path, + log_path: Path, + timeout_s: int, + model: str | None = None, + add_dirs: list[Path] | None = None, +) -> AgentRunResult: + """Run Codex in noninteractive mode and capture JSONL plus final text.""" + log_path.parent.mkdir(parents=True, exist_ok=True) + last_message_path = log_path.with_suffix(log_path.suffix + ".last.txt") + started = time.time() + argv = build_codex_exec_argv( + cwd=cwd, + last_message_path=last_message_path, + model=model, + add_dirs=add_dirs, + ) + try: + with open(log_path, "w", encoding="utf-8") as logf: + subprocess.run( + argv, + cwd=cwd, + input=prompt, + stdout=logf, + stderr=subprocess.STDOUT, + text=True, + timeout=timeout_s, + env=subagent_env(), + ) + except subprocess.TimeoutExpired: + return AgentRunResult( + provider="codex", + log_path=log_path, + last_message=_read_text(last_message_path), + duration_s=time.time() - started, + timed_out=True, + error=f"codex exceeded {timeout_s}s", + ) + except OSError as exc: + return AgentRunResult( + provider="codex", + log_path=log_path, + last_message=_read_text(last_message_path), + duration_s=time.time() - started, + error=f"codex launch failed: {exc}", + ) + return AgentRunResult( + provider="codex", + log_path=log_path, + last_message=_read_text(last_message_path), + duration_s=time.time() - started, + ) + + +def extract_last_json_object(text: str) -> dict[str, Any] | None: + """Return the last valid JSON object embedded in agent final text.""" + for chunk in reversed(text.strip().splitlines()): + chunk = chunk.strip() + if chunk.startswith("{") and chunk.endswith("}"): + try: + obj = json.loads(chunk) + except json.JSONDecodeError: + continue + if isinstance(obj, dict): + return obj + for match in reversed(list(re.finditer(r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}", text, re.DOTALL))): + try: + obj = json.loads(match.group(0)) + except json.JSONDecodeError: + continue + if isinstance(obj, dict): + return obj + return None + + +def extract_github_pr(text: str) -> str | None: + match = re.search(r"https://github\.com/[\w.-]+/[\w.-]+/pull/\d+", text) + return match.group(0) if match else None + + +def _read_text(path: Path) -> str: + try: + return path.read_text(encoding="utf-8", errors="replace") + except OSError: + return "" diff --git a/src/forge_loop/cli.py b/src/forge_loop/cli.py index 36656bd..5dbf997 100644 --- a/src/forge_loop/cli.py +++ b/src/forge_loop/cli.py @@ -212,15 +212,23 @@ def line(status: str, label: str, detail: str = "") -> None: try: local = _sp.run( ["git", "rev-parse", "HEAD"], - cwd=cfg.repo, capture_output=True, text=True, timeout=5, + cwd=cfg.repo, + capture_output=True, + text=True, + timeout=5, ).stdout.strip() _sp.run( ["git", "fetch", "origin", "trunk", "--quiet"], - cwd=cfg.repo, capture_output=True, timeout=10, + cwd=cfg.repo, + capture_output=True, + timeout=10, ) remote = _sp.run( ["git", "rev-parse", "origin/trunk"], - cwd=cfg.repo, capture_output=True, text=True, timeout=5, + cwd=cfg.repo, + capture_output=True, + text=True, + timeout=5, ).stdout.strip() if local and remote and local == remote: line("green", f"code matches origin/trunk @ {local[:8]}") @@ -308,9 +316,7 @@ def _cmd_status(args: SimpleNamespace) -> int: for line in raw[-5:]: try: e = json.loads(line) - last_5_events.append( - {"ts": str(e.get("ts", "?")), "kind": str(e.get("kind", "?"))} - ) + last_5_events.append({"ts": str(e.get("ts", "?")), "kind": str(e.get("kind", "?"))}) except json.JSONDecodeError: pass @@ -318,11 +324,21 @@ def _cmd_status(args: SimpleNamespace) -> int: try: r = subprocess.run( [ - "gh", "issue", "list", "--repo", cfg.github_repo, - "--label", cfg.labels.ready, "--state", "open", - "--json", "number", + "gh", + "issue", + "list", + "--repo", + cfg.github_repo, + "--label", + cfg.labels.ready, + "--state", + "open", + "--json", + "number", ], - capture_output=True, text=True, timeout=15, + capture_output=True, + text=True, + timeout=15, ) if r.returncode == 0: queue_depth = len(json.loads(r.stdout or "[]")) @@ -360,9 +376,8 @@ def _cmd_status(args: SimpleNamespace) -> int: if halt_reason: table.add_row("[red]HALTED[/red]", halt_reason) - pid_render = ( - f"{pid_text or '(no pidfile)'} " - + ("[green](alive)[/green]" if pid_alive else "[red](NOT running)[/red]") + pid_render = f"{pid_text or '(no pidfile)'} " + ( + "[green](alive)[/green]" if pid_alive else "[red](NOT running)[/red]" ) table.add_row("pid", pid_render) table.add_row( @@ -832,8 +847,10 @@ def _cmd_repos_list(args: SimpleNamespace) -> int: except json.JSONDecodeError: continue if e.get("kind") in { - "repo_tick_done", "repo_skipped", - "repo_tick_start", "repo_tick_error", + "repo_tick_done", + "repo_skipped", + "repo_tick_start", + "repo_tick_error", }: repo = e.get("repo") if repo: @@ -992,9 +1009,21 @@ def _cmd_config(args: SimpleNamespace) -> int: "worker_timeout_s": cfg.worker_timeout_s, "state_file": str(cfg.state_file), "events_file": str(cfg.events_file), - "worker": {"model": cfg.worker.model, "thinking": cfg.worker.thinking}, - "po": {"model": cfg.po.model, "thinking": cfg.po.thinking}, - "critic": {"model": cfg.critic.model, "thinking": cfg.critic.thinking}, + "worker": { + "provider": cfg.worker.provider, + "model": cfg.worker.model, + "thinking": cfg.worker.thinking, + }, + "po": { + "provider": cfg.po.provider, + "model": cfg.po.model, + "thinking": cfg.po.thinking, + }, + "critic": { + "provider": cfg.critic.provider, + "model": cfg.critic.model, + "thinking": cfg.critic.thinking, + }, } # Historical surface: ``config`` always emits JSON (the ``--json`` flag # was a no-op kept for back-compat). Preserve that. @@ -1006,21 +1035,21 @@ def _cmd_config(args: SimpleNamespace) -> int: def _cmd_config_models(args: SimpleNamespace) -> int: cfg = load() rows = [ - ("worker", cfg.worker.model, cfg.worker.thinking), - ("po", cfg.po.model, cfg.po.thinking), - ("critic", cfg.critic.model, cfg.critic.thinking), + ("worker", cfg.worker.provider, cfg.worker.model, cfg.worker.thinking), + ("po", cfg.po.provider, cfg.po.model, cfg.po.thinking), + ("critic", cfg.critic.provider, cfg.critic.model, cfg.critic.thinking), ] if getattr(args, "json", False): typer.echo( json.dumps( - {role: {"model": m, "thinking": t} for role, m, t in rows}, + {role: {"provider": p, "model": m, "thinking": t} for role, p, m, t in rows}, indent=2, ) ) return 0 - typer.echo(f"{'ROLE':<8} {'MODEL':<22} THINKING") - for role, model, thinking in rows: - typer.echo(f"{role:<8} {model:<22} {thinking}") + typer.echo(f"{'ROLE':<8} {'PROVIDER':<8} {'MODEL':<22} THINKING") + for role, provider, model, thinking in rows: + typer.echo(f"{role:<8} {provider:<8} {model or '':<22} {thinking}") return 0 @@ -1038,12 +1067,9 @@ def _cmd_roles_list(args: SimpleNamespace) -> int: "model": r.model, "timeout_s": r.timeout_s, "budget_usd": r.budget_usd, - "triggers": [ - {"on": t.on, "filter": t.filter} for t in r.triggers - ], + "triggers": [{"on": t.on, "filter": t.filter} for t in r.triggers], "actions": [ - {"mcp_tools": list(a.mcp_tools), "shell": a.shell} - for a in r.actions + {"mcp_tools": list(a.mcp_tools), "shell": a.shell} for a in r.actions ], "output_schema": r.output_schema, "source": r.source_path, @@ -1051,9 +1077,7 @@ def _cmd_roles_list(args: SimpleNamespace) -> int: } for r in result.roles ], - "errors": [ - {"source": e.source, "message": e.message} for e in result.errors - ], + "errors": [{"source": e.source, "message": e.message} for e in result.errors], } sys.stdout.write(json.dumps(payload, indent=2) + "\n") return 0 @@ -1063,7 +1087,8 @@ def _cmd_roles_list(args: SimpleNamespace) -> int: for r in result.roles: triggers = ( ", ".join( - t.on + (f"[{','.join(f'{k}={v}' for k, v in t.filter.items())}]" if t.filter else "") + t.on + + (f"[{','.join(f'{k}={v}' for k, v in t.filter.items())}]" if t.filter else "") for t in r.triggers ) or "(none)" @@ -1178,11 +1203,7 @@ def cmd_dashboard( typer.echo("dashboard: choose --web or --tui, not both", err=True) raise typer.Exit(code=2) mode = "tui" if tui else "web" - _exit( - _cmd_dashboard( - SimpleNamespace(mode=mode, host=host, port=port, roles_dir=roles_dir) - ) - ) + _exit(_cmd_dashboard(SimpleNamespace(mode=mode, host=host, port=port, roles_dir=roles_dir))) @app.command("init", help="Scaffold forge-loop config in a project.") @@ -1194,9 +1215,7 @@ def cmd_init( ) -> None: _exit( _cmd_init( - SimpleNamespace( - target=target, repo=repo, force=force, create_labels=create_labels - ) + SimpleNamespace(target=target, repo=repo, force=force, create_labels=create_labels) ) ) @@ -1215,8 +1234,11 @@ def cmd_record_session( _exit( _cmd_record_session( SimpleNamespace( - issue=issue, issue_file=issue_file, out=out, - worktree=worktree, timeout=timeout, + issue=issue, + issue_file=issue_file, + out=out, + worktree=worktree, + timeout=timeout, ) ) ) @@ -1239,8 +1261,14 @@ def cmd_brief( _exit( _cmd_brief( SimpleNamespace( - kind=kind, issue=issue, issue_file=issue_file, worktree=worktree, - pr=pr, repo=repo, risk_gated=risk_gated, raw=raw, + kind=kind, + issue=issue, + issue_file=issue_file, + worktree=worktree, + pr=pr, + repo=repo, + risk_gated=risk_gated, + raw=raw, ) ) ) @@ -1341,8 +1369,12 @@ def cmd_replay( _exit( _cmd_replay( SimpleNamespace( - tick=tick, role=role, brief=brief, - fixtures_dir=fixtures_dir, suffix=suffix, dry_plan=dry_plan, + tick=tick, + role=role, + brief=brief, + fixtures_dir=fixtures_dir, + suffix=suffix, + dry_plan=dry_plan, ) ) ) diff --git a/src/forge_loop/config.py b/src/forge_loop/config.py index c2f37ad..ba358ac 100644 --- a/src/forge_loop/config.py +++ b/src/forge_loop/config.py @@ -25,9 +25,9 @@ # startup if an operator sets ``LOOP_*_MODEL`` (or the yaml equivalent) to # something that does not parse — much better than a cryptic SDK failure # at first dispatch. -_MODEL_PATTERN = re.compile( - r"^claude-(opus|sonnet|haiku)-\d+-\d+(-[a-z0-9.-]+)?$" -) +_MODEL_PATTERN = re.compile(r"^claude-(opus|sonnet|haiku)-\d+-\d+(-[a-z0-9.-]+)?$") +_CODEX_MODEL_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._:-]*$") +_AGENT_PROVIDERS = frozenset({"claude", "codex"}) # Thinking-budget tiers we expose. ``off`` disables extended thinking # entirely; ``low``/``medium``/``high`` map to ascending budgets the SDK @@ -43,7 +43,23 @@ class ModelConfigError(ValueError): """ -def _validate_model(value: str, source: str) -> str: +def _validate_provider(value: str, source: str) -> str: + if value not in _AGENT_PROVIDERS: + raise ModelConfigError( + f"unknown agent provider {value!r} for {source}: " + f"expected one of {sorted(_AGENT_PROVIDERS)}" + ) + return value + + +def _validate_model(value: str, source: str, provider: str = "claude") -> str: + if provider == "codex": + if value == "" or _CODEX_MODEL_PATTERN.match(value): + return value + raise ModelConfigError( + f"unknown Codex model alias {value!r} for {source}: " + "expected an empty value for the Codex CLI default or a safe model name" + ) if not _MODEL_PATTERN.match(value): raise ModelConfigError( f"unknown model alias {value!r} for {source}: " @@ -116,6 +132,7 @@ class CriticConfig: # surface stays uniform across roles. model: str = "claude-sonnet-4-6" thinking: str = "off" + provider: str = "claude" @dataclass(frozen=True) @@ -131,12 +148,14 @@ class POConfig: yet expose a thinking-budget flag. The field is wired through and will activate once the PO migrates to the SDK (see follow-up of issue #34). """ + enabled: bool = True timeout_s: int = 480 max_to_expand_per_tick: int = 2 # Per-role model (issue #34). PO needs hard thinking about spec quality. model: str = "claude-opus-4-7" thinking: str = "high" + provider: str = "claude" # Bundled default of MCP servers the worker is allowed to call (issue #60). @@ -161,8 +180,10 @@ class WorkerConfig: list would break the worker since it relies on at least the forge-loop server. """ + model: str = "claude-opus-4-7" thinking: str = "medium" + provider: str = "claude" allowed_mcp_tools: tuple[str, ...] = DEFAULT_ALLOWED_MCP_SERVERS @@ -180,6 +201,7 @@ class LumenConfig: (K discovered + 1 authored). Graceful-degrade is non-negotiable: if Lumen is offline the brief still renders and the worker continues. """ + top_k: int = 3 @@ -195,7 +217,9 @@ class Config: parallel: int = 3 tick_interval_s: int = 60 max_ticks: int = 0 - worker_timeout_s: int = 7200 # fail-safe wall ceiling; idle-kill is the primary killer (watchdog) + worker_timeout_s: int = ( + 7200 # fail-safe wall ceiling; idle-kill is the primary killer (watchdog) + ) maintenance_every_n_ticks: int = 0 # 0 = off # Deploy (no default — operators set LOOP_DEPLOY_TASK or repo.deploy.task in YAML) @@ -341,19 +365,37 @@ def load() -> Config: critic_block = y.get("critic") or {} po_block = y.get("po") or {} worker_block = y.get("worker") or {} + agent_block = y.get("agent") or {} attempts_block = y.get("attempts") or {} lumen_block = y.get("lumen") or {} def _resolve_role( - env_model: str, env_thinking: str, + env_model: str, + env_thinking: str, + env_provider: str, block: dict[str, Any], - default_model: str, default_thinking: str, - ) -> tuple[str, str]: + default_model: str, + default_thinking: str, + ) -> tuple[str, str, str]: + raw_provider = os.environ.get(env_provider) + provider_source = env_provider + if raw_provider is None: + raw_provider = os.environ.get("LOOP_AGENT_PROVIDER") + provider_source = "LOOP_AGENT_PROVIDER" + if raw_provider is None: + raw_provider = block.get("provider", agent_block.get("provider", "claude")) + provider_source = ( + f"yaml: {env_provider.lower().replace('loop_', '').replace('_provider', '')}" + ".provider" + ) + provider = _validate_provider(str(raw_provider), provider_source) raw_model = os.environ.get(env_model) model_source = env_model if raw_model is None: - raw_model = block.get("model", default_model) - model_source = f"yaml: {env_model.lower().replace('loop_', '').replace('_model', '')}.model" + raw_model = block.get("model", default_model if provider == "claude" else "") + model_source = ( + f"yaml: {env_model.lower().replace('loop_', '').replace('_model', '')}.model" + ) raw_thinking = os.environ.get(env_thinking) thinking_source = env_thinking if raw_thinking is None: @@ -363,22 +405,35 @@ def _resolve_role( ".thinking" ) return ( - _validate_model(str(raw_model), model_source), + provider, + _validate_model(str(raw_model), model_source, provider), _validate_thinking(str(raw_thinking), thinking_source), ) - worker_model, worker_thinking = _resolve_role( - "LOOP_WORKER_MODEL", "LOOP_WORKER_THINKING", - worker_block, "claude-opus-4-7", "medium", + worker_provider, worker_model, worker_thinking = _resolve_role( + "LOOP_WORKER_MODEL", + "LOOP_WORKER_THINKING", + "LOOP_WORKER_PROVIDER", + worker_block, + "claude-opus-4-7", + "medium", ) worker_allowed = _resolve_allowed_mcp_tools(worker_block) - po_model, po_thinking = _resolve_role( - "LOOP_PO_MODEL", "LOOP_PO_THINKING", - po_block, "claude-opus-4-7", "high", + po_provider, po_model, po_thinking = _resolve_role( + "LOOP_PO_MODEL", + "LOOP_PO_THINKING", + "LOOP_PO_PROVIDER", + po_block, + "claude-opus-4-7", + "high", ) - critic_model, critic_thinking = _resolve_role( - "LOOP_CRITIC_MODEL", "LOOP_CRITIC_THINKING", - critic_block, "claude-sonnet-4-6", "off", + critic_provider, critic_model, critic_thinking = _resolve_role( + "LOOP_CRITIC_MODEL", + "LOOP_CRITIC_THINKING", + "LOOP_CRITIC_PROVIDER", + critic_block, + "claude-sonnet-4-6", + "off", ) github_repo = os.environ.get("LOOP_GH_REPO") or repo_block.get("github") @@ -428,6 +483,7 @@ def _resolve_role( ), model=critic_model, thinking=critic_thinking, + provider=critic_provider, ), po=POConfig( enabled=bool(po_block.get("enabled", True)), @@ -435,10 +491,12 @@ def _resolve_role( max_to_expand_per_tick=int(po_block.get("max_to_expand_per_tick", 2)), model=po_model, thinking=po_thinking, + provider=po_provider, ), worker=WorkerConfig( model=worker_model, thinking=worker_thinking, + provider=worker_provider, allowed_mcp_tools=worker_allowed, ), attempts=AttemptsConfig( diff --git a/src/forge_loop/critic.py b/src/forge_loop/critic.py index b6a1d86..55ef328 100644 --- a/src/forge_loop/critic.py +++ b/src/forge_loop/critic.py @@ -128,6 +128,7 @@ def review_pr( brief_template: str | None = None, emit: Callable[[str, dict[str, Any]], None] | None = None, model: str | None = None, + provider: str = "claude", ) -> CriticOutcome: """Spawn the critic subagent against an open PR. Synchronous. @@ -148,6 +149,56 @@ def review_pr( parse_error: str | None = None retries = 0 + if provider == "codex": + from forge_loop.agent_backend import run_codex_exec + + log_path = logs_dir / f"critic-{issue_number}-{int(time.time())}-codex.log" + result = run_codex_exec( + prompt=brief, + cwd=repo, + log_path=log_path, + timeout_s=timeout_s, + model=model, + add_dirs=[repo], + ) + if result.timed_out: + return CriticOutcome( + verdict="error", + reasons=[], + duration_s=result.duration_s, + stdout_tail="(timeout)", + error=result.error, + ) + report, parse_error = parse_report_from_text(result.last_message) + tail = _tail(log_path, 500) + if report is None: + if emit is not None: + emit( + "critic_parse_failed", + { + "issue": issue_number, + "pr": pr_url, + "err": (parse_error or result.error or "no_json_found")[:200], + "retries": 0, + }, + ) + return CriticOutcome( + verdict="error", + reasons=[], + duration_s=result.duration_s, + stdout_tail=tail, + error=parse_error or result.error or "critic_parse_failed", + ) + verdict = _verdict_from_overall(report.overall) + reasons = [f"[{f.severity}/{f.category}] {f.message}" for f in report.findings] + return CriticOutcome( + verdict=verdict, + reasons=reasons, + duration_s=result.duration_s, + stdout_tail=tail, + report=report, + ) + for attempt in range(2): # initial + 1 retry log_path = logs_dir / f"critic-{issue_number}-{int(time.time())}-{attempt}.log" last_log_path = log_path diff --git a/src/forge_loop/init.py b/src/forge_loop/init.py index 8af6071..0259857 100644 --- a/src/forge_loop/init.py +++ b/src/forge_loop/init.py @@ -41,10 +41,28 @@ # Set to "" to disable. Default: risk:high risk_gate: "risk:high" +agent: + # Default provider for worker / PO / critic. Use `codex` to route roles + # through the local Codex CLI. + provider: claude + +worker: + provider: claude + model: claude-opus-4-7 + thinking: medium + +po: + provider: claude + model: claude-opus-4-7 + thinking: high + # Optional: enable the critic agent. When true, a critic subagent reviews # every PR a worker opens BEFORE auto-merge fires (~30-90s per PR). critic: enabled: false + provider: claude + model: claude-sonnet-4-6 + thinking: off # Optional: per-issue attempt history persisted as GH issue comments. # Workers read past attempts before starting; humans see them in the UI. @@ -128,7 +146,9 @@ def detect_github_repo(target_dir: Path) -> str: r = subprocess.run( ["git", "-C", str(target_dir), "remote", "get-url", "origin"], - capture_output=True, text=True, check=False, + capture_output=True, + text=True, + check=False, ) if r.returncode != 0: return "owner/repo" @@ -153,9 +173,21 @@ def ensure_labels_via_gh(repo: str, labels: list[tuple[str, str, str]]) -> list[ created: list[str] = [] for name, color, desc in labels: r = subprocess.run( - ["gh", "label", "create", name, - "--repo", repo, "--color", color, "--description", desc], - capture_output=True, text=True, check=False, + [ + "gh", + "label", + "create", + name, + "--repo", + repo, + "--color", + color, + "--description", + desc, + ], + capture_output=True, + text=True, + check=False, ) if r.returncode == 0: created.append(name) diff --git a/src/forge_loop/multirepo/loader.py b/src/forge_loop/multirepo/loader.py index 33b3d56..7f654e6 100644 --- a/src/forge_loop/multirepo/loader.py +++ b/src/forge_loop/multirepo/loader.py @@ -212,12 +212,19 @@ def build_config_for_repo(spec: RepoSpec, *, template: Config | None = None) -> timeout_s=tmpl.critic.timeout_s, block_on_sev2=tmpl.critic.block_on_sev2, min_findings_for_approve=tmpl.critic.min_findings_for_approve, + model=tmpl.critic.model, + thinking=tmpl.critic.thinking, + provider=tmpl.critic.provider, ), po=POConfig( enabled=tmpl.po.enabled, timeout_s=tmpl.po.timeout_s, max_to_expand_per_tick=tmpl.po.max_to_expand_per_tick, + model=tmpl.po.model, + thinking=tmpl.po.thinking, + provider=tmpl.po.provider, ), + worker=tmpl.worker, attempts=AttemptsConfig( enabled=tmpl.attempts.enabled, max_history_in_brief=tmpl.attempts.max_history_in_brief, diff --git a/src/forge_loop/po.py b/src/forge_loop/po.py index b7fac68..7c3c614 100644 --- a/src/forge_loop/po.py +++ b/src/forge_loop/po.py @@ -84,6 +84,7 @@ def expand_thin_specs( brief_template: str | None = None, max_to_expand: int = 3, model: str | None = None, + provider: str = "claude", ) -> list[POOutcome]: """Run the PO pass over up to ``max_to_expand`` thin issues. @@ -112,7 +113,15 @@ def expand_thin_specs( github_repo=github_repo, ) outcomes.append( - _run_one(issue["number"], brief, repo, logs_dir, timeout_s, model=model) + _run_one( + issue["number"], + brief, + repo, + logs_dir, + timeout_s, + model=model, + provider=provider, + ) ) n_expanded += 1 @@ -162,11 +171,47 @@ def _run_one( timeout_s: int, *, model: str | None = None, + provider: str = "claude", ) -> POOutcome: logs_dir.mkdir(parents=True, exist_ok=True) log_path = logs_dir / f"po-{issue_number}-{int(time.time())}.log" started = time.time() + if provider == "codex": + from forge_loop.agent_backend import extract_last_json_object, run_codex_exec + + result = run_codex_exec( + prompt=brief, + cwd=repo, + log_path=log_path, + timeout_s=timeout_s, + model=model, + add_dirs=[repo], + ) + if result.timed_out: + return POOutcome( + issue=issue_number, + skipped=False, + reason="po-timeout", + sections_added=[], + duration_s=result.duration_s, + stdout_tail="(timeout)", + error=result.error, + ) + parsed = extract_last_json_object(result.last_message) or { + "skipped": False, + "reason": "no-final-json", + } + return POOutcome( + issue=issue_number, + skipped=bool(parsed.get("skipped", False)), + reason=str(parsed.get("reason", "")), + sections_added=list(parsed.get("sections_added", []) or []), + duration_s=result.duration_s, + stdout_tail=_tail(log_path, 400), + error=result.error, + ) + try: with open(log_path, "wb") as logf: subprocess.run( diff --git a/src/forge_loop/runner/_pipeline_driver.py b/src/forge_loop/runner/_pipeline_driver.py index 7a51ba6..7750322 100644 --- a/src/forge_loop/runner/_pipeline_driver.py +++ b/src/forge_loop/runner/_pipeline_driver.py @@ -99,7 +99,10 @@ def worker_handler(ctx: StepContext) -> StepOutcome: meta = ctx.extras.get("meta") or {} tick = ctx.extras.get("tick", 0) out = run_worker_fn( - ctx.issue, cfg.repo, cfg.logs_dir, cfg.worker_timeout_s, + ctx.issue, + cfg.repo, + cfg.logs_dir, + cfg.worker_timeout_s, risk_gated=meta.get("risk_gated", False), past_attempts=meta.get("past_attempts") or [], emit=bus_emit, @@ -109,6 +112,7 @@ def worker_handler(ctx: StepContext) -> StepOutcome: tick=tick, model=cfg.worker.model, thinking=cfg.worker.thinking, + provider=getattr(cfg.worker, "provider", "claude"), allowed_mcp_servers=cfg.worker.allowed_mcp_tools, ) ok = out.status in {"merged", "open"} @@ -137,15 +141,19 @@ def critic_handler(ctx: StepContext) -> StepOutcome: return StepOutcome(role="critic", status="ok", detail="worker not open/merged") try: c = critic_review_fn( - worker_outcome.pr_url, worker_outcome.issue, - cfg.repo, cfg.logs_dir, + worker_outcome.pr_url, + worker_outcome.issue, + cfg.repo, + cfg.logs_dir, timeout_s=cfg.critic.timeout_s, emit=bus_emit, model=cfg.critic.model, + provider=getattr(cfg.critic, "provider", "claude"), ) except Exception as e: # noqa: BLE001 — boundary return StepOutcome( - role="critic", status="failed", + role="critic", + status="failed", detail=f"{type(e).__name__}: {e}", ) return StepOutcome(role="critic", status="ok", detail=getattr(c, "verdict", "?"), payload=c) @@ -209,31 +217,41 @@ def _events_sink(e: dict[str, Any]) -> None: # StepContext.upstream_outcomes). Handlers mutate this dict # via a thin wrapper below. all_prior: dict[str, StepOutcome] = {} - extras = {"meta": meta, "cfg": cfg, "tick": tick, - "_all_prior_outcomes": all_prior} + extras = {"meta": meta, "cfg": cfg, "tick": tick, "_all_prior_outcomes": all_prior} # Wrap each handler to record its outcome into ``all_prior`` # so later steps (e.g. critic after reviewer after worker) can # see what worker produced even when they aren't direct parents. wrapped: dict[str, Any] = {} for role, h in handlers.items(): + def _wrap(role_=role, h_=h, sink=all_prior): def runner(ctx): out = h_(ctx) sink[role_] = out return out + return runner + wrapped[role] = _wrap() executor = PipelineExecutor(dag, wrapped, events=_events_sink) - bus_emit("pipeline_run_start", { - "issue": issue["number"], "roles": list(dag.order), - "tick": tick, - }) + bus_emit( + "pipeline_run_start", + { + "issue": issue["number"], + "roles": list(dag.order), + "tick": tick, + }, + ) result = executor.run(issue, extras=extras) - bus_emit("pipeline_run_done", { - "issue": issue["number"], "end_state": result.end_state, - "tick": tick, - "statuses": {r: o.status for r, o in result.outcomes.items()}, - }) + bus_emit( + "pipeline_run_done", + { + "issue": issue["number"], + "end_state": result.end_state, + "tick": tick, + "statuses": {r: o.status for r, o in result.outcomes.items()}, + }, + ) worker_step = result.outcomes.get("worker") if worker_step is not None and isinstance(worker_step.payload, WorkerOutcome): @@ -242,13 +260,15 @@ def runner(ctx): # Worker step did not execute (skipped) or returned a non- # WorkerOutcome payload. Synthesise a no-op WorkerOutcome so # callers can keep their flat list invariant. - outcomes.append(WorkerOutcome( - issue=issue["number"], - title=issue.get("title", ""), - pr_url=None, - status="no_pr", - duration_s=0.0, - stdout_tail="", - error=f"pipeline end_state={result.end_state}", - )) + outcomes.append( + WorkerOutcome( + issue=issue["number"], + title=issue.get("title", ""), + pr_url=None, + status="no_pr", + duration_s=0.0, + stdout_tail="", + error=f"pipeline end_state={result.end_state}", + ) + ) return outcomes diff --git a/src/forge_loop/runner/boot.py b/src/forge_loop/runner/boot.py index a26b7df..d4fd2e8 100644 --- a/src/forge_loop/runner/boot.py +++ b/src/forge_loop/runner/boot.py @@ -73,16 +73,20 @@ def _validate_pipeline_if_configured(cfg: Config) -> None: return try: from forge_loop.pipeline import build_dag, load_pipeline + spec = load_pipeline(pipeline_yaml) dag = build_dag(spec) except Exception as e: # noqa: BLE001 — boundary append_event( - cfg.events_file, "pipeline_invalid", - path=str(pipeline_yaml), error=str(e), + cfg.events_file, + "pipeline_invalid", + path=str(pipeline_yaml), + error=str(e), ) raise append_event( - cfg.events_file, "pipeline_loaded", + cfg.events_file, + "pipeline_loaded", path=str(pipeline_yaml), roles=list(dag.order), roots=list(dag.roots), @@ -227,40 +231,55 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: append_event(cfg.events_file, kind, **payload) append_event( - cfg.events_file, "loop_start", - parallel=cfg.parallel, tick_interval=cfg.tick_interval_s, - max_ticks=cfg.max_ticks, label=cfg.labels.ready, + cfg.events_file, + "loop_start", + parallel=cfg.parallel, + tick_interval=cfg.tick_interval_s, + max_ticks=cfg.max_ticks, + label=cfg.labels.ready, orchestrator="async", pools={"po": pools.po, "worker": pools.worker, "critic": pools.critic}, ) - write_state(cfg.state_file, { - "state": "starting", "tick": 0, - "orchestrator": "async", - "pools": {"po": pools.po, "worker": pools.worker, "critic": pools.critic}, - }) + write_state( + cfg.state_file, + { + "state": "starting", + "tick": 0, + "orchestrator": "async", + "pools": {"po": pools.po, "worker": pools.worker, "critic": pools.critic}, + }, + ) async def _po_fn(issue: dict[str, Any]) -> dict[str, Any]: if not cfg.po.enabled: return {"issue": issue["number"], "skipped": True, "reason": "po_disabled"} outs = await asyncio.to_thread( - _po_expand, [issue], cfg.repo, cfg.logs_dir, + _po_expand, + [issue], + cfg.repo, + cfg.logs_dir, github_repo=cfg.github_repo, timeout_s=cfg.po.timeout_s, max_to_expand=1, model=cfg.po.model, + provider=getattr(cfg.po, "provider", "claude"), ) if not outs: return {"issue": issue["number"], "skipped": True, "reason": "po_no_op"} o = outs[0] return { - "issue": o.issue, "skipped": o.skipped, "reason": o.reason, + "issue": o.issue, + "skipped": o.skipped, + "reason": o.reason, "sections_added": o.sections_added, } async def _worker_fn(issue: dict[str, Any], _po: dict[str, Any]) -> dict[str, Any]: if not _po.get("skipped"): fresh = await asyncio.to_thread( - fetch_issue, issue["number"], cfg.github_repo, + fetch_issue, + issue["number"], + cfg.github_repo, ) if fresh: issue = fresh @@ -269,51 +288,82 @@ async def _worker_fn(issue: dict[str, Any], _po: dict[str, Any]) -> dict[str, An past: list[dict[str, Any]] = [] if cfg.attempts.enabled: past = await asyncio.to_thread( - _attempts.fetch_history, issue["number"], cfg.github_repo, + _attempts.fetch_history, + issue["number"], + cfg.github_repo, ) - past = past[-cfg.attempts.max_history_in_brief:] if past else [] + past = past[-cfg.attempts.max_history_in_brief :] if past else [] o = await asyncio.to_thread( - run_worker, issue, cfg.repo, cfg.logs_dir, cfg.worker_timeout_s, - risk_gated=gated, past_attempts=past, emit=_bus_emit, + run_worker, + issue, + cfg.repo, + cfg.logs_dir, + cfg.worker_timeout_s, + risk_gated=gated, + past_attempts=past, + emit=_bus_emit, lumen_top_k=cfg.lumen.top_k, lumen_test_pattern=cfg.lumen_test_pattern, coauthor=cfg.coauthor, model=cfg.worker.model, thinking=cfg.worker.thinking, + provider=getattr(cfg.worker, "provider", "claude"), allowed_mcp_servers=cfg.worker.allowed_mcp_tools, ) return { - "issue": o.issue, "title": o.title, - "pr_url": o.pr_url, "status": o.status, - "duration_s": o.duration_s, "error": o.error, + "issue": o.issue, + "title": o.title, + "pr_url": o.pr_url, + "status": o.status, + "duration_s": o.duration_s, + "error": o.error, } async def _critic_fn(wr: dict[str, Any]) -> dict[str, Any]: if not cfg.critic.enabled or not wr.get("pr_url"): return {"issue": wr.get("issue"), "verdict": "skipped", "reasons": []} c = await asyncio.to_thread( - _critic_review, wr["pr_url"], wr["issue"], - cfg.repo, cfg.logs_dir, cfg.critic.timeout_s, None, _bus_emit, + _critic_review, + wr["pr_url"], + wr["issue"], + cfg.repo, + cfg.logs_dir, + cfg.critic.timeout_s, + None, + _bus_emit, cfg.critic.model, + getattr(cfg.critic, "provider", "claude"), ) if c.report is not None: try: lines = await asyncio.to_thread( - _gh.pr_changed_lines, wr["pr_url"], cfg.github_repo, + _gh.pr_changed_lines, + wr["pr_url"], + cfg.github_repo, ) await asyncio.to_thread( apply_critic_report, - c.report, wr["pr_url"], lines, + c.report, + wr["pr_url"], + lines, cfg.critic.block_on_sev2, cfg.critic.min_findings_for_approve, - _gh, cfg.github_repo, _bus_emit, + _gh, + cfg.github_repo, + _bus_emit, ) except Exception as act_ex: - append_event(cfg.events_file, "critic_actions_failed", - issue=wr.get("issue"), err=str(act_ex)[:200]) + append_event( + cfg.events_file, + "critic_actions_failed", + issue=wr.get("issue"), + err=str(act_ex)[:200], + ) return { - "issue": wr.get("issue"), "verdict": c.verdict, - "reasons": c.reasons, "duration_s": c.duration_s, + "issue": wr.get("issue"), + "verdict": c.verdict, + "reasons": c.reasons, + "duration_s": c.duration_s, "sev_counts": _sev_counts(c), "parse_retries": c.parse_retries, } @@ -325,7 +375,10 @@ async def _one_tick() -> None: tick += 1 try: issues = await asyncio.to_thread( - top_issues, cfg.labels.ready, cfg.parallel, cfg.github_repo, + top_issues, + cfg.labels.ready, + cfg.parallel, + cfg.github_repo, ) except subprocess.CalledProcessError as e: append_event(cfg.events_file, "gh_list_failed", err=(e.stderr or "")[:200]) @@ -337,35 +390,51 @@ async def _one_tick() -> None: await asyncio.sleep(cfg.tick_interval_s) return - append_event(cfg.events_file, "tick_start", tick=tick, - issues=[i["number"] for i in issues], orchestrator="async") - write_state(cfg.state_file, { - "state": "running", "tick": tick, - "dispatched": [{"issue": i["number"], "title": i["title"]} for i in issues], - }) + append_event( + cfg.events_file, + "tick_start", + tick=tick, + issues=[i["number"] for i in issues], + orchestrator="async", + ) + write_state( + cfg.state_file, + { + "state": "running", + "tick": tick, + "dispatched": [{"issue": i["number"], "title": i["title"]} for i in issues], + }, + ) results, stats = await run_async_tick( - issues, pools=pools, caps=caps, - po_fn=_po_fn, worker_fn=_worker_fn, critic_fn=_critic_fn, + issues, + pools=pools, + caps=caps, + po_fn=_po_fn, + worker_fn=_worker_fn, + critic_fn=_critic_fn, emit=_bus_emit, po_timeout_s=float(cfg.po.timeout_s), worker_timeout_s=float(cfg.worker_timeout_s), critic_timeout_s=float(cfg.critic.timeout_s), ) - merged = [r["issue"] for r in results - if (r.get("worker") or {}).get("status") == "merged"] - append_event(cfg.events_file, "tick_done", tick=tick, - merged=merged, results=results, - stats={ - "po": stats.po.__dict__, - "worker": stats.worker.__dict__, - "critic": stats.critic.__dict__, - }) + merged = [r["issue"] for r in results if (r.get("worker") or {}).get("status") == "merged"] + append_event( + cfg.events_file, + "tick_done", + tick=tick, + merged=merged, + results=results, + stats={ + "po": stats.po.__dict__, + "worker": stats.worker.__dict__, + "critic": stats.critic.__dict__, + }, + ) for issue_num in merged: _reap_worktree(cfg.repo, issue_num) if merged and cfg.deploy_task: ok, log = await asyncio.to_thread(redeploy, cfg.repo, cfg.deploy_task) - append_event(cfg.events_file, "redeploy", - task=cfg.deploy_task, ok=ok, detail=log) + append_event(cfg.events_file, "redeploy", task=cfg.deploy_task, ok=ok, detail=log) write_state(cfg.state_file, {"state": "between-ticks", "tick": tick}) await asyncio.sleep(cfg.tick_interval_s) diff --git a/src/forge_loop/runner/dispatch.py b/src/forge_loop/runner/dispatch.py index f5e7776..cf036b1 100644 --- a/src/forge_loop/runner/dispatch.py +++ b/src/forge_loop/runner/dispatch.py @@ -63,20 +63,27 @@ def _run_workers( from forge_loop.runner._pipeline_driver import ( pipeline_driven_enabled as _pipeline_enabled, ) + used_pipeline = False if _pipeline_enabled(cfg): used_pipeline = True - append_event(cfg.events_file, "pipeline_dispatch_start", tick=tick, - issues=[i["number"] for i in issues]) + append_event( + cfg.events_file, + "pipeline_dispatch_start", + tick=tick, + issues=[i["number"] for i in issues], + ) try: outcomes = _pipeline_dispatch( - cfg, issues, workers_meta, tick, + cfg, + issues, + workers_meta, + tick, master_log_path=master_log_path, bus_emit=bus_emit, ) except Exception as ex_: # noqa: BLE001 — must not kill the tick - append_event(cfg.events_file, "pipeline_dispatch_failed", - tick=tick, err=str(ex_)[:300]) + append_event(cfg.events_file, "pipeline_dispatch_failed", tick=tick, err=str(ex_)[:300]) # Fall back to the legacy chain so a broken pipeline.yaml # does not strand the loop. used_pipeline = False @@ -85,7 +92,11 @@ def _run_workers( with ThreadPoolExecutor(max_workers=cfg.parallel) as ex: futures = [ ex.submit( - run_worker, i, cfg.repo, cfg.logs_dir, cfg.worker_timeout_s, + run_worker, + i, + cfg.repo, + cfg.logs_dir, + cfg.worker_timeout_s, risk_gated=meta["risk_gated"], past_attempts=meta["past_attempts"], emit=bus_emit, @@ -95,6 +106,7 @@ def _run_workers( tick=tick, model=cfg.worker.model, thinking=cfg.worker.thinking, + provider=getattr(cfg.worker, "provider", "claude"), allowed_mcp_servers=cfg.worker.allowed_mcp_tools, ) for i, meta in dispatch @@ -103,9 +115,10 @@ def _run_workers( outcomes.append(fut.result()) for o in outcomes: - _mlog.info(master_log_path, - f"worker #{o.issue} {o.status} ({o.duration_s:.0f}s) " - f"pr={o.pr_url or '-'}") + _mlog.info( + master_log_path, + f"worker #{o.issue} {o.status} ({o.duration_s:.0f}s) pr={o.pr_url or '-'}", + ) return outcomes, used_pipeline @@ -120,15 +133,20 @@ def _run_critic_for_outcomes( if o.status in {"open", "merged"} and o.pr_url: try: critic_outcome = _critic_review( - o.pr_url, o.issue, - cfg.repo, cfg.logs_dir, + o.pr_url, + o.issue, + cfg.repo, + cfg.logs_dir, timeout_s=cfg.critic.timeout_s, emit=bus_emit, model=cfg.critic.model, + provider=getattr(cfg.critic, "provider", "claude"), ) append_event( - cfg.events_file, "critic_done", - issue=o.issue, pr=o.pr_url, + cfg.events_file, + "critic_done", + issue=o.issue, + pr=o.pr_url, verdict=critic_outcome.verdict, reasons=critic_outcome.reasons, duration_s=round(critic_outcome.duration_s, 1), @@ -140,17 +158,23 @@ def _run_critic_for_outcomes( lines = _gh.pr_changed_lines(o.pr_url, repo=cfg.github_repo) apply_critic_report( critic_outcome.report, - o.pr_url, lines, + o.pr_url, + lines, cfg.critic.block_on_sev2, cfg.critic.min_findings_for_approve, - gh=_gh, repo=cfg.github_repo, emit=bus_emit, + gh=_gh, + repo=cfg.github_repo, + emit=bus_emit, ) except Exception as act_ex: - append_event(cfg.events_file, "critic_actions_failed", - issue=o.issue, err=str(act_ex)[:200]) + append_event( + cfg.events_file, + "critic_actions_failed", + issue=o.issue, + err=str(act_ex)[:200], + ) except Exception as ex_: - append_event(cfg.events_file, "critic_failed", - issue=o.issue, err=str(ex_)[:200]) + append_event(cfg.events_file, "critic_failed", issue=o.issue, err=str(ex_)[:200]) def run_multirepo( @@ -174,6 +198,7 @@ def run_multirepo( specs = load_repos(repos_dir) except RepoLoadError as e: import sys + sys.stderr.write(f"[multirepo] failed to load repos: {e}\n") return 2 @@ -182,16 +207,18 @@ def run_multirepo( sidecar_events.parent.mkdir(parents=True, exist_ok=True) state = MultirepoRunState() - append_event(sidecar_events, "multirepo_loop_start", - repos=[s.name for s in specs]) + append_event(sidecar_events, "multirepo_loop_start", repos=[s.name for s in specs]) tick = 0 while _boot._RUN: tick += 1 run_multirepo_tick( - specs, tick, - state=state, template=template, - events_file=sidecar_events, tick_fn=_tick, + specs, + tick, + state=state, + template=template, + events_file=sidecar_events, + tick_fn=_tick, ) if template and template.max_ticks and tick >= template.max_ticks: append_event(sidecar_events, "max_ticks_reached", tick=tick) diff --git a/src/forge_loop/runner/tick.py b/src/forge_loop/runner/tick.py index 27cb84c..12558f8 100644 --- a/src/forge_loop/runner/tick.py +++ b/src/forge_loop/runner/tick.py @@ -63,25 +63,36 @@ def _tick(cfg: Config, tick: int) -> None: write_state(cfg.state_file, {"state": "maintenance", "tick": tick}) append_event(cfg.events_file, "maintenance_start", tick=tick) brief = cfg.briefs.maintenance # may be None → maintenance.run_maintenance uses default - outcome = run_maintenance( - cfg.repo, cfg.logs_dir, - brief=brief if brief else None, # type: ignore[arg-type] - ) if brief else run_maintenance(cfg.repo, cfg.logs_dir) + outcome = ( + run_maintenance( + cfg.repo, + cfg.logs_dir, + brief=brief if brief else None, # type: ignore[arg-type] + ) + if brief + else run_maintenance(cfg.repo, cfg.logs_dir) + ) append_event( - cfg.events_file, "maintenance_done", tick=tick, + cfg.events_file, + "maintenance_done", + tick=tick, acted_on=outcome.acted_on, added_ready=outcome.added_ready, closed_dupes=outcome.closed_dupes, retitled=outcome.retitled, duration_s=round(outcome.duration_s, 1), ) - write_state(cfg.state_file, { - "state": "between-ticks", "tick": tick, - "last_maintenance": { - "acted_on": outcome.acted_on, - "added_ready": outcome.added_ready, + write_state( + cfg.state_file, + { + "state": "between-ticks", + "tick": tick, + "last_maintenance": { + "acted_on": outcome.acted_on, + "added_ready": outcome.added_ready, + }, }, - }) + ) _short_sleep(cfg.tick_interval_s, cfg) return @@ -107,25 +118,35 @@ def _tick(cfg: Config, tick: int) -> None: # dispatch). Idempotent — issues already expanded carry the marker. if cfg.po.enabled: write_state(cfg.state_file, {"state": "po_expanding", "tick": tick}) - append_event(cfg.events_file, "po_start", tick=tick, - issues=[i["number"] for i in issues]) + append_event(cfg.events_file, "po_start", tick=tick, issues=[i["number"] for i in issues]) po_outcomes = _po_expand( - issues, cfg.repo, cfg.logs_dir, + issues, + cfg.repo, + cfg.logs_dir, github_repo=cfg.github_repo, timeout_s=cfg.po.timeout_s, max_to_expand=cfg.po.max_to_expand_per_tick, model=cfg.po.model, + provider=getattr(cfg.po, "provider", "claude"), ) expanded_nums = [o.issue for o in po_outcomes if not o.skipped] append_event( - cfg.events_file, "po_done", tick=tick, + cfg.events_file, + "po_done", + tick=tick, expanded=expanded_nums, skipped=[o.issue for o in po_outcomes if o.skipped], - outcomes=[{"issue": o.issue, "skipped": o.skipped, - "reason": o.reason, - "sections_added": o.sections_added, - "duration_s": round(o.duration_s, 1), - "error": o.error} for o in po_outcomes], + outcomes=[ + { + "issue": o.issue, + "skipped": o.skipped, + "reason": o.reason, + "sections_added": o.sections_added, + "duration_s": round(o.duration_s, 1), + "error": o.error, + } + for o in po_outcomes + ], ) # Re-fetch any issues whose bodies were just rewritten so the workers # see the new spec, not the stale snapshot we captured at tick start. @@ -165,41 +186,57 @@ def _tick(cfg: Config, tick: int) -> None: corrupt = 0 if cfg.attempts.enabled: past, corrupt = _attempts.fetch_history_strict( - i["number"], repo=cfg.github_repo, + i["number"], + repo=cfg.github_repo, ) if corrupt: append_event( - cfg.events_file, "attempts_corrupt", - issue=i["number"], rows=corrupt, + cfg.events_file, + "attempts_corrupt", + issue=i["number"], + rows=corrupt, ) fp = _attempts.compute_fingerprint( - i["number"], i.get("body") or "", brief_hash, + i["number"], + i.get("body") or "", + brief_hash, ) forced = i["number"] in force_set if cfg.attempts.enabled and not forced: decision = _attempts.classify_skip( - past, fp, cooldown_s=cooldown_s, + past, + fp, + cooldown_s=cooldown_s, ) if decision.kind == "in_flight": append_event( - cfg.events_file, "worker_skip_in_flight", - issue=i["number"], pr_url=decision.pr_url, - fingerprint=fp[:12], matched_ts=decision.matched_ts, + cfg.events_file, + "worker_skip_in_flight", + issue=i["number"], + pr_url=decision.pr_url, + fingerprint=fp[:12], + matched_ts=decision.matched_ts, ) continue if decision.kind == "cooldown": append_event( - cfg.events_file, "worker_skip_cooldown", - issue=i["number"], fingerprint=fp[:12], + cfg.events_file, + "worker_skip_cooldown", + issue=i["number"], + fingerprint=fp[:12], cooldown_remaining_s=decision.cooldown_remaining_s, matched_ts=decision.matched_ts, ) continue - trimmed = past[-cfg.attempts.max_history_in_brief:] if past else [] - workers_meta.append({ - "risk_gated": gated, "past_attempts": trimmed, - "brief_fingerprint": fp, "forced": forced, - }) + trimmed = past[-cfg.attempts.max_history_in_brief :] if past else [] + workers_meta.append( + { + "risk_gated": gated, + "past_attempts": trimmed, + "brief_fingerprint": fp, + "forced": forced, + } + ) issues_to_dispatch.append(i) issues = issues_to_dispatch @@ -220,9 +257,10 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: append_event(cfg.events_file, kind, **payload) master_log_path = cfg.logs_dir / "master.log" - _mlog.info(master_log_path, - f"tick {tick} dispatching {len(issues)} worker(s): " - f"{[i['number'] for i in issues]}") + _mlog.info( + master_log_path, + f"tick {tick} dispatching {len(issues)} worker(s): {[i['number'] for i in issues]}", + ) # forge-loop assumes Claude Code subscription-mode billing (flat). The # per-tick token-cost gate was removed in issue #38: it only made sense @@ -230,7 +268,10 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: # subscription operator persona we actually support. outcomes: list[WorkerOutcome] outcomes, _used_pipeline = _run_workers( - cfg, issues, workers_meta, tick, + cfg, + issues, + workers_meta, + tick, master_log_path=master_log_path, bus_emit=_bus_emit, ) @@ -244,7 +285,9 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: for o in outcomes: try: _attempts.record( - o.issue, status=o.status, pr_url=o.pr_url, + o.issue, + status=o.status, + pr_url=o.pr_url, duration_s=o.duration_s, note=(o.error or "")[:200], event_count=len(o.events or []), @@ -252,8 +295,9 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: brief_fingerprint=fingerprint_by_issue.get(o.issue, ""), ) except Exception as ex_: # don't fail tick on history-write error - append_event(cfg.events_file, "attempt_record_failed", - issue=o.issue, err=str(ex_)[:200]) + append_event( + cfg.events_file, "attempt_record_failed", issue=o.issue, err=str(ex_)[:200] + ) # Critic agent: review PRs the workers opened, before auto-merge fires. # In pipeline-driven mode the critic ran as a chain step already. @@ -269,6 +313,7 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: # ticket. from forge_loop import gh as _gh from forge_loop.runner.merge_gate import apply_issue_closed_gate + refused = apply_issue_closed_gate( outcomes, gh=_gh, @@ -315,9 +360,13 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: # Drift detector (gap #3): record outcome signature, halt if 3-in-a-row. had_workers = bool(outcomes) all_failed = had_workers and all(o.status not in {"merged", "open"} for o in outcomes) - sig = "ok" if not all_failed else _error_signature( - outcomes[0].error if outcomes else None, - outcomes[0].stdout_tail if outcomes else "", + sig = ( + "ok" + if not all_failed + else _error_signature( + outcomes[0].error if outcomes else None, + outcomes[0].stdout_tail if outcomes else "", + ) ) _RECENT_OUTCOMES.append((had_workers, all_failed, sig)) if _check_drift_and_maybe_halt(cfg): diff --git a/src/forge_loop/worker.py b/src/forge_loop/worker.py index a96824d..567233e 100644 --- a/src/forge_loop/worker.py +++ b/src/forge_loop/worker.py @@ -113,8 +113,8 @@ def make_brief( " STOP. DO NOT enable auto-merge. The `risk:high` label on this issue\n" " means a human must review. Post a comment on the PR: 'Risk-gated;\n" " ready for human review.' Your status is `open` (not `merged`)." - if risk_gated else - "10. `gh pr create` with a clear title + body (the body should restate\n" + if risk_gated + else "10. `gh pr create` with a clear title + body (the body should restate\n" " the acceptance criteria and how they're tested).\n" "11. `gh pr merge --squash --auto --delete-branch`." ) @@ -123,8 +123,8 @@ def make_brief( final_status = ( f'{{"issue": {n}, "pr": "", "status": "open", "note": "risk-gated"}}' - if risk_gated else - f'{{"issue": {n}, "pr": "", "status": "merged|open|failed", "note": ""}}' + if risk_gated + else f'{{"issue": {n}, "pr": "", "status": "merged|open|failed", "note": ""}}' ) coauthor_line = f"Sign as: Co-Authored-By: {coauthor}" if coauthor else "" @@ -147,6 +147,7 @@ def make_brief( ) if dry_run: from forge_loop.replay import apply_dry_run_to_brief + rendered = apply_dry_run_to_brief(rendered) return rendered @@ -220,7 +221,8 @@ def _prep_worktree(repo: Path, n: int, branch: str) -> tuple[Path, str | None]: subprocess.run(["chmod", "-R", "u+w", str(claude_dir)], capture_output=True) subprocess.run( ["git", "worktree", "remove", "--force", str(wt)], - cwd=repo, capture_output=True, + cwd=repo, + capture_output=True, ) # If a previous failed attempt left a local branch lying around, delete # it so `git worktree add -B` can recreate it cleanly off the freshest @@ -228,7 +230,8 @@ def _prep_worktree(repo: Path, n: int, branch: str) -> tuple[Path, str | None]: # an explicit delete to fail loudly if the branch is still in use. subprocess.run( ["git", "branch", "-D", branch], - cwd=repo, capture_output=True, + cwd=repo, + capture_output=True, ) # Force-update origin/trunk so the worktree always starts at the freshest # commit, even if many PRs landed during the prior tick. `+refs/heads/...` @@ -236,11 +239,14 @@ def _prep_worktree(repo: Path, n: int, branch: str) -> tuple[Path, str | None]: # happen for trunk, but if it does we want the upstream view). subprocess.run( ["git", "fetch", "--prune", "origin", "+refs/heads/trunk:refs/remotes/origin/trunk"], - cwd=repo, capture_output=True, + cwd=repo, + capture_output=True, ) r = subprocess.run( ["git", "worktree", "add", str(wt), "-B", branch, "origin/trunk"], - cwd=repo, capture_output=True, text=True, + cwd=repo, + capture_output=True, + text=True, ) if r.returncode != 0: return wt, r.stderr @@ -310,6 +316,7 @@ def run_worker( tick: int | None = None, model: str | None = None, thinking: str | None = None, + provider: str = "claude", allowed_mcp_servers: tuple[str, ...] | None = None, ) -> WorkerOutcome: """Run one claude-code worker against an issue. @@ -327,21 +334,37 @@ def run_worker( worktree, err = _prep_worktree(repo, n, branch) if err is not None: return WorkerOutcome( - issue=n, title=title, pr_url=None, status="failed", - duration_s=0.0, stdout_tail=err[-500:], + issue=n, + title=title, + pr_url=None, + status="failed", + duration_s=0.0, + stdout_tail=err[-500:], error="worktree-create-failed", ) logs_dir.mkdir(parents=True, exist_ok=True) log_path = logs_dir / f"worker-{n}-{int(time.time())}.log" brief = make_brief( - issue, worktree, - risk_gated=risk_gated, past_attempts=past_attempts, + issue, + worktree, + risk_gated=risk_gated, + past_attempts=past_attempts, lumen_top_k=lumen_top_k, lumen_test_pattern=lumen_test_pattern, coauthor=coauthor, ) + if provider == "codex": + return _run_worker_codex( + issue=issue, + worktree=worktree, + log_path=log_path, + brief=brief, + timeout_s=timeout_s, + model=model, + ) + return _run_worker_sdk( issue=issue, worktree=worktree, @@ -356,6 +379,68 @@ def run_worker( ) +def _run_worker_codex( + *, + issue: dict[str, Any], + worktree: Path, + log_path: Path, + brief: str, + timeout_s: int, + model: str | None = None, +) -> WorkerOutcome: + """Drive a worker through ``codex exec`` and map it to WorkerOutcome.""" + from forge_loop.agent_backend import ( + extract_github_pr, + extract_last_json_object, + run_codex_exec, + ) + + n = issue["number"] + title = issue["title"] + result = run_codex_exec( + prompt=brief, + cwd=worktree, + log_path=log_path, + timeout_s=timeout_s, + model=model, + add_dirs=[worktree], + ) + if result.timed_out: + return WorkerOutcome( + issue=n, + title=title, + pr_url=None, + status="timeout", + duration_s=result.duration_s, + stdout_tail="(timeout)", + error=result.error, + usage={}, + model=model or "", + ) + obj = extract_last_json_object(result.last_message) or {} + pr_url = obj.get("pr") if isinstance(obj.get("pr"), str) else None + status = obj.get("status") if isinstance(obj.get("status"), str) else "no_pr" + if pr_url is None: + pr_url = extract_github_pr(result.last_message) + if pr_url: + status = "open" + if result.error and pr_url is None: + status = "failed" + return WorkerOutcome( + issue=n, + title=title, + pr_url=pr_url, + status=status, + duration_s=result.duration_s, + stdout_tail=_tail(log_path, 500), + events=_read_subagent_events(worktree), + cost_usd=0.0, + usage={}, + model=model or "", + error=result.error, + ) + + def _run_worker_sdk( *, issue: dict[str, Any], @@ -383,6 +468,7 @@ def _run_worker_sdk( started = time.time() with open(log_path, "w", encoding="utf-8") as log_fh: + def _on_event(ev: dict[str, Any]) -> None: with contextlib.suppress(OSError): log_fh.write(json.dumps(ev, default=str) + "\n") @@ -414,10 +500,16 @@ async def _drive() -> Any: # Record the *requested* model so the audit trail is informative # even when no response ever arrived (issue #34). return WorkerOutcome( - issue=n, title=title, pr_url=None, status="timeout", - duration_s=duration, stdout_tail="(timeout)", + issue=n, + title=title, + pr_url=None, + status="timeout", + duration_s=duration, + stdout_tail="(timeout)", error=f"worker exceeded {timeout_s}s", - cost_usd=0.0, usage={}, model=model or "", + cost_usd=0.0, + usage={}, + model=model or "", ) assert result is not None @@ -433,10 +525,16 @@ async def _drive() -> Any: events = _read_subagent_events(worktree) return WorkerOutcome( - issue=n, title=title, pr_url=pr_url, status=status, - duration_s=duration, stdout_tail=_tail(log_path, 500), + issue=n, + title=title, + pr_url=pr_url, + status=status, + duration_s=duration, + stdout_tail=_tail(log_path, 500), events=events, - cost_usd=cost_usd, usage=dict(result.usage or {}), model=model_seen, + cost_usd=cost_usd, + usage=dict(result.usage or {}), + model=model_seen, error=result.error, ) diff --git a/tests/test_codex_backend.py b/tests/test_codex_backend.py new file mode 100644 index 0000000..dcb37f0 --- /dev/null +++ b/tests/test_codex_backend.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from pathlib import Path + +from forge_loop.agent_backend import ( + build_codex_exec_argv, + extract_github_pr, + extract_last_json_object, +) + + +def test_build_codex_exec_argv_uses_stdin_prompt_and_last_message_file(tmp_path: Path) -> None: + last = tmp_path / "last.txt" + argv = build_codex_exec_argv( + cwd=tmp_path, + last_message_path=last, + model="gpt-5-codex", + add_dirs=[tmp_path / "extra"], + ) + assert argv[:3] == ["codex", "exec", "-"] + assert "--json" in argv + assert ["-C", str(tmp_path)] == argv[argv.index("-C") : argv.index("-C") + 2] + assert argv[argv.index("-m") : argv.index("-m") + 2] == ["-m", "gpt-5-codex"] + assert str(last) in argv + assert str(tmp_path / "extra") in argv + + +def test_extract_last_json_object_tolerates_prose() -> None: + obj = extract_last_json_object( + 'first {"ignored": true}\nDone.\n{"issue": 1, "status": "merged"}' + ) + assert obj == {"issue": 1, "status": "merged"} + + +def test_extract_github_pr_fallback() -> None: + assert extract_github_pr("opened https://github.com/o/r/pull/42") == ( + "https://github.com/o/r/pull/42" + ) diff --git a/tests/test_config.py b/tests/test_config.py index 991e809..bfdbc45 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -73,7 +73,8 @@ def test_coauthor_env(fake_repo: Path, monkeypatch: pytest.MonkeyPatch) -> None: def test_yaml_in_repo_root_is_picked_up(fake_repo: Path) -> None: - (fake_repo / "forge-loop.yaml").write_text(dedent(""" + (fake_repo / "forge-loop.yaml").write_text( + dedent(""" scheduling: parallel: 7 maintenance_every_n_ticks: 3 @@ -81,7 +82,8 @@ def test_yaml_in_repo_root_is_picked_up(fake_repo: Path) -> None: task: my-custom-task labels: ready: "ready-now" - """)) + """) + ) cfg = config_mod.load() assert cfg.parallel == 7 assert cfg.maintenance_every_n_ticks == 3 @@ -106,11 +108,13 @@ def test_env_overrides_yaml(fake_repo: Path, monkeypatch: pytest.MonkeyPatch) -> def test_briefs_yaml_loads_maintenance_template(fake_repo: Path) -> None: - (fake_repo / "forge-loop.yaml").write_text(dedent(""" + (fake_repo / "forge-loop.yaml").write_text( + dedent(""" briefs: maintenance: | Custom maintenance brief content - """)) + """) + ) cfg = config_mod.load() assert cfg.briefs.maintenance is not None assert "Custom maintenance brief" in cfg.briefs.maintenance @@ -136,15 +140,19 @@ def test_model_defaults_match_issue_34(fake_repo: Path) -> None: """ cfg = config_mod.load() assert cfg.worker.model == "claude-opus-4-7" + assert cfg.worker.provider == "claude" assert cfg.worker.thinking == "medium" assert cfg.po.model == "claude-opus-4-7" + assert cfg.po.provider == "claude" assert cfg.po.thinking == "high" assert cfg.critic.model == "claude-sonnet-4-6" + assert cfg.critic.provider == "claude" assert cfg.critic.thinking == "off" def test_yaml_overrides_model_defaults(fake_repo: Path) -> None: - (fake_repo / "forge-loop.yaml").write_text(dedent(""" + (fake_repo / "forge-loop.yaml").write_text( + dedent(""" worker: model: claude-sonnet-4-6 thinking: low @@ -154,7 +162,8 @@ def test_yaml_overrides_model_defaults(fake_repo: Path) -> None: critic: model: claude-haiku-4-5 thinking: off - """)) + """) + ) cfg = config_mod.load() assert cfg.worker.model == "claude-sonnet-4-6" assert cfg.worker.thinking == "low" @@ -165,12 +174,14 @@ def test_yaml_overrides_model_defaults(fake_repo: Path) -> None: def test_env_overrides_yaml_for_role_model( fake_repo: Path, monkeypatch: pytest.MonkeyPatch ) -> None: - (fake_repo / "forge-loop.yaml").write_text(dedent(""" + (fake_repo / "forge-loop.yaml").write_text( + dedent(""" worker: model: claude-opus-4-7 critic: thinking: low - """)) + """) + ) monkeypatch.setenv("LOOP_WORKER_MODEL", "claude-sonnet-4-6") monkeypatch.setenv("LOOP_WORKER_THINKING", "high") monkeypatch.setenv("LOOP_CRITIC_THINKING", "off") @@ -185,9 +196,12 @@ def test_missing_env_and_yaml_falls_back_to_documented_defaults( ) -> None: # Make explicit: no yaml, no env -> the documented defaults. for k in ( - "LOOP_WORKER_MODEL", "LOOP_WORKER_THINKING", - "LOOP_PO_MODEL", "LOOP_PO_THINKING", - "LOOP_CRITIC_MODEL", "LOOP_CRITIC_THINKING", + "LOOP_WORKER_MODEL", + "LOOP_WORKER_THINKING", + "LOOP_PO_MODEL", + "LOOP_PO_THINKING", + "LOOP_CRITIC_MODEL", + "LOOP_CRITIC_THINKING", ): monkeypatch.delenv(k, raising=False) cfg = config_mod.load() @@ -216,3 +230,54 @@ def test_unknown_thinking_value_raises_clear_error( with pytest.raises(config_mod.ModelConfigError) as excinfo: config_mod.load() assert "ultra" in str(excinfo.value) + + +def test_global_codex_provider_uses_codex_cli_default_model(fake_repo: Path) -> None: + (fake_repo / "forge-loop.yaml").write_text( + dedent(""" + agent: + provider: codex + """) + ) + cfg = config_mod.load() + assert cfg.worker.provider == "codex" + assert cfg.worker.model == "" + assert cfg.po.provider == "codex" + assert cfg.critic.provider == "codex" + + +def test_role_provider_overrides_global_agent_provider(fake_repo: Path) -> None: + (fake_repo / "forge-loop.yaml").write_text( + dedent(""" + agent: + provider: codex + worker: + provider: claude + model: claude-sonnet-4-6 + """) + ) + cfg = config_mod.load() + assert cfg.worker.provider == "claude" + assert cfg.worker.model == "claude-sonnet-4-6" + assert cfg.po.provider == "codex" + + +def test_codex_model_alias_is_allowed_from_env( + fake_repo: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("LOOP_WORKER_PROVIDER", "codex") + monkeypatch.setenv("LOOP_WORKER_MODEL", "gpt-5-codex") + cfg = config_mod.load() + assert cfg.worker.provider == "codex" + assert cfg.worker.model == "gpt-5-codex" + + +def test_unknown_agent_provider_raises_clear_error( + fake_repo: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("LOOP_AGENT_PROVIDER", "wizard") + with pytest.raises(config_mod.ModelConfigError) as excinfo: + config_mod.load() + assert "wizard" in str(excinfo.value) diff --git a/tests/test_critic_model.py b/tests/test_critic_model.py index 196652d..a6201f3 100644 --- a/tests/test_critic_model.py +++ b/tests/test_critic_model.py @@ -26,9 +26,10 @@ def fake_run(argv, **_kw): captured["argv"] = list(argv) return MagicMock(returncode=0) - with patch.object( - critic, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception) - ), patch.object(critic, "ensure_subagent_trusted", lambda *_a, **_k: None): + with ( + patch.object(critic, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception)), + patch.object(critic, "ensure_subagent_trusted", lambda *_a, **_k: None), + ): critic.review_pr( "https://github.com/o/r/pull/1", 123, @@ -50,9 +51,10 @@ def fake_run(argv, **_kw): captured["argv"] = list(argv) return MagicMock(returncode=0) - with patch.object( - critic, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception) - ), patch.object(critic, "ensure_subagent_trusted", lambda *_a, **_k: None): + with ( + patch.object(critic, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception)), + patch.object(critic, "ensure_subagent_trusted", lambda *_a, **_k: None), + ): critic.review_pr( "https://github.com/o/r/pull/1", 123, @@ -62,3 +64,33 @@ def fake_run(argv, **_kw): brief_template="dummy", ) assert "--model" not in captured["argv"] + + +def test_review_pr_codex_provider_uses_codex_backend(tmp_path: Path, monkeypatch) -> None: + from forge_loop import agent_backend + + captured: dict[str, object] = {} + + def fake_codex(**kwargs): + captured.update(kwargs) + return agent_backend.AgentRunResult( + provider="codex", + log_path=kwargs["log_path"], + last_message='{"overall": "approve", "findings": []}', + duration_s=0.5, + ) + + monkeypatch.setattr(agent_backend, "run_codex_exec", fake_codex) + with patch.object(critic, "ensure_subagent_trusted", lambda *_a, **_k: None): + outcome = critic.review_pr( + "https://github.com/o/r/pull/1", + 123, + tmp_path, + tmp_path / "logs", + timeout_s=10, + brief_template="dummy", + provider="codex", + model="gpt-5-codex", + ) + assert captured["model"] == "gpt-5-codex" + assert outcome.verdict == "approved" diff --git a/tests/test_po_model.py b/tests/test_po_model.py index 3ac9714..ae10777 100644 --- a/tests/test_po_model.py +++ b/tests/test_po_model.py @@ -35,8 +35,10 @@ def fake_run(argv, **_kw): captured["argv"] = list(argv) return MagicMock(returncode=0) - with patch.object(po, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception)), \ - patch.object(po, "ensure_subagent_trusted", lambda *_a, **_k: None): + with ( + patch.object(po, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception)), + patch.object(po, "ensure_subagent_trusted", lambda *_a, **_k: None), + ): po.expand_thin_specs( [{"number": 1, "title": "t", "body": "thin body"}], tmp_path, @@ -60,11 +62,14 @@ def fake_run(argv, **_kw): captured["argv"] = list(argv) return MagicMock(returncode=0) - with patch.object(po, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception)), \ - patch.object(po, "ensure_subagent_trusted", lambda *_a, **_k: None): + with ( + patch.object(po, "subprocess", MagicMock(run=fake_run, TimeoutExpired=Exception)), + patch.object(po, "ensure_subagent_trusted", lambda *_a, **_k: None), + ): po.expand_thin_specs( [{"number": 1, "title": "t", "body": "thin"}], - tmp_path, tmp_path / "logs", + tmp_path, + tmp_path / "logs", github_repo="o/r", timeout_s=10, brief_template="dummy", @@ -72,3 +77,38 @@ def fake_run(argv, **_kw): ) assert "--model" not in captured["argv"] + + +def test_expand_thin_specs_codex_provider_uses_codex_backend( + tmp_path: Path, + monkeypatch, +) -> None: + from forge_loop import agent_backend + + captured: dict[str, object] = {} + + def fake_codex(**kwargs): + captured.update(kwargs) + return agent_backend.AgentRunResult( + provider="codex", + log_path=kwargs["log_path"], + last_message='{"skipped": false, "reason": "expanded", "sections_added": ["Acceptance"]}', + duration_s=0.5, + ) + + monkeypatch.setattr(agent_backend, "run_codex_exec", fake_codex) + with patch.object(po, "ensure_subagent_trusted", lambda *_a, **_k: None): + outcomes = po.expand_thin_specs( + [{"number": 1, "title": "t", "body": "thin"}], + tmp_path, + tmp_path / "logs", + github_repo="o/r", + timeout_s=10, + brief_template="dummy", + max_to_expand=1, + provider="codex", + model="gpt-5-codex", + ) + + assert captured["model"] == "gpt-5-codex" + assert outcomes[0].sections_added == ["Acceptance"] diff --git a/tests/test_worker.py b/tests/test_worker.py index c11cd3e..ee6a566 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,6 +21,7 @@ _read_subagent_events, _tail, make_brief, + run_worker, ) @@ -147,8 +148,12 @@ def test_make_brief_includes_history_section_when_past_attempts(tmp_path: Path) issue = {"number": 942, "title": "fix x", "body": "Do the thing."} past = [ {"ts": "2026-05-26T10:00:00Z", "status": "failed", "note": "test missing", "pr_url": None}, - {"ts": "2026-05-26T11:00:00Z", "status": "merged", "note": "shipped", - "pr_url": "https://github.com/h/r/pull/9"}, + { + "ts": "2026-05-26T11:00:00Z", + "status": "merged", + "note": "shipped", + "pr_url": "https://github.com/h/r/pull/9", + }, ] brief = make_brief(issue, tmp_path / "w", past_attempts=past) assert "PREVIOUS ATTEMPTS" in brief @@ -178,6 +183,45 @@ def test_make_brief_default_keeps_automerge(tmp_path: Path) -> None: assert "DO NOT enable auto-merge" not in brief +def test_run_worker_codex_provider_maps_final_json( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + from forge_loop import agent_backend + + worktree = tmp_path / "wt" + worktree.mkdir() + + def fake_prep(_repo: Path, _n: int, _branch: str) -> tuple[Path, None]: + return worktree, None + + def fake_codex(**kwargs: Any) -> agent_backend.AgentRunResult: + assert kwargs["cwd"] == worktree + assert kwargs["model"] == "gpt-5-codex" + return agent_backend.AgentRunResult( + provider="codex", + log_path=kwargs["log_path"], + last_message=( + 'Done.\n{"issue": 12, "pr": "https://github.com/o/r/pull/9", "status": "open"}' + ), + duration_s=1.5, + ) + + monkeypatch.setattr("forge_loop.worker._prep_worktree", fake_prep) + monkeypatch.setattr(agent_backend, "run_codex_exec", fake_codex) + out = run_worker( + {"number": 12, "title": "ship codex", "body": "body"}, + tmp_path, + tmp_path / "logs", + 30, + provider="codex", + model="gpt-5-codex", + ) + assert out.status == "open" + assert out.pr_url == "https://github.com/o/r/pull/9" + assert out.model == "gpt-5-codex" + + # Gradle/WSL-OOM guard tests removed: forge-loop is stack-agnostic; the # generic brief now says "avoid full-suite runs" without project-specific # JVM flag pinning. Operators add their own gates via project tooling. @@ -248,6 +292,7 @@ def __init__(self, **kwargs: Any) -> None: def patch_sdk_types(monkeypatch: pytest.MonkeyPatch) -> None: """Swap claude_agent_sdk in sys.modules for a stub the tests can drive.""" import types as _types + fake = _types.ModuleType("claude_agent_sdk") fake.AssistantMessage = _FakeAssistantMessage # type: ignore[attr-defined] fake.ResultMessage = _FakeResultMessage # type: ignore[attr-defined] @@ -268,21 +313,26 @@ async def _stub_query(**_kw: Any) -> Any: # pragma: no cover def _make_stream(messages: list[Any]) -> Any: """Return a callable matching SDK `query(prompt=..., options=...)`.""" + async def _q(*, prompt: str, options: Any) -> Any: for m in messages: yield m + return _q def _run(messages: list[Any], **kw: Any) -> _worker_sdk.SDKRunResult: import anyio - return anyio.run(lambda: _worker_sdk.run_sdk_session( - "irrelevant brief", - cwd=Path("/tmp"), - query_fn=_make_stream(messages), - options_cls=_FakeOptions, - **kw, - )) + + return anyio.run( + lambda: _worker_sdk.run_sdk_session( + "irrelevant brief", + cwd=Path("/tmp"), + query_fn=_make_stream(messages), + options_cls=_FakeOptions, + **kw, + ) + ) def test_sdk_happy_path_extracts_pr_and_merged(patch_sdk_types: None) -> None: @@ -295,11 +345,21 @@ def test_sdk_happy_path_extracts_pr_and_merged(patch_sdk_types: None) -> None: ], model="claude-sonnet-4-6", ), - _FakeUserMessage(content=[_FakeToolResultBlock( - tool_use_id="t1", content="file contents", is_error=False, - )]), + _FakeUserMessage( + content=[ + _FakeToolResultBlock( + tool_use_id="t1", + content="file contents", + is_error=False, + ) + ] + ), _FakeAssistantMessage( - content=[_FakeTextBlock(text='Done.\n{"issue":2,"pr":"https://github.com/o/r/pull/77","status":"merged"}')], + content=[ + _FakeTextBlock( + text='Done.\n{"issue":2,"pr":"https://github.com/o/r/pull/77","status":"merged"}' + ) + ], ), _FakeResultMessage( result='Done.\n{"issue":2,"pr":"https://github.com/o/r/pull/77","status":"merged"}', @@ -322,8 +382,13 @@ def test_sdk_happy_path_extracts_pr_and_merged(patch_sdk_types: None) -> None: # immediately after ``turn_start``; the original event order is # otherwise preserved. assert kinds == [ - "turn_start", "worker_mcp_filtered", "assistant_text", "tool_use", - "tool_result", "assistant_text", "final_result", + "turn_start", + "worker_mcp_filtered", + "assistant_text", + "tool_use", + "tool_result", + "assistant_text", + "final_result", ] # seq is monotonic assert [e["seq"] for e in captured] == list(range(1, len(captured) + 1)) @@ -349,9 +414,15 @@ def test_sdk_tool_error_event_preserved(patch_sdk_types: None) -> None: _FakeAssistantMessage( content=[_FakeToolUseBlock(id="t9", name="Bash", input={"command": "git push"})], ), - _FakeUserMessage(content=[_FakeToolResultBlock( - tool_use_id="t9", content="permission denied", is_error=True, - )]), + _FakeUserMessage( + content=[ + _FakeToolResultBlock( + tool_use_id="t9", + content="permission denied", + is_error=True, + ) + ] + ), _FakeResultMessage(result="failed: cannot push", total_cost_usd=0.05), ] events: list[dict[str, Any]] = [] @@ -365,18 +436,24 @@ def test_sdk_tool_error_event_preserved(patch_sdk_types: None) -> None: def test_sdk_rate_limit_mid_stream_emits_error_not_crash(patch_sdk_types: None) -> None: """Adversarial: SDK raises 429 mid-iteration — worker must absorb it.""" + async def _q(**_kw: Any) -> Any: yield _FakeSystemMessage(subtype="init") yield _FakeAssistantMessage(content=[_FakeTextBlock(text="working")]) raise RuntimeError("HTTP 429: rate_limit_error — retry-after: 30s") import anyio + events: list[dict[str, Any]] = [] - res = anyio.run(lambda: _worker_sdk.run_sdk_session( - "x", cwd=Path("/tmp"), - query_fn=_q, options_cls=_FakeOptions, - on_event=events.append, - )) + res = anyio.run( + lambda: _worker_sdk.run_sdk_session( + "x", + cwd=Path("/tmp"), + query_fn=_q, + options_cls=_FakeOptions, + on_event=events.append, + ) + ) # Loop survives — no exception propagates err_events = [e for e in events if e["kind"] == "error"] assert len(err_events) == 1 @@ -396,15 +473,15 @@ def test_sdk_path_does_not_import_subprocess() -> None: issue #2's 'shrink or disappear' wording). """ import importlib + mod = importlib.reload(_worker_sdk) - assert "subprocess" not in mod.__dict__, ( - "_worker_sdk must remain subprocess-free per issue #2" - ) + assert "subprocess" not in mod.__dict__, "_worker_sdk must remain subprocess-free per issue #2" # And the source text itself doesn't reference it src = Path(mod.__file__).read_text() # only allowed reference is the explanatory docstring/comment code_lines = [ - ln for ln in src.splitlines() + ln + for ln in src.splitlines() if not ln.strip().startswith("#") and not ln.strip().startswith('"') ] code_blob = "\n".join(code_lines) @@ -421,9 +498,7 @@ def test_extract_pr_status_handles_trailing_object() -> None: def test_extract_pr_status_regex_fallback() -> None: - pr, status = _worker_sdk._extract_pr_status( - "see https://github.com/foo/bar/pull/9 for review" - ) + pr, status = _worker_sdk._extract_pr_status("see https://github.com/foo/bar/pull/9 for review") assert pr == "https://github.com/foo/bar/pull/9" assert status == "open" @@ -448,13 +523,21 @@ def test_run_sdk_session_writes_typed_events_to_callback(patch_sdk_types: None) messages = [ _FakeSystemMessage(subtype="init"), - _FakeAssistantMessage(content=[ - _FakeTextBlock(text="hi"), - _FakeToolUseBlock(id="t", name="Read", input={"p": "/x"}), - ]), - _FakeUserMessage(content=[_FakeToolResultBlock( - tool_use_id="t", content="ok", is_error=False, - )]), + _FakeAssistantMessage( + content=[ + _FakeTextBlock(text="hi"), + _FakeToolUseBlock(id="t", name="Read", input={"p": "/x"}), + ] + ), + _FakeUserMessage( + content=[ + _FakeToolResultBlock( + tool_use_id="t", + content="ok", + is_error=False, + ) + ] + ), _FakeResultMessage(result="{}", total_cost_usd=0.001), ] seen: list[str] = [] From af177c612dcafea518f9cf236ad1cb4ed728ffa4 Mon Sep 17 00:00:00 2001 From: MAJDOUB Khalid Date: Fri, 29 May 2026 23:43:09 +0200 Subject: [PATCH 2/2] Support configurable forge-loop base branch Add repo.base_branch and LOOP_BASE_BRANCH so worker worktrees and doctor checks can target repositories that use main instead of trunk. Thread the setting through worker dispatch paths and cover yaml/env loading. --- src/forge_loop/cli.py | 11 ++++++----- src/forge_loop/config.py | 2 ++ src/forge_loop/mcp_server.py | 2 +- src/forge_loop/runner/_pipeline_driver.py | 1 + src/forge_loop/runner/boot.py | 1 + src/forge_loop/runner/dispatch.py | 1 + src/forge_loop/worker.py | 21 +++++++++++++++------ tests/test_config.py | 19 +++++++++++++++++++ tests/test_worker.py | 4 +++- 9 files changed, 49 insertions(+), 13 deletions(-) diff --git a/src/forge_loop/cli.py b/src/forge_loop/cli.py index 5dbf997..89b1503 100644 --- a/src/forge_loop/cli.py +++ b/src/forge_loop/cli.py @@ -218,28 +218,28 @@ def line(status: str, label: str, detail: str = "") -> None: timeout=5, ).stdout.strip() _sp.run( - ["git", "fetch", "origin", "trunk", "--quiet"], + ["git", "fetch", "origin", cfg.base_branch, "--quiet"], cwd=cfg.repo, capture_output=True, timeout=10, ) remote = _sp.run( - ["git", "rev-parse", "origin/trunk"], + ["git", "rev-parse", f"origin/{cfg.base_branch}"], cwd=cfg.repo, capture_output=True, text=True, timeout=5, ).stdout.strip() if local and remote and local == remote: - line("green", f"code matches origin/trunk @ {local[:8]}") + line("green", f"code matches origin/{cfg.base_branch} @ {local[:8]}") elif local and remote: line( "yellow", - "local trunk behind origin", + f"local {cfg.base_branch} behind origin", f"local={local[:8]} origin={remote[:8]}", ) else: - line("yellow", "could not compare to origin/trunk") + line("yellow", f"could not compare to origin/{cfg.base_branch}") except (_sp.SubprocessError, OSError): line("yellow", "git probe failed") @@ -1002,6 +1002,7 @@ def _cmd_config(args: SimpleNamespace) -> int: cfg = load() out = { "repo": str(cfg.repo), + "base_branch": cfg.base_branch, "parallel": cfg.parallel, "tick_interval_s": cfg.tick_interval_s, "max_ticks": cfg.max_ticks, diff --git a/src/forge_loop/config.py b/src/forge_loop/config.py index ba358ac..5a1910c 100644 --- a/src/forge_loop/config.py +++ b/src/forge_loop/config.py @@ -209,6 +209,7 @@ class LumenConfig: class Config: repo: Path github_repo: str | None = None + base_branch: str = "trunk" coauthor: str = "" lumen_test_pattern: str = "**/*Test.*" worktree_root: Path = field(default_factory=lambda: Path("/tmp")) @@ -447,6 +448,7 @@ def _resolve_role( return Config( repo=repo, github_repo=github_repo, + base_branch=_env_str("LOOP_BASE_BRANCH", repo_block.get("base_branch", "trunk")), coauthor=os.environ.get("LOOP_COAUTHOR", ""), lumen_test_pattern=lumen_block.get("test_pattern", "**/*Test.*"), worktree_root=Path(repo_block.get("worktree_root", "/tmp")), diff --git a/src/forge_loop/mcp_server.py b/src/forge_loop/mcp_server.py index 4030e9c..6745563 100644 --- a/src/forge_loop/mcp_server.py +++ b/src/forge_loop/mcp_server.py @@ -259,7 +259,7 @@ def dispatch_worker(issue_number: int, timeout_s: int = 3600) -> dict[str, Any]: issue = json.loads(r.stdout) else: issue = matches[0] - outcome = _run_worker(issue, cfg.repo, cfg.logs_dir, timeout_s) + outcome = _run_worker(issue, cfg.repo, cfg.logs_dir, timeout_s, base_branch=cfg.base_branch) return asdict(outcome) diff --git a/src/forge_loop/runner/_pipeline_driver.py b/src/forge_loop/runner/_pipeline_driver.py index 7750322..e008711 100644 --- a/src/forge_loop/runner/_pipeline_driver.py +++ b/src/forge_loop/runner/_pipeline_driver.py @@ -114,6 +114,7 @@ def worker_handler(ctx: StepContext) -> StepOutcome: thinking=cfg.worker.thinking, provider=getattr(cfg.worker, "provider", "claude"), allowed_mcp_servers=cfg.worker.allowed_mcp_tools, + base_branch=cfg.base_branch, ) ok = out.status in {"merged", "open"} return StepOutcome( diff --git a/src/forge_loop/runner/boot.py b/src/forge_loop/runner/boot.py index d4fd2e8..553b916 100644 --- a/src/forge_loop/runner/boot.py +++ b/src/forge_loop/runner/boot.py @@ -309,6 +309,7 @@ async def _worker_fn(issue: dict[str, Any], _po: dict[str, Any]) -> dict[str, An thinking=cfg.worker.thinking, provider=getattr(cfg.worker, "provider", "claude"), allowed_mcp_servers=cfg.worker.allowed_mcp_tools, + base_branch=cfg.base_branch, ) return { "issue": o.issue, diff --git a/src/forge_loop/runner/dispatch.py b/src/forge_loop/runner/dispatch.py index cf036b1..7199182 100644 --- a/src/forge_loop/runner/dispatch.py +++ b/src/forge_loop/runner/dispatch.py @@ -108,6 +108,7 @@ def _run_workers( thinking=cfg.worker.thinking, provider=getattr(cfg.worker, "provider", "claude"), allowed_mcp_servers=cfg.worker.allowed_mcp_tools, + base_branch=cfg.base_branch, ) for i, meta in dispatch ] diff --git a/src/forge_loop/worker.py b/src/forge_loop/worker.py index 567233e..4b4993b 100644 --- a/src/forge_loop/worker.py +++ b/src/forge_loop/worker.py @@ -211,7 +211,9 @@ def _drop_permissive_settings(worktree: Path) -> None: cdir.chmod(0o555) -def _prep_worktree(repo: Path, n: int, branch: str) -> tuple[Path, str | None]: +def _prep_worktree( + repo: Path, n: int, branch: str, base_branch: str = "trunk" +) -> tuple[Path, str | None]: wt = Path(f"/tmp/wt-loop-{n}") # chmod the planted .claude/ back to writable so worktree remove can # delete it (we set it read-only at the end of last run to prevent worker @@ -226,24 +228,30 @@ def _prep_worktree(repo: Path, n: int, branch: str) -> tuple[Path, str | None]: ) # If a previous failed attempt left a local branch lying around, delete # it so `git worktree add -B` can recreate it cleanly off the freshest - # origin/trunk. `-B` would overwrite anyway, but we use plain `-b` after + # remote base branch. `-B` would overwrite anyway, but we use plain `-b` after # an explicit delete to fail loudly if the branch is still in use. subprocess.run( ["git", "branch", "-D", branch], cwd=repo, capture_output=True, ) - # Force-update origin/trunk so the worktree always starts at the freshest + # Force-update the remote base branch so the worktree always starts at the freshest # commit, even if many PRs landed during the prior tick. `+refs/heads/...` # makes the fetch force the ref update (defensive — non-FF should never # happen for trunk, but if it does we want the upstream view). subprocess.run( - ["git", "fetch", "--prune", "origin", "+refs/heads/trunk:refs/remotes/origin/trunk"], + [ + "git", + "fetch", + "--prune", + "origin", + f"+refs/heads/{base_branch}:refs/remotes/origin/{base_branch}", + ], cwd=repo, capture_output=True, ) r = subprocess.run( - ["git", "worktree", "add", str(wt), "-B", branch, "origin/trunk"], + ["git", "worktree", "add", str(wt), "-B", branch, f"origin/{base_branch}"], cwd=repo, capture_output=True, text=True, @@ -318,6 +326,7 @@ def run_worker( thinking: str | None = None, provider: str = "claude", allowed_mcp_servers: tuple[str, ...] | None = None, + base_branch: str = "trunk", ) -> WorkerOutcome: """Run one claude-code worker against an issue. @@ -331,7 +340,7 @@ def run_worker( title = issue["title"] branch = _branch_name(n, title) - worktree, err = _prep_worktree(repo, n, branch) + worktree, err = _prep_worktree(repo, n, branch, base_branch) if err is not None: return WorkerOutcome( issue=n, diff --git a/tests/test_config.py b/tests/test_config.py index bfdbc45..8895bac 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -55,6 +55,25 @@ def test_repo_from_yaml(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None assert cfg.github_repo == "foo/bar" +def test_base_branch_from_yaml(fake_repo: Path) -> None: + (fake_repo / "forge-loop.yaml").write_text( + "repo:\n github: foo/bar\n base_branch: main\n" + ) + cfg = config_mod.load() + assert cfg.base_branch == "main" + + +def test_base_branch_env_overrides_yaml( + fake_repo: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + (fake_repo / "forge-loop.yaml").write_text( + "repo:\n github: foo/bar\n base_branch: main\n" + ) + monkeypatch.setenv("LOOP_BASE_BRANCH", "release") + cfg = config_mod.load() + assert cfg.base_branch == "release" + + def test_env_overrides_yaml_repo(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(config_mod, "_repo_root", lambda: tmp_path) for k in list(os.environ): diff --git a/tests/test_worker.py b/tests/test_worker.py index ee6a566..2db97d9 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -192,7 +192,9 @@ def test_run_worker_codex_provider_maps_final_json( worktree = tmp_path / "wt" worktree.mkdir() - def fake_prep(_repo: Path, _n: int, _branch: str) -> tuple[Path, None]: + def fake_prep( + _repo: Path, _n: int, _branch: str, _base_branch: str = "trunk" + ) -> tuple[Path, None]: return worktree, None def fake_codex(**kwargs: Any) -> agent_backend.AgentRunResult: