From ea1eb87ef884cde7013b25c192833e86a0aa8df6 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 10:04:50 +0000 Subject: [PATCH 1/4] feat(context): memory consolidation engine (#498, #679, #680, #681, #682, #683) Distill episodic memory into durable, deduplicated, provenance-stamped facts. - context/consolidation.py: deterministic cluster_episodes (#679), promote_clusters with provenance + max-sensitivity inheritance (#680), decay_episodes/decay_facts report-only over append-only stores (#681), and the consolidate() orchestrator (idempotent apply via content-addressed fact IDs). Private helpers in _consolidation_helpers.py keep it <=300 lines. - context/_consolidation_merge.py: optional fail-closed call_fn canonicalizer that rejects ungrounded completions, deterministic fallback (#682). - context/consolidation_types.py: ConsolidationPolicy / EpisodeCluster / PromotedFact / ConsolidationReport with to_dict/from_dict. - eval/consolidation.py: evaluate_consolidation -> ConsolidationEvalReport (precision/coverage + dedup ratio), offline + deterministic (#683). - CLI: `contextweaver consolidate` subcommand over JSON-serialised stores. - Re-exports via contextweaver.context and contextweaver.eval; regenerated api/public_api.txt; AGENTS.md module map + Key Types; CHANGELOG. Pure stdlib, no new dependency. Standalone functions mirror handoff (no new ContextManager method surface). Tests: test_consolidation.py, test_eval_consolidation.py, plus CLI tests in test_cli.py. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01JiHvVpWeqApRTyQReS4QJX --- AGENTS.md | 6 + CHANGELOG.md | 22 ++ api/public_api.txt | 25 ++ src/contextweaver/__main__.py | 94 +++++ src/contextweaver/context/__init__.py | 24 ++ .../context/_consolidation_helpers.py | 111 ++++++ .../context/_consolidation_merge.py | 85 ++++ src/contextweaver/context/consolidation.py | 285 ++++++++++++++ .../context/consolidation_types.py | 252 ++++++++++++ src/contextweaver/eval/__init__.py | 3 + src/contextweaver/eval/consolidation.py | 139 +++++++ tests/test_cli.py | 63 +++ tests/test_consolidation.py | 363 ++++++++++++++++++ tests/test_eval_consolidation.py | 68 ++++ 14 files changed, 1540 insertions(+) create mode 100644 src/contextweaver/context/_consolidation_helpers.py create mode 100644 src/contextweaver/context/_consolidation_merge.py create mode 100644 src/contextweaver/context/consolidation.py create mode 100644 src/contextweaver/context/consolidation_types.py create mode 100644 src/contextweaver/eval/consolidation.py create mode 100644 tests/test_consolidation.py create mode 100644 tests/test_eval_consolidation.py diff --git a/AGENTS.md b/AGENTS.md index 93e44756..efefa753 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -62,6 +62,10 @@ It prepares context and routes tools but never calls models or executes tools. | `context/memory_source.py` | `memory_entries_to_context_items` / `select_memory_for_phase` helpers that materialise memory entries into budgeted `memory_fact` candidates (issue #293). | | `context/handoff_types.py` | `HandoffEntry` + `SessionHandoffPack` dataclasses and canonical handoff category constants (issue #294). | | `context/handoff.py` | `build_session_handoff_pack` / `render_handoff_pack` — deterministic, budget-aware, sensitivity- and firewall-respecting session continuity snapshot (issue #294). | +| `context/consolidation_types.py` | `ConsolidationPolicy` / `EpisodeCluster` / `PromotedFact` / `ConsolidationReport` (+ `CONSOLIDATION_REPORT_VERSION`) — pure-data config and result types for the memory consolidation engine (issue #498). | +| `context/consolidation.py` | Memory consolidation engine (issue #498): `cluster_episodes` (deterministic episodic clustering/dedupe, #679), `promote_clusters` (fact promotion with provenance + max-sensitivity inheritance, #680; optional fail-closed `call_fn` merge, #682), `decay_episodes` / `decay_facts` (report-only decay over append-only stores, #681), and the `consolidate(...)` orchestrator → `ConsolidationReport`. Deterministic; `apply=True` upserts content-addressed facts (idempotent). Standalone functions (not a `ContextManager` method) mirroring `handoff.py`. | +| `context/_consolidation_helpers.py` | Private deterministic helpers for `consolidation.py` (clustering canonical text, max-sensitivity, session counting, ISO-timestamp parsing, content-addressed fact IDs, decay predicate) — keeps `consolidation.py` ≤300 lines. Not public API. | +| `context/_consolidation_merge.py` | Private optional model-assisted canonicalizer for consolidation (issue #682): `refine_canonical_text` runs a user-supplied `call_fn` under fail-closed guardrails (no LLM SDK dep; rejects blank/ungrounded completions that introduce tokens absent from the source cluster, falling back to the deterministic text). Not public API. | | `context/explanation.py` | `ContextBuildExplanation` + `CandidateExplanation` opt-in debug surface returned by `ContextManager.build(..., explain=True)` (issue #291); carries `resolved_weights` (the per-phase scoring weights applied, issue #487). Sister to `routing/explanation.py` on the routing side. | | `context/build_policy.py` | Pure build-pipeline policy helpers (not public API): `override_phase_budget` / `adjust_budget_for_header` (budget math), `enforce_overflow_policy` (`ContextPolicy.overflow_action`, issue #510), and `render_pack_prompt` (caller-owned `renderer` hook, issue #410). Extracted from `build.py` to keep it within its size ceiling. | | `context/classify.py` | Opt-in deterministic ingestion-time sensitivity classification (issue #542): `HeuristicSensitivityClassifier` (implements the `SensitivityClassifier` protocol) + `detect_sensitivity()`. Runs at the start of the pipeline's sensitivity stage and over fact/episode header content; may only **raise** a label, never lower it. Reuses `secrets.contains_secret` plus PII markers. | @@ -134,6 +138,7 @@ It prepares context and routes tools but never calls models or executes tools. | `extras/memory/_zep_common.py` | Internal helpers backing `zep.py` (keeps it ≤300 lines): shared `cw_*` constants, the `ZepBackendError` exception, the JSON/scan helpers (`_episode_records` / `_episode_uuid` / `_episode_payload`), the defensive payload-coercion helpers (`_coerce_str_tags` / `_coerce_metadata`), and the `_ZepStoreBase` scope/scan/write base. Carries the same `[zep]`-extra import guard (issue #195). | | `extras/memory/langmem.py` | `LangMemEpisodicStore` + `LangMemFactStore` — wrap any LangGraph `BaseStore` scoped by a `namespace` tuple; canonical ID is the store key, value is the dataclass `to_dict()` payload (direct, lossless KV). `search` delegates to `BaseStore.search`. Gated behind the `[langmem]` extra (issue #195). | | `eval/` | Evaluation harness (issue #12): `EvalCase` / `EvalDataset` (gold datasets), `evaluate_routing` → `RoutingEvalReport` (top-k recall, MRR, confidence gap, beam steps), `evaluate_context` → `ContextEvalReport` (budget utilisation + token savings vs naive concat). Pure-stdlib, deterministic; backs the `eval` CLI subcommand. | +| `eval/consolidation.py` | Consolidation quality evaluation harness (issue #683): `evaluate_consolidation` → `ConsolidationEvalReport` (precision / coverage against an optional gold set + dedup ratio). Pure-stdlib, offline, deterministic. | | `eval/metrics.py` | Canonical rank-based routing metrics — `recall_at_k` (classic fractional recall@k), `precision_at_k`, `reciprocal_rank` (issue #354). Single source of truth imported by both `eval/routing.py` and `benchmarks/benchmark.py` so the harness and the benchmark script can no longer define the same names with different semantics. | | `__main__.py` | CLI: 11 subcommands (`demo`, `build`, `route`, `print-tree`, `init`, `ingest`, `replay`, `stats`, `inspect`, `budget-check`, `eval`) plus the `mcp` and `catalog` Typer sub-apps. `inspect` renders payload-safe context/routing/artifact JSON or Markdown (issue #398); `catalog lint` surfaces `NormalizationReport` + reference findings with `--json` and CI exit codes (issue #538). | | `_mcp_cli.py` | Backs the `mcp` Typer sub-app. Hosts `mcp serve`, `mcp inspect`, and `mcp stats`; accepts native contextweaver, raw MCP `tools/list`, and `{tools:[...]}` catalog shapes. `mcp serve --diagnostics FILE` appends sanitized JSONL and `--quiet` suppresses lifecycle stderr; both are config-file keys. `mcp serve --state-dir DIR` (config key `state_dir`) persists gateway state — `events.sqlite3` + `artifacts/` — so artifact handles and event history survive a restart (issue #511); omit it for the in-memory default. The packaged serve path remains a static catalog + stub upstream. | @@ -183,6 +188,7 @@ For full pipeline descriptions and design rationale, see [docs/agent-context/arc | `Mode` | Determinism mode (`strict` / `seeded` / `adaptive` placeholder) on `ProfileConfig` | | `MaskRedactionHook` | Built-in redaction hook for sensitivity enforcement | | `HydrationResult` | Result of hydrating a tool call with context | +| `ConsolidationReport` | Deterministic result of a `consolidate()` run: episode clusters, promoted facts (with provenance + inherited sensitivity), and report-only decayed episode/fact IDs (issue #498) | | `ViewRegistry` | Maps content-type patterns to view generators for progressive disclosure | | `ProxyRuntime` | Shared core for MCP proxy (#13) and gateway (#28) modes — owns upstream catalog, per-session `ContextManager`, browse / execute / view dispatch; persisted text results are returned as envelope artifact refs for `tool_view`. Hardens the untrusted-input boundary (issues #464/#484/#485/#488): `on_invalid` (skip/raise) + `schema_limits` + `last_refresh_report` at ingest, cached per-`tool_id` validators, classified+redacted upstream errors, and opt-in `tolerant_args`. Opt-in dispatch-path controls (issues #529/#482/#512/#483): `retry_policy`, `rate_limiter`, `result_cache`, and `tool_execute(dry_run=True)` — all inert by default; catalog refresh rebuilds all derived state atomically (#507). | | `ExposureMode` | `TRANSPARENT` (#13) vs `GATEWAY` (#28) for `ProxyRuntime` | diff --git a/CHANGELOG.md b/CHANGELOG.md index c2fc3367..4fd3fa68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Memory consolidation engine (#498, #679, #680, #681, #682, #683).** + New `contextweaver.context.consolidate(...)` distills episodic memory into + durable, deduplicated, provenance-stamped facts. The deterministic core + clusters similar episodes (`cluster_episodes`, #679), promotes clusters that + meet `ConsolidationPolicy` thresholds (`min_occurrences` / `min_sessions`) + into `PromotedFact` records carrying full source provenance and the **maximum** + source sensitivity (`promote_clusters`, #680), and reports entries past the + decay horizon without deleting them — the stores are append-only + (`decay_episodes` / `decay_facts`, #681). An optional, fail-closed `call_fn` + may refine a fact's canonical text, rejecting any completion that introduces + ungrounded tokens (#682). `consolidate(..., apply=True)` upserts the promoted + facts with content-addressed IDs, so re-running over an unchanged store is a + no-op (idempotent). Results are returned as a `ConsolidationReport` + (serialisable via `to_dict`/`from_dict`). New public surface in + `contextweaver.context`: `consolidate`, `cluster_episodes`, `promote_clusters`, + `decay_episodes`, `decay_facts`, `ConsolidationPolicy`, `ConsolidationReport`, + `PromotedFact`, `EpisodeCluster`. A new `contextweaver consolidate` CLI + subcommand runs the pipeline over JSON-serialised stores. Quality is + measurable offline via `contextweaver.eval.evaluate_consolidation` → + `ConsolidationEvalReport` (precision / coverage + dedup ratio, #683). Pure + stdlib; no new dependency. + - **Routing-scale index cache + profiler (#543, #624, #685, #684, #686).** New `contextweaver.routing.RoutingIndexCache` + `CachedRetriever` persist and reuse the fitted first-stage retriever index — the dominant cost of the first diff --git a/api/public_api.txt b/api/public_api.txt index 241b2221..cd7eb891 100644 --- a/api/public_api.txt +++ b/api/public_api.txt @@ -600,9 +600,18 @@ def to_weaver_selectable_item(item: 'SelectableItem') -> '_ws_types.SelectableItem' ## contextweaver.context + CONSOLIDATION_REPORT_VERSION: str class CandidateExplanation(item_id: 'str', kind: 'str', sensitivity: 'str', score: 'float | None' = ..., included: 'bool' = ..., drop_reason: 'str' = ..., dependency_closure: 'bool' = ...) -> None def from_dict(cls, data: 'dict[str, Any]') -> 'CandidateExplanation' def to_dict(self) -> 'dict[str, Any]' + class ConsolidationPolicy(min_occurrences: 'int' = ..., min_sessions: 'int' = ..., similarity_threshold: 'float' = ..., decay_after_days: 'int | None' = ..., timestamp_key: 'str' = ..., session_key: 'str' = ...) -> None + def from_dict(cls, data: 'dict[str, Any]') -> 'ConsolidationPolicy' + def to_dict(self) -> 'dict[str, Any]' + def validate(self) -> 'None' + class ConsolidationReport(clusters: 'list[EpisodeCluster]' = ..., promoted: 'list[PromotedFact]' = ..., decayed_episode_ids: 'list[str]' = ..., decayed_fact_ids: 'list[str]' = ..., applied: 'bool' = ..., version: 'str' = ...) -> None + def from_dict(cls, data: 'dict[str, Any]') -> 'ConsolidationReport' + def summary(self) -> 'str' + def to_dict(self) -> 'dict[str, Any]' class ContextBuildExplanation(version: 'int' = ..., phase: 'str' = ..., query: 'str' = ..., total_candidates: 'int' = ..., included_count: 'int' = ..., dropped_count: 'int' = ..., dropped_reasons: 'dict[str, int]' = ..., dependency_closures: 'int' = ..., sensitivity_drops: 'int' = ..., dedup_removed: 'int' = ..., budget_tokens: 'int' = ..., resolved_weights: 'dict[str, float]' = ..., candidates: 'list[CandidateExplanation]' = ...) -> None def from_dict(cls, data: 'dict[str, Any]') -> 'ContextBuildExplanation' def to_dict(self) -> 'dict[str, Any]' @@ -610,6 +619,9 @@ def drilldown(self, handle: 'str', selector: 'dict[str, Any]', *, inject: 'bool' = ..., parent_id: 'str | None' = ...) -> 'str' def drilldown_sync(self, handle: 'str', selector: 'dict[str, Any]', *, inject: 'bool' = ..., parent_id: 'str | None' = ...) -> 'str' EXPLANATION_VERSION: int + class EpisodeCluster(cluster_id: 'str', episode_ids: 'list[str]' = ..., canonical_text: 'str' = ...) -> None + def from_dict(cls, data: 'dict[str, Any]') -> 'EpisodeCluster' + def to_dict(self) -> 'dict[str, Any]' HANDOFF_CATEGORIES: tuple HANDOFF_PACK_VERSION: str class HandoffEntry(id: 'str', text: 'str', category: 'str', source_ids: 'list[str]' = ..., confidence: 'float' = ..., token_estimate: 'int' = ...) -> None @@ -627,6 +639,9 @@ def is_expired(self, *, now: 'float | None' = ...) -> 'bool' def to_dict(self) -> 'dict[str, Any]' PHASE_SCOPE_PREFERENCES: dict + class PromotedFact(fact_id: 'str', key: 'str', text: 'str', source_episode_ids: 'list[str]' = ..., occurrences: 'int' = ..., sessions: 'int' = ..., first_seen: 'str | None' = ..., last_seen: 'str | None' = ..., sensitivity: 'Sensitivity' = ..., merged_by_llm: 'bool' = ...) -> None + def from_dict(cls, data: 'dict[str, Any]') -> 'PromotedFact' + def to_dict(self) -> 'dict[str, Any]' class SessionHandoffPack(decisions: 'list[HandoffEntry]' = ..., conventions: 'list[HandoffEntry]' = ..., unresolved_tasks: 'list[HandoffEntry]' = ..., pitfalls: 'list[HandoffEntry]' = ..., next_inspections: 'list[HandoffEntry]' = ..., artifact_refs: 'list[ArtifactRef]' = ..., sensitivity_dropped: 'int' = ..., token_estimate: 'int' = ..., version: 'str' = ...) -> None def all_entries(self) -> 'list[HandoffEntry]' def from_dict(cls, data: 'dict[str, Any]') -> 'SessionHandoffPack' @@ -639,12 +654,17 @@ def apply_sensitivity_filter(items: 'list[ContextItem]', policy: 'ContextPolicy', estimator: 'TokenEstimator | None' = ...) -> 'tuple[list[ContextItem], int]' def build_schema_header(hydration: 'HydrationResult', schema: 'dict[str, Any] | None' = ..., examples: 'list[str] | None' = ..., constraints: 'dict[str, Any] | None' = ...) -> 'str' def build_session_handoff_pack(event_log: 'EventLog', artifact_store: 'ArtifactStore', policy: 'ContextPolicy', estimator: 'TokenEstimator', *, budget_tokens: 'int' = ...) -> 'SessionHandoffPack' + def cluster_episodes(episodes: 'list[Episode]', *, similarity_threshold: 'float' = ...) -> 'list[EpisodeCluster]' + def consolidate(episodic_store: 'EpisodicStore', fact_store: 'FactStore', policy: 'ConsolidationPolicy | None' = ..., *, as_of: 'datetime | None' = ..., call_fn: 'Callable[[str], str] | None' = ..., deterministic: 'bool' = ..., apply: 'bool' = ...) -> 'ConsolidationReport' + def decay_episodes(episodes: 'list[Episode]', policy: 'ConsolidationPolicy', *, as_of: 'datetime') -> 'list[str]' + def decay_facts(facts: 'list[Fact]', policy: 'ConsolidationPolicy', *, as_of: 'datetime') -> 'list[str]' def deduplicate_candidates(scored: 'list[tuple[float, ContextItem]]', similarity_threshold: 'float' = ...) -> 'tuple[list[tuple[float, ContextItem]], int]' def drilldown_tool_spec() -> 'SelectableItem' def generate_candidates(event_log: 'EventLog', phase: 'Phase', policy: 'ContextPolicy') -> 'list[ContextItem]' def generate_views(ref: 'ArtifactRef', data: 'bytes', registry: 'ViewRegistry | None' = ...) -> 'list[ViewSpec]' def memory_entries_to_context_items(entries: 'list[MemoryEntry]', *, estimator: 'TokenEstimator | None' = ..., now: 'float | None' = ...) -> 'list[ContextItem]' def passthrough_renderer(items: 'list[ContextItem]') -> 'str' + def promote_clusters(clusters: 'list[EpisodeCluster]', episodes_by_id: 'dict[str, Episode]', policy: 'ConsolidationPolicy', *, call_fn: 'Callable[[str], str] | None' = ..., deterministic: 'bool' = ...) -> 'list[PromotedFact]' def register_redaction_hook(name: 'str', hook: 'RedactionHook') -> 'None' def render_context(items: 'list[ContextItem]', separator: 'str' = ..., header: 'str' = ..., footer: 'str' = ...) -> 'str' def render_handoff_pack(pack: 'SessionHandoffPack') -> 'str' @@ -661,6 +681,10 @@ def gateway_catalog_path() -> 'Path' ## contextweaver.eval + class ConsolidationEvalReport(clusters_found: 'int' = ..., facts_promoted: 'int' = ..., episodes_decayed: 'int' = ..., facts_decayed: 'int' = ..., dedup_ratio: 'float' = ..., precision: 'float' = ..., coverage: 'float' = ..., gold_size: 'int' = ...) -> None + def from_dict(cls, data: 'dict[str, Any]') -> 'ConsolidationEvalReport' + def summary(self) -> 'str' + def to_dict(self) -> 'dict[str, Any]' class ContextEvalReport(phase: 'str' = ..., prompt_tokens: 'int' = ..., budget_tokens: 'int' = ..., budget_utilization_pct: 'float' = ..., naive_tokens: 'int' = ..., token_savings: 'int' = ..., token_savings_pct: 'float' = ..., total_candidates: 'int' = ..., items_included: 'int' = ..., items_dropped: 'int' = ..., dedup_removed: 'int' = ...) -> None def from_dict(cls, data: 'dict[str, Any]') -> 'ContextEvalReport' def summary(self) -> 'str' @@ -676,6 +700,7 @@ def from_dict(cls, data: 'dict[str, Any]') -> 'RoutingEvalReport' def summary(self) -> 'str' def to_dict(self) -> 'dict[str, Any]' + def evaluate_consolidation(report: 'ConsolidationReport', expected_texts: 'Iterable[str] | None' = ..., *, total_episodes: 'int | None' = ...) -> 'ConsolidationEvalReport' def evaluate_context(manager: 'ContextManager', phase: 'Phase' = ..., query: 'str' = ..., *, estimator: 'TokenEstimator | None' = ...) -> 'ContextEvalReport' def evaluate_routing(router: 'Router', dataset: 'EvalDataset', *, catalog_ids: 'set[str] | None' = ...) -> 'RoutingEvalReport' def precision_at_k(predicted: 'Sequence[str]', expected: 'Collection[str]', k: 'int') -> 'float' diff --git a/src/contextweaver/__main__.py b/src/contextweaver/__main__.py index 6da8092c..bad0d527 100644 --- a/src/contextweaver/__main__.py +++ b/src/contextweaver/__main__.py @@ -33,6 +33,7 @@ import json import sys +from datetime import datetime from enum import Enum from pathlib import Path from typing import Annotated, Any @@ -56,6 +57,8 @@ from contextweaver.adapters.mcp import mcp_tool_to_selectable from contextweaver.adapters.sidecar import SidecarApp, SidecarConfig from contextweaver.config import ContextBudget +from contextweaver.context.consolidation import consolidate +from contextweaver.context.consolidation_types import ConsolidationPolicy from contextweaver.context.manager import ContextManager from contextweaver.eval.dataset import EvalDataset from contextweaver.eval.routing import evaluate_routing @@ -74,6 +77,8 @@ from contextweaver.routing.normalizer import CatalogNormalizer, NormalizationReport from contextweaver.routing.router import Router from contextweaver.routing.tree import TreeBuilder +from contextweaver.store.episodic import InMemoryEpisodicStore +from contextweaver.store.facts import InMemoryFactStore from contextweaver.types import ContextItem, ItemKind, Phase, SelectableItem # --------------------------------------------------------------------------- @@ -687,6 +692,95 @@ def eval_cmd( print(report.summary()) +# --------------------------------------------------------------------------- +# consolidate (issue #498) +# --------------------------------------------------------------------------- + + +@app.command("consolidate") +def consolidate_cmd( + episodes: Annotated[ + Path, + typer.Option(..., "--episodes", help="Episodic-store JSON file ({'episodes': [...]})."), + ], + facts: Annotated[ + Path | None, + typer.Option("--facts", help="Optional fact-store JSON file ({'facts': [...]})."), + ] = None, + apply: Annotated[ + bool, typer.Option("--apply", help="Write promoted facts into the fact store.") + ] = False, + facts_out: Annotated[ + Path | None, + typer.Option("--facts-out", help="Write the updated fact store here (with --apply)."), + ] = None, + min_occurrences: Annotated[ + int, typer.Option("--min-occurrences", help="Min clustered episodes to promote.") + ] = 3, + min_sessions: Annotated[ + int, typer.Option("--min-sessions", help="Min distinct sessions to promote.") + ] = 2, + similarity: Annotated[ + float, typer.Option("--similarity", help="Jaccard similarity threshold for clustering.") + ] = 0.5, + decay_after_days: Annotated[ + int, + typer.Option("--decay-after-days", help="Decay horizon in days; negative disables decay."), + ] = 90, + as_of: Annotated[ + str | None, + typer.Option("--as-of", help="ISO-8601 reference time for decay reporting."), + ] = None, + json_output: Annotated[ + bool, typer.Option("--json", help="Emit machine-readable JSON.") + ] = False, +) -> None: + """Consolidate episodic memory into durable facts (issue #498). + + Loads an episodic store (and an optional fact store) from JSON, runs the + deterministic consolidation pipeline, and prints the report. With + ``--apply`` the promoted facts are upserted into the fact store; pass + ``--facts-out`` to persist the updated store. + """ + try: + ep_store = InMemoryEpisodicStore.from_dict(json.loads(episodes.read_text(encoding="utf-8"))) + fact_store = ( + InMemoryFactStore.from_dict(json.loads(facts.read_text(encoding="utf-8"))) + if facts is not None + else InMemoryFactStore() + ) + except (OSError, json.JSONDecodeError) as exc: + raise typer.BadParameter(f"could not read store JSON: {exc}") from exc + + policy = ConsolidationPolicy( + min_occurrences=min_occurrences, + min_sessions=min_sessions, + similarity_threshold=similarity, + decay_after_days=None if decay_after_days < 0 else decay_after_days, + ) + parsed_as_of = None + if as_of is not None: + try: + parsed_as_of = datetime.fromisoformat(as_of) + except ValueError as exc: + raise typer.BadParameter(f"invalid --as-of timestamp: {exc}") from exc + + try: + report = consolidate(ep_store, fact_store, policy, as_of=parsed_as_of, apply=apply) + except ContextWeaverError as exc: + raise typer.BadParameter(str(exc)) from exc + + if apply and facts_out is not None: + facts_out.write_text( + json.dumps(fact_store.to_dict(), indent=2, sort_keys=True), encoding="utf-8" + ) + + if json_output: + print(json.dumps(report.to_dict(), indent=2, sort_keys=True)) + else: + print(report.summary()) + + # --------------------------------------------------------------------------- # verify (issue #657) # --------------------------------------------------------------------------- diff --git a/src/contextweaver/context/__init__.py b/src/contextweaver/context/__init__.py index 1071f233..0bee17d6 100644 --- a/src/contextweaver/context/__init__.py +++ b/src/contextweaver/context/__init__.py @@ -9,6 +9,20 @@ from contextweaver.context.call_prompt import build_schema_header from contextweaver.context.candidates import generate_candidates, resolve_dependency_closure +from contextweaver.context.consolidation import ( + cluster_episodes, + consolidate, + decay_episodes, + decay_facts, + promote_clusters, +) +from contextweaver.context.consolidation_types import ( + CONSOLIDATION_REPORT_VERSION, + ConsolidationPolicy, + ConsolidationReport, + EpisodeCluster, + PromotedFact, +) from contextweaver.context.dedup import deduplicate_candidates from contextweaver.context.explanation import ( EXPLANATION_VERSION, @@ -44,10 +58,14 @@ from contextweaver.context.views import ViewRegistry, drilldown_tool_spec, generate_views __all__ = [ + "CONSOLIDATION_REPORT_VERSION", "CandidateExplanation", + "ConsolidationPolicy", + "ConsolidationReport", "ContextBuildExplanation", "ContextManager", "EXPLANATION_VERSION", + "EpisodeCluster", "HANDOFF_CATEGORIES", "HANDOFF_PACK_VERSION", "HandoffEntry", @@ -55,6 +73,7 @@ "MaskRedactionHook", "MemoryEntry", "PHASE_SCOPE_PREFERENCES", + "PromotedFact", "SessionHandoffPack", "ViewRegistry", "apply_firewall", @@ -62,12 +81,17 @@ "apply_sensitivity_filter", "build_schema_header", "build_session_handoff_pack", + "cluster_episodes", + "consolidate", + "decay_episodes", + "decay_facts", "deduplicate_candidates", "drilldown_tool_spec", "generate_candidates", "generate_views", "memory_entries_to_context_items", "passthrough_renderer", + "promote_clusters", "register_redaction_hook", "unregister_redaction_hook", "render_context", diff --git a/src/contextweaver/context/_consolidation_helpers.py b/src/contextweaver/context/_consolidation_helpers.py new file mode 100644 index 00000000..90ef82db --- /dev/null +++ b/src/contextweaver/context/_consolidation_helpers.py @@ -0,0 +1,111 @@ +"""Private deterministic helpers for the consolidation engine (issue #498). + +Pure functions extracted from :mod:`contextweaver.context.consolidation` to keep +that module within the project's ≤300-line ceiling. Not public API — the +public surface is re-exported from ``consolidation``. +""" + +from __future__ import annotations + +import hashlib +from datetime import datetime + +from contextweaver._utils import tokenize +from contextweaver.store.episodic import Episode +from contextweaver.types import Sensitivity + +#: Fact key under which consolidated facts are stored. +CONSOLIDATED_FACT_KEY = "consolidated" + +#: Severity ranking used to inherit the maximum sensitivity of source episodes. +_SENSITIVITY_RANK: dict[Sensitivity, int] = { + Sensitivity.public: 0, + Sensitivity.internal: 1, + Sensitivity.confidential: 2, + Sensitivity.restricted: 3, +} + + +def canonical_member(members: list[Episode]) -> str: + """Return the deterministic representative summary for *members*. + + Picks the summary with the most tokens (most informative), breaking ties by + the smallest ``episode_id`` so the choice is reproducible. + """ + best = min(members, key=lambda ep: (-len(tokenize(ep.summary)), ep.episode_id)) + return best.summary + + +def max_sensitivity(members: list[Episode]) -> Sensitivity: + """Return the highest sensitivity among *members* (defaults to public).""" + return max( + (ep.sensitivity for ep in members), + key=lambda s: _SENSITIVITY_RANK[s], + default=Sensitivity.public, + ) + + +def count_sessions(members: list[Episode], session_key: str) -> int: + """Count distinct sessions in *members*. + + Episodes lacking a session marker collectively count as one shared session. + """ + sessions: set[str] = set() + for ep in members: + value = ep.metadata.get(session_key) + sessions.add(str(value) if value is not None else "\x00unscoped") + return len(sessions) + + +def episode_iso(ep: Episode, key: str) -> str | None: + """Return *ep*'s ISO-8601 timestamp metadata value, or ``None``.""" + value = ep.metadata.get(key) + return value if isinstance(value, str) and value else None + + +def parse_iso(value: str | None) -> datetime | None: + """Parse an ISO-8601 *value* to a ``datetime``, or ``None`` on failure.""" + if not value: + return None + try: + return datetime.fromisoformat(value) + except ValueError: + return None + + +def seen_bounds(members: list[Episode], key: str) -> tuple[str | None, str | None]: + """Return the (first_seen, last_seen) ISO timestamps across *members*.""" + stamped = [(iso, parse_iso(iso)) for ep in members if (iso := episode_iso(ep, key))] + parsed = [(iso, dt) for iso, dt in stamped if dt is not None] + if not parsed: + return None, None + first = min(parsed, key=lambda pair: pair[1])[0] + last = max(parsed, key=lambda pair: pair[1])[0] + return first, last + + +def canonical_fact_id(source_ids: list[str]) -> str: + """Return a deterministic, content-addressed fact ID for *source_ids*.""" + digest = hashlib.sha1("\n".join(sorted(source_ids)).encode("utf-8")).hexdigest()[:12] + return f"fact:{CONSOLIDATED_FACT_KEY}:{digest}" + + +def is_decayed(iso: str | None, as_of: datetime, decay_after_days: int) -> bool: + """Return ``True`` when *iso* is older than *decay_after_days* before *as_of*.""" + stamp = parse_iso(iso) + if stamp is None: + return False + return (as_of - stamp).days > decay_after_days + + +__all__ = [ + "CONSOLIDATED_FACT_KEY", + "canonical_fact_id", + "canonical_member", + "count_sessions", + "episode_iso", + "is_decayed", + "max_sensitivity", + "parse_iso", + "seen_bounds", +] diff --git a/src/contextweaver/context/_consolidation_merge.py b/src/contextweaver/context/_consolidation_merge.py new file mode 100644 index 00000000..015ba695 --- /dev/null +++ b/src/contextweaver/context/_consolidation_merge.py @@ -0,0 +1,85 @@ +"""Optional model-assisted canonicalizer for consolidation (issue #682). + +The deterministic consolidation core (:mod:`contextweaver.context.consolidation`) +picks a representative ``canonical_text`` for each cluster without any model. +This private helper lets callers refine that text with a user-supplied +``call_fn`` (the established zero-dependency plugin pattern from +:mod:`contextweaver.extras.llm_summarizer`) under strict guardrails: + +* **Opt-in.** Only runs when a ``call_fn`` is supplied and the run is not in + ``deterministic`` mode. +* **Fail-closed.** Any exception, a non-string / blank completion, or a result + that introduces tokens absent from the source cluster falls back to the + deterministic ``canonical_text``. A model can only ever *rephrase* grounded + content — it can never inject a new entity into a durable fact. + +The "no new tokens" check reuses :func:`contextweaver._utils.tokenize` so the +grounding test matches the rest of the library's text normalisation. +""" + +from __future__ import annotations + +import logging +from collections.abc import Callable + +from contextweaver._utils import tokenize + +logger = logging.getLogger("contextweaver.context") + +#: Instruction prepended to the cluster's source summaries for the merge call. +DEFAULT_MERGE_PROMPT = ( + "Merge the following related memory notes into a single concise fact " + "sentence. Use only information present in the notes; do not add details. " + "Return only the sentence.\n\n" +) + +#: Upper bound on characters sent to the model. +_MAX_INPUT = 4000 + + +def refine_canonical_text( + canonical_text: str, + source_texts: list[str], + call_fn: Callable[[str], str], + *, + system_prompt: str = DEFAULT_MERGE_PROMPT, +) -> tuple[str, bool]: + """Return a model-refined canonical text, or fall back deterministically. + + Args: + canonical_text: The deterministic representative text (the fallback). + source_texts: The cluster's source episode summaries; used both as the + model prompt body and as the grounding vocabulary. + call_fn: User-supplied ``prompt -> completion`` callable. No LLM SDK is + imported; bring your own model. + system_prompt: Instruction prepended to the joined source texts. + + Returns: + A ``(text, merged_by_llm)`` tuple. ``merged_by_llm`` is ``True`` only + when the model produced a grounded, non-blank result that was accepted. + """ + allowed = set() + for text in source_texts: + allowed |= tokenize(text) + body = "\n".join(source_texts)[:_MAX_INPUT] + try: + result = call_fn(f"{system_prompt}{body}") + except Exception as exc: # noqa: BLE001 - any model failure must degrade safely + logger.warning("consolidation merge: call_fn raised (%s); using deterministic text", exc) + return canonical_text, False + + if not isinstance(result, str) or not result.strip(): + logger.warning("consolidation merge: blank/non-string completion; using deterministic text") + return canonical_text, False + merged = result.strip() + new_tokens = tokenize(merged) - allowed + if new_tokens: + logger.warning( + "consolidation merge: ungrounded tokens %s; using deterministic text", + sorted(new_tokens), + ) + return canonical_text, False + return merged, True + + +__all__ = ["DEFAULT_MERGE_PROMPT", "refine_canonical_text"] diff --git a/src/contextweaver/context/consolidation.py b/src/contextweaver/context/consolidation.py new file mode 100644 index 00000000..40b30bb2 --- /dev/null +++ b/src/contextweaver/context/consolidation.py @@ -0,0 +1,285 @@ +"""Memory consolidation engine (issue #498). + +Distills episodic memory into durable, deduplicated, provenance-stamped facts: + +1. :func:`cluster_episodes` — deterministic similarity clustering of episodes + (issue #679). +2. :func:`promote_clusters` — promote clusters that meet the policy thresholds + into :class:`~contextweaver.context.consolidation_types.PromotedFact` records + with full source provenance and inherited (max) sensitivity (issue #680). An + optional, fail-closed ``call_fn`` may refine the canonical text (issue #682). +3. :func:`decay_episodes` / :func:`decay_facts` — report entries past the decay + horizon without ever deleting them (the stores are append-only; issue #681). +4. :func:`consolidate` — the orchestrator returning a + :class:`~contextweaver.context.consolidation_types.ConsolidationReport`. + +Everything is deterministic given identical store contents, policy, and +``as_of``: clustering iterates episodes in sorted-ID order, ties break by ID, +and promoted fact IDs are content-addressed so re-running ``apply=True`` over an +unchanged store is a no-op (idempotent upsert). Pure helper functions live in +:mod:`contextweaver.context._consolidation_helpers` to keep this module within +the size ceiling. +""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from datetime import datetime + +from contextweaver._utils import jaccard, tokenize +from contextweaver.context._consolidation_helpers import ( + CONSOLIDATED_FACT_KEY, + canonical_fact_id, + canonical_member, + count_sessions, + episode_iso, + is_decayed, + max_sensitivity, + seen_bounds, +) +from contextweaver.context._consolidation_merge import refine_canonical_text +from contextweaver.context.consolidation_types import ( + ConsolidationPolicy, + ConsolidationReport, + EpisodeCluster, + PromotedFact, +) +from contextweaver.protocols import EpisodicStore, FactStore +from contextweaver.store.episodic import Episode +from contextweaver.store.facts import Fact + +logger = logging.getLogger("contextweaver.context") + + +def cluster_episodes( + episodes: list[Episode], + *, + similarity_threshold: float = 0.5, +) -> list[EpisodeCluster]: + """Group *episodes* into deterministic similarity clusters (issue #679). + + Episodes are processed in sorted-ID order. Each episode joins the first + existing cluster whose seed summary has Jaccard similarity at or above + *similarity_threshold*; otherwise it seeds a new cluster. The result is + stable and idempotent for identical input. + + Args: + episodes: Episodes to cluster. + similarity_threshold: Jaccard similarity in ``[0, 1]`` for joining. + + Returns: + Clusters in creation order, each with sorted ``episode_ids`` and a + deterministic ``canonical_text``. + """ + ordered = sorted(episodes, key=lambda ep: ep.episode_id) + seeds: list[set[str]] = [] + buckets: list[list[Episode]] = [] + for ep in ordered: + tokens = tokenize(ep.summary) + placed = False + for i, seed in enumerate(seeds): + if jaccard(tokens, seed) >= similarity_threshold: + buckets[i].append(ep) + placed = True + break + if not placed: + seeds.append(tokens) + buckets.append([ep]) + + clusters = [ + EpisodeCluster( + cluster_id=f"cluster_{idx:03d}", + episode_ids=sorted(ep.episode_id for ep in members), + canonical_text=canonical_member(members), + ) + for idx, members in enumerate(buckets) + ] + logger.debug("consolidation.cluster: episodes=%d clusters=%d", len(ordered), len(clusters)) + return clusters + + +def promote_clusters( + clusters: list[EpisodeCluster], + episodes_by_id: dict[str, Episode], + policy: ConsolidationPolicy, + *, + call_fn: Callable[[str], str] | None = None, + deterministic: bool = False, +) -> list[PromotedFact]: + """Promote qualifying *clusters* into :class:`PromotedFact` records (#680). + + A cluster is promoted when it has at least ``policy.min_occurrences`` + episodes spanning at least ``policy.min_sessions`` distinct sessions. The + promoted fact inherits the maximum source sensitivity and carries full + provenance. When *call_fn* is supplied and *deterministic* is ``False``, the + canonical text is refined under the fail-closed guardrails in + :func:`~contextweaver.context._consolidation_merge.refine_canonical_text`. + + Args: + clusters: Clusters from :func:`cluster_episodes`. + episodes_by_id: Lookup from episode ID to :class:`Episode`. + policy: Promotion thresholds. + call_fn: Optional ``prompt -> completion`` callable for merge refinement. + deterministic: When ``True``, ``call_fn`` is ignored (fail-closed). + + Returns: + Promoted facts, ordered by ``fact_id`` for determinism. + """ + promoted: list[PromotedFact] = [] + for cluster in clusters: + members = [episodes_by_id[e] for e in cluster.episode_ids if e in episodes_by_id] + if len(members) < policy.min_occurrences: + continue + sessions = count_sessions(members, policy.session_key) + if sessions < policy.min_sessions: + continue + + text = cluster.canonical_text + merged_by_llm = False + if call_fn is not None and not deterministic: + text, merged_by_llm = refine_canonical_text( + cluster.canonical_text, + [ep.summary for ep in members], + call_fn, + ) + + first_seen, last_seen = seen_bounds(members, policy.timestamp_key) + promoted.append( + PromotedFact( + fact_id=canonical_fact_id(cluster.episode_ids), + key=CONSOLIDATED_FACT_KEY, + text=text, + source_episode_ids=list(cluster.episode_ids), + occurrences=len(members), + sessions=sessions, + first_seen=first_seen, + last_seen=last_seen, + sensitivity=max_sensitivity(members), + merged_by_llm=merged_by_llm, + ) + ) + promoted.sort(key=lambda pf: pf.fact_id) + return promoted + + +def decay_episodes( + episodes: list[Episode], + policy: ConsolidationPolicy, + *, + as_of: datetime, +) -> list[str]: + """Return IDs of *episodes* past the decay horizon (report-only; #681). + + Decay never deletes — the episodic store is append-only. Callers decide how + to act on the returned IDs (e.g. status tombstones in their own backend). + """ + if policy.decay_after_days is None: + return [] + return sorted( + ep.episode_id + for ep in episodes + if is_decayed(episode_iso(ep, policy.timestamp_key), as_of, policy.decay_after_days) + ) + + +def decay_facts( + facts: list[Fact], + policy: ConsolidationPolicy, + *, + as_of: datetime, +) -> list[str]: + """Return IDs of *facts* past the decay horizon (report-only; #681).""" + if policy.decay_after_days is None: + return [] + stale: list[str] = [] + for fact in facts: + value = fact.metadata.get(policy.timestamp_key) + iso = value if isinstance(value, str) else None + if is_decayed(iso, as_of, policy.decay_after_days): + stale.append(fact.fact_id) + return sorted(stale) + + +def consolidate( + episodic_store: EpisodicStore, + fact_store: FactStore, + policy: ConsolidationPolicy | None = None, + *, + as_of: datetime | None = None, + call_fn: Callable[[str], str] | None = None, + deterministic: bool = False, + apply: bool = False, +) -> ConsolidationReport: + """Run the consolidation pipeline over *episodic_store* (issue #498). + + Args: + episodic_store: Source of episodes to consolidate. + fact_store: Target fact store (written only when *apply* is ``True``). + policy: Thresholds; defaults to :class:`ConsolidationPolicy`. + as_of: Reference time for decay reporting. When ``None``, no decay is + reported. + call_fn: Optional ``prompt -> completion`` callable for merge refinement. + deterministic: When ``True``, ``call_fn`` is ignored (fail-closed). + apply: When ``True``, promoted facts are upserted into *fact_store* with + provenance metadata. Idempotent: re-running over an unchanged store + rewrites identical facts. + + Returns: + A :class:`ConsolidationReport`. + + Raises: + ConfigError: If *policy* fails validation. + """ + policy = policy if policy is not None else ConsolidationPolicy() + policy.validate() + + episodes = episodic_store.all() + episodes_by_id = {ep.episode_id: ep for ep in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=policy.similarity_threshold) + promoted = promote_clusters( + clusters, episodes_by_id, policy, call_fn=call_fn, deterministic=deterministic + ) + + decayed_episode_ids = decay_episodes(episodes, policy, as_of=as_of) if as_of else [] + decayed_fact_ids = decay_facts(fact_store.all(), policy, as_of=as_of) if as_of else [] + + if apply: + for pf in promoted: + fact_store.put( + Fact( + fact_id=pf.fact_id, + key=pf.key, + value=pf.text, + metadata={ + "consolidated": True, + "source_episode_ids": list(pf.source_episode_ids), + "occurrences": pf.occurrences, + "sessions": pf.sessions, + "first_seen": pf.first_seen, + "last_seen": pf.last_seen, + "merged_by_llm": pf.merged_by_llm, + }, + sensitivity=pf.sensitivity, + ) + ) + + report = ConsolidationReport( + clusters=clusters, + promoted=promoted, + decayed_episode_ids=decayed_episode_ids, + decayed_fact_ids=decayed_fact_ids, + applied=apply, + ) + logger.debug("consolidation.run: %s", report.summary()) + return report + + +__all__ = [ + "CONSOLIDATED_FACT_KEY", + "cluster_episodes", + "consolidate", + "decay_episodes", + "decay_facts", + "promote_clusters", +] diff --git a/src/contextweaver/context/consolidation_types.py b/src/contextweaver/context/consolidation_types.py new file mode 100644 index 00000000..769a8a9d --- /dev/null +++ b/src/contextweaver/context/consolidation_types.py @@ -0,0 +1,252 @@ +"""Dataclasses and constants for the memory consolidation engine (issue #498). + +The consolidation engine distills episodic memory into durable facts. These +pure-data types describe its configuration (:class:`ConsolidationPolicy`), its +intermediate clustering output (:class:`EpisodeCluster`), the promoted facts it +proposes (:class:`PromotedFact`), and the full run report +(:class:`ConsolidationReport`). All carry ``to_dict`` / ``from_dict`` for +lossless JSON round-trips, matching the repo serialization convention. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from contextweaver.exceptions import ConfigError +from contextweaver.types import Sensitivity + +#: Schema version for :class:`ConsolidationReport` payloads. +CONSOLIDATION_REPORT_VERSION = "1" + + +@dataclass +class ConsolidationPolicy: + """Thresholds and policy knobs for a consolidation run. + + Attributes: + min_occurrences: Minimum number of clustered episodes required before a + cluster is promoted to a durable fact. + min_sessions: Minimum number of distinct sessions (counted from each + episode's ``metadata[session_key]``) a cluster must span before it + is promoted. Episodes without a session marker count as one shared + "unscoped" session. + similarity_threshold: Jaccard similarity (0..1) at or above which an + episode joins an existing cluster. + decay_after_days: Episodes / facts whose timestamp is older than this + many days (relative to the run's ``as_of``) are reported as decayed. + ``None`` disables decay reporting. + timestamp_key: ``metadata`` key holding an ISO-8601 timestamp used for + decay and for first/last-seen provenance. + session_key: ``metadata`` key identifying the originating session, used + for the ``min_sessions`` gate. + """ + + min_occurrences: int = 3 + min_sessions: int = 2 + similarity_threshold: float = 0.5 + decay_after_days: int | None = 90 + timestamp_key: str = "timestamp" + session_key: str = "session_id" + + def validate(self) -> None: + """Validate the policy values. + + Raises: + ConfigError: If any threshold is out of range. + """ + if self.min_occurrences < 1: + raise ConfigError(f"min_occurrences must be >= 1, got {self.min_occurrences}") + if self.min_sessions < 1: + raise ConfigError(f"min_sessions must be >= 1, got {self.min_sessions}") + if not 0.0 <= self.similarity_threshold <= 1.0: + raise ConfigError( + f"similarity_threshold must be in [0, 1], got {self.similarity_threshold}" + ) + if self.decay_after_days is not None and self.decay_after_days < 0: + raise ConfigError(f"decay_after_days must be >= 0 or None, got {self.decay_after_days}") + + def to_dict(self) -> dict[str, Any]: + """Serialise to a JSON-compatible dict.""" + return { + "min_occurrences": self.min_occurrences, + "min_sessions": self.min_sessions, + "similarity_threshold": self.similarity_threshold, + "decay_after_days": self.decay_after_days, + "timestamp_key": self.timestamp_key, + "session_key": self.session_key, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ConsolidationPolicy: + """Deserialise from a JSON-compatible dict produced by :meth:`to_dict`.""" + decay = data.get("decay_after_days", 90) + return cls( + min_occurrences=int(data.get("min_occurrences", 3)), + min_sessions=int(data.get("min_sessions", 2)), + similarity_threshold=float(data.get("similarity_threshold", 0.5)), + decay_after_days=None if decay is None else int(decay), + timestamp_key=str(data.get("timestamp_key", "timestamp")), + session_key=str(data.get("session_key", "session_id")), + ) + + +@dataclass +class EpisodeCluster: + """A deterministic grouping of similar episodes (issue #679). + + Attributes: + cluster_id: Stable, zero-padded cluster identifier (``"cluster_000"``). + episode_ids: Member episode IDs, sorted for determinism. + canonical_text: Representative text for the cluster (the deterministic + merge candidate before any optional model-assisted refinement). + """ + + cluster_id: str + episode_ids: list[str] = field(default_factory=list) + canonical_text: str = "" + + def to_dict(self) -> dict[str, Any]: + """Serialise to a JSON-compatible dict.""" + return { + "cluster_id": self.cluster_id, + "episode_ids": list(self.episode_ids), + "canonical_text": self.canonical_text, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> EpisodeCluster: + """Deserialise from a JSON-compatible dict produced by :meth:`to_dict`.""" + return cls( + cluster_id=str(data["cluster_id"]), + episode_ids=[str(e) for e in data.get("episode_ids", [])], + canonical_text=str(data.get("canonical_text", "")), + ) + + +@dataclass +class PromotedFact: + """A durable fact promoted from a cluster of episodes (issue #680). + + Every promoted fact carries complete provenance back to its source + episodes so promotions are auditable and reversible. + + Attributes: + fact_id: Deterministic, content-addressed fact ID (idempotent across + re-runs over an unchanged store). + key: Fact key under which the fact is stored (``"consolidated"``). + text: Canonical fact text (post optional model-assisted merge). + source_episode_ids: Source episode IDs, sorted. + occurrences: Number of source episodes in the cluster. + sessions: Number of distinct sessions the cluster spans. + first_seen: Earliest source timestamp (ISO-8601), or ``None``. + last_seen: Latest source timestamp (ISO-8601), or ``None``. + sensitivity: Maximum sensitivity of the source episodes (inherited up, + never down). + merged_by_llm: ``True`` when an optional ``call_fn`` produced the text. + """ + + fact_id: str + key: str + text: str + source_episode_ids: list[str] = field(default_factory=list) + occurrences: int = 0 + sessions: int = 0 + first_seen: str | None = None + last_seen: str | None = None + sensitivity: Sensitivity = Sensitivity.public + merged_by_llm: bool = False + + def to_dict(self) -> dict[str, Any]: + """Serialise to a JSON-compatible dict.""" + return { + "fact_id": self.fact_id, + "key": self.key, + "text": self.text, + "source_episode_ids": list(self.source_episode_ids), + "occurrences": self.occurrences, + "sessions": self.sessions, + "first_seen": self.first_seen, + "last_seen": self.last_seen, + "sensitivity": self.sensitivity.value, + "merged_by_llm": self.merged_by_llm, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> PromotedFact: + """Deserialise from a JSON-compatible dict produced by :meth:`to_dict`.""" + return cls( + fact_id=str(data["fact_id"]), + key=str(data.get("key", "consolidated")), + text=str(data.get("text", "")), + source_episode_ids=[str(e) for e in data.get("source_episode_ids", [])], + occurrences=int(data.get("occurrences", 0)), + sessions=int(data.get("sessions", 0)), + first_seen=data.get("first_seen"), + last_seen=data.get("last_seen"), + sensitivity=Sensitivity(data.get("sensitivity", Sensitivity.public.value)), + merged_by_llm=bool(data.get("merged_by_llm", False)), + ) + + +@dataclass +class ConsolidationReport: + """Deterministic summary of a single consolidation run. + + Attributes: + clusters: All episode clusters discovered. + promoted: Facts promoted from clusters meeting the policy thresholds. + decayed_episode_ids: Episode IDs past the decay horizon (report-only; + never deleted — the stores are append-only). + decayed_fact_ids: Fact IDs past the decay horizon (report-only). + applied: ``True`` when promoted facts were written to the fact store. + version: Schema version tag. + """ + + clusters: list[EpisodeCluster] = field(default_factory=list) + promoted: list[PromotedFact] = field(default_factory=list) + decayed_episode_ids: list[str] = field(default_factory=list) + decayed_fact_ids: list[str] = field(default_factory=list) + applied: bool = False + version: str = CONSOLIDATION_REPORT_VERSION + + def to_dict(self) -> dict[str, Any]: + """Serialise to a JSON-compatible dict.""" + return { + "version": self.version, + "clusters": [c.to_dict() for c in self.clusters], + "promoted": [p.to_dict() for p in self.promoted], + "decayed_episode_ids": list(self.decayed_episode_ids), + "decayed_fact_ids": list(self.decayed_fact_ids), + "applied": self.applied, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ConsolidationReport: + """Deserialise from a JSON-compatible dict produced by :meth:`to_dict`.""" + return cls( + clusters=[EpisodeCluster.from_dict(c) for c in data.get("clusters", [])], + promoted=[PromotedFact.from_dict(p) for p in data.get("promoted", [])], + decayed_episode_ids=[str(e) for e in data.get("decayed_episode_ids", [])], + decayed_fact_ids=[str(f) for f in data.get("decayed_fact_ids", [])], + applied=bool(data.get("applied", False)), + version=str(data.get("version", CONSOLIDATION_REPORT_VERSION)), + ) + + def summary(self) -> str: + """Return a compact, human-readable one-block summary.""" + return ( + f"Consolidation (v{self.version}): " + f"clusters={len(self.clusters)} promoted={len(self.promoted)} " + f"decayed_episodes={len(self.decayed_episode_ids)} " + f"decayed_facts={len(self.decayed_fact_ids)} applied={self.applied}" + ) + + +__all__ = [ + "CONSOLIDATION_REPORT_VERSION", + "ConsolidationPolicy", + "ConsolidationReport", + "EpisodeCluster", + "PromotedFact", +] diff --git a/src/contextweaver/eval/__init__.py b/src/contextweaver/eval/__init__.py index ca876efe..0b118af5 100644 --- a/src/contextweaver/eval/__init__.py +++ b/src/contextweaver/eval/__init__.py @@ -14,16 +14,19 @@ from __future__ import annotations +from contextweaver.eval.consolidation import ConsolidationEvalReport, evaluate_consolidation from contextweaver.eval.context import ContextEvalReport, evaluate_context from contextweaver.eval.dataset import EvalCase, EvalDataset from contextweaver.eval.metrics import precision_at_k, recall_at_k, reciprocal_rank from contextweaver.eval.routing import RoutingEvalReport, evaluate_routing __all__ = [ + "ConsolidationEvalReport", "ContextEvalReport", "EvalCase", "EvalDataset", "RoutingEvalReport", + "evaluate_consolidation", "evaluate_context", "evaluate_routing", "precision_at_k", diff --git a/src/contextweaver/eval/consolidation.py b/src/contextweaver/eval/consolidation.py new file mode 100644 index 00000000..5fdd14ca --- /dev/null +++ b/src/contextweaver/eval/consolidation.py @@ -0,0 +1,139 @@ +"""Consolidation quality evaluation harness (issue #683). + +:func:`evaluate_consolidation` scores a +:class:`~contextweaver.context.consolidation_types.ConsolidationReport` against +an optional gold set of expected fact texts and reports precision / coverage +plus deduplication metrics. It is pure-stdlib, offline, and deterministic: given +the same report and gold set it always produces the same numbers, so it can gate +quality regressions in CI fixtures. + +Matching is on normalised text (lower-cased, whitespace-collapsed) so trivial +formatting differences between a promoted fact and its gold expectation do not +count as misses. +""" + +from __future__ import annotations + +from collections.abc import Iterable +from dataclasses import dataclass +from typing import Any + +from contextweaver.context.consolidation_types import ConsolidationReport + +__all__ = ["ConsolidationEvalReport", "evaluate_consolidation"] + + +def _normalise(text: str) -> str: + """Lower-case and collapse whitespace for tolerant text matching.""" + return " ".join(text.lower().split()) + + +@dataclass +class ConsolidationEvalReport: + """Quality metrics for one consolidation run. + + Attributes: + clusters_found: Number of clusters discovered. + facts_promoted: Number of facts promoted. + episodes_decayed: Number of episodes reported past the decay horizon. + facts_decayed: Number of facts reported past the decay horizon. + dedup_ratio: ``1 - clusters / total_episodes`` — fraction of episodic + redundancy collapsed by clustering (``0.0`` when unknown). + precision: Fraction of promoted facts present in the gold set + (``0.0`` when no gold set is supplied). + coverage: Fraction of gold facts that were promoted (``0.0`` when no + gold set is supplied). + gold_size: Number of distinct gold facts evaluated against. + """ + + clusters_found: int = 0 + facts_promoted: int = 0 + episodes_decayed: int = 0 + facts_decayed: int = 0 + dedup_ratio: float = 0.0 + precision: float = 0.0 + coverage: float = 0.0 + gold_size: int = 0 + + def to_dict(self) -> dict[str, Any]: + """Serialise to a JSON-compatible dict.""" + return { + "clusters_found": self.clusters_found, + "facts_promoted": self.facts_promoted, + "episodes_decayed": self.episodes_decayed, + "facts_decayed": self.facts_decayed, + "dedup_ratio": self.dedup_ratio, + "precision": self.precision, + "coverage": self.coverage, + "gold_size": self.gold_size, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ConsolidationEvalReport: + """Build a :class:`ConsolidationEvalReport` from a raw dict.""" + return cls( + clusters_found=int(data.get("clusters_found", 0)), + facts_promoted=int(data.get("facts_promoted", 0)), + episodes_decayed=int(data.get("episodes_decayed", 0)), + facts_decayed=int(data.get("facts_decayed", 0)), + dedup_ratio=float(data.get("dedup_ratio", 0.0)), + precision=float(data.get("precision", 0.0)), + coverage=float(data.get("coverage", 0.0)), + gold_size=int(data.get("gold_size", 0)), + ) + + def summary(self) -> str: + """Return a compact, human-readable one-block summary.""" + return ( + f"Consolidation eval: clusters={self.clusters_found} " + f"promoted={self.facts_promoted} dedup_ratio={self.dedup_ratio:.2f}\n" + f" precision={self.precision:.2f} coverage={self.coverage:.2f} " + f"(gold={self.gold_size})\n" + f" decayed_episodes={self.episodes_decayed} decayed_facts={self.facts_decayed}" + ) + + +def evaluate_consolidation( + report: ConsolidationReport, + expected_texts: Iterable[str] | None = None, + *, + total_episodes: int | None = None, +) -> ConsolidationEvalReport: + """Score *report* and return a :class:`ConsolidationEvalReport`. + + Args: + report: The consolidation report to evaluate. + expected_texts: Optional gold set of fact texts the run *should* have + promoted. When supplied, precision and coverage are computed via + normalised-text matching; otherwise both are ``0.0``. + total_episodes: Total episodes the run saw, used for ``dedup_ratio``. + When ``None`` or zero, ``dedup_ratio`` is ``0.0``. + + Returns: + A populated :class:`ConsolidationEvalReport`. + """ + promoted_norm = {_normalise(p.text) for p in report.promoted} + gold_norm = {_normalise(t) for t in expected_texts} if expected_texts is not None else set() + + if gold_norm: + hits = len(promoted_norm & gold_norm) + precision = hits / len(promoted_norm) if promoted_norm else 0.0 + coverage = hits / len(gold_norm) + else: + precision = 0.0 + coverage = 0.0 + + dedup_ratio = 0.0 + if total_episodes: + dedup_ratio = round(1.0 - len(report.clusters) / total_episodes, 4) + + return ConsolidationEvalReport( + clusters_found=len(report.clusters), + facts_promoted=len(report.promoted), + episodes_decayed=len(report.decayed_episode_ids), + facts_decayed=len(report.decayed_fact_ids), + dedup_ratio=dedup_ratio, + precision=round(precision, 4), + coverage=round(coverage, 4), + gold_size=len(gold_norm), + ) diff --git a/tests/test_cli.py b/tests/test_cli.py index c19b39af..436379c9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -721,3 +721,66 @@ def test_verify_subcommand_json_mode() -> None: assert "routing" in check_names assert all(c["ok"] is True for c in payload["checks"]) assert "next_step" in payload + + +# ------------------------------------------------------------------ +# consolidate (issue #498) +# ------------------------------------------------------------------ + + +def _write_episodes(path: Path) -> None: + summary = "customer prefers email contact for support" + episodes = [ + {"episode_id": f"ep{i}", "summary": summary, "metadata": {"session_id": f"s{i}"}} + for i in range(3) + ] + path.write_text(json.dumps({"episodes": episodes}), encoding="utf-8") + + +def test_consolidate_subcommand_json(tmp_path: Path) -> None: + eps = tmp_path / "episodes.json" + _write_episodes(eps) + result = _run( + "consolidate", + "--episodes", + str(eps), + "--min-occurrences", + "3", + "--min-sessions", + "2", + "--json", + ) + assert result.returncode == 0, result.stderr + report = json.loads(result.stdout) + assert len(report["promoted"]) == 1 + assert report["promoted"][0]["occurrences"] == 3 + assert report["applied"] is False + + +def test_consolidate_subcommand_apply_writes_facts(tmp_path: Path) -> None: + eps = tmp_path / "episodes.json" + out = tmp_path / "facts.json" + _write_episodes(eps) + result = _run( + "consolidate", + "--episodes", + str(eps), + "--apply", + "--facts-out", + str(out), + "--min-occurrences", + "3", + "--min-sessions", + "2", + ) + assert result.returncode == 0, result.stderr + assert "applied=True" in result.stdout + written = json.loads(out.read_text(encoding="utf-8")) + assert len(written["facts"]) == 1 + assert written["facts"][0]["key"] == "consolidated" + + +def test_consolidate_subcommand_bad_file(tmp_path: Path) -> None: + missing = tmp_path / "nope.json" + result = _run("consolidate", "--episodes", str(missing)) + assert result.returncode != 0 diff --git a/tests/test_consolidation.py b/tests/test_consolidation.py new file mode 100644 index 00000000..7e5b0a79 --- /dev/null +++ b/tests/test_consolidation.py @@ -0,0 +1,363 @@ +"""Tests for the memory consolidation engine (issues #498, #679-#682).""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +import pytest + +from contextweaver.context.consolidation import ( + cluster_episodes, + consolidate, + decay_episodes, + promote_clusters, +) +from contextweaver.context.consolidation_types import ( + CONSOLIDATION_REPORT_VERSION, + ConsolidationPolicy, + ConsolidationReport, + EpisodeCluster, + PromotedFact, +) +from contextweaver.exceptions import ConfigError +from contextweaver.store.episodic import Episode, InMemoryEpisodicStore +from contextweaver.store.facts import InMemoryFactStore +from contextweaver.types import Sensitivity + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _ep( + eid: str, + summary: str, + *, + session: str | None = None, + ts: str | None = None, + sensitivity: Sensitivity = Sensitivity.public, +) -> Episode: + metadata: dict[str, object] = {} + if session is not None: + metadata["session_id"] = session + if ts is not None: + metadata["timestamp"] = ts + return Episode(episode_id=eid, summary=summary, metadata=metadata, sensitivity=sensitivity) + + +def _store(*episodes: Episode) -> InMemoryEpisodicStore: + store = InMemoryEpisodicStore() + for ep in episodes: + store.add(ep) + return store + + +_EMAIL = "customer prefers email contact for support" + + +# --------------------------------------------------------------------------- +# Clustering (#679) +# --------------------------------------------------------------------------- + + +def test_cluster_groups_similar_and_separates_dissimilar() -> None: + episodes = [ + _ep("a", "customer prefers email contact for support"), + _ep("b", "customer prefers email contact when reaching support"), + _ep("c", "the build failed due to a missing dependency"), + ] + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + assert len(clusters) == 2 + # Email episodes cluster together; the build episode is its own cluster. + grouped = {c.cluster_id: c.episode_ids for c in clusters} + assert grouped["cluster_000"] == ["a", "b"] + assert grouped["cluster_001"] == ["c"] + + +def test_cluster_is_deterministic_and_idempotent() -> None: + episodes = [ + _ep("z", "customer prefers email contact for support"), + _ep("a", "customer prefers email contact for support today"), + _ep("m", "deploy pipeline timed out on staging"), + ] + first = cluster_episodes(episodes, similarity_threshold=0.4) + second = cluster_episodes(list(reversed(episodes)), similarity_threshold=0.4) + assert [c.to_dict() for c in first] == [c.to_dict() for c in second] + + +def test_cluster_canonical_text_picks_most_informative() -> None: + episodes = [ + _ep("a", "email preferred"), + _ep("b", "customer prefers email contact for support tickets"), + ] + [cluster] = cluster_episodes(episodes, similarity_threshold=0.1) + assert cluster.canonical_text == "customer prefers email contact for support tickets" + + +def test_cluster_empty() -> None: + assert cluster_episodes([], similarity_threshold=0.5) == [] + + +# --------------------------------------------------------------------------- +# Promotion (#680) +# --------------------------------------------------------------------------- + + +def test_promote_requires_min_occurrences_and_sessions() -> None: + episodes = [ + _ep("a", _EMAIL, session="s1"), + _ep("b", _EMAIL, session="s2"), + _ep("c", _EMAIL, session="s3"), + ] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + policy = ConsolidationPolicy(min_occurrences=3, min_sessions=2) + [fact] = promote_clusters(clusters, by_id, policy) + assert fact.occurrences == 3 + assert fact.sessions == 3 + assert fact.source_episode_ids == ["a", "b", "c"] + assert fact.key == "consolidated" + + +def test_promote_blocked_below_occurrence_threshold() -> None: + episodes = [_ep("a", _EMAIL, session="s1"), _ep("b", _EMAIL, session="s2")] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + policy = ConsolidationPolicy(min_occurrences=3, min_sessions=2) + assert promote_clusters(clusters, by_id, policy) == [] + + +def test_promote_blocked_below_session_threshold() -> None: + # Three occurrences but all from the same session. + episodes = [_ep(e, _EMAIL, session="s1") for e in ("a", "b", "c")] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + policy = ConsolidationPolicy(min_occurrences=3, min_sessions=2) + assert promote_clusters(clusters, by_id, policy) == [] + + +def test_promote_inherits_max_sensitivity() -> None: + episodes = [ + _ep("a", _EMAIL, session="s1", sensitivity=Sensitivity.public), + _ep("b", _EMAIL, session="s2", sensitivity=Sensitivity.confidential), + _ep("c", _EMAIL, session="s3", sensitivity=Sensitivity.internal), + ] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + [fact] = promote_clusters( + clusters, by_id, ConsolidationPolicy(min_occurrences=3, min_sessions=2) + ) + assert fact.sensitivity is Sensitivity.confidential + + +def test_promote_records_seen_bounds() -> None: + episodes = [ + _ep("a", _EMAIL, session="s1", ts="2026-01-05T00:00:00"), + _ep("b", _EMAIL, session="s2", ts="2026-01-01T00:00:00"), + _ep("c", _EMAIL, session="s3", ts="2026-03-09T00:00:00"), + ] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + [fact] = promote_clusters( + clusters, by_id, ConsolidationPolicy(min_occurrences=3, min_sessions=2) + ) + assert fact.first_seen == "2026-01-01T00:00:00" + assert fact.last_seen == "2026-03-09T00:00:00" + + +def test_promote_fact_id_is_content_addressed_and_stable() -> None: + episodes = [_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + pol = ConsolidationPolicy(min_occurrences=3, min_sessions=2) + first = promote_clusters(clusters, by_id, pol)[0].fact_id + second = promote_clusters(clusters, by_id, pol)[0].fact_id + assert first == second + assert first.startswith("fact:consolidated:") + + +# --------------------------------------------------------------------------- +# Optional LLM merge (#682) +# --------------------------------------------------------------------------- + + +def _grounded_call(_prompt: str) -> str: + # Reorders grounded tokens only — no new entity introduced. + return "support email contact customer prefers for" + + +def _hallucinating_call(_prompt: str) -> str: + return "customer prefers carrier pigeons" + + +def test_llm_merge_accepts_grounded_completion() -> None: + episodes = [_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + [fact] = promote_clusters( + clusters, + by_id, + ConsolidationPolicy(min_occurrences=3, min_sessions=2), + call_fn=_grounded_call, + ) + assert fact.merged_by_llm is True + assert fact.text == "support email contact customer prefers for" + + +def test_llm_merge_rejects_ungrounded_completion() -> None: + episodes = [_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + [fact] = promote_clusters( + clusters, + by_id, + ConsolidationPolicy(min_occurrences=3, min_sessions=2), + call_fn=_hallucinating_call, + ) + assert fact.merged_by_llm is False + assert fact.text == clusters[0].canonical_text + + +def test_llm_merge_disabled_under_deterministic() -> None: + episodes = [_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + [fact] = promote_clusters( + clusters, + by_id, + ConsolidationPolicy(min_occurrences=3, min_sessions=2), + call_fn=_grounded_call, + deterministic=True, + ) + assert fact.merged_by_llm is False + + +# --------------------------------------------------------------------------- +# Decay (#681) +# --------------------------------------------------------------------------- + + +def test_decay_boundary_is_strict() -> None: + base = datetime(2026, 1, 1, 0, 0, 0) + episodes = [_ep("a", "stale note", ts=base.isoformat())] + policy = ConsolidationPolicy(decay_after_days=90) + # Exactly 90 days later: not yet decayed (strictly greater-than). + assert decay_episodes(episodes, policy, as_of=base + timedelta(days=90)) == [] + # 91 days later: decayed. + assert decay_episodes(episodes, policy, as_of=base + timedelta(days=91)) == ["a"] + + +def test_decay_disabled_when_none() -> None: + base = datetime(2026, 1, 1) + episodes = [_ep("a", "stale", ts=base.isoformat())] + policy = ConsolidationPolicy(decay_after_days=None) + assert decay_episodes(episodes, policy, as_of=base + timedelta(days=9999)) == [] + + +def test_decay_ignores_episodes_without_timestamp() -> None: + episodes = [_ep("a", "no timestamp")] + policy = ConsolidationPolicy(decay_after_days=1) + assert decay_episodes(episodes, policy, as_of=datetime(2030, 1, 1)) == [] + + +def test_decay_never_mutates_store() -> None: + base = datetime(2026, 1, 1) + store = _store(_ep("a", _EMAIL, session="s1", ts=base.isoformat())) + facts = InMemoryFactStore() + report = consolidate(store, facts, as_of=base + timedelta(days=400)) + assert report.decayed_episode_ids == ["a"] + # Append-only invariant: the episode is still present after a decay report. + assert store.get("a") is not None + + +# --------------------------------------------------------------------------- +# Orchestration / apply (#498) +# --------------------------------------------------------------------------- + + +def test_consolidate_apply_writes_facts_with_provenance() -> None: + store = _store( + _ep("a", _EMAIL, session="s1"), + _ep("b", _EMAIL, session="s2"), + _ep("c", _EMAIL, session="s3"), + ) + facts = InMemoryFactStore() + report = consolidate( + store, facts, ConsolidationPolicy(min_occurrences=3, min_sessions=2), apply=True + ) + assert report.applied is True + stored = facts.all() + assert len(stored) == 1 + fact = stored[0] + assert fact.metadata["consolidated"] is True + assert fact.metadata["source_episode_ids"] == ["a", "b", "c"] + assert fact.metadata["occurrences"] == 3 + + +def test_consolidate_apply_is_idempotent() -> None: + store = _store(*[_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))]) + facts = InMemoryFactStore() + pol = ConsolidationPolicy(min_occurrences=3, min_sessions=2) + consolidate(store, facts, pol, apply=True) + consolidate(store, facts, pol, apply=True) + assert len(facts.all()) == 1 + + +def test_consolidate_no_apply_leaves_fact_store_untouched() -> None: + store = _store(*[_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))]) + facts = InMemoryFactStore() + report = consolidate(store, facts, ConsolidationPolicy(min_occurrences=3, min_sessions=2)) + assert report.applied is False + assert facts.all() == [] + assert len(report.promoted) == 1 + + +def test_consolidate_empty_store() -> None: + report = consolidate(InMemoryEpisodicStore(), InMemoryFactStore()) + assert report.clusters == [] + assert report.promoted == [] + + +def test_consolidate_invalid_policy_raises() -> None: + with pytest.raises(ConfigError): + consolidate( + InMemoryEpisodicStore(), + InMemoryFactStore(), + ConsolidationPolicy(similarity_threshold=1.5), + ) + + +# --------------------------------------------------------------------------- +# Serialization round-trips +# --------------------------------------------------------------------------- + + +def test_report_round_trip() -> None: + store = _store(*[_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))]) + report = consolidate( + store, InMemoryFactStore(), ConsolidationPolicy(min_occurrences=3, min_sessions=2) + ) + restored = ConsolidationReport.from_dict(report.to_dict()) + assert restored.to_dict() == report.to_dict() + assert restored.version == CONSOLIDATION_REPORT_VERSION + + +def test_policy_round_trip_preserves_none_decay() -> None: + policy = ConsolidationPolicy(decay_after_days=None, min_occurrences=5) + restored = ConsolidationPolicy.from_dict(policy.to_dict()) + assert restored == policy + + +def test_dataclass_round_trips() -> None: + cluster = EpisodeCluster(cluster_id="cluster_000", episode_ids=["a"], canonical_text="x") + assert EpisodeCluster.from_dict(cluster.to_dict()) == cluster + fact = PromotedFact( + fact_id="fact:consolidated:abc", + key="consolidated", + text="x", + source_episode_ids=["a"], + occurrences=1, + sessions=1, + sensitivity=Sensitivity.internal, + ) + assert PromotedFact.from_dict(fact.to_dict()) == fact diff --git a/tests/test_eval_consolidation.py b/tests/test_eval_consolidation.py new file mode 100644 index 00000000..17446f2e --- /dev/null +++ b/tests/test_eval_consolidation.py @@ -0,0 +1,68 @@ +"""Tests for the consolidation quality evaluation harness (issue #683).""" + +from __future__ import annotations + +from contextweaver.context.consolidation_types import ( + ConsolidationReport, + EpisodeCluster, + PromotedFact, +) +from contextweaver.eval.consolidation import ConsolidationEvalReport, evaluate_consolidation + + +def _fact(text: str) -> PromotedFact: + return PromotedFact( + fact_id=f"fact:consolidated:{abs(hash(text)) % 9999:04d}", key="c", text=text + ) + + +def _report(*texts: str, clusters: int = 0) -> ConsolidationReport: + return ConsolidationReport( + clusters=[EpisodeCluster(cluster_id=f"cluster_{i:03d}") for i in range(clusters)], + promoted=[_fact(t) for t in texts], + ) + + +def test_precision_and_coverage_with_gold() -> None: + report = _report("Customer prefers email", "Build fails on staging") + ev = evaluate_consolidation( + report, + expected_texts=["customer prefers email", "deploy needs approval"], + ) + # One of two promoted facts matched gold -> precision 0.5; one of two gold + # facts covered -> coverage 0.5. Matching is case/space-insensitive. + assert ev.precision == 0.5 + assert ev.coverage == 0.5 + assert ev.gold_size == 2 + assert ev.facts_promoted == 2 + + +def test_no_gold_yields_zero_precision_coverage() -> None: + ev = evaluate_consolidation(_report("a", "b")) + assert ev.precision == 0.0 + assert ev.coverage == 0.0 + assert ev.gold_size == 0 + + +def test_dedup_ratio() -> None: + report = _report("x", clusters=4) + ev = evaluate_consolidation(report, total_episodes=10) + assert ev.dedup_ratio == 0.6 # 1 - 4/10 + + +def test_dedup_ratio_zero_without_total() -> None: + ev = evaluate_consolidation(_report("x", clusters=4)) + assert ev.dedup_ratio == 0.0 + + +def test_perfect_match() -> None: + ev = evaluate_consolidation(_report("only fact"), expected_texts=["only fact"]) + assert ev.precision == 1.0 + assert ev.coverage == 1.0 + + +def test_eval_report_round_trip() -> None: + ev = evaluate_consolidation(_report("a"), expected_texts=["a"], total_episodes=2) + restored = ConsolidationEvalReport.from_dict(ev.to_dict()) + assert restored == ev + assert "precision" in ev.summary() From e0e29218ea959fb74459403584bbd98e3a062dde Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 10:12:41 +0000 Subject: [PATCH 2/4] fix(context): robust timestamp handling in consolidation (PR #708 review) Addresses Copilot review on #708: - parse_iso: normalise the RFC 3339 `Z` suffix to `+00:00` (parses on Python 3.10) and convert tz-aware values to naive UTC, mirroring the repo's ISO convention (RoutingDecision.from_dict). Single normalisation point feeding both seen_bounds and is_decayed. - is_decayed: compare against a timedelta instead of floored whole days (so a timestamp older than the horizon by <24h still decays) and normalise as_of to naive UTC so a tz-aware as_of/stamp never raises TypeError. - consolidate(apply=True): stamp the policy decay timestamp key (= last_seen) on promoted facts so they are themselves decay-eligible on later runs, not only their source episodes. - CLI `consolidate`: parse --as-of via parse_iso (Z support) and default the decay reference to now so --decay-after-days takes effect without an explicit --as-of. Tests: Z-suffix + tz-aware decay, sub-day decay granularity, seen-bounds string preservation, consolidated-fact decay on a later run, and CLI default-decay / Z --as-of cases. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01JiHvVpWeqApRTyQReS4QJX --- src/contextweaver/__main__.py | 15 +++-- .../context/_consolidation_helpers.py | 30 ++++++++-- src/contextweaver/context/consolidation.py | 24 +++++--- tests/test_cli.py | 27 +++++++++ tests/test_consolidation.py | 55 +++++++++++++++++++ 5 files changed, 131 insertions(+), 20 deletions(-) diff --git a/src/contextweaver/__main__.py b/src/contextweaver/__main__.py index bad0d527..c1225348 100644 --- a/src/contextweaver/__main__.py +++ b/src/contextweaver/__main__.py @@ -57,6 +57,7 @@ from contextweaver.adapters.mcp import mcp_tool_to_selectable from contextweaver.adapters.sidecar import SidecarApp, SidecarConfig from contextweaver.config import ContextBudget +from contextweaver.context._consolidation_helpers import parse_iso from contextweaver.context.consolidation import consolidate from contextweaver.context.consolidation_types import ConsolidationPolicy from contextweaver.context.manager import ContextManager @@ -729,7 +730,7 @@ def consolidate_cmd( ] = 90, as_of: Annotated[ str | None, - typer.Option("--as-of", help="ISO-8601 reference time for decay reporting."), + typer.Option("--as-of", help="ISO-8601 decay reference time (defaults to now)."), ] = None, json_output: Annotated[ bool, typer.Option("--json", help="Emit machine-readable JSON.") @@ -758,12 +759,14 @@ def consolidate_cmd( similarity_threshold=similarity, decay_after_days=None if decay_after_days < 0 else decay_after_days, ) - parsed_as_of = None + # Default the decay reference to now so --decay-after-days takes effect in + # the common case; an explicit --as-of (with RFC 3339 ``Z`` support) wins. if as_of is not None: - try: - parsed_as_of = datetime.fromisoformat(as_of) - except ValueError as exc: - raise typer.BadParameter(f"invalid --as-of timestamp: {exc}") from exc + parsed_as_of = parse_iso(as_of) + if parsed_as_of is None: + raise typer.BadParameter(f"invalid --as-of timestamp: {as_of!r}; expected ISO-8601") + else: + parsed_as_of = datetime.now() try: report = consolidate(ep_store, fact_store, policy, as_of=parsed_as_of, apply=apply) diff --git a/src/contextweaver/context/_consolidation_helpers.py b/src/contextweaver/context/_consolidation_helpers.py index 90ef82db..ba8e9f88 100644 --- a/src/contextweaver/context/_consolidation_helpers.py +++ b/src/contextweaver/context/_consolidation_helpers.py @@ -8,7 +8,7 @@ from __future__ import annotations import hashlib -from datetime import datetime +from datetime import datetime, timedelta, timezone from contextweaver._utils import tokenize from contextweaver.store.episodic import Episode @@ -63,12 +63,27 @@ def episode_iso(ep: Episode, key: str) -> str | None: return value if isinstance(value, str) and value else None +def _to_naive_utc(dt: datetime) -> datetime: + """Return *dt* as a naive UTC datetime (tz-aware inputs are converted).""" + if dt.tzinfo is not None: + return dt.astimezone(timezone.utc).replace(tzinfo=None) + return dt + + def parse_iso(value: str | None) -> datetime | None: - """Parse an ISO-8601 *value* to a ``datetime``, or ``None`` on failure.""" + """Parse an ISO-8601 *value* to a naive-UTC ``datetime``, or ``None``. + + Mirrors the repo's ISO convention (e.g. ``RoutingDecision.from_dict``): the + RFC 3339 ``Z`` UTC suffix is normalised to ``+00:00`` so timestamps parse on + Python 3.10, and tz-aware values are converted to naive UTC so later + arithmetic against a naive reference time never raises. Naive inputs are + assumed to already be UTC. + """ if not value: return None + text = value[:-1] + "+00:00" if value.endswith(("Z", "z")) else value try: - return datetime.fromisoformat(value) + return _to_naive_utc(datetime.fromisoformat(text)) except ValueError: return None @@ -91,11 +106,16 @@ def canonical_fact_id(source_ids: list[str]) -> str: def is_decayed(iso: str | None, as_of: datetime, decay_after_days: int) -> bool: - """Return ``True`` when *iso* is older than *decay_after_days* before *as_of*.""" + """Return ``True`` when *iso* is older than *decay_after_days* before *as_of*. + + Compares against a :class:`~datetime.timedelta` (not floored whole days) so a + timestamp older than the horizon by under 24h still decays, and normalises + *as_of* to naive UTC so a tz-aware ``as_of`` or stamp never raises. + """ stamp = parse_iso(iso) if stamp is None: return False - return (as_of - stamp).days > decay_after_days + return _to_naive_utc(as_of) - stamp > timedelta(days=decay_after_days) __all__ = [ diff --git a/src/contextweaver/context/consolidation.py b/src/contextweaver/context/consolidation.py index 40b30bb2..ad673f23 100644 --- a/src/contextweaver/context/consolidation.py +++ b/src/contextweaver/context/consolidation.py @@ -246,20 +246,26 @@ def consolidate( if apply: for pf in promoted: + metadata: dict[str, object] = { + "consolidated": True, + "source_episode_ids": list(pf.source_episode_ids), + "occurrences": pf.occurrences, + "sessions": pf.sessions, + "first_seen": pf.first_seen, + "last_seen": pf.last_seen, + "merged_by_llm": pf.merged_by_llm, + } + # Stamp the policy's decay timestamp key with the fact's recency + # (its last-seen source time) so the promoted fact is itself + # eligible for decay reporting on later runs, not just its episodes. + if pf.last_seen is not None: + metadata[policy.timestamp_key] = pf.last_seen fact_store.put( Fact( fact_id=pf.fact_id, key=pf.key, value=pf.text, - metadata={ - "consolidated": True, - "source_episode_ids": list(pf.source_episode_ids), - "occurrences": pf.occurrences, - "sessions": pf.sessions, - "first_seen": pf.first_seen, - "last_seen": pf.last_seen, - "merged_by_llm": pf.merged_by_llm, - }, + metadata=metadata, sensitivity=pf.sensitivity, ) ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 436379c9..5a7ff8f3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -784,3 +784,30 @@ def test_consolidate_subcommand_bad_file(tmp_path: Path) -> None: missing = tmp_path / "nope.json" result = _run("consolidate", "--episodes", str(missing)) assert result.returncode != 0 + + +def test_consolidate_subcommand_reports_decay_by_default(tmp_path: Path) -> None: + eps = tmp_path / "episodes.json" + episodes = [ + { + "episode_id": f"ep{i}", + "summary": "customer prefers email contact for support", + "metadata": {"session_id": f"s{i}", "timestamp": "2020-01-01T00:00:00Z"}, + } + for i in range(3) + ] + eps.write_text(json.dumps({"episodes": episodes}), encoding="utf-8") + # No --as-of: defaults to now, so the 2020 timestamps decay. + result = _run("consolidate", "--episodes", str(eps), "--json") + assert result.returncode == 0, result.stderr + report = json.loads(result.stdout) + assert len(report["decayed_episode_ids"]) == 3 + + +def test_consolidate_subcommand_accepts_z_as_of(tmp_path: Path) -> None: + eps = tmp_path / "episodes.json" + _write_episodes(eps) + result = _run( + "consolidate", "--episodes", str(eps), "--as-of", "2026-06-01T00:00:00Z", "--json" + ) + assert result.returncode == 0, result.stderr diff --git a/tests/test_consolidation.py b/tests/test_consolidation.py index 7e5b0a79..527bcf90 100644 --- a/tests/test_consolidation.py +++ b/tests/test_consolidation.py @@ -361,3 +361,58 @@ def test_dataclass_round_trips() -> None: sensitivity=Sensitivity.internal, ) assert PromotedFact.from_dict(fact.to_dict()) == fact + + +# --------------------------------------------------------------------------- +# Timezone / Z-suffix handling + sub-day decay (review fixes) +# --------------------------------------------------------------------------- + + +def test_decay_handles_z_suffix_and_tz_aware_as_of() -> None: + # RFC 3339 'Z' on the episode + a tz-aware as_of: neither should raise, and + # the stale episode is still reported. + episodes = [_ep("a", "stale note", ts="2026-01-01T00:00:00Z")] + policy = ConsolidationPolicy(decay_after_days=90) + as_of = datetime.fromisoformat("2026-06-01T00:00:00+00:00") + assert decay_episodes(episodes, policy, as_of=as_of) == ["a"] + + +def test_decay_sub_day_granularity() -> None: + base = datetime(2026, 1, 1, 0, 0, 0) + episodes = [_ep("a", "stale", ts=base.isoformat())] + policy = ConsolidationPolicy(decay_after_days=90) + # 90 days + 23h is past the horizon (timedelta comparison, not floored days). + assert decay_episodes(episodes, policy, as_of=base + timedelta(days=90, hours=23)) == ["a"] + + +def test_seen_bounds_preserves_original_z_strings() -> None: + episodes = [ + _ep("a", _EMAIL, session="s1", ts="2026-01-05T00:00:00Z"), + _ep("b", _EMAIL, session="s2", ts="2026-01-01T00:00:00Z"), + _ep("c", _EMAIL, session="s3", ts="2026-03-09T00:00:00Z"), + ] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + [fact] = promote_clusters( + clusters, by_id, ConsolidationPolicy(min_occurrences=3, min_sessions=2) + ) + assert fact.first_seen == "2026-01-01T00:00:00Z" + assert fact.last_seen == "2026-03-09T00:00:00Z" + + +def test_consolidated_fact_is_decayable_on_later_run() -> None: + base = datetime(2026, 1, 1) + store = _store( + *[ + _ep(e, _EMAIL, session=f"s{i}", ts=base.isoformat()) + for i, e in enumerate(("a", "b", "c")) + ] + ) + facts = InMemoryFactStore() + pol = ConsolidationPolicy(min_occurrences=3, min_sessions=2, decay_after_days=90) + consolidate(store, facts, pol, apply=True) + stored = facts.all()[0] + # The promoted fact carries the policy decay key, so a later run reports it. + assert stored.metadata[pol.timestamp_key] == base.isoformat() + later = consolidate(store, facts, pol, as_of=base + timedelta(days=400)) + assert stored.fact_id in later.decayed_fact_ids From 2d91683b4931a2d83180d9a38899183a40d57e21 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 19:41:53 +0000 Subject: [PATCH 3/4] fix(review): use UTC for consolidate CLI decay reference The consolidate CLI defaulted the decay reference to datetime.now() (naive local time), while parse_iso normalises episode/fact timestamps to naive UTC. On a non-UTC host this skewed the decay horizon by the host's offset. Use datetime.now(timezone.utc) so the reference matches the stored stamps. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01NUfbnfYYJDQ5ei5quKAftg --- src/contextweaver/__main__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/contextweaver/__main__.py b/src/contextweaver/__main__.py index c1225348..d96d2497 100644 --- a/src/contextweaver/__main__.py +++ b/src/contextweaver/__main__.py @@ -33,7 +33,7 @@ import json import sys -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Annotated, Any @@ -761,12 +761,15 @@ def consolidate_cmd( ) # Default the decay reference to now so --decay-after-days takes effect in # the common case; an explicit --as-of (with RFC 3339 ``Z`` support) wins. + # Use UTC so the reference matches the naive-UTC timestamps parse_iso + # produces from episode/fact metadata (a naive local now() would skew the + # decay horizon by the host's UTC offset). if as_of is not None: parsed_as_of = parse_iso(as_of) if parsed_as_of is None: raise typer.BadParameter(f"invalid --as-of timestamp: {as_of!r}; expected ISO-8601") else: - parsed_as_of = datetime.now() + parsed_as_of = datetime.now(timezone.utc) try: report = consolidate(ep_store, fact_store, policy, as_of=parsed_as_of, apply=apply) From 484c17f6c833328f9a92a3b8ac372135b18481f8 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 09:10:56 +0000 Subject: [PATCH 4/4] fix(context): harden consolidation merge grounding and decay (review) Audit follow-ups on the memory consolidation engine: - F1 (security): the optional LLM merge guardrail grounded only on tokenize() output, which strips stop-words, so an injected negation ("is safe" -> "is not safe") shared the same content tokens and was accepted, silently inverting a durable fact. Add a negation-term check (_negations) that rejects any polarity word absent from the source notes; soften the docstring's grounding claim; add a regression test. - F4: decay_facts now honours datetime timestamps (not just ISO strings) via the new coerce_iso helper; add a mixed-type decay test. - F3: re-export parse_iso from the public consolidation module so the CLI no longer imports from the private _consolidation_helpers module. - F2: document cluster_episodes' O(n*k) greedy, offline-batch complexity. Keeps consolidation.py at the 300-line ceiling by moving timestamp coercion into _consolidation_helpers. make ci green (the one failing test is the pre-existing offline-tiktoken artifact). Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01JHED1e6xNDDqjgR8v6Cvcm --- CHANGELOG.md | 2 +- src/contextweaver/__main__.py | 3 +- .../context/_consolidation_helpers.py | 12 +++++ .../context/_consolidation_merge.py | 43 +++++++++++++++--- src/contextweaver/context/consolidation.py | 15 +++++-- tests/test_consolidation.py | 45 ++++++++++++++++++- 6 files changed, 106 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e52b2622..59c79567 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 decay horizon without deleting them — the stores are append-only (`decay_episodes` / `decay_facts`, #681). An optional, fail-closed `call_fn` may refine a fact's canonical text, rejecting any completion that introduces - ungrounded tokens (#682). `consolidate(..., apply=True)` upserts the promoted + ungrounded tokens or a negation absent from the source notes (#682). `consolidate(..., apply=True)` upserts the promoted facts with content-addressed IDs, so re-running over an unchanged store is a no-op (idempotent). Results are returned as a `ConsolidationReport` (serialisable via `to_dict`/`from_dict`). New public surface in diff --git a/src/contextweaver/__main__.py b/src/contextweaver/__main__.py index 5a7604d7..bb757983 100644 --- a/src/contextweaver/__main__.py +++ b/src/contextweaver/__main__.py @@ -57,8 +57,7 @@ from contextweaver.adapters.mcp import mcp_tool_to_selectable from contextweaver.adapters.sidecar import SidecarApp, SidecarConfig from contextweaver.config import ContextBudget -from contextweaver.context._consolidation_helpers import parse_iso -from contextweaver.context.consolidation import consolidate +from contextweaver.context.consolidation import consolidate, parse_iso from contextweaver.context.consolidation_types import ConsolidationPolicy from contextweaver.context.manager import ContextManager from contextweaver.eval.dataset import EvalDataset diff --git a/src/contextweaver/context/_consolidation_helpers.py b/src/contextweaver/context/_consolidation_helpers.py index ba8e9f88..5a372880 100644 --- a/src/contextweaver/context/_consolidation_helpers.py +++ b/src/contextweaver/context/_consolidation_helpers.py @@ -63,6 +63,17 @@ def episode_iso(ep: Episode, key: str) -> str | None: return value if isinstance(value, str) and value else None +def coerce_iso(value: object) -> str | None: + """Coerce a metadata timestamp *value* to ISO text, or ``None``. + + Accepts ISO-8601 strings and :class:`~datetime.datetime` values; any other + type (or an empty string) is treated as undated. + """ + if isinstance(value, datetime): + return value.isoformat() + return value if isinstance(value, str) and value else None + + def _to_naive_utc(dt: datetime) -> datetime: """Return *dt* as a naive UTC datetime (tz-aware inputs are converted).""" if dt.tzinfo is not None: @@ -122,6 +133,7 @@ def is_decayed(iso: str | None, as_of: datetime, decay_after_days: int) -> bool: "CONSOLIDATED_FACT_KEY", "canonical_fact_id", "canonical_member", + "coerce_iso", "count_sessions", "episode_iso", "is_decayed", diff --git a/src/contextweaver/context/_consolidation_merge.py b/src/contextweaver/context/_consolidation_merge.py index 015ba695..c6a58bd4 100644 --- a/src/contextweaver/context/_consolidation_merge.py +++ b/src/contextweaver/context/_consolidation_merge.py @@ -8,24 +8,44 @@ * **Opt-in.** Only runs when a ``call_fn`` is supplied and the run is not in ``deterministic`` mode. -* **Fail-closed.** Any exception, a non-string / blank completion, or a result - that introduces tokens absent from the source cluster falls back to the - deterministic ``canonical_text``. A model can only ever *rephrase* grounded - content — it can never inject a new entity into a durable fact. - -The "no new tokens" check reuses :func:`contextweaver._utils.tokenize` so the -grounding test matches the rest of the library's text normalisation. +* **Fail-closed.** Any exception, a non-string / blank completion, a result + that introduces content tokens absent from the source cluster, or one that + introduces a *negation* absent from the source falls back to the deterministic + ``canonical_text``. A model may only ever *rephrase* grounded content — it can + neither inject a new entity nor flip the polarity of a durable fact. + +The content-token check reuses :func:`contextweaver._utils.tokenize` so grounding +matches the rest of the library's text normalisation. Because ``tokenize`` drops +stop-words, it would not catch an injected negation (``"is safe"`` → +``"is not safe"`` share the same content tokens); the separate negation check in +:func:`_negations` closes that gap. """ from __future__ import annotations import logging +import re from collections.abc import Callable from contextweaver._utils import tokenize logger = logging.getLogger("contextweaver.context") +#: Polarity / negation terms that :func:`~contextweaver._utils.tokenize` discards +#: as stop-words but which invert meaning. They must be grounded too: a merge may +#: not introduce a negation absent from the source notes. +_NEGATION_TERMS = frozenset({"not", "no", "never", "none", "cannot", "without", "neither", "nor"}) + + +def _negations(text: str) -> set[str]: + """Return the negation terms present in *text* (lower-cased, incl. ``n't``).""" + words = set(re.findall(r"[a-z']+", text.lower())) + found = {term for term in _NEGATION_TERMS if term in words} + if any(word.endswith("n't") for word in words): + found.add("n't") + return found + + #: Instruction prepended to the cluster's source summaries for the merge call. DEFAULT_MERGE_PROMPT = ( "Merge the following related memory notes into a single concise fact " @@ -59,8 +79,10 @@ def refine_canonical_text( when the model produced a grounded, non-blank result that was accepted. """ allowed = set() + source_negations: set[str] = set() for text in source_texts: allowed |= tokenize(text) + source_negations |= _negations(text) body = "\n".join(source_texts)[:_MAX_INPUT] try: result = call_fn(f"{system_prompt}{body}") @@ -79,6 +101,13 @@ def refine_canonical_text( sorted(new_tokens), ) return canonical_text, False + introduced_negations = _negations(merged) - source_negations + if introduced_negations: + logger.warning( + "consolidation merge: introduced negation(s) %s; using deterministic text", + sorted(introduced_negations), + ) + return canonical_text, False return merged, True diff --git a/src/contextweaver/context/consolidation.py b/src/contextweaver/context/consolidation.py index ad673f23..0c337b65 100644 --- a/src/contextweaver/context/consolidation.py +++ b/src/contextweaver/context/consolidation.py @@ -32,10 +32,12 @@ CONSOLIDATED_FACT_KEY, canonical_fact_id, canonical_member, + coerce_iso, count_sessions, episode_iso, is_decayed, max_sensitivity, + parse_iso, seen_bounds, ) from contextweaver.context._consolidation_merge import refine_canonical_text @@ -71,6 +73,10 @@ def cluster_episodes( Returns: Clusters in creation order, each with sorted ``episode_ids`` and a deterministic ``canonical_text``. + + Note: + Greedy single-pass clustering, ``O(n·k)`` for ``n`` episodes / ``k`` + clusters (each tokenised once) — for offline batch use, not a hot path. """ ordered = sorted(episodes, key=lambda ep: ep.episode_id) seeds: list[set[str]] = [] @@ -189,13 +195,15 @@ def decay_facts( *, as_of: datetime, ) -> list[str]: - """Return IDs of *facts* past the decay horizon (report-only; #681).""" + """Return IDs of *facts* past the decay horizon (report-only; #681). + + Honours both ISO-8601 string and :class:`~datetime.datetime` timestamps. + """ if policy.decay_after_days is None: return [] stale: list[str] = [] for fact in facts: - value = fact.metadata.get(policy.timestamp_key) - iso = value if isinstance(value, str) else None + iso = coerce_iso(fact.metadata.get(policy.timestamp_key)) if is_decayed(iso, as_of, policy.decay_after_days): stale.append(fact.fact_id) return sorted(stale) @@ -287,5 +295,6 @@ def consolidate( "consolidate", "decay_episodes", "decay_facts", + "parse_iso", "promote_clusters", ] diff --git a/tests/test_consolidation.py b/tests/test_consolidation.py index 527bcf90..14f350d7 100644 --- a/tests/test_consolidation.py +++ b/tests/test_consolidation.py @@ -10,6 +10,7 @@ cluster_episodes, consolidate, decay_episodes, + decay_facts, promote_clusters, ) from contextweaver.context.consolidation_types import ( @@ -21,7 +22,7 @@ ) from contextweaver.exceptions import ConfigError from contextweaver.store.episodic import Episode, InMemoryEpisodicStore -from contextweaver.store.facts import InMemoryFactStore +from contextweaver.store.facts import Fact, InMemoryFactStore from contextweaver.types import Sensitivity # --------------------------------------------------------------------------- @@ -190,6 +191,13 @@ def _hallucinating_call(_prompt: str) -> str: return "customer prefers carrier pigeons" +def _negating_call(_prompt: str) -> str: + # Every content token is grounded; only an injected negation ("no") differs, + # which inverts the fact's meaning. tokenize() drops "no" as a stop-word, so + # the content-token check passes — the negation guardrail must catch it. + return "customer prefers no email contact for support" + + def test_llm_merge_accepts_grounded_completion() -> None: episodes = [_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))] by_id = {e.episode_id: e for e in episodes} @@ -232,6 +240,22 @@ def test_llm_merge_disabled_under_deterministic() -> None: assert fact.merged_by_llm is False +def test_llm_merge_rejects_negation_injection() -> None: + # A negation absent from the source must not be accepted even when every + # content token is grounded — it would silently invert the durable fact. + episodes = [_ep(e, _EMAIL, session=f"s{i}") for i, e in enumerate(("a", "b", "c"))] + by_id = {e.episode_id: e for e in episodes} + clusters = cluster_episodes(episodes, similarity_threshold=0.4) + [fact] = promote_clusters( + clusters, + by_id, + ConsolidationPolicy(min_occurrences=3, min_sessions=2), + call_fn=_negating_call, + ) + assert fact.merged_by_llm is False + assert fact.text == clusters[0].canonical_text + + # --------------------------------------------------------------------------- # Decay (#681) # --------------------------------------------------------------------------- @@ -270,6 +294,25 @@ def test_decay_never_mutates_store() -> None: assert store.get("a") is not None +def test_decay_facts_accepts_string_and_datetime_timestamps() -> None: + base = datetime(2026, 1, 1) + policy = ConsolidationPolicy(decay_after_days=90) + as_of = base + timedelta(days=400) + facts = [ + Fact( + fact_id="str_ts", + key="consolidated", + value="x", + metadata={"timestamp": base.isoformat()}, + ), + Fact(fact_id="dt_ts", key="consolidated", value="y", metadata={"timestamp": base}), + Fact(fact_id="int_ts", key="consolidated", value="z", metadata={"timestamp": 12345}), + Fact(fact_id="no_ts", key="consolidated", value="w", metadata={}), + ] + # Both the ISO string and the datetime decay; the int and missing keys do not. + assert decay_facts(facts, policy, as_of=as_of) == ["dt_ts", "str_ts"] + + # --------------------------------------------------------------------------- # Orchestration / apply (#498) # ---------------------------------------------------------------------------