Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/forge_loop/_brainstormer_sdk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""SDK-driven brainstormer shim (issue #123).

Mirrors :mod:`forge_loop._critic_sdk`: a single synchronous entry point
that drives one Claude Agent SDK session and returns ``last_message`` +
``duration_s`` + ``error``. The brainstormer reads the final assistant
message as a JSON ``BrainstormReport`` payload.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

from forge_loop._critic_sdk import CriticSdkResult, run_critic_sdk

# Type alias for callers that want a self-documenting return name.
BrainstormerSdkResult = CriticSdkResult


def run_brainstormer_sdk(
prompt: str,
*,
cwd: Path,
timeout_s: int,
model: str | None = None,
thinking_budget: str | None = None,
allowed_mcp_servers: tuple[str, ...] | None = None,
load_timeout_ms: int | None = None,
strict_mcp_config: bool = False,
mcp_servers: dict[str, Any] | None = None,
add_dirs: tuple[Path, ...] = (),
query_fn: Any = None,
options_cls: Any = None,
) -> BrainstormerSdkResult:
"""One-shot SDK session for the brainstormer.

Shape-identical to :func:`forge_loop._critic_sdk.run_critic_sdk` so
tests can swap one for the other. Implemented as a thin pass-through
rather than a copy of the asyncio plumbing — there's exactly one
correct way to drive a single SDK session, and we already have it.
"""
return run_critic_sdk(
prompt,
cwd=cwd,
timeout_s=timeout_s,
model=model,
thinking_budget=thinking_budget,
allowed_mcp_servers=allowed_mcp_servers,
load_timeout_ms=load_timeout_ms,
strict_mcp_config=strict_mcp_config,
mcp_servers=mcp_servers,
add_dirs=add_dirs,
query_fn=query_fn,
options_cls=options_cls,
)


__all__ = ["BrainstormerSdkResult", "run_brainstormer_sdk"]
325 changes: 325 additions & 0 deletions src/forge_loop/brainstormer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
"""Brainstormer — propose axis-aligned epics + tickets from product vision.

Issue #123 (part of epic #121). Depends on ``ProductVision`` from #122.

The brainstormer is the sprint loop's *generator* of new work. Without
it, the loop drains the existing backlog and then drifts into cosmetic
tinkering. With it, every newly proposed item must:

1. Cite an axis (by exact name) declared in :class:`ProductVision`.
2. Carry a customer story tied to that axis's ``customer`` field.
3. Survive the anti-cosmetic guardrail — substring-match against the
axis's ``rejected_as_cosmetic`` list.

The SDK session does the proposing; the guardrail (this module) drops
anything that fails the rubric. The session is *not* trusted to enforce
its own refusal contract — defense in depth.
"""

from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable

from pydantic import BaseModel, ConfigDict, Field, ValidationError

from forge_loop.log import get_logger
from forge_loop.product_vision import ProductVision

__all__ = [
"BrainstormReport",
"Brainstormer",
"ProposedEpic",
"ProposedTicket",
]

_log = get_logger("forge_loop.brainstormer")


class _ProposedBase(BaseModel):
"""Common shape for proposed epics + tickets.

Extra keys ignored for forward compatibility with future SDK output
versions — we only enforce the fields the rubric cares about.
"""

model_config = ConfigDict(extra="ignore")

title: str
body: str = ""
axis: str = ""
customer_story: str = ""


class ProposedEpic(_ProposedBase):
"""An epic proposed by the brainstormer SDK session."""


class ProposedTicket(_ProposedBase):
"""A ticket proposed by the brainstormer SDK session."""


class BrainstormReport(BaseModel):
"""Filtered output — only items that survived the anti-cosmetic guardrail."""

model_config = ConfigDict(extra="ignore")

proposed_epics: list[ProposedEpic] = Field(default_factory=list)
proposed_tickets: list[ProposedTicket] = Field(default_factory=list)


# ---------------------------------------------------------------------------
# Internals: prompt rendering + filtering
# ---------------------------------------------------------------------------


def _render_axes_block(vision: ProductVision) -> str:
"""Render axes as a deterministic, prompt-friendly structured block."""
import yaml

payload = {
"axes": [
{
"name": a.name,
"customer": a.customer,
"valuable_means": a.valuable_means,
"acceptable_work": list(a.acceptable_work),
"rejected_as_cosmetic": list(a.rejected_as_cosmetic),
}
for a in vision.axes
]
}
return yaml.safe_dump(payload, sort_keys=False).strip()


def _render_backlog_block(backlog: Any) -> str:
"""Render the open backlog as a compact, prompt-friendly block.

Accepts an ``OpenBacklog`` (preferred) or any object exposing ``epics``
and ``tickets`` iterables of ``(number, title)``-shaped items. The
flexibility keeps tests free of githubkit-shape coupling.
"""
def _fmt(items: Any) -> str:
out: list[str] = []
for it in items or []:
num = getattr(it, "number", None) or (it.get("number") if isinstance(it, dict) else None)
title = getattr(it, "title", None) or (it.get("title") if isinstance(it, dict) else "")
out.append(f" - #{num}: {title}")
return "\n".join(out) if out else " (none)"

epics = getattr(backlog, "epics", []) or []
tickets = getattr(backlog, "tickets", []) or []
return f"Open epics:\n{_fmt(epics)}\n\nOpen tickets:\n{_fmt(tickets)}"


def _parse_sdk_payload(last_message: str) -> dict[str, Any]:
"""Extract the trailing JSON object from the SDK's last message.

The prompt requires the JSON object to be on the LAST line. We accept
"anywhere in the tail" defensively — find the last ``{`` that begins
a balanced object. Raises ``ValueError`` (callers re-raise as
``RuntimeError``) on malformed input.
"""
text = (last_message or "").strip()
if not text:
raise ValueError("empty SDK output")

# Try a fast path: the whole message is JSON.
try:
obj = json.loads(text)
if isinstance(obj, dict):
return obj
except json.JSONDecodeError:
pass

# Fall back: scan for the last balanced ``{...}`` substring.
depth = 0
start = -1
best: str | None = None
for i, ch in enumerate(text):
if ch == "{":
if depth == 0:
start = i
depth += 1
elif ch == "}":
depth -= 1
if depth == 0 and start >= 0:
best = text[start : i + 1]
if best is None:
raise ValueError("no JSON object found in SDK output")
try:
obj = json.loads(best)
except json.JSONDecodeError as exc:
raise ValueError(f"malformed JSON in SDK output: {exc}") from exc
if not isinstance(obj, dict):
raise ValueError("SDK output JSON is not an object")
return obj


def _filter_items(
items: list[Any],
vision: ProductVision,
*,
kind: str,
) -> list[Any]:
"""Drop items that fail the rubric. Returns the surviving list.

Drop reasons (logged at INFO):
* ``missing_axis`` — empty ``axis``
* ``missing_customer_story`` — empty ``customer_story``
* ``unknown_axis`` — ``axis`` not in ``vision.axes``
* ``cosmetic_match`` — title/body contains a ``rejected_as_cosmetic``
phrase (case-insensitive substring) for the cited axis
"""
axes_by_name = {a.name: a for a in vision.axes}
survivors: list[Any] = []
for it in items:
title = it.title
axis_name = (it.axis or "").strip()
story = (it.customer_story or "").strip()
if not axis_name:
_log.info("brainstormer_dropped", kind=kind, title=title, reason="missing_axis")
continue
if not story:
_log.info(
"brainstormer_dropped", kind=kind, title=title, reason="missing_customer_story"
)
continue
axis = axes_by_name.get(axis_name)
if axis is None:
_log.info(
"brainstormer_dropped",
kind=kind,
title=title,
axis=axis_name,
reason="unknown_axis",
)
continue
haystack = f"{title}\n{it.body}".lower()
cosmetic_hit = next(
(
phrase
for phrase in axis.rejected_as_cosmetic
if phrase and phrase.lower() in haystack
),
None,
)
if cosmetic_hit is not None:
_log.info(
"brainstormer_dropped",
kind=kind,
title=title,
axis=axis_name,
phrase=cosmetic_hit,
reason="cosmetic_match",
)
continue
survivors.append(it)
return survivors


# ---------------------------------------------------------------------------
# Public class
# ---------------------------------------------------------------------------


@dataclass
class Brainstormer:
"""Drive one brainstormer SDK session and return a filtered report.

Args:
repo_path: Working directory the SDK session runs in.
owner / repo: GitHub coordinates for the open-backlog scan. If
either is empty, the backlog block renders as "(none)" — the
session still runs but without backlog context.
gh_client: Injected :class:`forge_loop.gh_client.GhClient`. If
None, a real :class:`GithubkitClient` is constructed lazily.
sdk_fn: Injection point for ``run_brainstormer_sdk`` (tests stub
this to avoid network + SDK install).
timeout_s: Hard cap on the SDK session.
"""

repo_path: Path = Path(".")
owner: str = ""
repo: str = ""
gh_client: Any = None
sdk_fn: Callable[..., Any] | None = None
timeout_s: int = 300
model: str | None = None

def run(self, vision: ProductVision) -> BrainstormReport:
"""Entry point — see module docstring."""
if vision is None or not getattr(vision, "axes", None):
raise ValueError(
"brainstormer requires a non-empty ProductVision with at least one axis"
)

# 1. Build the prompt.
backlog = self._scan_backlog()
prompt = self._render_prompt(vision, backlog)

# 2. Drive the SDK session.
sdk_fn = self.sdk_fn or self._default_sdk_fn()
result = sdk_fn(prompt, cwd=self.repo_path, timeout_s=self.timeout_s, model=self.model)
err = getattr(result, "error", None)
timed_out = getattr(result, "timed_out", False)
if timed_out or err == "timeout":
raise RuntimeError("brainstormer SDK session timed out — no report produced")
if err:
raise RuntimeError(f"brainstormer SDK session failed: {err}")

last_message = getattr(result, "last_message", "") or ""

# 3. Parse + validate the payload.
try:
payload = _parse_sdk_payload(last_message)
raw = BrainstormReport.model_validate(payload)
except (ValueError, ValidationError) as exc:
raise RuntimeError(
f"brainstormer SDK returned malformed output ({exc}); "
f"last_message={last_message!r}"
) from exc

# 4. Apply the anti-cosmetic guardrail.
return BrainstormReport(
proposed_epics=_filter_items(list(raw.proposed_epics), vision, kind="epic"),
proposed_tickets=_filter_items(list(raw.proposed_tickets), vision, kind="ticket"),
)

# -- helpers --------------------------------------------------------

def _scan_backlog(self) -> Any:
"""Return open backlog via gh_client (or an empty stand-in)."""
from forge_loop.gh_client import OpenBacklog, list_open_backlog

if not self.owner or not self.repo:
return OpenBacklog()
client = self.gh_client
if client is None:
try:
from forge_loop.gh_client import GithubkitClient

client = GithubkitClient()
except Exception: # noqa: BLE001 — boundary; degrade gracefully
return OpenBacklog()
try:
return list_open_backlog(client, self.owner, self.repo)
except Exception: # noqa: BLE001 — boundary; degrade gracefully
return OpenBacklog()

def _render_prompt(self, vision: ProductVision, backlog: Any) -> str:
from forge_loop.briefs import render_brief

return render_brief(
"brainstormer",
vision_markdown=vision.vision_markdown,
axes_block=_render_axes_block(vision),
backlog_block=_render_backlog_block(backlog),
)

def _default_sdk_fn(self) -> Callable[..., Any]:
from forge_loop._brainstormer_sdk import run_brainstormer_sdk

return run_brainstormer_sdk
3 changes: 2 additions & 1 deletion src/forge_loop/briefs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
from pathlib import Path
from typing import Any

KINDS = ("worker", "po", "critic")
KINDS = ("worker", "po", "critic", "brainstormer")

_ENV_OVERRIDES = {
"worker": "LOOP_WORKER_BRIEF",
"po": "LOOP_PO_BRIEF",
"critic": "LOOP_CRITIC_BRIEF",
"brainstormer": "LOOP_BRAINSTORMER_BRIEF",
}


Expand Down
Loading
Loading