Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/vouch/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"kb.stats",
"kb.search",
"kb.neighbors",
"kb.experts",
"kb.context",
"kb.synthesize",
"kb.read_page",
Expand Down
33 changes: 33 additions & 0 deletions src/vouch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,39 @@ def _parse_meta(pairs: tuple[str, ...]) -> dict[str, Any]:
return out


@cli.command(name="experts")
@click.argument("topic")
@click.option("--limit", default=10, show_default=True, type=int)
@click.option("--min-claims", "min_claims", default=1, show_default=True, type=int)
@click.option(
"--weight",
default="count",
show_default=True,
help="ranking weight: count | recency | citation (unknown falls back to count).",
)
@click.option("--json", "as_json", is_flag=True, help="emit the ranking as JSON.")
def experts_cmd(
topic: str, limit: int, min_claims: int, weight: str, as_json: bool
) -> None:
"""Rank entities by evidence density on TOPIC (read-only)."""
from .experts import rank_experts

store = _load_store()
rows = rank_experts(store, topic, limit=limit, min_claims=min_claims, weight=weight)
if as_json:
_emit_json({"experts": rows})
return
if not rows:
click.echo("no experts found.")
return
for row in rows:
click.echo(
f"{row['name']} ({row['type']}) "
f"claims={row['claim_count']} citations={row['citation_count']} "
f"score={row['score']}"
)


@cli.group(name="schema")
def schema() -> None:
"""inspect and validate config-declared page kinds (issue #234)."""
Expand Down
113 changes: 113 additions & 0 deletions src/vouch/experts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""kb.experts - rank entities by evidence density on a topic (issue #315).

Read-only aggregation over approved, live claims. Given a free-text topic,
return the entities carrying the most matched evidence, ranked by one of three
weightings (count / recency / citation). It never proposes, writes, or mutates
anything, makes no network or LLM call, and reads only claims already past the
review gate - so the review gate is untouched by construction.
"""

from __future__ import annotations

from datetime import datetime
from typing import Any

from . import index_db
from .models import Claim, ClaimStatus, utcnow
from .salience import _substring_entity_ids
from .storage import KBStore

# A superseded / archived / redacted claim is not live evidence and must never
# inflate an entity ranking (consistent with issue #78).
_EXCLUDED_STATUSES = frozenset(
{ClaimStatus.SUPERSEDED, ClaimStatus.ARCHIVED, ClaimStatus.REDACTED}
)
_VALID_WEIGHTS = frozenset({"count", "recency", "citation"})
_RECENCY_HALF_LIFE_DAYS = 30.0


def _claim_weight(claim: Claim, weight: str, now: datetime) -> float:
"""Per-claim contribution to an entity score under the chosen weighting."""
if weight == "recency":
ts = claim.last_confirmed_at or claim.updated_at
age_days = max(0.0, (now - ts).total_seconds() / 86400.0)
return 2.0 ** (-age_days / _RECENCY_HALF_LIFE_DAYS)
if weight == "citation":
return float(len(set(claim.evidence))) * float(claim.confidence)
return 1.0 # count


def rank_experts(
store: KBStore,
topic: str,
*,
limit: int = 10,
min_claims: int = 1,
weight: str = "count",
) -> list[dict[str, Any]]:
"""Return entities ranked by evidence density on ``topic``.

``weight`` is one of ``count`` | ``recency`` | ``citation``; an unknown
value falls back to ``count`` (never raises), matching the defensive-config
style used elsewhere. Ordered by descending score with a stable tie-break
on ``entity_id``.
"""
if weight not in _VALID_WEIGHTS:
weight = "count"

entities = store.list_entities()
by_id = {ent.id: ent for ent in entities}
topic_entity_ids = set(_substring_entity_ids(entities, topic))

# Candidate claims: FTS hits on the topic, plus every claim that references
# an entity whose name/alias matches the topic.
fetch = max(limit * 5, 50)
fts_claim_ids = {
cid
for kind, cid, _snip, _score in index_db.search(store.kb_dir, topic, limit=fetch)
if kind == "claim"
}

now = utcnow()
counts: dict[str, int] = {}
citations: dict[str, set[str]] = {}
scores: dict[str, float] = {}
top_claims: dict[str, list[tuple[float, str]]] = {}

for claim in store.list_claims():
if claim.status in _EXCLUDED_STATUSES:
continue
matched = claim.id in fts_claim_ids or bool(
set(claim.entities) & topic_entity_ids
)
if not matched:
continue
contrib = _claim_weight(claim, weight, now)
for eid in claim.entities:
if eid not in by_id:
continue # dangling reference - skip (graph gate should prevent)
counts[eid] = counts.get(eid, 0) + 1
citations.setdefault(eid, set()).update(claim.evidence)
scores[eid] = scores.get(eid, 0.0) + contrib
top_claims.setdefault(eid, []).append((contrib, claim.id))

rows: list[dict[str, Any]] = []
for eid, count in counts.items():
if count < min_claims:
continue
ent = by_id[eid]
ranked = sorted(top_claims[eid], key=lambda item: (-item[0], item[1]))
rows.append(
{
"entity_id": eid,
"name": ent.name,
"type": str(ent.type),
"claim_count": count,
"citation_count": len(citations.get(eid, set())),
"score": round(scores[eid], 6),
"top_claim_ids": [cid for _w, cid in ranked[:3]],
}
)

rows.sort(key=lambda row: (-row["score"], -row["claim_count"], row["entity_id"]))
return rows[:limit]
15 changes: 15 additions & 0 deletions src/vouch/jsonl_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ def _load_cfg(store: KBStore) -> dict:
return loaded if isinstance(loaded, dict) else {}


def _h_experts(p: dict) -> dict:
from .experts import rank_experts

return {
"experts": rank_experts(
_store(),
p["topic"],
limit=int(p.get("limit", 10)),
min_claims=int(p.get("min_claims", 1)),
weight=p.get("weight", "count"),
)
}


def _h_neighbors(p: dict) -> dict:
from .graph import find_neighbors

Expand Down Expand Up @@ -679,6 +693,7 @@ def _h_propose_theme(p: dict) -> dict:
"kb.stats": _h_stats,
"kb.search": _h_search,
"kb.neighbors": _h_neighbors,
"kb.experts": _h_experts,
"kb.context": _h_context,
"kb.synthesize": _h_synthesize,
"kb.read_page": _h_read_page,
Expand Down
17 changes: 17 additions & 0 deletions src/vouch/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ def _load_cfg(store: KBStore) -> dict[str, Any]:
return loaded if isinstance(loaded, dict) else {}


@mcp.tool()
def kb_experts(
topic: str,
limit: int = 10,
min_claims: int = 1,
weight: str = "count",
) -> dict[str, Any]:
"""Rank entities by evidence density on a topic (read-only)."""
from .experts import rank_experts

return {
"experts": rank_experts(
_store(), topic, limit=limit, min_claims=min_claims, weight=weight
)
}


@mcp.tool()
def kb_neighbors(
node_id: str,
Expand Down
151 changes: 151 additions & 0 deletions tests/test_experts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""kb.experts - rank entities by evidence density on a topic (issue #315).

Read-only: aggregates approved, live claims and returns a ranking. These tests
seed entities + claims directly and match on the topic via the entity
name/alias substring pass, so they exercise the ranking without depending on
the FTS index being populated.
"""

from __future__ import annotations

from pathlib import Path

import pytest

from vouch.experts import rank_experts
from vouch.jsonl_server import handle_request
from vouch.models import Claim, ClaimStatus, Entity, EntityType
from vouch.storage import KBStore


@pytest.fixture
def store(tmp_path: Path) -> KBStore:
return KBStore.init(tmp_path)


def _seed(store: KBStore) -> None:
src = store.put_source(b"evidence-bytes")
src2 = store.put_source(b"other-evidence")
store.put_entity(Entity(id="jwt", name="JWT", type=EntityType.CONCEPT))
store.put_entity(Entity(id="alice", name="alice", type=EntityType.PERSON))
store.put_entity(Entity(id="bob", name="bob", type=EntityType.PERSON))
# alice: 2 JWT claims (one citing two distinct sources); bob: 1 JWT claim.
store.put_claim(
Claim(id="c1", text="jwt auth by alice", evidence=[src.id], entities=["jwt", "alice"])
)
store.put_claim(
Claim(
id="c2",
text="jwt rotation by alice",
evidence=[src.id, src2.id],
entities=["jwt", "alice"],
)
)
store.put_claim(
Claim(id="c3", text="jwt review by bob", evidence=[src.id], entities=["jwt", "bob"])
)


def test_ranks_by_claim_count(store: KBStore) -> None:
_seed(store)
rows = rank_experts(store, "JWT", weight="count")
names = [r["name"] for r in rows]
assert names[0] == "JWT" # on all 3 claims
assert names.index("alice") < names.index("bob") # 2 claims vs 1
alice = next(r for r in rows if r["name"] == "alice")
assert alice["claim_count"] == 2


def test_min_claims_and_limit(store: KBStore) -> None:
_seed(store)
names = {r["name"] for r in rank_experts(store, "JWT", min_claims=2)}
assert "bob" not in names # bob has only 1 claim
assert rank_experts(store, "JWT", limit=1)[0]["name"] == "JWT"


def test_citation_weight_rewards_source_breadth(store: KBStore) -> None:
_seed(store)
rows = rank_experts(store, "JWT", weight="citation")
alice = next(r for r in rows if r["name"] == "alice")
assert alice["citation_count"] == 2 # c2 cites two distinct sources


def test_excludes_superseded_archived_redacted(store: KBStore) -> None:
src = store.put_source(b"x")
store.put_entity(Entity(id="e", name="ghost", type=EntityType.CONCEPT))
store.put_claim(
Claim(
id="live",
text="ghost live",
evidence=[src.id],
entities=["e"],
status=ClaimStatus.STABLE,
)
)
for i, dead in enumerate(
(ClaimStatus.SUPERSEDED, ClaimStatus.ARCHIVED, ClaimStatus.REDACTED)
):
store.put_claim(
Claim(
id=f"dead{i}",
text="ghost dead",
evidence=[src.id],
entities=["e"],
status=dead,
)
)
row = next(r for r in rank_experts(store, "ghost") if r["name"] == "ghost")
assert row["claim_count"] == 1 # only the live claim scored


def test_unknown_weight_falls_back_to_count(store: KBStore) -> None:
_seed(store)
fallback = [r["entity_id"] for r in rank_experts(store, "JWT", weight="nonsense")]
baseline = [r["entity_id"] for r in rank_experts(store, "JWT", weight="count")]
assert fallback == baseline


def test_empty_kb_and_no_match(store: KBStore) -> None:
assert rank_experts(store, "anything") == []
_seed(store)
assert rank_experts(store, "no-such-topic-xyz") == []


def test_deterministic_tie_break_on_entity_id(store: KBStore) -> None:
src = store.put_source(b"y")
store.put_entity(Entity(id="t", name="topic-x", type=EntityType.CONCEPT))
store.put_entity(Entity(id="a2", name="a2", type=EntityType.PERSON))
store.put_entity(Entity(id="a1", name="a1", type=EntityType.PERSON))
store.put_claim(
Claim(id="k1", text="topic-x one", evidence=[src.id], entities=["t", "a1"])
)
store.put_claim(
Claim(id="k2", text="topic-x two", evidence=[src.id], entities=["t", "a2"])
)
ranked = rank_experts(store, "topic-x")
tied = [r["entity_id"] for r in ranked if r["entity_id"] in {"a1", "a2"}]
assert tied == ["a1", "a2"] # equal score -> ascending entity_id
Comment thread
coderabbitai[bot] marked this conversation as resolved.


def test_jsonl_experts_envelope_success(store: KBStore, monkeypatch) -> None:
# kb.experts over the JSONL contract: a well-formed request returns the
# {id, ok, result} envelope with the ranking under result["experts"].
_seed(store)
monkeypatch.chdir(store.root)
resp = handle_request(
{"id": "e1", "method": "kb.experts", "params": {"topic": "JWT"}}
)
assert resp["id"] == "e1"
assert resp["ok"] is True
names = [r["name"] for r in resp["result"]["experts"]]
assert "alice" in names


def test_jsonl_experts_envelope_missing_topic_errors(store: KBStore, monkeypatch) -> None:
# A request missing the required `topic` param yields the failure envelope
# {id, ok: false, error} rather than raising out of the server.
monkeypatch.chdir(store.root)
resp = handle_request({"id": "e2", "method": "kb.experts", "params": {}})
assert resp["id"] == "e2"
assert resp["ok"] is False
assert resp["error"]["code"] == "missing_param"