From cc47c3ec1301515b61e741a065cc0ddabc37e811 Mon Sep 17 00:00:00 2001 From: kmajdoub Date: Thu, 28 May 2026 18:52:01 +0200 Subject: [PATCH] feat(brainstormer): axis label namespace + axis-aware status/dispatch filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #126. Introduces the `axis:` GitHub label namespace as pure plumbing so operators can focus a sprint on a single topical concern (dispatch, cli, observability, …) without changing how issues are queued. Surface: - `forge-loop status` now groups open `loop:ready` issues by `axis:*` label. JSON gains `axes` + `unaligned_count` + `axis_filter` keys. Issues with no axis land in an `unaligned` bucket and a yellow warning row is rendered in the human surface. - `forge-loop run --axis ` (repeatable, union semantics) narrows dispatch to matching issues. Omitting the flag preserves pre-#126 behaviour byte-for-byte. - `forge-loop status --axis ` narrows the grouped view. - The runner logs the active axis filter at startup and emits `axis_filter_active` / `axis_filter_empty` events per tick so a focused-sprint launch is auditable from `forge-loop events`. Implementation: - New `forge_loop.axis` module: pure helpers (extract_axes, group_by_axis, filter_issues_by_axes, parse_filter_env). Lowercase-normalises mixed-case labels; treats empty-slug labels (`axis:`) as unaligned. - CLI threads `--axis` through env var `LOOP_AXIS_FILTER` so the tick loop can read it without a new kwarg on `run_loop`. - Dispatch widens its `top_issues` fetch only when a filter is active, then trims back to `cfg.parallel` from the matched subset. Docs: `docs/axis-labels.md` walks the namespace + edge-case matrix. Tests: - tests/test_cli_axis_filter.py — pure axis helpers, `status --json` payload shape, multi-axis dedup, Rich warning row, CLI flag wiring. - tests/test_dispatch_axis_filter.py — single + union filters, regression guard for the no-filter path (identity-preserving), ready-gate priority, malformed-label safety, mixed-case normalisation, zero-match clean exit. --- docs/axis-labels.md | 89 ++++++++++ src/forge_loop/axis.py | 140 ++++++++++++++++ src/forge_loop/cli.py | 88 +++++++++- src/forge_loop/runner/boot.py | 13 ++ src/forge_loop/runner/tick.py | 29 +++- tests/test_cli_axis_filter.py | 252 +++++++++++++++++++++++++++++ tests/test_dispatch_axis_filter.py | 133 +++++++++++++++ 7 files changed, 739 insertions(+), 5 deletions(-) create mode 100644 docs/axis-labels.md create mode 100644 src/forge_loop/axis.py create mode 100644 tests/test_cli_axis_filter.py create mode 100644 tests/test_dispatch_axis_filter.py diff --git a/docs/axis-labels.md b/docs/axis-labels.md new file mode 100644 index 0000000..ce268ac --- /dev/null +++ b/docs/axis-labels.md @@ -0,0 +1,89 @@ +# Axis labels (`axis:`) + +> Introduced in [#126](https://github.com/khalidx/forge-loop/issues/126). +> Plumbing for focused-sprint mode: group / filter the loop's work by +> topical concern without changing how issues are queued. + +## What it is + +A free-form label namespace, `axis:`, applied to GitHub issues +by the brainstormer / PO / operators. The slug is any lowercase string +(e.g. `dispatch`, `cli`, `observability`, `docs`). Multiple `axis:*` +labels on the same issue are fine; they stack. + +There is **no enum, no registry, and no validation**. If you want to +spin up a new axis tonight, just label an issue. + +## Why + +Operators staring at `forge-loop status` saw a flat list of +`loop:ready` issues with no way to tell whether the backlog leaned +toward dispatcher work, CLI work, or observability work. Likewise, +`forge-loop run` greedily grabbed whatever was ready, so an operator +who wanted to grind one area for a sprint had no knob. + +## Surface + +### Status grouping + +``` +$ forge-loop status +... (existing rows) +axes dispatch (6) #12, #14, #18, #19, #22, #27 + cli (5) #11, #13, #21, #24, #26 + unaligned(3) #9, #17, #28 +warning 3 open issue(s) carry no axis:* label +``` + +JSON shape (`forge-loop status --json`): + +```json +{ + "queue_depth": 14, + "axes": { + "dispatch": [{"number": 12, "title": "..."}, ...], + "cli": [{"number": 11, "title": "..."}, ...], + "unaligned": [{"number": 9, "title": "..."}, ...] + }, + "unaligned_count": 3, + "axis_filter": [] +} +``` + +Narrow the view with `--axis`: + +``` +$ forge-loop status --axis dispatch --json +{ ..., "axes": { "dispatch": [...] }, "axis_filter": ["dispatch"] } +``` + +### Dispatch filter + +``` +$ forge-loop run --axis dispatch # only axis:dispatch issues +$ forge-loop run --axis dispatch --axis cli # union +$ forge-loop run # unchanged behaviour (no filter) +``` + +The runner logs the active filter at startup and emits +`axis_filter_active` events per tick so a focused-sprint launch is +auditable from `forge-loop events`. If no ready issues match, the tick +exits cleanly (the loop just idles until a matching issue lands). + +## Edge cases + +| Case | Behaviour | +| ---- | --------- | +| Mixed case label (`Axis:Dispatch`) | Normalised to lowercase before matching. | +| Empty slug label (`axis:`) | Treated as unaligned; logged, no crash. | +| Issue carries multiple `axis:*` labels | Appears under each bucket; counted once in `unaligned_count`. | +| `--axis foo` matches zero issues | Tick idles cleanly with an `axis_filter_empty` event. Exit code 0. | +| `--axis` not passed | Behaviour is byte-identical to the pre-#126 loop. | + +## Out of scope + +- Automatic axis assignment from issue text (separate brainstormer + enhancement). +- Closed registry / enum of allowed axes. +- Persisting the filter into config or state — `--axis` is a + per-invocation knob. diff --git a/src/forge_loop/axis.py b/src/forge_loop/axis.py new file mode 100644 index 0000000..5a19a17 --- /dev/null +++ b/src/forge_loop/axis.py @@ -0,0 +1,140 @@ +"""Axis label namespace helpers (issue #126). + +The brainstormer/PO emits issues across many concerns: CLI, runner, +dispatch, observability, etc. Operators want to focus a sprint on a +single concern — but a flat ``loop:ready`` queue gives them no way to +group or filter. Issue #126 introduces the ``axis:`` label +namespace as plumbing: + +* Any label whose name (lowercased) starts with ``axis:`` is parsed + into a free-form axis slug. No registry, no validation, no enum. +* ``forge-loop status`` groups open ready-issues by axis; issues with + no axis label go into the ``unaligned`` bucket and a soft warning + is surfaced. +* ``forge-loop run --axis `` (repeatable) narrows dispatch to + issues carrying any of those axis labels. Omitting the flag is a + no-op — behaviour is byte-identical to the pre-#126 loop. + +The helpers in this module are intentionally pure: they read labels +off in-memory dicts, normalise case, and never touch GitHub. Callers +fetch issues; we just bucket them. +""" + +from __future__ import annotations + +import os +from typing import Any, Iterable + + +AXIS_PREFIX = "axis:" +UNALIGNED_BUCKET = "unaligned" +AXIS_FILTER_ENV = "LOOP_AXIS_FILTER" + + +def extract_axes(labels: Iterable[Any]) -> set[str]: + """Pull the ``axis:*`` slugs off a label list. + + Accepts either raw strings (``"axis:dispatch"``) or dicts shaped + like the ``gh issue list --json labels`` payload (``{"name": ...}``). + Comparison is case-insensitive: ``Axis:Dispatch`` normalises to + ``dispatch``. + + Empty slugs (the literal label ``axis:`` with nothing after the + colon) are dropped — the acceptance criteria call this out as + "treated as unaligned". + """ + out: set[str] = set() + for raw in labels or []: + if isinstance(raw, dict): + name = str(raw.get("name") or "") + else: + name = str(raw or "") + low = name.strip().lower() + if not low.startswith(AXIS_PREFIX): + continue + slug = low[len(AXIS_PREFIX):].strip() + if not slug: + continue + out.add(slug) + return out + + +def matches_axes(labels: Iterable[Any], wanted: Iterable[str]) -> bool: + """True iff the issue's axes intersect ``wanted``. + + Used by the dispatcher. ``wanted`` empty means "no filter set" — + callers handle that branch separately (this function would return + False for empty intersection, which is the wrong answer). + """ + want = {w.lower() for w in wanted if w} + if not want: + return True + return bool(extract_axes(labels) & want) + + +def group_by_axis( + issues: list[dict[str, Any]], +) -> tuple[dict[str, list[dict[str, Any]]], int]: + """Bucket issues by axis label. + + Returns ``(buckets, unaligned_count)`` where ``buckets`` maps an + axis slug -> list of issues carrying that label, plus a special + ``"unaligned"`` key for issues with zero axis labels. An issue + carrying multiple axis labels appears under each bucket. The + ``unaligned_count`` reflects unique issues, not the sum of bucket + sizes (an issue can be in two axes but only counts once). + """ + buckets: dict[str, list[dict[str, Any]]] = {} + unaligned = 0 + for issue in issues: + axes = extract_axes(issue.get("labels") or []) + if not axes: + buckets.setdefault(UNALIGNED_BUCKET, []).append(issue) + unaligned += 1 + continue + for ax in sorted(axes): + buckets.setdefault(ax, []).append(issue) + return buckets, unaligned + + +def parse_filter_env(env: str | None = None) -> list[str]: + """Read ``LOOP_AXIS_FILTER`` (comma-separated) -> sorted unique slugs. + + The CLI sets this env var when ``--axis`` is passed; the tick loop + consumes it. Empty / missing var -> empty list (= no filter). + """ + raw = env if env is not None else os.environ.get(AXIS_FILTER_ENV, "") + if not raw: + return [] + seen: list[str] = [] + for chunk in raw.split(","): + c = chunk.strip().lower() + if c and c not in seen: + seen.append(c) + return seen + + +def filter_issues_by_axes( + issues: list[dict[str, Any]], + wanted: list[str], +) -> list[dict[str, Any]]: + """Return issues whose axis labels intersect ``wanted``. + + Empty ``wanted`` -> returns input unchanged (preserves today's + behaviour exactly, per the regression-guard acceptance criterion). + """ + if not wanted: + return issues + return [i for i in issues if matches_axes(i.get("labels") or [], wanted)] + + +__all__ = [ + "AXIS_FILTER_ENV", + "AXIS_PREFIX", + "UNALIGNED_BUCKET", + "extract_axes", + "filter_issues_by_axes", + "group_by_axis", + "matches_axes", + "parse_filter_env", +] diff --git a/src/forge_loop/cli.py b/src/forge_loop/cli.py index 7c71426..8938c8c 100644 --- a/src/forge_loop/cli.py +++ b/src/forge_loop/cli.py @@ -102,6 +102,20 @@ def _cmd_run(args: SimpleNamespace) -> int: if queue_url: os.environ["LOOP_QUEUE_URL"] = queue_url + # Issue #126 — axis-aware dispatch filter. The CLI accepts ``--axis`` + # repeatedly; we serialise to a comma-separated env var so the tick + # loop (which is a separate function in another module) can read it + # without us having to thread a new kwarg through ``run_loop`` and + # every adjacent caller. Empty list -> env var stays unset, and the + # dispatcher takes the pre-#126 fast path verbatim. + from forge_loop.axis import AXIS_FILTER_ENV + + axes = [a.strip().lower() for a in (getattr(args, "axis", None) or []) if a and a.strip()] + if axes: + os.environ[AXIS_FILTER_ENV] = ",".join(axes) + else: + os.environ.pop(AXIS_FILTER_ENV, None) + orch = getattr(args, "orchestrator", "sync") if orch == "async": from forge_loop.runner import run_async as run_async_loop @@ -324,7 +338,11 @@ def _cmd_status(args: SimpleNamespace) -> int: except json.JSONDecodeError: pass + # Issue #126 — fetch labels + title alongside number so we can group + # the open ready-queue by ``axis:*`` label below. Cheap: same call, + # one additional JSON field. queue_depth = 0 + ready_issues: list[dict[str, Any]] = [] try: r = subprocess.run( [ @@ -337,18 +355,41 @@ def _cmd_status(args: SimpleNamespace) -> int: cfg.labels.ready, "--state", "open", + "--limit", + "200", "--json", - "number", + "number,title,labels", ], capture_output=True, text=True, timeout=15, ) if r.returncode == 0: - queue_depth = len(json.loads(r.stdout or "[]")) + ready_issues = json.loads(r.stdout or "[]") + queue_depth = len(ready_issues) except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError): queue_depth = -1 + # Group the open ready-queue by axis. Axis filter (--axis) narrows + # the bucketed view to just the requested slugs — this is the + # "sanity check before running" surface called out in the spec. + from forge_loop.axis import UNALIGNED_BUCKET, group_by_axis + + axis_filter = [a.strip().lower() for a in (getattr(args, "axis", None) or []) if a and a.strip()] + grouped_all, unaligned_count = group_by_axis(ready_issues) + if axis_filter: + axes_view: dict[str, list[dict[str, Any]]] = { + k: v for k, v in grouped_all.items() if k in set(axis_filter) + } + else: + axes_view = grouped_all + # Render shape: {axis_slug: [{number, title}], ...} (drop the heavy + # ``labels`` blob from the per-issue payload). + axes_payload: dict[str, list[dict[str, Any]]] = { + k: [{"number": i.get("number"), "title": i.get("title", "")} for i in v] + for k, v in axes_view.items() + } + payload: dict[str, Any] = { "pid": pid_text or None, "pid_alive": pid_alive, @@ -361,6 +402,9 @@ def _cmd_status(args: SimpleNamespace) -> int: "last_failure": last_failure, "last_events": last_5_events, "events_file": str(cfg.events_file), + "axes": axes_payload, + "unaligned_count": unaligned_count, + "axis_filter": axis_filter, } if getattr(args, "json", False): @@ -408,6 +452,28 @@ def _cmd_status(args: SimpleNamespace) -> int: table.add_row("last 5 events", joined) table.add_row("events", str(cfg.events_file)) + # Issue #126 — axis breakdown. Render one row per axis (sorted for + # deterministic output) showing the issue count, plus a yellow + # "unaligned" warning iff any open ready-issue has no axis label. + if axis_filter: + table.add_row("axis filter", ", ".join(sorted(set(axis_filter)))) + if axes_view: + axis_lines = Text() + for ax in sorted(k for k in axes_view if k != UNALIGNED_BUCKET): + nums = ", ".join(f"#{i.get('number')}" for i in axes_view[ax]) + axis_lines.append(f" {ax}", style="cyan") + axis_lines.append(f" ({len(axes_view[ax])}) {nums}\n") + if UNALIGNED_BUCKET in axes_view: + unaligned_nums = ", ".join(f"#{i.get('number')}" for i in axes_view[UNALIGNED_BUCKET]) + axis_lines.append(f" {UNALIGNED_BUCKET}", style="yellow") + axis_lines.append(f" ({len(axes_view[UNALIGNED_BUCKET])}) {unaligned_nums}\n") + table.add_row("axes", axis_lines) + if unaligned_count > 0 and not axis_filter: + table.add_row( + "[yellow]warning[/yellow]", + f"[yellow]{unaligned_count} open issue(s) carry no axis:* label[/yellow]", + ) + console.print(Panel(table, title="[bold]forge-loop status[/bold]", title_align="left")) return 0 @@ -1163,18 +1229,32 @@ def cmd_run( "--queue", help="Queue backend URL. Default in-memory; sqlite:///path for durable.", ), + axis: list[str] = typer.Option( + [], + "--axis", + help=( + "Narrow dispatch to issues carrying ``axis:`` labels. " + "Repeatable; values are unioned. Omit to preserve pre-#126 " + "behaviour (no filter)." + ), + ), ) -> None: if orchestrator not in {"sync", "async"}: typer.echo(f"run: invalid --orchestrator {orchestrator!r}", err=True) raise typer.Exit(code=2) - _exit(_cmd_run(SimpleNamespace(orchestrator=orchestrator, queue=queue))) + _exit(_cmd_run(SimpleNamespace(orchestrator=orchestrator, queue=queue, axis=axis))) @app.command("status", help="Operator-facing health surface.") def cmd_status( json_: bool = typer.Option(False, "--json", help="Emit raw JSON for scripts."), + axis: list[str] = typer.Option( + [], + "--axis", + help="Narrow the axis-grouped view to these slugs (repeatable).", + ), ) -> None: - _exit(_cmd_status(SimpleNamespace(json=json_))) + _exit(_cmd_status(SimpleNamespace(json=json_, axis=axis))) @app.command("doctor", help="One-shot health check (config-independent checks still run).") diff --git a/src/forge_loop/runner/boot.py b/src/forge_loop/runner/boot.py index de50ded..9a85f67 100644 --- a/src/forge_loop/runner/boot.py +++ b/src/forge_loop/runner/boot.py @@ -240,6 +240,12 @@ def run(cfg: Config, state: RunnerState | None = None) -> int: queue = build_queue(queue_url) host_id = default_host_id() + # Issue #126 — surface the axis filter at startup so an operator + # tail'ing events can audit a focused-sprint launch. + from forge_loop.axis import parse_filter_env as _parse_axis_filter + + _axes = _parse_axis_filter() + append_event( cfg.events_file, "loop_start", @@ -251,7 +257,14 @@ def run(cfg: Config, state: RunnerState | None = None) -> int: host_id=host_id, distributed=False, queue_backend=(queue_url or "memory"), + axis_filter=_axes, ) + if _axes: + import logging as _logging + + _logging.getLogger("forge_loop.runner").info( + "axis filter active: %s", ",".join(_axes) + ) write_state(cfg.state_file, {"state": "starting", "tick": 0, "parallel": cfg.parallel}) # Issue #18 — if `.forge/pipeline.yaml` exists, validate it at startup so diff --git a/src/forge_loop/runner/tick.py b/src/forge_loop/runner/tick.py index 7b8bd83..3e7ee38 100644 --- a/src/forge_loop/runner/tick.py +++ b/src/forge_loop/runner/tick.py @@ -492,14 +492,41 @@ def _bus_emit(kind: str, payload: dict[str, Any]) -> None: _short_sleep(cfg.tick_interval_s, cfg) return + # Issue #126 — axis-aware dispatch filter. When ``LOOP_AXIS_FILTER`` + # is set (via ``forge-loop run --axis ...``), the dispatcher pulls a + # wider window of ready issues than ``cfg.parallel`` so the filter + # has something to chew on, then trims back to ``cfg.parallel`` from + # the matched subset. When the env var is empty, behaviour is + # byte-identical to today (same call, same limit). + from forge_loop.axis import filter_issues_by_axes, parse_filter_env + + axis_filter = parse_filter_env() + fetch_limit = max(cfg.parallel, 50) if axis_filter else cfg.parallel try: - issues = top_issues(cfg.labels.ready, cfg.parallel, repo=cfg.github_repo) + issues = top_issues(cfg.labels.ready, fetch_limit, repo=cfg.github_repo) except subprocess.CalledProcessError as e: append_event(cfg.events_file, "gh_list_failed", err=(e.stderr or "")[:200]) write_state(cfg.state_file, {"state": "gh_error", "tick": tick}) _short_sleep(60, cfg) return + if axis_filter: + append_event( + cfg.events_file, + "axis_filter_active", + tick=tick, + axes=axis_filter, + candidates=len(issues), + ) + issues = filter_issues_by_axes(issues, axis_filter)[: cfg.parallel] + if not issues: + append_event( + cfg.events_file, + "axis_filter_empty", + tick=tick, + axes=axis_filter, + ) + if not issues: append_event(cfg.events_file, "tick_idle", tick=tick) write_state( diff --git a/tests/test_cli_axis_filter.py b/tests/test_cli_axis_filter.py new file mode 100644 index 0000000..c5e08b0 --- /dev/null +++ b/tests/test_cli_axis_filter.py @@ -0,0 +1,252 @@ +"""Axis-label grouping + filtering for ``forge-loop status`` (issue #126). + +These tests pin the axis-aware UX: + +* ``status --json`` returns ``axes`` + ``unaligned_count`` keys. +* The Rich human surface emits a yellow ``warning`` row iff there is + at least one unaligned open ready-issue. +* Multi-axis issues appear under every bucket they qualify for, but + the count of unaligned is unique (an issue with two axes is never + counted as unaligned). +* ``status --axis dispatch`` narrows the grouped view. + +We monkeypatch ``subprocess.run`` rather than hitting GitHub, the same +way the rest of the CLI suite stubs I/O. +""" + +from __future__ import annotations + +import json +import os +import subprocess +from pathlib import Path +from types import SimpleNamespace +from typing import Any + +import pytest +from typer.testing import CliRunner + +from forge_loop import axis as _axis +from forge_loop import cli + + +# --------------------------------------------------------------------------- +# Pure axis helpers +# --------------------------------------------------------------------------- + + +def test_extract_axes_picks_lowercased_slugs_only() -> None: + labels = [ + {"name": "axis:dispatch"}, + {"name": "Axis:CLI"}, # mixed case → lowercase + {"name": "axis:"}, # empty slug → dropped + {"name": "loop:ready"}, + "axis:observability", + ] + assert _axis.extract_axes(labels) == {"dispatch", "cli", "observability"} + + +def test_matches_axes_empty_wanted_is_true() -> None: + # "no filter set" branch — callers gate on len(wanted), but the + # helper itself must not refuse to match an unfiltered label list. + assert _axis.matches_axes([{"name": "axis:foo"}], []) is True + + +def test_matches_axes_intersect() -> None: + labels = [{"name": "axis:dispatch"}] + assert _axis.matches_axes(labels, ["dispatch"]) is True + assert _axis.matches_axes(labels, ["cli"]) is False + assert _axis.matches_axes(labels, ["cli", "dispatch"]) is True + + +def test_group_by_axis_buckets_and_dedupes_unaligned() -> None: + issues = [ + {"number": 1, "labels": [{"name": "axis:dispatch"}]}, + {"number": 2, "labels": [{"name": "axis:cli"}, {"name": "axis:dispatch"}]}, + {"number": 3, "labels": [{"name": "loop:ready"}]}, # unaligned + {"number": 4, "labels": [{"name": "axis:"}]}, # malformed → unaligned + ] + buckets, unaligned = _axis.group_by_axis(issues) + + # #2 appears under BOTH dispatch and cli + assert {i["number"] for i in buckets["dispatch"]} == {1, 2} + assert {i["number"] for i in buckets["cli"]} == {2} + # #3 and #4 land in unaligned; unique count = 2 (not summed across axes) + assert {i["number"] for i in buckets["unaligned"]} == {3, 4} + assert unaligned == 2 + + +def test_filter_issues_by_axes_empty_filter_preserves_input() -> None: + issues = [{"number": 1, "labels": [{"name": "axis:dispatch"}]}] + assert _axis.filter_issues_by_axes(issues, []) is issues + + +def test_filter_issues_by_axes_union_semantics() -> None: + issues = [ + {"number": 1, "labels": [{"name": "axis:dispatch"}]}, + {"number": 2, "labels": [{"name": "axis:cli"}]}, + {"number": 3, "labels": [{"name": "axis:docs"}]}, + ] + out = _axis.filter_issues_by_axes(issues, ["dispatch", "cli"]) + assert {i["number"] for i in out} == {1, 2} + + +def test_parse_filter_env_dedupes_and_lowercases() -> None: + assert _axis.parse_filter_env("dispatch, CLI ,dispatch,, ") == ["dispatch", "cli"] + assert _axis.parse_filter_env("") == [] + assert _axis.parse_filter_env(None) == [] # reads env; unset → [] + + +# --------------------------------------------------------------------------- +# CLI surface: ``status --json`` + Rich human output +# --------------------------------------------------------------------------- + + +@pytest.fixture +def _status_cfg(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Stub ``cli.load()`` so ``_cmd_status`` runs against a temp dir.""" + + class _Labels: + ready = "loop:ready" + + cfg = SimpleNamespace( + github_repo="owner/repo", + labels=_Labels(), + state_dir=tmp_path, + state_file=tmp_path / "state.json", + events_file=tmp_path / "events.jsonl", + pid_file=tmp_path / "pid", + ) + monkeypatch.setattr(cli, "load", lambda: cfg) + monkeypatch.delenv(_axis.AXIS_FILTER_ENV, raising=False) + + +def _seed_subprocess(monkeypatch: pytest.MonkeyPatch, payload: list[dict[str, Any]]) -> None: + def fake_run(*args: Any, **kwargs: Any) -> subprocess.CompletedProcess[str]: + return subprocess.CompletedProcess( + args=args[0] if args else [], + returncode=0, + stdout=json.dumps(payload), + stderr="", + ) + + monkeypatch.setattr("forge_loop.cli.subprocess.run", fake_run) + + +def test_status_json_groups_by_axis( + _status_cfg: None, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str] +) -> None: + _seed_subprocess(monkeypatch, [ + {"number": 1, "title": "a", "labels": [{"name": "axis:dispatch"}]}, + {"number": 2, "title": "b", "labels": [{"name": "axis:cli"}, {"name": "axis:dispatch"}]}, + {"number": 3, "title": "c", "labels": [{"name": "loop:ready"}]}, + ]) + rc = cli._cmd_status(SimpleNamespace(json=True, axis=[])) + assert rc == 0 + out = json.loads(capsys.readouterr().out) + assert out["unaligned_count"] == 1 + assert {i["number"] for i in out["axes"]["dispatch"]} == {1, 2} + assert {i["number"] for i in out["axes"]["cli"]} == {2} + assert {i["number"] for i in out["axes"]["unaligned"]} == {3} + + +def test_status_human_warns_on_unaligned( + _status_cfg: None, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str] +) -> None: + _seed_subprocess(monkeypatch, [ + {"number": 7, "title": "alone", "labels": [{"name": "loop:ready"}]}, + ]) + cli._cmd_status(SimpleNamespace(json=False, axis=[])) + out = capsys.readouterr().out + assert "warning" in out + assert "no axis" in out + + +def test_status_human_no_warning_when_aligned( + _status_cfg: None, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str] +) -> None: + _seed_subprocess(monkeypatch, [ + {"number": 1, "title": "x", "labels": [{"name": "axis:dispatch"}]}, + ]) + cli._cmd_status(SimpleNamespace(json=False, axis=[])) + out = capsys.readouterr().out + assert "no axis" not in out + + +def test_status_json_axis_filter_narrows_view( + _status_cfg: None, monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str] +) -> None: + _seed_subprocess(monkeypatch, [ + {"number": 1, "title": "a", "labels": [{"name": "axis:dispatch"}]}, + {"number": 2, "title": "b", "labels": [{"name": "axis:cli"}]}, + ]) + rc = cli._cmd_status(SimpleNamespace(json=True, axis=["dispatch"])) + assert rc == 0 + out = json.loads(capsys.readouterr().out) + assert set(out["axes"].keys()) == {"dispatch"} + assert out["axis_filter"] == ["dispatch"] + + +# --------------------------------------------------------------------------- +# ``run --axis`` sets the env var so the dispatcher can read it. +# --------------------------------------------------------------------------- + + +def test_cmd_run_axis_flag_sets_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, Any] = {} + + def fake_run_loop(cfg: Any) -> int: + captured["env"] = os.environ.get(_axis.AXIS_FILTER_ENV) + return 0 + + monkeypatch.setattr(cli, "run_loop", fake_run_loop) + monkeypatch.setattr(cli, "load", lambda: SimpleNamespace()) + monkeypatch.delenv(_axis.AXIS_FILTER_ENV, raising=False) + + cli._cmd_run(SimpleNamespace(orchestrator="sync", queue=None, axis=["Dispatch", "cli"])) + assert captured["env"] == "dispatch,cli" + + +def test_cmd_run_no_axis_clears_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(_axis.AXIS_FILTER_ENV, "stale") + monkeypatch.setattr(cli, "run_loop", lambda cfg: 0) + monkeypatch.setattr(cli, "load", lambda: SimpleNamespace()) + cli._cmd_run(SimpleNamespace(orchestrator="sync", queue=None, axis=[])) + assert os.environ.get(_axis.AXIS_FILTER_ENV) is None + + +# --------------------------------------------------------------------------- +# Typer wiring: --axis is repeatable. +# --------------------------------------------------------------------------- + + +def test_cli_run_accepts_repeated_axis_flag(monkeypatch: pytest.MonkeyPatch) -> None: + runner = CliRunner() + captured: dict[str, Any] = {} + + def _stub(args: SimpleNamespace) -> int: + captured["axis"] = args.axis + return 0 + + monkeypatch.setattr(cli, "_cmd_run", _stub) + result = runner.invoke( + cli.app, ["run", "--axis", "dispatch", "--axis", "cli"] + ) + assert result.exit_code == 0 + assert captured["axis"] == ["dispatch", "cli"] + + +def test_cli_status_accepts_axis_flag(monkeypatch: pytest.MonkeyPatch) -> None: + runner = CliRunner() + captured: dict[str, Any] = {} + + def _stub(args: SimpleNamespace) -> int: + captured["axis"] = args.axis + captured["json"] = args.json + return 0 + + monkeypatch.setattr(cli, "_cmd_status", _stub) + result = runner.invoke(cli.app, ["status", "--json", "--axis", "dispatch"]) + assert result.exit_code == 0 + assert captured["axis"] == ["dispatch"] + assert captured["json"] is True diff --git a/tests/test_dispatch_axis_filter.py b/tests/test_dispatch_axis_filter.py new file mode 100644 index 0000000..9962528 --- /dev/null +++ b/tests/test_dispatch_axis_filter.py @@ -0,0 +1,133 @@ +"""Dispatch-side axis filter (issue #126). + +The dispatcher MUST honour ``LOOP_AXIS_FILTER`` when it pulls the open +ready-queue. These tests pin: + +* Issues labelled ``axis:dispatch`` are picked when the filter is + ``--axis dispatch``. +* Union semantics for multiple ``--axis`` values. +* No filter → no change vs today (regression guard). +* An ``axis:dispatch`` issue WITHOUT ``loop:ready`` is still skipped + (the ready-gate is the prior, higher-priority filter — we never + override it). +* Malformed ``axis:`` labels are treated as unaligned (skipped under + a filter, included otherwise). +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from forge_loop.axis import ( + AXIS_FILTER_ENV, + filter_issues_by_axes, + parse_filter_env, +) + + +READY = "loop:ready" + + +def _make(num: int, *labels: str) -> dict[str, Any]: + return { + "number": num, + "title": f"issue-{num}", + "labels": [{"name": name} for name in labels], + } + + +# --------------------------------------------------------------------------- +# Filtering against a synthetic ready-queue (the ``top_issues`` payload). +# --------------------------------------------------------------------------- + + +def test_single_axis_picks_only_matching_issues() -> None: + queue = [ + _make(1, READY, "axis:dispatch"), + _make(2, READY, "axis:cli"), + _make(3, READY, "axis:dispatch", "axis:cli"), + _make(4, READY), # unaligned + ] + picked = filter_issues_by_axes(queue, ["dispatch"]) + assert [i["number"] for i in picked] == [1, 3] + + +def test_multi_axis_unions() -> None: + queue = [ + _make(1, READY, "axis:dispatch"), + _make(2, READY, "axis:cli"), + _make(3, READY, "axis:docs"), + ] + picked = filter_issues_by_axes(queue, ["dispatch", "cli"]) + assert {i["number"] for i in picked} == {1, 2} + + +def test_no_filter_is_byte_identical_to_today() -> None: + """Regression guard: empty filter MUST return the input list as-is.""" + queue = [ + _make(1, READY, "axis:dispatch"), + _make(2, READY), + ] + # Identity-preserving (no copy, no reorder) keeps any callsite that + # relies on list identity (e.g. ``is``-based caches) safe. + assert filter_issues_by_axes(queue, []) is queue + + +def test_axis_match_alone_does_not_bypass_ready_gate() -> None: + """The ready-gate is upstream: `top_issues(loop:ready, …)` only + returns issues already carrying the ready label. The axis filter is + a *narrower* filter on top, never a wider one. We simulate that here + by only feeding ready-labelled issues into the filter — an issue + that has ``axis:dispatch`` but lacks ``loop:ready`` would never be + in the input list at all, and the filter is incapable of + re-introducing it.""" + queue_from_ready_gate = [ + _make(1, READY, "axis:dispatch"), + # #99 (axis:dispatch but no loop:ready) is NOT in this list — + # `top_issues` filtered it out upstream. + ] + picked = filter_issues_by_axes(queue_from_ready_gate, ["dispatch"]) + assert [i["number"] for i in picked] == [1] + + +def test_malformed_axis_label_does_not_crash_and_excludes_under_filter() -> None: + queue = [ + _make(1, READY, "axis:"), # empty slug + _make(2, READY, "axis:dispatch"), + ] + picked = filter_issues_by_axes(queue, ["dispatch"]) + # #1 (malformed) is NOT in the dispatch bucket. + assert [i["number"] for i in picked] == [2] + + +def test_mixed_case_axis_label_normalises() -> None: + queue = [_make(1, READY, "Axis:Dispatch")] + picked = filter_issues_by_axes(queue, ["dispatch"]) + assert [i["number"] for i in picked] == [1] + + +# --------------------------------------------------------------------------- +# Env-var contract: how the CLI tells the dispatcher what to filter. +# --------------------------------------------------------------------------- + + +def test_parse_filter_env_returns_empty_when_unset(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv(AXIS_FILTER_ENV, raising=False) + assert parse_filter_env() == [] + + +def test_parse_filter_env_reads_csv(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(AXIS_FILTER_ENV, "dispatch,cli") + assert parse_filter_env() == ["dispatch", "cli"] + + +def test_unknown_axis_filter_produces_empty_result_not_error() -> None: + """Sad path: ``--axis nonexistent`` against a queue with zero + matching issues is NOT an error — it's a clean idle tick. The + dispatcher logs a one-liner and moves on (see ``axis_filter_empty`` + event).""" + queue = [_make(1, READY, "axis:dispatch")] + picked = filter_issues_by_axes(queue, ["nonexistent"]) + assert picked == []