diff --git a/CHANGELOG.md b/CHANGELOG.md index c5f3196..38b48a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,19 @@ All notable changes to vouch are documented here. Format follows ## [Unreleased] +### Added +- `kb.triage_pending` — advisory triage scoring over the pending-review queue. + Scores each pending proposal on fit, citation quality, duplication risk, and + contradiction risk, then attaches a `_meta.vouch_triage` block + (`recommendation`, `score`, `signals`, `rationale`) to help a reviewer + prioritize a long `kb.list_pending`. Read-only and advisory only: it never + calls `kb.approve` / `kb.reject` and never moves a proposal out of pending — + a human still decides. Duplication and fit reuse the propose-time embedding + similarity path and degrade to a `difflib` heuristic when the `[embeddings]` + extra isn't installed. Opt-in via `triage.enabled: true` in `config.yaml`. + `vouch triage [proposal-id...]` mirrors it on the CLI with `--json` and + `--reverse` (#322). + ## [1.1.0] — 2026-07-03 ### Added diff --git a/src/vouch/capabilities.py b/src/vouch/capabilities.py index 2efc39a..7f06c1e 100644 --- a/src/vouch/capabilities.py +++ b/src/vouch/capabilities.py @@ -32,6 +32,7 @@ "kb.list_relations", "kb.list_sources", "kb.list_pending", + "kb.triage_pending", "kb.register_source", "kb.register_source_from_path", "kb.propose_claim", diff --git a/src/vouch/cli.py b/src/vouch/cli.py index f8b4a0c..a6cbc0f 100644 --- a/src/vouch/cli.py +++ b/src/vouch/cli.py @@ -784,6 +784,52 @@ def show(proposal_id: str) -> None: click.echo(yaml.safe_dump(pr.model_dump(mode="json"), sort_keys=False)) +@cli.command() +@click.argument("proposal_ids", nargs=-1) +@click.option( + "--json", "as_json", is_flag=True, + help="Emit machine-readable _meta.vouch_triage blocks.", +) +@click.option( + "--reverse", is_flag=True, + help="Ascending order (worst-first) instead of the default descending (best-first).", +) +def triage(proposal_ids: tuple[str, ...], as_json: bool, reverse: bool) -> None: + """Advisory triage scoring over pending proposals (opt-in: triage.enabled). + + Scores each proposal on fit, citation quality, duplication risk, and + contradiction risk, then prints a ranked table. Never approves or + rejects — a human still decides via `vouch approve` / `vouch reject`. + """ + from . import triage as triage_mod + + store = _load_store() + with _cli_errors(): + results = triage_mod.triage_pending(store, proposal_ids=list(proposal_ids) or None) + results.sort(key=lambda r: r["_meta"]["vouch_triage"]["score"], reverse=not reverse) + + if as_json: + _emit_json(results) + return + if not results: + click.echo("no pending proposals to triage") + return + for r in results: + block = r["_meta"]["vouch_triage"] + preview = ( + r["payload"].get("text") + or r["payload"].get("title") + or r["payload"].get("name") + or r["payload"].get("id") + or "-" + ) + click.echo( + f"{block['score']:.2f} [{block['recommendation']:>11}] " + f"{r['id']} [{r['kind']}] {str(preview).strip()[:80]}" + ) + click.echo(f" {block['rationale']}") + + @cli.command() @click.argument("proposal_ids", nargs=-1, required=True) @click.option("--reason", default=None) diff --git a/src/vouch/jsonl_server.py b/src/vouch/jsonl_server.py index 5f42e16..f13f8b0 100644 --- a/src/vouch/jsonl_server.py +++ b/src/vouch/jsonl_server.py @@ -273,6 +273,12 @@ def _h_list_pending(_: dict) -> list[dict]: ] +def _h_triage_pending(p: dict) -> list[dict]: + from . import triage as triage_mod + + return triage_mod.triage_pending(_store(), proposal_ids=p.get("proposal_ids")) + + def _h_register_source(p: dict) -> dict: s = _store() src = s.put_source( @@ -691,6 +697,7 @@ def _h_propose_theme(p: dict) -> dict: "kb.list_relations": _h_list_relations, "kb.list_sources": _h_list_sources, "kb.list_pending": _h_list_pending, + "kb.triage_pending": _h_triage_pending, "kb.register_source": _h_register_source, "kb.register_source_from_path": _h_register_source_from_path, "kb.propose_claim": _h_propose_claim, diff --git a/src/vouch/server.py b/src/vouch/server.py index 81fa5c2..9c0867f 100644 --- a/src/vouch/server.py +++ b/src/vouch/server.py @@ -336,6 +336,23 @@ def kb_list_pending() -> list[dict[str, Any]]: ] +@mcp.tool() +def kb_triage_pending(proposal_ids: list[str] | None = None) -> list[dict[str, Any]]: + """Advisory triage scoring over the pending-review queue. + + Attaches `_meta.vouch_triage` (recommendation/score/signals/rationale) + to each pending proposal's view. Read-only — never approves, rejects, + or otherwise decides; a human still calls `kb_approve` / `kb_reject`. + Opt-in: disabled unless `triage.enabled: true` is set in config.yaml. + """ + from . import triage as triage_mod + + try: + return triage_mod.triage_pending(_store(), proposal_ids=proposal_ids) + except (ValueError, ArtifactNotFoundError) as e: + raise ValueError(str(e)) from e + + # === write tools — gated (produce proposals) ============================= diff --git a/src/vouch/triage.py b/src/vouch/triage.py new file mode 100644 index 0000000..31202e4 --- /dev/null +++ b/src/vouch/triage.py @@ -0,0 +1,503 @@ +"""Advisory triage scoring for the pending-review queue (issue #322). + +Read-only. Scores each pending proposal on four signals — fit, citation +quality, duplication risk, and contradiction risk — and folds them into a +composite ``score`` plus an advisory ``recommendation``. The result is +attached as ``_meta.vouch_triage`` on the proposal's own ``model_dump``. + +This never decides anything: no call here ever reaches +``proposals.approve``, ``proposals.reject``, ``store.put_*``, or +``store.move_proposal_to_decided``. A human still calls ``kb.approve`` / +``kb.reject``; ``recommendation`` is a hint the reviewer may ignore. + +Opt-in: disabled unless ``triage.enabled: true`` is set in +``.vouch/config.yaml`` (mirrors the defensive yaml-read pattern in +``salience.reflex_cfg`` / ``embeddings.similarity.similarity_threshold`` — +no pydantic Config model yet, see issue #243). + +Duplication risk reuses the embedding path already built for propose-time +warnings (``embeddings.similarity.find_similar_on_propose``); fit uses the +same underlying primitive (``index_db.search_embedding``) at a lower +threshold band so a near-duplicate hit doesn't also inflate fit and cancel +out its own duplication penalty (see ``_topical_fit_scores``). When no +embedder is registered (base install, no ``[embeddings]`` extra), both +signals fall back to a ``difflib`` text-similarity heuristic so the method +still returns a full block. +""" + +from __future__ import annotations + +import difflib +import re +from dataclasses import dataclass +from typing import Any + +import yaml + +from .models import Proposal, ProposalKind, ProposalStatus +from .proposals import _payload_block_reason +from .storage import ArtifactNotFoundError, KBStore + +DEFAULT_WEIGHTS: dict[str, float] = { + "fit": 0.3, + "citation_quality": 0.3, + "duplication_risk": 0.2, + "contradiction_risk": 0.2, +} + +_APPROVE_THRESHOLD = 0.7 +_REJECT_THRESHOLD = 0.35 +_FUZZY_MATCH_FLOOR = 0.3 +_CONTRADICTION_CANDIDATE_FLOOR = 0.35 + +_NEGATION_MARKERS = frozenset({ + "not", "no", "never", "cannot", "isnt", "doesnt", "wont", "wasnt", + "arent", "dont", "didnt", "hasnt", "havent", "without", "neither", "nor", +}) + + +class TriageError(ValueError): + """Raised when `kb.triage_pending` is invoked while disabled, or misused.""" + + +@dataclass(frozen=True) +class TriageConfig: + enabled: bool + backend: str + weights: dict[str, float] + + +def triage_cfg(store: KBStore) -> TriageConfig: + """Read `triage.*` from config.yaml defensively. Default: disabled.""" + cfg: dict[str, Any] = {} + try: + loaded = yaml.safe_load(store.config_path.read_text(encoding="utf-8")) + if isinstance(loaded, dict): + cfg = loaded + except Exception: + pass + + triage = cfg.get("triage") + triage = triage if isinstance(triage, dict) else {} + + enabled = triage.get("enabled", False) + enabled = bool(enabled) if isinstance(enabled, bool) else False + + backend = triage.get("backend", "embeddings") + backend = backend if isinstance(backend, str) else "embeddings" + + weights = dict(DEFAULT_WEIGHTS) + weights_cfg = triage.get("weights") + if isinstance(weights_cfg, dict): + for key in weights: + value = weights_cfg.get(key) + if isinstance(value, int | float) and not isinstance(value, bool): + weights[key] = float(value) + + return TriageConfig(enabled=enabled, backend=backend, weights=weights) + + +# --- shared helpers --------------------------------------------------------- + + +def _referenced_entity_ids(proposal: Proposal) -> list[str]: + if proposal.kind in (ProposalKind.CLAIM, ProposalKind.PAGE): + return list(proposal.payload.get("entities") or []) + return [] + + +def _has_negation(text: str) -> bool: + tokens = set(re.findall(r"[a-z']+", text.casefold())) + tokens = {t.replace("'", "") for t in tokens} + return bool(tokens & _NEGATION_MARKERS) + + +def _safe_embedder() -> Any | None: + try: + from .embeddings import get_embedder + + return get_embedder() + except Exception: + return None + + +def _best_fuzzy_match( + text: str, pool: list[tuple[str, str]], +) -> tuple[str | None, float]: + needle = text.casefold() + best_id: str | None = None + best_ratio = 0.0 + for cid, candidate in pool: + candidate = (candidate or "").strip() + if not candidate: + continue + ratio = difflib.SequenceMatcher(None, needle, candidate.casefold()).ratio() + if ratio > best_ratio: + best_ratio, best_id = ratio, cid + if best_id is None or best_ratio < _FUZZY_MATCH_FLOOR: + return None, 0.0 + return best_id, best_ratio + + +def _claim_text_pool( + store: KBStore, *, exclude_proposal_id: str, exclude_claim_id: str | None, +) -> list[tuple[str, str]]: + pool = [ + (c.id, c.text) for c in store.list_claims() if c.id != exclude_claim_id + ] + pool += [ + (p.id, str(p.payload.get("text", ""))) + for p in store.list_proposals(ProposalStatus.PENDING) + if p.kind == ProposalKind.CLAIM and p.id != exclude_proposal_id + ] + return pool + + +def _embedding_hits_for_claim( + store: KBStore, proposal: Proposal, *, use_embeddings: bool, +) -> list[dict[str, Any]] | None: + """`find_similar_on_propose` hits, or None when the embedding path can't run. + + None means "no embedder available (or backend forced to heuristic)" — + callers fall back to a difflib heuristic. `[]` means the embedder ran + and genuinely found nothing similar. Every hit returned is, by that + function's own contract, at or above the near-duplicate threshold — + it's a duplicate detector, not a general similarity search. Used by + `duplication_risk` and `contradiction_risk`; deliberately NOT reused + for `fit` (see `_topical_fit_scores`). + """ + if not use_embeddings or proposal.kind != ProposalKind.CLAIM: + return None + text = str(proposal.payload.get("text", "")).strip() + if not text or _safe_embedder() is None: + return None + try: + from .embeddings.similarity import find_similar_on_propose + except ImportError: + return None + return find_similar_on_propose( + store, text, exclude_claim_id=proposal.payload.get("id"), + ) + + +def _topical_fit_scores(store: KBStore, proposal: Proposal, embedder: Any) -> list[float]: + """Cosine scores against the approved corpus, below the duplicate band. + + A near-duplicate hit is already penalized by `duplication_risk`; letting + it also inflate `fit` would let the two signals cancel each other out + for the exact-duplicate case. So this looks at a lower, wider band + (`min_score=0.3`) and excludes anything at or above the near-duplicate + threshold (`review.similarity_threshold`, default 0.95). + """ + text = str(proposal.payload.get("text", "")).strip() + if not text: + return [] + try: + from . import index_db + from .embeddings.similarity import similarity_threshold + + vec = embedder.encode(text) + dup_threshold = similarity_threshold(store) + hits = index_db.search_embedding( + store.kb_dir, query_vec=vec, kinds=("claim", "page"), limit=5, min_score=0.3, + ) + except Exception: + return [] + exclude_id = proposal.payload.get("id") + return [ + float(cos) for _kind, cid, _snip, cos in hits + if cid != exclude_id and cos < dup_threshold + ] + + +# --- signals ----------------------------------------------------------------- + + +def _signal_citation_quality(store: KBStore, proposal: Proposal) -> dict[str, Any]: + block = _payload_block_reason(store, proposal) + if block: + return {"score": 0.0, "reason": block} + if proposal.kind in (ProposalKind.CLAIM, ProposalKind.RELATION): + n = len(proposal.payload.get("evidence") or []) + if n == 0: + # Relations may legitimately have no evidence; claims can't reach + # here with n == 0 (Claim._at_least_one_citation already blocked). + return { + "score": 0.6, + "reason": "relation has no evidence citation (allowed, but weaker)", + } + score = min(1.0, 0.7 + 0.15 * (n - 1)) + return {"score": round(score, 4), "reason": f"{n} evidence citation(s) resolve cleanly"} + if proposal.kind == ProposalKind.PAGE: + sources = proposal.payload.get("sources") or [] + claims = proposal.payload.get("claims") or [] + if not sources and not claims: + return {"score": 0.5, "reason": "page has no source/claim citations"} + return { + "score": 1.0, + "reason": f"{len(sources)} source(s), {len(claims)} claim(s) resolve cleanly", + } + return {"score": 1.0, "reason": "entity payload resolves cleanly"} + + +def _signal_fit( + store: KBStore, proposal: Proposal, embedder: Any | None, +) -> dict[str, Any]: + entity_ids = _referenced_entity_ids(proposal) + known = {e.id for e in store.list_entities()} + overlap: float | None = None + if entity_ids: + overlap = sum(1 for e in entity_ids if e in known) / len(entity_ids) + + topical: float | None = None + if embedder is not None and proposal.kind == ProposalKind.CLAIM: + scores = _topical_fit_scores(store, proposal, embedder) + if scores: + topical = sum(scores) / len(scores) + + parts = [v for v in (overlap, topical) if v is not None] + if not parts: + return { + "score": 0.5, + "reason": "no referenced entities or approved-corpus signal; neutral fit", + } + bits = [] + if overlap is not None: + bits.append(f"{overlap:.0%} of referenced entities already known") + if topical is not None: + bits.append(f"mean topical similarity to approved corpus {topical:.2f}") + return {"score": round(sum(parts) / len(parts), 4), "reason": "; ".join(bits)} + + +def _duplication_risk_structural(store: KBStore, proposal: Proposal) -> dict[str, Any]: + if proposal.kind == ProposalKind.RELATION: + triple = ( + proposal.payload.get("source"), + proposal.payload.get("relation"), + proposal.payload.get("target"), + ) + for r in store.list_relations(): + if (r.source, r.relation.value, r.target) == triple: + return {"score": 1.0, "reason": f"identical relation already approved: {r.id}"} + for p in store.list_proposals(ProposalStatus.PENDING): + if p.kind != ProposalKind.RELATION or p.id == proposal.id: + continue + other = (p.payload.get("source"), p.payload.get("relation"), p.payload.get("target")) + if other == triple: + return {"score": 1.0, "reason": f"identical relation already pending: {p.id}"} + return {"score": 0.0, "reason": "no identical relation found"} + + if proposal.kind == ProposalKind.ENTITY: + name = str(proposal.payload.get("name", "")).strip() + pool = [(e.id, e.name) for e in store.list_entities()] + pool += [ + (p.id, str(p.payload.get("name", ""))) + for p in store.list_proposals(ProposalStatus.PENDING) + if p.kind == ProposalKind.ENTITY and p.id != proposal.id + ] + else: # PAGE + name = str(proposal.payload.get("title", "")).strip() + pool = [(pg.id, pg.title) for pg in store.list_pages()] + pool += [ + (p.id, str(p.payload.get("title", ""))) + for p in store.list_proposals(ProposalStatus.PENDING) + if p.kind == ProposalKind.PAGE and p.id != proposal.id + ] + + if not name: + return {"score": 0.0, "reason": "no name/title to compare"} + best_id, best_ratio = _best_fuzzy_match(name, pool) + if best_id is None: + return {"score": 0.0, "reason": "no similarly-named artifact found (heuristic backend)"} + return { + "score": round(best_ratio, 4), + "reason": f"name similarity {best_ratio:.2f} vs {best_id} (heuristic backend)", + } + + +def _signal_duplication_risk( + store: KBStore, proposal: Proposal, hits: list[dict[str, Any]] | None, +) -> dict[str, Any]: + if proposal.kind != ProposalKind.CLAIM: + return _duplication_risk_structural(store, proposal) + + text = str(proposal.payload.get("text", "")).strip() + if not text: + return {"score": 0.0, "reason": "no claim text to compare"} + + if hits is None: + pool = _claim_text_pool( + store, exclude_proposal_id=proposal.id, + exclude_claim_id=proposal.payload.get("id"), + ) + best_id, best_ratio = _best_fuzzy_match(text, pool) + if best_id is None: + return {"score": 0.0, "reason": "no near-duplicate claims found (heuristic backend)"} + return { + "score": round(best_ratio, 4), + "reason": f"text similarity {best_ratio:.2f} vs {best_id} (heuristic backend)", + } + + if not hits: + return {"score": 0.0, "reason": "no near-duplicate claims found (embedding backend)"} + top = max(hits, key=lambda w: w["cosine"]) + return { + "score": round(float(top["cosine"]), 4), + "reason": ( + f"cosine {top['cosine']:.2f} vs {top['artifact_kind']} " + f"{top['artifact_id']} (embedding backend)" + ), + } + + +def _signal_contradiction_risk( + store: KBStore, proposal: Proposal, hits: list[dict[str, Any]] | None, +) -> dict[str, Any]: + if proposal.kind != ProposalKind.CLAIM: + return {"score": 0.0, "reason": "contradiction risk is only assessed for claim proposals"} + + text = str(proposal.payload.get("text", "")).strip() + if not text: + return {"score": 0.0, "reason": "no claim text to compare"} + + entity_ids = set(proposal.payload.get("entities") or []) + neg = _has_negation(text) + + if hits is not None: + backend = "embedding" + candidates = [ + (h["artifact_id"], float(h["cosine"])) + for h in hits + if h.get("artifact_kind") == "claim" + ] + else: + backend = "heuristic" + pool = _claim_text_pool( + store, exclude_proposal_id=proposal.id, + exclude_claim_id=proposal.payload.get("id"), + ) + candidates = [ + (cid, ratio) + for cid, ratio in ( + (cid, difflib.SequenceMatcher(None, text.casefold(), ctext.casefold()).ratio()) + for cid, ctext in pool + ) + if ratio >= _CONTRADICTION_CANDIDATE_FLOOR + ] + + if not candidates: + return {"score": 0.0, "reason": f"no topically related claims found ({backend} backend)"} + + conflicts: list[tuple[str, float]] = [] + for cid, sim in candidates: + try: + claim = store.get_claim(cid) + except ArtifactNotFoundError: + continue # candidate is a pending proposal, not yet an approved claim + if entity_ids & set(claim.entities) and _has_negation(claim.text) != neg: + conflicts.append((cid, sim)) + + if not conflicts: + return { + "score": 0.0, + "reason": ( + f"{len(candidates)} related claim(s), " + f"no polarity conflict ({backend} backend)" + ), + } + top_id, top_sim = max(conflicts, key=lambda c: c[1]) + score = round(min(1.0, 0.5 + top_sim / 2), 4) + return { + "score": score, + "reason": ( + f"possible polarity conflict with {top_id} " + f"(similarity {top_sim:.2f}, {backend} backend)" + ), + } + + +# --- composite --------------------------------------------------------------- + + +def _composite_score(signals: dict[str, dict[str, Any]], weights: dict[str, float]) -> float: + goodness = { + "fit": signals["fit"]["score"], + "citation_quality": signals["citation_quality"]["score"], + "duplication_risk": 1.0 - signals["duplication_risk"]["score"], + "contradiction_risk": 1.0 - signals["contradiction_risk"]["score"], + } + total_weight = sum(weights.get(k, 0.0) for k in goodness) or 1.0 + raw = sum(goodness[k] * weights.get(k, 0.0) for k in goodness) / total_weight + return round(min(1.0, max(0.0, raw)), 4) + + +def _recommendation(score: float, signals: dict[str, dict[str, Any]]) -> str: + if signals["citation_quality"]["score"] == 0.0: + # A blocked payload can't be approved as-is (approve() would raise) — + # no composite score should be able to override that. + return "reject" + if score >= _APPROVE_THRESHOLD: + return "approve" + if score <= _REJECT_THRESHOLD: + return "reject" + return "needs-human" + + +def _rationale(recommendation: str, score: float, signals: dict[str, dict[str, Any]]) -> str: + parts = "; ".join(f"{name}: {sig['reason']}" for name, sig in signals.items()) + return f"{recommendation} (score {score:.2f}) — {parts}" + + +def score_proposal( + store: KBStore, proposal: Proposal, *, + weights: dict[str, float] | None = None, + use_embeddings: bool = True, +) -> dict[str, Any]: + """Compute the `_meta.vouch_triage` block for one pending proposal.""" + weights = weights or DEFAULT_WEIGHTS + hits = _embedding_hits_for_claim(store, proposal, use_embeddings=use_embeddings) + embedder = _safe_embedder() if use_embeddings else None + signals = { + "fit": _signal_fit(store, proposal, embedder), + "citation_quality": _signal_citation_quality(store, proposal), + "duplication_risk": _signal_duplication_risk(store, proposal, hits), + "contradiction_risk": _signal_contradiction_risk(store, proposal, hits), + } + score = _composite_score(signals, weights) + recommendation = _recommendation(score, signals) + return { + "recommendation": recommendation, + "score": score, + "signals": signals, + "rationale": _rationale(recommendation, score, signals), + } + + +def triage_pending( + store: KBStore, proposal_ids: list[str] | None = None, +) -> list[dict[str, Any]]: + """Score pending proposals (default: all of them) — read-only, advisory. + + Raises TriageError if `triage.enabled` isn't `true` in config.yaml. + """ + cfg = triage_cfg(store) + if not cfg.enabled: + raise TriageError( + "triage is disabled; set triage.enabled: true in .vouch/config.yaml to opt in" + ) + use_embeddings = cfg.backend != "heuristic" + + if proposal_ids: + proposals = [store.get_proposal(pid) for pid in proposal_ids] + proposals = [p for p in proposals if p.status == ProposalStatus.PENDING] + else: + proposals = store.list_proposals(ProposalStatus.PENDING) + + out: list[dict[str, Any]] = [] + for p in proposals: + result = p.model_dump(mode="json") + result.setdefault("_meta", {})["vouch_triage"] = score_proposal( + store, p, weights=cfg.weights, use_embeddings=use_embeddings, + ) + out.append(result) + return out diff --git a/tests/test_triage.py b/tests/test_triage.py new file mode 100644 index 0000000..ed46b8c --- /dev/null +++ b/tests/test_triage.py @@ -0,0 +1,433 @@ +"""Advisory triage scoring over the pending-review queue — issue #322.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +import yaml +from click.testing import CliRunner + +from vouch import triage +from vouch.cli import cli +from vouch.jsonl_server import HANDLERS, handle_request +from vouch.models import Claim, Entity, EntityType, Proposal, ProposalKind, ProposalStatus +from vouch.proposals import propose_claim, propose_entity +from vouch.storage import KBStore + +SIGNAL_NAMES = {"fit", "citation_quality", "duplication_risk", "contradiction_risk"} + + +@pytest.fixture +def store(tmp_path: Path) -> KBStore: + return KBStore.init(tmp_path) + + +def _enable_triage(store: KBStore, **overrides: object) -> None: + cfg = {"triage": {"enabled": True, **overrides}} + store.config_path.write_text(yaml.safe_dump(cfg), encoding="utf-8") + + +def _no_embedder(monkeypatch: pytest.MonkeyPatch) -> None: + def _raise(name: str | None = None) -> None: + raise KeyError("no embedder registered") + + monkeypatch.setattr("vouch.embeddings.get_embedder", _raise) + + +def _assert_block_shape(block: dict) -> None: + assert set(block) == {"recommendation", "score", "signals", "rationale"} + assert block["recommendation"] in {"approve", "reject", "needs-human"} + assert 0.0 <= block["score"] <= 1.0 + assert set(block["signals"]) == SIGNAL_NAMES + for sig in block["signals"].values(): + assert 0.0 <= sig["score"] <= 1.0 + assert isinstance(sig["reason"], str) and sig["reason"] + assert isinstance(block["rationale"], str) and block["rationale"] + + +# --- opt-in gate ------------------------------------------------------------- + + +def test_disabled_by_default_raises(store: KBStore) -> None: + with pytest.raises(triage.TriageError, match="disabled"): + triage.triage_pending(store) + + +def test_enabled_scores_pending_proposals(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="vouch requires citations", evidence=[src.id], proposed_by="agent") + results = triage.triage_pending(store) + assert len(results) == 1 + + +# --- output shape -------------------------------------------------------------- + + +def test_triage_block_shape(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="vouch requires citations", evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + assert result["kind"] == "claim" + _assert_block_shape(result["_meta"]["vouch_triage"]) + + +# --- no-write invariant --------------------------------------------------------- + + +def test_never_mutates_pending_queue(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + p1 = propose_claim(store, text="a claim", evidence=[src.id], proposed_by="agent").id + p2 = propose_entity(store, name="widget", entity_type="concept", proposed_by="agent").id + + before = {p.id for p in store.list_proposals(ProposalStatus.PENDING)} + triage.triage_pending(store) + after = {p.id for p in store.list_proposals(ProposalStatus.PENDING)} + + assert before == after == {p1, p2} + assert store.list_proposals(ProposalStatus.APPROVED) == [] + assert store.list_proposals(ProposalStatus.REJECTED) == [] + assert store.list_claims() == [] + assert store.list_entities() == [] + + +# --- citation_quality: reuses proposals._payload_block_reason ----------------- + + +def test_citation_quality_flags_dangling_ref_and_forces_reject( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + # Bypass propose_claim's own ref validation (store.put_proposal is raw + # I/O) to simulate a dangling reference slipping into the queue — + # the same shape proposals._payload_block_reason guards at approve time. + bad = Proposal( + id="bad-1", kind=ProposalKind.CLAIM, proposed_by="agent", + payload={ + "id": "c-bad", "text": "x", "type": "observation", "confidence": 0.7, + "evidence": ["missing-source"], "entities": [], "tags": [], + }, + ) + store.put_proposal(bad) + [result] = triage.triage_pending(store) + block = result["_meta"]["vouch_triage"] + assert block["signals"]["citation_quality"]["score"] == 0.0 + assert "missing-source" in block["signals"]["citation_quality"]["reason"] + assert block["recommendation"] == "reject" + + +def test_citation_quality_scores_clean_claim_positively( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a well cited claim", evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + assert result["_meta"]["vouch_triage"]["signals"]["citation_quality"]["score"] > 0.0 + + +# --- duplication_risk: heuristic fallback (default in this dev env) ---------- + + +def test_duplication_risk_heuristic_fallback_flags_near_duplicate( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + text = "auth uses jwts in the authorization header for every request" + store.put_claim(Claim(id="c1", text=text, evidence=[src.id])) + propose_claim(store, text=text, evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert dup["score"] > 0.9 + assert "heuristic backend" in dup["reason"] + assert "c1" in dup["reason"] + + +def test_duplication_risk_heuristic_no_match_for_unrelated_text( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + store.put_claim(Claim(id="c1", text="apples and oranges", evidence=[src.id])) + propose_claim( + store, text="zebras run fast in the savanna", evidence=[src.id], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert dup["score"] == 0.0 + assert "heuristic backend" in dup["reason"] + + +def test_duplication_risk_relation_exact_match( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + from vouch.models import Relation + + store.put_entity(Entity(id="a", name="A", type=EntityType.CONCEPT)) + store.put_entity(Entity(id="b", name="B", type=EntityType.CONCEPT)) + store.put_relation( + Relation(id="a--relates_to--b", source="a", relation="relates_to", target="b") + ) + dup_proposal = Proposal( + id="rel-1", kind=ProposalKind.RELATION, proposed_by="agent", + payload={ + "id": "a--relates_to--b-2", "source": "a", "relation": "relates_to", + "target": "b", "confidence": 0.7, "evidence": [], + }, + ) + store.put_proposal(dup_proposal) + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert dup["score"] == 1.0 + assert "already approved" in dup["reason"] + + +# --- fit: entity-overlap heuristic (no embeddings needed) --------------------- + + +def test_fit_scores_high_when_entities_already_known( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + store.put_entity(Entity(id="jwt", name="JWT", type=EntityType.CONCEPT)) + src = store.put_source(b"evidence") + propose_claim( + store, text="jwt tokens expire after an hour", evidence=[src.id], + entities=["jwt"], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + fit = result["_meta"]["vouch_triage"]["signals"]["fit"] + assert fit["score"] == 1.0 + + +def test_fit_neutral_when_no_entities_referenced( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="an unrelated observation", evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + fit = result["_meta"]["vouch_triage"]["signals"]["fit"] + assert fit["score"] == 0.5 + + +# --- contradiction_risk -------------------------------------------------------- + + +def test_contradiction_risk_flags_polarity_conflict( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + store.put_entity(Entity(id="api", name="API", type=EntityType.CONCEPT)) + src = store.put_source(b"evidence") + store.put_claim(Claim( + id="c1", text="the api requires an auth token for every request", + evidence=[src.id], entities=["api"], + )) + propose_claim( + store, text="the api does not require an auth token for every request", + evidence=[src.id], entities=["api"], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + conflict = result["_meta"]["vouch_triage"]["signals"]["contradiction_risk"] + assert conflict["score"] > 0.0 + assert "c1" in conflict["reason"] + + +def test_contradiction_risk_no_conflict_without_shared_entity( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + store.put_claim(Claim( + id="c1", text="the api requires an auth token for every request", + evidence=[src.id], + )) + propose_claim( + store, text="the api does not require an auth token for every request", + evidence=[src.id], proposed_by="agent", + ) + [result] = triage.triage_pending(store) + conflict = result["_meta"]["vouch_triage"]["signals"]["contradiction_risk"] + assert conflict["score"] == 0.0 + + +def test_contradiction_risk_not_applicable_to_non_claim_kind( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + propose_entity(store, name="widget", entity_type="concept", proposed_by="agent") + [result] = triage.triage_pending(store) + conflict = result["_meta"]["vouch_triage"]["signals"]["contradiction_risk"] + assert conflict["score"] == 0.0 + assert "only assessed for claim proposals" in conflict["reason"] + + +# --- embeddings-present path (requires numpy; skipped without it) ------------ + + +@pytest.fixture +def _mock_embedder() -> None: + pytest.importorskip("numpy") + from tests.embeddings._fakes import MockEmbedder + from vouch.embeddings import register + from vouch.embeddings.base import DEFAULT_MODEL_NAME + + register(DEFAULT_MODEL_NAME, lambda: MockEmbedder(dim=8)) + + +def test_duplication_risk_embedding_backend_flags_exact_duplicate( + store: KBStore, _mock_embedder: None, +) -> None: + _enable_triage(store) + src = store.put_source(b"evidence") + text = "auth uses jwts in the authorization header" + store.put_claim(Claim(id="c1", text=text, evidence=[src.id])) + propose_claim(store, text=text, evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + block = result["_meta"]["vouch_triage"] + dup = block["signals"]["duplication_risk"] + assert dup["score"] >= 0.95 + assert "embedding backend" in dup["reason"] + # A near-duplicate hit is penalized by duplication_risk and must not + # also inflate fit via the same signal (see _topical_fit_scores). + assert block["recommendation"] != "approve" + + +def test_backend_heuristic_config_forces_fallback_even_with_embedder( + store: KBStore, _mock_embedder: None, +) -> None: + _enable_triage(store, backend="heuristic") + src = store.put_source(b"evidence") + text = "auth uses jwts in the authorization header" + store.put_claim(Claim(id="c1", text=text, evidence=[src.id])) + propose_claim(store, text=text, evidence=[src.id], proposed_by="agent") + [result] = triage.triage_pending(store) + dup = result["_meta"]["vouch_triage"]["signals"]["duplication_risk"] + assert "heuristic backend" in dup["reason"] + + +# --- proposal_ids filter / config plumbing ------------------------------------ + + +def test_proposal_ids_filters_to_subset(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + p1 = propose_claim(store, text="first claim", evidence=[src.id], proposed_by="agent").id + propose_claim(store, text="second claim", evidence=[src.id], proposed_by="agent") + results = triage.triage_pending(store, proposal_ids=[p1]) + assert [r["id"] for r in results] == [p1] + + +def test_custom_weights_read_from_config(store: KBStore) -> None: + custom_weights = { + "fit": 1.0, "citation_quality": 0.0, "duplication_risk": 0.0, "contradiction_risk": 0.0, + } + _enable_triage(store, weights=custom_weights) + cfg = triage.triage_cfg(store) + assert cfg.weights == custom_weights + + +def test_disabled_config_value_keeps_default_false(store: KBStore) -> None: + raw = yaml.safe_dump({"triage": {"weights": {"fit": 0.9}}}) + store.config_path.write_text(raw, encoding="utf-8") + cfg = triage.triage_cfg(store) + assert cfg.enabled is False + + +# --- registration sites -------------------------------------------------------- + + +def test_jsonl_handler_registered() -> None: + assert "kb.triage_pending" in HANDLERS + + +def test_jsonl_triage_pending_disabled_returns_invalid_request( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + import vouch.jsonl_server as jsonl_server + + monkeypatch.setattr(jsonl_server, "_store", lambda: store) + resp = handle_request({"id": "1", "method": "kb.triage_pending", "params": {}}) + assert resp["ok"] is False + assert resp["error"]["code"] == "invalid_request" + + +def test_jsonl_triage_pending_enabled_returns_blocks( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + import vouch.jsonl_server as jsonl_server + + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a claim", evidence=[src.id], proposed_by="agent") + monkeypatch.setattr(jsonl_server, "_store", lambda: store) + resp = handle_request({"id": "1", "method": "kb.triage_pending", "params": {}}) + assert resp["ok"] is True + [item] = resp["result"] + _assert_block_shape(item["_meta"]["vouch_triage"]) + + +def test_cli_triage_disabled_shows_clean_error( + store: KBStore, monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.chdir(store.root) + result = CliRunner().invoke(cli, ["triage"]) + assert result.exit_code != 0 + assert "Traceback" not in result.output + assert "Error:" in result.output + assert "disabled" in result.output + + +def test_cli_triage_json_output(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a claim", evidence=[src.id], proposed_by="agent") + monkeypatch.chdir(store.root) + result = CliRunner().invoke(cli, ["triage", "--json"]) + assert result.exit_code == 0, result.output + data = json.loads(result.output) + _assert_block_shape(data[0]["_meta"]["vouch_triage"]) + + +def test_cli_triage_sorts_ranked_table(store: KBStore, monkeypatch: pytest.MonkeyPatch) -> None: + _no_embedder(monkeypatch) + _enable_triage(store) + src = store.put_source(b"evidence") + propose_claim(store, text="a well cited unique claim", evidence=[src.id], proposed_by="agent") + bad = Proposal( + id="bad-1", kind=ProposalKind.CLAIM, proposed_by="agent", + payload={ + "id": "c-bad", "text": "y", "type": "observation", "confidence": 0.7, + "evidence": ["missing-source"], "entities": [], "tags": [], + }, + ) + store.put_proposal(bad) + monkeypatch.chdir(store.root) + result = CliRunner().invoke(cli, ["triage"]) + assert result.exit_code == 0, result.output + lines = [ln for ln in result.output.splitlines() if ln and ln[0].isdigit()] + scores = [float(ln.split()[0]) for ln in lines] + assert scores == sorted(scores, reverse=True)