From a55cac87121a44510b6402a1d5a965f869326bbf Mon Sep 17 00:00:00 2001 From: jsdevninja Date: Fri, 3 Jul 2026 01:15:51 -0500 Subject: [PATCH] =?UTF-8?q?feat(review):=20kb.triage=5Fpending=20=E2=80=94?= =?UTF-8?q?=20advisory=20triage=20scoring=20for=20the=20pending=20queue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit a long `kb.list_pending` forces the reviewer to reconstruct, per proposal, whether the claim fits the existing kb, whether its citations resolve, whether it duplicates something already filed, and whether it contradicts an approved claim. this adds an optional triage pass that scores each pending proposal on those four signals and attaches a `_meta.vouch_triage` block (recommendation/score/signals/rationale) to help a reviewer prioritize, without ever deciding anything itself. read-only by construction: the pass never calls proposals.approve/reject, store.put_*, or store.move_proposal_to_decided — a human still calls kb.approve/kb.reject. citation_quality reuses proposals._payload_block_reason; duplication_risk reuses the propose-time embedding similarity path (embeddings.similarity.find_similar_on_propose) and degrades to a difflib heuristic when the embeddings extra isn't installed. fit uses a separate, lower-threshold embedding search so a near-duplicate hit doesn't also inflate fit and cancel out its own duplication penalty. opt-in via `triage.enabled: true` in config.yaml (default false). registered at all four kb.* surface sites (server.py, jsonl_server.py, capabilities.py, cli.py) plus `vouch triage [proposal-id...]` with `--json` and `--reverse`. --- CHANGELOG.md | 13 + src/vouch/capabilities.py | 1 + src/vouch/cli.py | 46 ++++ src/vouch/jsonl_server.py | 7 + src/vouch/server.py | 17 ++ src/vouch/triage.py | 503 ++++++++++++++++++++++++++++++++++++++ tests/test_triage.py | 433 ++++++++++++++++++++++++++++++++ 7 files changed, 1020 insertions(+) create mode 100644 src/vouch/triage.py create mode 100644 tests/test_triage.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c5f3196..38b48a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,19 @@ All notable changes to vouch are documented here. Format follows ## [Unreleased] +### Added +- `kb.triage_pending` — advisory triage scoring over the pending-review queue. + Scores each pending proposal on fit, citation quality, duplication risk, and + contradiction risk, then attaches a `_meta.vouch_triage` block + (`recommendation`, `score`, `signals`, `rationale`) to help a reviewer + prioritize a long `kb.list_pending`. Read-only and advisory only: it never + calls `kb.approve` / `kb.reject` and never moves a proposal out of pending — + a human still decides. Duplication and fit reuse the propose-time embedding + similarity path and degrade to a `difflib` heuristic when the `[embeddings]` + extra isn't installed. Opt-in via `triage.enabled: true` in `config.yaml`. + `vouch triage [proposal-id...]` mirrors it on the CLI with `--json` and + `--reverse` (#322). + ## [1.1.0] — 2026-07-03 ### Added diff --git a/src/vouch/capabilities.py b/src/vouch/capabilities.py index 2efc39a..7f06c1e 100644 --- a/src/vouch/capabilities.py +++ b/src/vouch/capabilities.py @@ -32,6 +32,7 @@ "kb.list_relations", "kb.list_sources", "kb.list_pending", + "kb.triage_pending", "kb.register_source", "kb.register_source_from_path", "kb.propose_claim", diff --git a/src/vouch/cli.py b/src/vouch/cli.py index f8b4a0c..a6cbc0f 100644 --- a/src/vouch/cli.py +++ b/src/vouch/cli.py @@ -784,6 +784,52 @@ def show(proposal_id: str) -> None: click.echo(yaml.safe_dump(pr.model_dump(mode="json"), sort_keys=False)) +@cli.command() +@click.argument("proposal_ids", nargs=-1) +@click.option( + "--json", "as_json", is_flag=True, + help="Emit machine-readable _meta.vouch_triage blocks.", +) +@click.option( + "--reverse", is_flag=True, + help="Ascending order (worst-first) instead of the default descending (best-first).", +) +def triage(proposal_ids: tuple[str, ...], as_json: bool, reverse: bool) -> None: + """Advisory triage scoring over pending proposals (opt-in: triage.enabled). + + Scores each proposal on fit, citation quality, duplication risk, and + contradiction risk, then prints a ranked table. Never approves or + rejects — a human still decides via `vouch approve` / `vouch reject`. + """ + from . import triage as triage_mod + + store = _load_store() + with _cli_errors(): + results = triage_mod.triage_pending(store, proposal_ids=list(proposal_ids) or None) + results.sort(key=lambda r: r["_meta"]["vouch_triage"]["score"], reverse=not reverse) + + if as_json: + _emit_json(results) + return + if not results: + click.echo("no pending proposals to triage") + return + for r in results: + block = r["_meta"]["vouch_triage"] + preview = ( + r["payload"].get("text") + or r["payload"].get("title") + or r["payload"].get("name") + or r["payload"].get("id") + or "-" + ) + click.echo( + f"{block['score']:.2f} [{block['recommendation']:>11}] " + f"{r['id']} [{r['kind']}] {str(preview).strip()[:80]}" + ) + click.echo(f" {block['rationale']}") + + @cli.command() @click.argument("proposal_ids", nargs=-1, required=True) @click.option("--reason", default=None) diff --git a/src/vouch/jsonl_server.py b/src/vouch/jsonl_server.py index 5f42e16..f13f8b0 100644 --- a/src/vouch/jsonl_server.py +++ b/src/vouch/jsonl_server.py @@ -273,6 +273,12 @@ def _h_list_pending(_: dict) -> list[dict]: ] +def _h_triage_pending(p: dict) -> list[dict]: + from . import triage as triage_mod + + return triage_mod.triage_pending(_store(), proposal_ids=p.get("proposal_ids")) + + def _h_register_source(p: dict) -> dict: s = _store() src = s.put_source( @@ -691,6 +697,7 @@ def _h_propose_theme(p: dict) -> dict: "kb.list_relations": _h_list_relations, "kb.list_sources": _h_list_sources, "kb.list_pending": _h_list_pending, + "kb.triage_pending": _h_triage_pending, "kb.register_source": _h_register_source, "kb.register_source_from_path": _h_register_source_from_path, "kb.propose_claim": _h_propose_claim, diff --git a/src/vouch/server.py b/src/vouch/server.py index 81fa5c2..9c0867f 100644 --- a/src/vouch/server.py +++ b/src/vouch/server.py @@ -336,6 +336,23 @@ def kb_list_pending() -> list[dict[str, Any]]: ] +@mcp.tool() +def kb_triage_pending(proposal_ids: list[str] | None = None) -> list[dict[str, Any]]: + """Advisory triage scoring over the pending-review queue. + + Attaches `_meta.vouch_triage` (recommendation/score/signals/rationale) + to each pending proposal's view. Read-only — never approves, rejects, + or otherwise decides; a human still calls `kb_approve` / `kb_reject`. + Opt-in: disabled unless `triage.enabled: true` is set in config.yaml. + """ + from . import triage as triage_mod + + try: + return triage_mod.triage_pending(_store(), proposal_ids=proposal_ids) + except (ValueError, ArtifactNotFoundError) as e: + raise ValueError(str(e)) from e + + # === write tools — gated (produce proposals) ============================= diff --git a/src/vouch/triage.py b/src/vouch/triage.py new file mode 100644 index 0000000..31202e4 --- /dev/null +++ b/src/vouch/triage.py @@ -0,0 +1,503 @@ +"""Advisory triage scoring for the pending-review queue (issue #322). + +Read-only. Scores each pending proposal on four signals — fit, citation +quality, duplication risk, and contradiction risk — and folds them into a +composite ``score`` plus an advisory ``recommendation``. The result is +attached as ``_meta.vouch_triage`` on the proposal's own ``model_dump``. + +This never decides anything: no call here ever reaches +``proposals.approve``, ``proposals.reject``, ``store.put_*``, or +``store.move_proposal_to_decided``. A human still calls ``kb.approve`` / +``kb.reject``; ``recommendation`` is a hint the reviewer may ignore. + +Opt-in: disabled unless ``triage.enabled: true`` is set in +``.vouch/config.yaml`` (mirrors the defensive yaml-read pattern in +``salience.reflex_cfg`` / ``embeddings.similarity.similarity_threshold`` — +no pydantic Config model yet, see issue #243). + +Duplication risk reuses the embedding path already built for propose-time +warnings (``embeddings.similarity.find_similar_on_propose``); fit uses the +same underlying primitive (``index_db.search_embedding``) at a lower +threshold band so a near-duplicate hit doesn't also inflate fit and cancel +out its own duplication penalty (see ``_topical_fit_scores``). When no +embedder is registered (base install, no ``[embeddings]`` extra), both +signals fall back to a ``difflib`` text-similarity heuristic so the method +still returns a full block. +""" + +from __future__ import annotations + +import difflib +import re +from dataclasses import dataclass +from typing import Any + +import yaml + +from .models import Proposal, ProposalKind, ProposalStatus +from .proposals import _payload_block_reason +from .storage import ArtifactNotFoundError, KBStore + +DEFAULT_WEIGHTS: dict[str, float] = { + "fit": 0.3, + "citation_quality": 0.3, + "duplication_risk": 0.2, + "contradiction_risk": 0.2, +} + +_APPROVE_THRESHOLD = 0.7 +_REJECT_THRESHOLD = 0.35 +_FUZZY_MATCH_FLOOR = 0.3 +_CONTRADICTION_CANDIDATE_FLOOR = 0.35 + +_NEGATION_MARKERS = frozenset({ + "not", "no", "never", "cannot", "isnt", "doesnt", "wont", "wasnt", + "arent", "dont", "didnt", "hasnt", "havent", "without", "neither", "nor", +}) + + +class TriageError(ValueError): + """Raised when `kb.triage_pending` is invoked while disabled, or misused.""" + + +@dataclass(frozen=True) +class TriageConfig: + enabled: bool + backend: str + weights: dict[str, float] + + +def triage_cfg(store: KBStore) -> TriageConfig: + """Read `triage.*` from config.yaml defensively. Default: disabled.""" + cfg: dict[str, Any] = {} + try: + loaded = yaml.safe_load(store.config_path.read_text(encoding="utf-8")) + if isinstance(loaded, dict): + cfg = loaded + except Exception: + pass + + triage = cfg.get("triage") + triage = triage if isinstance(triage, dict) else {} + + enabled = triage.get("enabled", False) + enabled = bool(enabled) if isinstance(enabled, bool) else False + + backend = triage.get("backend", "embeddings") + backend = backend if isinstance(backend, str) else "embeddings" + + weights = dict(DEFAULT_WEIGHTS) + weights_cfg = triage.get("weights") + if isinstance(weights_cfg, dict): + for key in weights: + value = weights_cfg.get(key) + if isinstance(value, int | float) and not isinstance(value, bool): + weights[key] = float(value) + + return TriageConfig(enabled=enabled, backend=backend, weights=weights) + + +# --- shared helpers --------------------------------------------------------- + + +def _referenced_entity_ids(proposal: Proposal) -> list[str]: + if proposal.kind in (ProposalKind.CLAIM, ProposalKind.PAGE): + return list(proposal.payload.get("entities") or []) + return [] + + +def _has_negation(text: str) -> bool: + tokens = set(re.findall(r"[a-z']+", text.casefold())) + tokens = {t.replace("'", "") for t in tokens} + return bool(tokens & _NEGATION_MARKERS) + + +def _safe_embedder() -> Any | None: + try: + from .embeddings import get_embedder + + return get_embedder() + except Exception: + return None + + +def _best_fuzzy_match( + text: str, pool: list[tuple[str, str]], +) -> tuple[str | None, float]: + needle = text.casefold() + best_id: str | None = None + best_ratio = 0.0 + for cid, candidate in pool: + candidate = (candidate or "").strip() + if not candidate: + continue + ratio = difflib.SequenceMatcher(None, needle, candidate.casefold()).ratio() + if ratio > best_ratio: + best_ratio, best_id = ratio, cid + if best_id is None or best_ratio < _FUZZY_MATCH_FLOOR: + return None, 0.0 + return best_id, best_ratio + + +def _claim_text_pool( + store: KBStore, *, exclude_proposal_id: str, exclude_claim_id: str | None, +) -> list[tuple[str, str]]: + pool = [ + (c.id, c.text) for c in store.list_claims() if c.id != exclude_claim_id + ] + pool += [ + (p.id, str(p.payload.get("text", ""))) + for p in store.list_proposals(ProposalStatus.PENDING) + if p.kind == ProposalKind.CLAIM and p.id != exclude_proposal_id + ] + return pool + + +def _embedding_hits_for_claim( + store: KBStore, proposal: Proposal, *, use_embeddings: bool, +) -> list[dict[str, Any]] | None: + """`find_similar_on_propose` hits, or None when the embedding path can't run. + + None means "no embedder available (or backend forced to heuristic)" — + callers fall back to a difflib heuristic. `[]` means the embedder ran + and genuinely found nothing similar. Every hit returned is, by that + function's own contract, at or above the near-duplicate threshold — + it's a duplicate detector, not a general similarity search. Used by + `duplication_risk` and `contradiction_risk`; deliberately NOT reused + for `fit` (see `_topical_fit_scores`). + """ + if not use_embeddings or proposal.kind != ProposalKind.CLAIM: + return None + text = str(proposal.payload.get("text", "")).strip() + if not text or _safe_embedder() is None: + return None + try: + from .embeddings.similarity import find_similar_on_propose + except ImportError: + return None + return find_similar_on_propose( + store, text, exclude_claim_id=proposal.payload.get("id"), + ) + + +def _topical_fit_scores(store: KBStore, proposal: Proposal, embedder: Any) -> list[float]: + """Cosine scores against the approved corpus, below the duplicate band. + + A near-duplicate hit is already penalized by `duplication_risk`; letting + it also inflate `fit` would let the two signals cancel each other out + for the exact-duplicate case. So this looks at a lower, wider band + (`min_score=0.3`) and excludes anything at or above the near-duplicate + threshold (`review.similarity_threshold`, default 0.95). + """ + text = str(proposal.payload.get("text", "")).strip() + if not text: + return [] + try: + from . import index_db + from .embeddings.similarity import similarity_threshold + + vec = embedder.encode(text) + dup_threshold = similarity_threshold(store) + hits = index_db.search_embedding( + store.kb_dir, query_vec=vec, kinds=("claim", "page"), limit=5, min_score=0.3, + ) + except Exception: + return [] + exclude_id = proposal.payload.get("id") + return [ + float(cos) for _kind, cid, _snip, cos in hits + if cid != exclude_id and cos < dup_threshold + ] + + +# --- signals ----------------------------------------------------------------- + + +def _signal_citation_quality(store: KBStore, proposal: Proposal) -> dict[str, Any]: + block = _payload_block_reason(store, proposal) + if block: + return {"score": 0.0, "reason": block} + if proposal.kind in (ProposalKind.CLAIM, ProposalKind.RELATION): + n = len(proposal.payload.get("evidence") or []) + if n == 0: + # Relations may legitimately have no evidence; claims can't reach + # here with n == 0 (Claim._at_least_one_citation already blocked). + return { + "score": 0.6, + "reason": "relation has no evidence citation (allowed, but weaker)", + } + score = min(1.0, 0.7 + 0.15 * (n - 1)) + return {"score": round(score, 4), "reason": f"{n} evidence citation(s) resolve cleanly"} + if proposal.kind == ProposalKind.PAGE: + sources = proposal.payload.get("sources") or [] + claims = proposal.payload.get("claims") or [] + if not sources and not claims: + return {"score": 0.5, "reason": "page has no source/claim citations"} + return { + "score": 1.0, + "reason": f"{len(sources)} source(s), {len(claims)} claim(s) resolve cleanly", + } + return {"score": 1.0, "reason": "entity payload resolves cleanly"} + + +def _signal_fit( + store: KBStore, proposal: Proposal, embedder: Any | None, +) -> dict[str, Any]: + entity_ids = _referenced_entity_ids(proposal) + known = {e.id for e in store.list_entities()} + overlap: float | None = None + if entity_ids: + overlap = sum(1 for e in entity_ids if e in known) / len(entity_ids) + + topical: float | None = None + if embedder is not None and proposal.kind == ProposalKind.CLAIM: + scores = _topical_fit_scores(store, proposal, embedder) + if scores: + topical = sum(scores) / len(scores) + + parts = [v for v in (overlap, topical) if v is not None] + if not parts: + return { + "score": 0.5, + "reason": "no referenced entities or approved-corpus signal; neutral fit", + } + bits = [] + if overlap is not None: + bits.append(f"{overlap:.0%} of referenced entities already known") + if topical is not None: + bits.append(f"mean topical similarity to approved corpus {topical:.2f}") + return {"score": round(sum(parts) / len(parts), 4), "reason": "; ".join(bits)} + + +def _duplication_risk_structural(store: KBStore, proposal: Proposal) -> dict[str, Any]: + if proposal.kind == ProposalKind.RELATION: + triple = ( + proposal.payload.get("source"), + proposal.payload.get("relation"), + proposal.payload.get("target"), + ) + for r in store.list_relations(): + if (r.source, r.relation.value, r.target) == triple: + return {"score": 1.0, "reason": f"identical relation already approved: {r.id}"} + for p in store.list_proposals(ProposalStatus.PENDING): + if p.kind != ProposalKind.RELATION or p.id == proposal.id: + continue + other = (p.payload.get("source"), p.payload.get("relation"), p.payload.get("target")) + if other == triple: + return {"score": 1.0, "reason": f"identical relation already pending: {p.id}"} + return {"score": 0.0, "reason": "no identical relation found"} + + if proposal.kind == ProposalKind.ENTITY: + name = str(proposal.payload.get("name", "")).strip() + pool = [(e.id, e.name) for e in store.list_entities()] + pool += [ + (p.id, str(p.payload.get("name", ""))) + for p in store.list_proposals(ProposalStatus.PENDING) + if p.kind == ProposalKind.ENTITY and p.id != proposal.id + ] + else: # PAGE + name = str(proposal.payload.get("title", "")).strip() + pool = [(pg.id, pg.title) for pg in store.list_pages()] + pool += [ + (p.id, str(p.payload.get("title", ""))) + for p in store.list_proposals(ProposalStatus.PENDING) + if p.kind == ProposalKind.PAGE and p.id != proposal.id + ] + + if not name: + return {"score": 0.0, "reason": "no name/title to compare"} + best_id, best_ratio = _best_fuzzy_match(name, pool) + if best_id is None: + return {"score": 0.0, "reason": "no similarly-named artifact found (heuristic backend)"} + return { + "score": round(best_ratio, 4), + "reason": f"name similarity {best_ratio:.2f} vs {best_id} (heuristic backend)", + } + + +def _signal_duplication_risk( + store: KBStore, proposal: Proposal, hits: list[dict[str, Any]] | None, +) -> dict[str, Any]: + if proposal.kind != ProposalKind.CLAIM: + return _duplication_risk_structural(store, proposal) + + text = str(proposal.payload.get("text", "")).strip() + if not text: + return {"score": 0.0, "reason": "no claim text to compare"} + + if hits is None: + pool = _claim_text_pool( + store, exclude_proposal_id=proposal.id, + exclude_claim_id=proposal.payload.get("id"), + ) + best_id, best_ratio = _best_fuzzy_match(text, pool) + if best_id is None: + return {"score": 0.0, "reason": "no near-duplicate claims found (heuristic backend)"} + return { + "score": round(best_ratio, 4), + "reason": f"text similarity {best_ratio:.2f} vs {best_id} (heuristic backend)", + } + + if not hits: + return {"score": 0.0, "reason": "no near-duplicate claims found (embedding backend)"} + top = max(hits, key=lambda w: w["cosine"]) + return { + "score": round(float(top["cosine"]), 4), + "reason": ( + f"cosine {top['cosine']:.2f} vs {top['artifact_kind']} " + f"{top['artifact_id']} (embedding backend)" + ), + } + + +def _signal_contradiction_risk( + store: KBStore, proposal: Proposal, hits: list[dict[str, Any]] | None, +) -> dict[str, Any]: + if proposal.kind != ProposalKind.CLAIM: + return {"score": 0.0, "reason": "contradiction risk is only assessed for claim proposals"} + + text = str(proposal.payload.get("text", "")).strip() + if not text: + return {"score": 0.0, "reason": "no claim text to compare"} + + entity_ids = set(proposal.payload.get("entities") or []) + neg = _has_negation(text) + + if hits is not None: + backend = "embedding" + candidates = [ + (h["artifact_id"], float(h["cosine"])) + for h in hits + if h.get("artifact_kind") == "claim" + ] + else: + backend = "heuristic" + pool = _claim_text_pool( + store, exclude_proposal_id=proposal.id, + exclude_claim_id=proposal.payload.get("id"), + ) + candidates = [ + (cid, ratio) + for cid, ratio in ( + (cid, difflib.SequenceMatcher(None, text.casefold(), ctext.casefold()).ratio()) + for cid, ctext in pool + ) + if ratio >= _CONTRADICTION_CANDIDATE_FLOOR + ] + + if not candidates: + return {"score": 0.0, "reason": f"no topically related claims found ({backend} backend)"} + + conflicts: list[tuple[str, float]] = [] + for cid, sim in candidates: + try: + claim = store.get_claim(cid) + except ArtifactNotFoundError: + continue # candidate is a pending proposal, not yet an approved claim + if entity_ids & set(claim.entities) and _has_negation(claim.text) != neg: + conflicts.append((cid, sim)) + + if not conflicts: + return { + "score": 0.0, + "reason": ( + f"{len(candidates)} related claim(s), " + f"no polarity conflict ({backend} backend)" + ), + } + top_id, top_sim = max(conflicts, key=lambda c: c[1]) + score = round(min(1.0, 0.5 + top_sim / 2), 4) + return { + "score": score, + "reason": ( + f"possible polarity conflict with {top_id} " + f"(similarity {top_sim:.2f}, {backend} backend)" + ), + } + + +# --- composite --------------------------------------------------------------- + + +def _composite_score(signals: dict[str, dict[str, Any]], weights: dict[str, float]) -> float: + goodness = { + "fit": signals["fit"]["score"], + "citation_quality": signals["citation_quality"]["score"], + "duplication_risk": 1.0 - signals["duplication_risk"]["score"], + "contradiction_risk": 1.0 - signals["contradiction_risk"]["score"], + } + total_weight = sum(weights.get(k, 0.0) for k in goodness) or 1.0 + raw = sum(goodness[k] * weights.get(k, 0.0) for k in goodness) / total_weight + return round(min(1.0, max(0.0, raw)), 4) + + +def _recommendation(score: float, signals: dict[str, dict[str, Any]]) -> str: + if signals["citation_quality"]["score"] == 0.0: + # A blocked payload can't be approved as-is (approve() would raise) — + # no composite score should be able to override that. + return "reject" + if score >= _APPROVE_THRESHOLD: + return "approve" + if score <= _REJECT_THRESHOLD: + return "reject" + return "needs-human" + + +def _rationale(recommendation: str, score: float, signals: dict[str, dict[str, Any]]) -> str: + parts = "; ".join(f"{name}: {sig['reason']}" for name, sig in signals.items()) + return f"{recommendation} (score {score:.2f}) — {parts}" + + +def score_proposal( + store: KBStore, proposal: Proposal, *, + weights: dict[str, float] | None = None, + use_embeddings: bool = True, +) -> dict[str, Any]: + """Compute the `_meta.vouch_triage` block for one pending proposal.""" + weights = weights or DEFAULT_WEIGHTS + hits = _embedding_hits_for_claim(store, proposal, use_embeddings=use_embeddings) + embedder = _safe_embedder() if use_embeddings else None + signals = { + "fit": _signal_fit(store, proposal, embedder), + "citation_quality": _signal_citation_quality(store, proposal), + "duplication_risk": _signal_duplication_risk(store, proposal, hits), + "contradiction_risk": _signal_contradiction_risk(store, proposal, hits), + } + score = _composite_score(signals, weights) + recommendation = _recommendation(score, signals) + return { + "recommendation": recommendation, + "score": score, + "signals": signals, + "rationale": _rationale(recommendation, score, signals), + } + + +def triage_pending( + store: KBStore, proposal_ids: list[str] | None = None, +) -> list[dict[str, Any]]: + """Score pending proposals (default: all of them) — read-only, advisory. + + Raises TriageError if `triage.enabled` isn't `true` in config.yaml. + """ + cfg = triage_cfg(store) + if not cfg.enabled: + raise TriageError( + "triage is disabled; set triage.enabled: true in .vouch/config.yaml to opt in" + ) + use_embeddings = cfg.backend != "heuristic" + + if proposal_ids: + proposals = [store.get_proposal(pid) for pid in proposal_ids] + proposals = [p for p in proposals if p.status == ProposalStatus.PENDING] + else: + proposals = store.list_proposals(ProposalStatus.PENDING) + + out: list[dict[str, Any]] = [] + for p in proposals: + result = p.model_dump(mode="json") + result.setdefault("_meta", {})["vouch_triage"] = score_proposal( + store, p, weights=cfg.weights, use_embeddings=use_embeddings, + ) + out.append(result) + return out diff --git a/tests/test_triage.py b/tests/test_triage.py new file mode 100644 index 0000000..ed46b8c --- /dev/null +++ b/tests/test_triage.py @@ -0,0 +1,433 @@ +"""Advisory triage scoring over the pending-review queue — issue #322.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +import yaml +from click.testing import CliRunner + +from vouch import triage +from vouch.cli import cli +from vouch.jsonl_server import HANDLERS, handle_request +from vouch.models import Claim, Entity, EntityType, Proposal, ProposalKind, ProposalStatus +from vouch.proposals import propose_claim, propose_entity +from vouch.storage import KBStore + +SIGNAL_NAMES = {"fit", "citation_quality", "duplication_risk", "contradiction_risk"} + + +@pytest.fixture +def store(tmp_path: Path) -> KBStore: + return KBStore.init(tmp_path) + + +def _enable_triage(store: KBStore, **overrides: object) -> None: + cfg = {"triage": {"enabled": True, **overrides}} + store.config_path.write_text(yaml.safe_dump(cfg), encoding="utf-8") + + +def _no_embedder(monkeypatch: pytest.MonkeyPatch) -> None: + def _raise(name: str | None = None) -> None: + raise KeyError("no embedder registered") + + monkeypatch.setattr("vouch.embeddings.get_embedder", _raise) + + +def _assert_block_shape(block: dict) -> None: + assert set(block) == {"recommendation", "score", "signals", "rationale"} + assert block["recommendation"] in {"approve", "reject", "needs-human"} + assert 0.0 <= block["score"] <= 1.0 + assert set(block["signals"]) == SIGNAL_NAMES + for sig in block["signals"].values(): + assert 0.0 <= sig["score"] <= 1.0 + assert isinstance(sig["reason"], str) and sig["reason"] + assert isinstance(block["rationale"], str) and block["rationale"] + + +# --- opt-in gate ------------------------------------------------------------- + + +def test_disabled_by_default_raises(store: KBStore) -> None: + with pytest.raises(triage.TriageError, match="disabled"): + triage.triage_pending(store) + + +def test_enabled_scores_pending_proposals(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="vouch requires citations", evidence=[src.id], proposed_by="agent") + results = triage.triage_pending(store) + assert len(results) == 1 + + +# --- output shape -------------------------------------------------------------- + + +def test_triage_block_shape(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="vouch requires citations", evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + assert result["kind"] == "claim" + _assert_block_shape(result["_meta"]["vouch_triage"]) + + +# --- no-write invariant --------------------------------------------------------- + + +def test_never_mutates_pending_queue(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + p1 = propose_claim(store, text="a claim", evidence=[src.id], proposed_by="agent").id + p2 = propose_entity(store, name="widget", entity_type="concept", proposed_by="agent").id + + before = {p.id for p in store.list_proposals(ProposalStatus.PENDING)} + triage.triage_pending(store) + after = {p.id for p in store.list_proposals(ProposalStatus.PENDING)} + + assert before == after == {p1, p2} + assert store.list_proposals(ProposalStatus.APPROVED) == [] + assert store.list_proposals(ProposalStatus.REJECTED) == [] + assert store.list_claims() == [] + assert store.list_entities() == [] + + +# --- citation_quality: reuses proposals._payload_block_reason ----------------- + + +def test_citation_quality_flags_dangling_ref_and_forces_reject( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + # Bypass propose_claim's own ref validation (store.put_proposal is raw + # I/O) to simulate a dangling reference slipping into the queue — + # the same shape proposals._payload_block_reason guards at approve time. + bad = Proposal( + id="bad-1", kind=ProposalKind.CLAIM, proposed_by="agent", + payload={ + "id": "c-bad", "text": "x", "type": "observation", "confidence": 0.7, + "evidence": ["missing-source"], "entities": [], "tags": [], + }, + ) + store.put_proposal(bad) + [result] = triage.triage_pending(store) + block = result["_meta"]["vouch_triage"] + assert block["signals"]["citation_quality"]["score"] == 0.0 + assert "missing-source" in block["signals"]["citation_quality"]["reason"] + assert block["recommendation"] == "reject" + + +def test_citation_quality_scores_clean_claim_positively( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a well cited claim", evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + assert result["_meta"]["vouch_triage"]["signals"]["citation_quality"]["score"] > 0.0 + + +# --- duplication_risk: heuristic fallback (default in this dev env) ---------- + + +def test_duplication_risk_heuristic_fallback_flags_near_duplicate( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + text = "auth uses jwts in the authorization header for every request" + store.put_claim(Claim(id="c1", text=text, evidence=[src.id])) + propose_claim(store, text=text, evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert dup["score"] > 0.9 + assert "heuristic backend" in dup["reason"] + assert "c1" in dup["reason"] + + +def test_duplication_risk_heuristic_no_match_for_unrelated_text( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + store.put_claim(Claim(id="c1", text="apples and oranges", evidence=[src.id])) + propose_claim( + store, text="zebras run fast in the savanna", evidence=[src.id], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert dup["score"] == 0.0 + assert "heuristic backend" in dup["reason"] + + +def test_duplication_risk_relation_exact_match( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + from vouch.models import Relation + + store.put_entity(Entity(id="a", name="A", type=EntityType.CONCEPT)) + store.put_entity(Entity(id="b", name="B", type=EntityType.CONCEPT)) + store.put_relation( + Relation(id="a--relates_to--b", source="a", relation="relates_to", target="b") + ) + dup_proposal = Proposal( + id="rel-1", kind=ProposalKind.RELATION, proposed_by="agent", + payload={ + "id": "a--relates_to--b-2", "source": "a", "relation": "relates_to", + "target": "b", "confidence": 0.7, "evidence": [], + }, + ) + store.put_proposal(dup_proposal) + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert dup["score"] == 1.0 + assert "already approved" in dup["reason"] + + +# --- fit: entity-overlap heuristic (no embeddings needed) --------------------- + + +def test_fit_scores_high_when_entities_already_known( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + store.put_entity(Entity(id="jwt", name="JWT", type=EntityType.CONCEPT)) + src = store.put_source(b"evidence") + propose_claim( + store, text="jwt tokens expire after an hour", evidence=[src.id], + entities=["jwt"], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + fit = result["_meta"]["vouch_triage"]["signals"]["fit"] + assert fit["score"] == 1.0 + + +def test_fit_neutral_when_no_entities_referenced( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="an unrelated observation", evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + fit = result["_meta"]["vouch_triage"]["signals"]["fit"] + assert fit["score"] == 0.5 + + +# --- contradiction_risk -------------------------------------------------------- + + +def test_contradiction_risk_flags_polarity_conflict( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + store.put_entity(Entity(id="api", name="API", type=EntityType.CONCEPT)) + src = store.put_source(b"evidence") + store.put_claim(Claim( + id="c1", text="the api requires an auth token for every request", + evidence=[src.id], entities=["api"], + )) + propose_claim( + store, text="the api does not require an auth token for every request", + evidence=[src.id], entities=["api"], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + conflict = result["_meta"]["vouch_triage"]["signals"]["contradiction_risk"] + assert conflict["score"] > 0.0 + assert "c1" in conflict["reason"] + + +def test_contradiction_risk_no_conflict_without_shared_entity( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + store.put_claim(Claim( + id="c1", text="the api requires an auth token for every request", + evidence=[src.id], + )) + propose_claim( + store, text="the api does not require an auth token for every request", + evidence=[src.id], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + conflict = result["_meta"]["vouch_triage"]["signals"]["contradiction_risk"] + assert conflict["score"] == 0.0 + + +def test_contradiction_risk_not_applicable_to_non_claim_kind( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + propose_entity(store, name="widget", entity_type="concept", proposed_by="agent") + [result] = triage.triage_pending(store) + conflict = result["_meta"]["vouch_triage"]["signals"]["contradiction_risk"] + assert conflict["score"] == 0.0 + assert "only assessed for claim proposals" in conflict["reason"] + + +# --- embeddings-present path (requires numpy; skipped without it) ------------ + + +@pytest.fixture +def _mock_embedder() -> None: + pytest.importorskip("numpy") + from tests.embeddings._fakes import MockEmbedder + from vouch.embeddings import register + from vouch.embeddings.base import DEFAULT_MODEL_NAME + + register(DEFAULT_MODEL_NAME, lambda: MockEmbedder(dim=8)) + + +def test_duplication_risk_embedding_backend_flags_exact_duplicate( + store: KBStore, _mock_embedder: None, +) -> None: + _enable_triage(store) + src = store.put_source(b"evidence") + text = "auth uses jwts in the authorization header" + store.put_claim(Claim(id="c1", text=text, evidence=[src.id])) + propose_claim(store, text=text, evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + block = result["_meta"]["vouch_triage"] + dup = block["signals"]["duplication_risk"] + assert dup["score"] >= 0.95 + assert "embedding backend" in dup["reason"] + # A near-duplicate hit is penalized by duplication_risk and must not + # also inflate fit via the same signal (see _topical_fit_scores). + assert block["recommendation"] != "approve" + + +def test_backend_heuristic_config_forces_fallback_even_with_embedder( + store: KBStore, _mock_embedder: None, +) -> None: + _enable_triage(store, backend="heuristic") + src = store.put_source(b"evidence") + text = "auth uses jwts in the authorization header" + store.put_claim(Claim(id="c1", text=text, evidence=[src.id])) + propose_claim(store, text=text, evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert "heuristic backend" in dup["reason"] + + +# --- proposal_ids filter / config plumbing ------------------------------------ + + +def test_proposal_ids_filters_to_subset(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + p1 = propose_claim(store, text="first claim", evidence=[src.id], proposed_by="agent").id + propose_claim(store, text="second claim", evidence=[src.id], proposed_by="agent") + results = triage.triage_pending(store, proposal_ids=[p1]) + assert [r["id"] for r in results] == [p1] + + +def test_custom_weights_read_from_config(store: KBStore) -> None: + custom_weights = { + "fit": 1.0, "citation_quality": 0.0, "duplication_risk": 0.0, "contradiction_risk": 0.0, + } + _enable_triage(store, weights=custom_weights) + cfg = triage.triage_cfg(store) + assert cfg.weights == custom_weights + + +def test_disabled_config_value_keeps_default_false(store: KBStore) -> None: + raw = yaml.safe_dump({"triage": {"weights": {"fit": 0.9}}}) + store.config_path.write_text(raw, encoding="utf-8") + cfg = triage.triage_cfg(store) + assert cfg.enabled is False + + +# --- registration sites -------------------------------------------------------- + + +def test_jsonl_handler_registered() -> None: + assert "kb.triage_pending" in HANDLERS + + +def test_jsonl_triage_pending_disabled_returns_invalid_request( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + import vouch.jsonl_server as jsonl_server + + monkeypatch.setattr(jsonl_server, "_store", lambda: store) + resp = handle_request({"id": "1", "method": "kb.triage_pending", "params": {}}) + assert resp["ok"] is False + assert resp["error"]["code"] == "invalid_request" + + +def test_jsonl_triage_pending_enabled_returns_blocks( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + import vouch.jsonl_server as jsonl_server + + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a claim", evidence=[src.id], proposed_by="agent") + monkeypatch.setattr(jsonl_server, "_store", lambda: store) + resp = handle_request({"id": "1", "method": "kb.triage_pending", "params": {}}) + assert resp["ok"] is True + [item] = resp["result"] + _assert_block_shape(item["_meta"]["vouch_triage"]) + + +def test_cli_triage_disabled_shows_clean_error( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.chdir(store.root) + result = CliRunner().invoke(cli, ["triage"]) + assert result.exit_code != 0 + assert "Traceback" not in result.output + assert "Error:" in result.output + assert "disabled" in result.output + + +def test_cli_triage_json_output(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a claim", evidence=[src.id], proposed_by="agent") + monkeypatch.chdir(store.root) + result = CliRunner().invoke(cli, ["triage", "--json"]) + assert result.exit_code == 0, result.output + data = json.loads(result.output) + _assert_block_shape(data[0]["_meta"]["vouch_triage"]) + + +def test_cli_triage_sorts_ranked_table(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a well cited unique claim", evidence=[src.id], proposed_by="agent") + bad = Proposal( + id="bad-1", kind=ProposalKind.CLAIM, proposed_by="agent", + payload={ + "id": "c-bad", "text": "y", "type": "observation", "confidence": 0.7, + "evidence": ["missing-source"], "entities": [], "tags": [], + }, + ) + store.put_proposal(bad) + monkeypatch.chdir(store.root) + result = CliRunner().invoke(cli, ["triage"]) + assert result.exit_code == 0, result.output + lines = [ln for ln in result.output.splitlines() if ln and ln[0].isdigit()] + scores = [float(ln.split()[0]) for ln in lines] + assert scores == sorted(scores, reverse=True)