diff --git a/agents/README.md b/agents/README.md deleted file mode 100644 index 94bf49b..0000000 --- a/agents/README.md +++ /dev/null @@ -1,12 +0,0 @@ ---- -name: rune-agents-readme -description: Documentation for Rune agents. ---- - -# Rune Agents - -This directory contains the agent specifications for Rune: - -- **Scribe**: For capturing organizational context. -- **Retriever**: For searching and synthesizing organizational context. -- **Agent adapters**: `claude/`, `gemini/`, and `codex/` contain client-specific prompt variants where needed. diff --git a/agents/SLACK_SETUP.md b/agents/SLACK_SETUP.md deleted file mode 100644 index d2bf3a5..0000000 --- a/agents/SLACK_SETUP.md +++ /dev/null @@ -1,9 +0,0 @@ ---- -name: rune-slack-setup -description: Instructions for setting up the Rune Slack app. ---- - -# Slack Setup Guide - -To set up the Rune Slack app, follow these steps: -... diff --git a/agents/__init__.py b/agents/__init__.py deleted file mode 100644 index 6aba734..0000000 --- a/agents/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -Rune Agents - -Working implementations of Scribe (context capture) and Retriever (context retrieval). - -Philosophy: -- All memory is reproducible from payload.text (Markdown) -- Evidence-based reasoning: "Why" cannot be confirmed without quotes -- On-device similarity search for decision detection -- Text-only storage (no binary data) - -Usage: - from agents.common import load_config, EmbeddingService, PatternCache - from agents.common.schemas import DecisionRecord, render_payload_text - from agents.scribe import DecisionDetector, RecordBuilder - from agents.retriever import Searcher, Synthesizer -""" - -__version__ = "0.3.1" diff --git a/agents/common/__init__.py b/agents/common/__init__.py deleted file mode 100644 index 108832e..0000000 --- a/agents/common/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -Rune Agents Common Module - -Shared infrastructure for Scribe and Retriever agents. -""" - -from .config import RuneConfig, load_config -from .embedding_service import EmbeddingService -from .envector_client import EnVectorClient -from .pattern_cache import PatternCache, PatternEntry - -__all__ = [ - "RuneConfig", - "load_config", - "EmbeddingService", - "EnVectorClient", - "PatternCache", - "PatternEntry", -] diff --git a/agents/common/config.py b/agents/common/config.py deleted file mode 100644 index 2be3a1f..0000000 --- a/agents/common/config.py +++ /dev/null @@ -1,365 +0,0 @@ -""" -Configuration Management for Rune Agents - -Loads configuration from ~/.rune/config.json and environment variables. -""" - -import os -import json -from pathlib import Path -from dataclasses import dataclass, field - -# Default config paths -CONFIG_DIR = Path.home() / ".rune" -CONFIG_PATH = CONFIG_DIR / "config.json" -LOGS_DIR = CONFIG_DIR / "logs" -KEYS_DIR = CONFIG_DIR / "keys" -REVIEW_QUEUE_PATH = CONFIG_DIR / "review_queue.json" -CAPTURE_LOG_PATH = CONFIG_DIR / "capture_log.jsonl" - -# Project paths (relative to this file) -PROJECT_ROOT = Path(__file__).parent.parent.parent # rune/ -PATTERNS_DIR = PROJECT_ROOT / "patterns" -MCP_SERVER_DIR = PROJECT_ROOT / "mcp" / "server" - - -@dataclass -class VaultConfig: - """Rune-Vault configuration""" - endpoint: str = "" - token: str = "" - ca_cert: str = "" # Path to CA cert PEM. Empty = system CA. - tls_disable: bool = False - - -@dataclass -class EmbeddingConfig: - """Embedding model configuration""" - mode: str = "sbert" # sentence-transformers (on-device) - model: str = "Qwen/Qwen3-Embedding-0.6B" - - -@dataclass -class LLMConfig: - """Shared LLM provider configuration across all agents""" - provider: str = "anthropic" - tier2_provider: str = "anthropic" - anthropic_api_key: str = "" - anthropic_model: str = "claude-sonnet-4-20250514" - openai_api_key: str = "" - openai_model: str = "gpt-4o-mini" - openai_tier2_model: str = "" - google_api_key: str = "" - google_model: str = "gemini-2.0-flash-exp" - google_tier2_model: str = "" - - -@dataclass -class EnVectorConfig: - """enVector Cloud credentials (cached from Vault bundle)""" - endpoint: str = "" - api_key: str = "" - - -@dataclass -class ScribeConfig: - """Scribe agent configuration""" - slack_webhook_port: int = 8080 - similarity_threshold: float = 0.35 # Tier 1: wider net (Tier 2 LLM handles precision) - auto_capture_threshold: float = 0.7 - tier2_enabled: bool = False # Legacy: only enable if API keys configured - tier2_model: str = "claude-haiku-4-5-20251001" - patterns_path: str = str(PATTERNS_DIR / "capture-triggers.md") - slack_signing_secret: str = "" - notion_signing_secret: str = "" - - -@dataclass -class RetrieverConfig: - """Retriever agent configuration""" - topk: int = 10 - confidence_threshold: float = 0.5 - - -@dataclass -class RuneConfig: - """Main Rune configuration""" - vault: VaultConfig = field(default_factory=VaultConfig) - envector: EnVectorConfig = field(default_factory=EnVectorConfig) - embedding: EmbeddingConfig = field(default_factory=EmbeddingConfig) - llm: LLMConfig = field(default_factory=LLMConfig) - scribe: ScribeConfig = field(default_factory=ScribeConfig) - retriever: RetrieverConfig = field(default_factory=RetrieverConfig) - state: str = "dormant" # "active" or "dormant" - dormant_reason: str = "" # raeson why plugin entered dormant state (e.g., "vault_unreachable", "user_deactivated") - dormant_since: str = "" # Timestamp of when dormant state was entered - _env_sourced_keys: set = field(default_factory=set, repr=False) - - -def _parse_vault_config(data: dict) -> VaultConfig: - """Parse vault section from config dict""" - vault_data = data.get("vault", {}) - return VaultConfig( - endpoint=vault_data.get("endpoint") or vault_data.get("url", ""), - token=vault_data.get("token", ""), - ca_cert=vault_data.get("ca_cert", ""), - tls_disable=vault_data.get("tls_disable", False), - ) - - -def _parse_embedding_config(data: dict) -> EmbeddingConfig: - """Parse embedding section from config dict""" - embedding_data = data.get("embedding", {}) - return EmbeddingConfig( - mode=embedding_data.get("mode", "sbert"), - model=embedding_data.get("model", "Qwen/Qwen3-Embedding-0.6B"), - ) - - -def _parse_scribe_config(data: dict) -> ScribeConfig: - """Parse scribe section from config dict""" - scribe_data = data.get("scribe", {}) - return ScribeConfig( - slack_webhook_port=scribe_data.get("slack_webhook_port", 8080), - similarity_threshold=scribe_data.get("similarity_threshold", 0.35), - auto_capture_threshold=scribe_data.get("auto_capture_threshold", 0.7), - tier2_enabled=scribe_data.get("tier2_enabled", False), - tier2_model=scribe_data.get("tier2_model", "claude-haiku-4-5-20251001"), - patterns_path=scribe_data.get("patterns_path", str(PATTERNS_DIR / "capture-triggers.md")), - slack_signing_secret=scribe_data.get("slack_signing_secret", ""), - notion_signing_secret=scribe_data.get("notion_signing_secret", ""), - ) - - -def _parse_retriever_config(data: dict) -> RetrieverConfig: - """Parse retriever section from config dict (non-LLM fields only)""" - retriever_data = data.get("retriever", {}) - return RetrieverConfig( - topk=retriever_data.get("topk", 10), - confidence_threshold=retriever_data.get("confidence_threshold", 0.5), - ) - - -def _parse_envector_config(data: dict) -> EnVectorConfig: - """Parse envector section from config dict""" - ev_data = data.get("envector", {}) - return EnVectorConfig( - endpoint=ev_data.get("endpoint", ""), - api_key=ev_data.get("api_key", ""), - ) - - -def _parse_llm_config(data: dict) -> LLMConfig: - """Parse LLM configuration with backward-compatible migration. - - Reads from ``data["llm"]`` first. If that section is absent, falls back - to reading LLM-specific keys from ``data["retriever"]`` and - ``data["scribe"]["tier2_provider"]`` for backward compatibility with - configs written before the ``llm`` section existed. - """ - llm_data = data.get("llm") - - if llm_data is not None: - # New-style config: read directly from llm section - return LLMConfig( - provider=llm_data.get("provider", "anthropic"), - tier2_provider=llm_data.get("tier2_provider", "anthropic"), - anthropic_api_key=llm_data.get("anthropic_api_key", ""), - anthropic_model=llm_data.get("anthropic_model", "claude-sonnet-4-20250514"), - openai_api_key=llm_data.get("openai_api_key", ""), - openai_model=llm_data.get("openai_model", "gpt-4o-mini"), - openai_tier2_model=llm_data.get("openai_tier2_model", ""), - google_api_key=llm_data.get("google_api_key", ""), - google_model=llm_data.get("google_model", "gemini-2.0-flash-exp"), - google_tier2_model=llm_data.get("google_tier2_model", ""), - ) - - # Migration: fall back to retriever + scribe fields - retriever_data = data.get("retriever", {}) - scribe_data = data.get("scribe", {}) - - return LLMConfig( - provider=retriever_data.get("llm_provider", "anthropic"), - tier2_provider=scribe_data.get("tier2_provider", "anthropic"), - anthropic_api_key=retriever_data.get("anthropic_api_key", ""), - anthropic_model=retriever_data.get("anthropic_model", "claude-sonnet-4-20250514"), - openai_api_key=retriever_data.get("openai_api_key", ""), - openai_model=retriever_data.get("openai_model", "gpt-4o-mini"), - openai_tier2_model="", - google_api_key=retriever_data.get("google_api_key", ""), - google_model=retriever_data.get("google_model", "gemini-2.0-flash-exp"), - google_tier2_model="", - ) - - -def load_config() -> RuneConfig: - """ - Load configuration from file and environment variables. - - Vault credentials are loaded from ~/.rune/config.json. - enVector credentials are cached in config.json (populated from Vault bundle - during pipeline initialization). - Other settings (embedding, scribe, LLM keys) can be overridden via - environment variables. - - Priority (highest to lowest): - 1. Environment variables - 2. Config file (~/.rune/config.json) - 3. Default values - """ - config = RuneConfig() - - # Load from config file if exists - if CONFIG_PATH.exists(): - try: - with open(CONFIG_PATH) as f: - data = json.load(f) - - config.vault = _parse_vault_config(data) - config.envector = _parse_envector_config(data) - config.embedding = _parse_embedding_config(data) - config.llm = _parse_llm_config(data) - config.scribe = _parse_scribe_config(data) - config.retriever = _parse_retriever_config(data) - config.state = data.get("state", "dormant") - config.dormant_reason = data.get("dormant_reason", "") - config.dormant_since = data.get("dormant_since", "") - except (json.JSONDecodeError, IOError) as e: - print(f"[Config] Warning: Failed to load config file: {e}") - - # Environment variable overrides - if os.getenv("EMBEDDING_MODE"): - config.embedding.mode = os.getenv("EMBEDDING_MODE") - if os.getenv("EMBEDDING_MODEL"): - config.embedding.model = os.getenv("EMBEDDING_MODEL") - - if os.getenv("SCRIBE_PORT"): - try: - config.scribe.slack_webhook_port = int(os.getenv("SCRIBE_PORT")) - except ValueError: - print(f"[Config] Warning: invalid SCRIBE_PORT value: {os.getenv('SCRIBE_PORT')}") - if os.getenv("SCRIBE_THRESHOLD"): - try: - config.scribe.similarity_threshold = float(os.getenv("SCRIBE_THRESHOLD")) - except ValueError: - print(f"[Config] Warning: invalid SCRIBE_THRESHOLD value: {os.getenv('SCRIBE_THRESHOLD')}") - if os.getenv("SCRIBE_AUTO_THRESHOLD"): - try: - config.scribe.auto_capture_threshold = float(os.getenv("SCRIBE_AUTO_THRESHOLD")) - except ValueError: - print(f"[Config] Warning: invalid SCRIBE_AUTO_THRESHOLD value: {os.getenv('SCRIBE_AUTO_THRESHOLD')}") - if os.getenv("SLACK_SIGNING_SECRET"): - config.scribe.slack_signing_secret = os.getenv("SLACK_SIGNING_SECRET") - if os.getenv("NOTION_SIGNING_SECRET"): - config.scribe.notion_signing_secret = os.getenv("NOTION_SIGNING_SECRET") - - # LLM env var overrides (target config.llm, track env-sourced keys) - _env_llm_map = { - "ANTHROPIC_API_KEY": "anthropic_api_key", - "ANTHROPIC_MODEL": "anthropic_model", - "OPENAI_API_KEY": "openai_api_key", - "OPENAI_MODEL": "openai_model", - "GOOGLE_API_KEY": "google_api_key", - "GEMINI_API_KEY": "google_api_key", - "GOOGLE_MODEL": "google_model", - "RUNE_LLM_PROVIDER": "provider", - "RUNE_TIER2_LLM_PROVIDER": "tier2_provider", - } - for env_var, attr in _env_llm_map.items(): - val = os.getenv(env_var) - if val: - setattr(config.llm, attr, val) - config._env_sourced_keys.add(attr) - - if os.getenv("RUNE_STATE"): - config.state = os.getenv("RUNE_STATE") - - return config - - -def save_config(config: RuneConfig) -> None: - """Save configuration to file. - - API key fields that were sourced from environment variables are written - as empty strings so that secrets are not persisted to disk. - """ - CONFIG_DIR.mkdir(parents=True, exist_ok=True) - os.chmod(str(CONFIG_DIR), 0o700) # Force 700 regardless of umask - - env_sourced = getattr(config, "_env_sourced_keys", set()) - - # Build llm section, blanking out env-sourced API key fields - _llm_api_key_fields = { - "anthropic_api_key", "openai_api_key", "google_api_key", - } - llm_section = { - "provider": config.llm.provider, - "tier2_provider": config.llm.tier2_provider, - "anthropic_api_key": config.llm.anthropic_api_key, - "anthropic_model": config.llm.anthropic_model, - "openai_api_key": config.llm.openai_api_key, - "openai_model": config.llm.openai_model, - "openai_tier2_model": config.llm.openai_tier2_model, - "google_api_key": config.llm.google_api_key, - "google_model": config.llm.google_model, - "google_tier2_model": config.llm.google_tier2_model, - } - for key in _llm_api_key_fields: - if key in env_sourced: - llm_section[key] = "" - - data = { - "vault": { - "endpoint": config.vault.endpoint, - "token": config.vault.token, - "ca_cert": config.vault.ca_cert, - "tls_disable": config.vault.tls_disable, - }, - "envector": { - "endpoint": config.envector.endpoint, - "api_key": config.envector.api_key, - }, - "embedding": { - "mode": config.embedding.mode, - "model": config.embedding.model, - }, - "llm": llm_section, - "scribe": { - "slack_webhook_port": config.scribe.slack_webhook_port, - "similarity_threshold": config.scribe.similarity_threshold, - "auto_capture_threshold": config.scribe.auto_capture_threshold, - "tier2_enabled": config.scribe.tier2_enabled, - "tier2_model": config.scribe.tier2_model, - "patterns_path": config.scribe.patterns_path, - "slack_signing_secret": config.scribe.slack_signing_secret, - "notion_signing_secret": config.scribe.notion_signing_secret, - }, - "retriever": { - "topk": config.retriever.topk, - "confidence_threshold": config.retriever.confidence_threshold, - }, - "state": config.state, - } - - # Include dormant metadata - if config.state == "dormant": - if config.dormant_reason: - data["dormant_reason"] = config.dormant_reason - if config.dormant_since: - data["dormant_since"] = config.dormant_since - - with open(CONFIG_PATH, "w") as f: - json.dump(data, f, indent=2) - - # Set secure permissions - CONFIG_PATH.chmod(0o600) - - -def ensure_directories() -> None: - """Ensure required directories exist with secure permissions""" - CONFIG_DIR.mkdir(parents=True, exist_ok=True) - os.chmod(str(CONFIG_DIR), 0o700) - LOGS_DIR.mkdir(parents=True, exist_ok=True) - os.chmod(str(LOGS_DIR), 0o700) - KEYS_DIR.mkdir(parents=True, exist_ok=True) - os.chmod(str(KEYS_DIR), 0o700) diff --git a/agents/common/embedding_service.py b/agents/common/embedding_service.py deleted file mode 100644 index 1bc9d38..0000000 --- a/agents/common/embedding_service.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Embedding Service - -Wraps the existing EmbeddingAdapter from mcp/adapter. -Provides on-device embedding generation using fastembed. -""" - -import logging -import sys -from pathlib import Path -from typing import List, Optional -import numpy as np - -logger = logging.getLogger("rune.common.embedding") - -# Add mcp/ to path so `from adapter import ...` works -MCP_ROOT = Path(__file__).parent.parent.parent / "mcp" -if str(MCP_ROOT) not in sys.path: - sys.path.insert(0, str(MCP_ROOT)) - - -class EmbeddingService: - """ - Singleton embedding service for Rune agents. - - Uses fastembed by default for on-device embedding generation. - This avoids external API calls and keeps data local. - """ - - _instance: Optional["EmbeddingService"] = None - _adapter = None - - def __new__(cls, mode: str = "femb", model: str = "Qwen/Qwen3-Embedding-0.6B"): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._init_adapter(mode, model) - elif cls._instance._model != model or cls._instance._mode != mode: - cls._instance._init_adapter(mode, model) - return cls._instance - - def _init_adapter(self, mode: str, model: str) -> None: - """Initialize the underlying EmbeddingAdapter""" - try: - from adapter.embeddings import EmbeddingAdapter - self._adapter = EmbeddingAdapter(mode=mode, model_name=model) - self._mode = mode - self._model = model - logger.info("Initialized with mode=%s, model=%s", mode, model) - except ImportError as e: - logger.warning("Could not import EmbeddingAdapter: %s", e) - logger.warning("Using fallback mode (no embeddings)") - self._adapter = None - - @property - def is_available(self) -> bool: - """Check if embedding service is available""" - return self._adapter is not None - - def embed(self, texts: List[str]) -> List[List[float]]: - """ - Generate embeddings for a list of texts. - - Args: - texts: List of strings to embed - - Returns: - List of embedding vectors (L2 normalized) - """ - if not self._adapter: - raise RuntimeError("EmbeddingAdapter not initialized") - - if not texts: - return [] - - embeddings = self._adapter.get_embedding(texts) - - # Ensure consistent return type - if isinstance(embeddings, np.ndarray): - return embeddings.tolist() - return embeddings - - def embed_single(self, text: str) -> List[float]: - """ - Generate embedding for a single text. - - Args: - text: String to embed - - Returns: - Embedding vector (L2 normalized) - """ - if not text: - raise ValueError("Cannot embed empty text") - - embeddings = self.embed([text]) - return embeddings[0] - - def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: - """ - Compute cosine similarity between two vectors. - - Note: EmbeddingAdapter already L2 normalizes vectors, - so dot product equals cosine similarity. - - Args: - vec1: First embedding vector - vec2: Second embedding vector - - Returns: - Cosine similarity score (0.0 to 1.0) - """ - v1 = np.array(vec1) - v2 = np.array(vec2) - - # Handle potential dimension mismatch - if v1.shape != v2.shape: - raise ValueError(f"Vector dimension mismatch: {v1.shape} vs {v2.shape}") - - # For normalized vectors, dot product = cosine similarity - similarity = float(np.dot(v1, v2)) - - # Clamp to valid range (numerical precision issues) - return max(0.0, min(1.0, similarity)) - - def batch_cosine_similarity( - self, - query_vec: List[float], - vectors: List[List[float]] - ) -> List[float]: - """ - Compute cosine similarity between a query and multiple vectors. - - Args: - query_vec: Query embedding vector - vectors: List of embedding vectors to compare against - - Returns: - List of similarity scores - """ - if not vectors: - return [] - - query = np.array(query_vec) - matrix = np.array(vectors) - - # Matrix multiplication for batch similarity - similarities = np.dot(matrix, query) - - # Clamp to valid range - similarities = np.clip(similarities, 0.0, 1.0) - - return similarities.tolist() - - -# Module-level singleton getter -_service_instance: Optional[EmbeddingService] = None - - -def get_embedding_service( - mode: str = "sbert", - model: str = "Qwen/Qwen3-Embedding-0.6B" -) -> EmbeddingService: - """ - Get the singleton EmbeddingService instance. - - Args: - mode: Embedding mode (femb, sbert, hf, openai) - model: Model name - - Returns: - EmbeddingService instance - """ - global _service_instance - - if _service_instance is None: - _service_instance = EmbeddingService(mode=mode, model=model) - - return _service_instance diff --git a/agents/common/envector_client.py b/agents/common/envector_client.py deleted file mode 100644 index 9afda07..0000000 --- a/agents/common/envector_client.py +++ /dev/null @@ -1,220 +0,0 @@ -""" -EnVector Client - -Wraps EnVectorSDKAdapter for direct access to enVector operations. -Avoids MCP protocol overhead by importing adapters directly. -""" - -import json -import logging -import sys -from pathlib import Path -from typing import List, Dict, Any, Optional - -logger = logging.getLogger("rune.common.envector") - -# Add mcp/ to path so `from adapter import ...` works -MCP_ROOT = Path(__file__).parent.parent.parent / "mcp" -if str(MCP_ROOT) not in sys.path: - sys.path.insert(0, str(MCP_ROOT)) - - -class EnVectorClient: - """ - Direct client to enVector operations. - - Uses direct import of EnVectorSDKAdapter instead of MCP protocol - for lower overhead when running on the same machine. - """ - - def __init__( - self, - address: str = "localhost:50050", - key_path: str = "~/.rune/keys", - key_id: str = None, - access_token: Optional[str] = None, - auto_key_setup: bool = True, - agent_id: Optional[str] = None, - agent_dek: Optional[bytes] = None, - ): - """ - Initialize EnVector client. - - Args: - address: enVector server address (host:port or cloud URL) - key_path: Path to store/load encryption keys - key_id: Key identifier - access_token: Cloud access token (for enVector Cloud) - auto_key_setup: Auto-generate keys if not found - agent_id: Per-agent identifier for app-layer metadata encryption - agent_dek: Per-agent AES-256 DEK (32 bytes) from Vault - """ - self._address = address - self._key_path = Path(key_path).expanduser() - self._key_id = key_id - self._access_token = access_token - self._auto_key_setup = auto_key_setup - self._agent_id = agent_id - self._agent_dek = agent_dek - self._adapter = None - self._initialized = False - - def _ensure_initialized(self) -> None: - """Lazily initialize the adapter""" - if self._initialized: - return - - try: - from adapter.envector_sdk import EnVectorSDKAdapter - - # Ensure key directory exists - self._key_path.mkdir(parents=True, exist_ok=True) - - self._adapter = EnVectorSDKAdapter( - address=self._address, - key_id=self._key_id, - key_path=str(self._key_path), - eval_mode="rmp", - query_encryption=False, # Plain queries for simplicity - access_token=self._access_token, - auto_key_setup=self._auto_key_setup, - agent_id=self._agent_id, - agent_dek=self._agent_dek, - ) - self._initialized = True - logger.info("Connected to %s", self._address) - - except ImportError as e: - logger.warning("Could not import EnVectorSDKAdapter: %s", e) - raise RuntimeError(f"EnVectorSDKAdapter not available: {e}") - except Exception as e: - logger.error("Error initializing: %s", e) - raise - - @property - def is_available(self) -> bool: - """Check if client is available""" - try: - self._ensure_initialized() - return self._adapter is not None - except Exception: - return False - - def get_index_list(self) -> Dict[str, Any]: - """Get list of all indexes""" - self._ensure_initialized() - return self._adapter.call_get_index_list() - - def insert( - self, - index_name: str, - vectors: List[List[float]], - metadata: Optional[List[Dict]] = None - ) -> Dict[str, Any]: - """ - Insert vectors into an index. - - Args: - index_name: Target index name - vectors: List of embedding vectors - metadata: Optional list of metadata dicts (one per vector) - - Returns: - Result dict with ok/error status - """ - self._ensure_initialized() - - if metadata: - # Serialize metadata to JSON strings - meta_list = [ - json.dumps(m) if isinstance(m, dict) else str(m) - for m in metadata - ] - else: - meta_list = [json.dumps({"index": i}) for i in range(len(vectors))] - - return self._adapter.call_insert( - index_name=index_name, - vectors=vectors, - metadata=meta_list - ) - - def insert_with_text( - self, - index_name: str, - texts: List[str], - embedding_service, - metadata: Optional[List[Dict]] = None - ) -> Dict[str, Any]: - """ - Embed texts and insert into index. - - Args: - index_name: Target index name - texts: List of texts to embed - embedding_service: EmbeddingService instance - metadata: Optional list of metadata dicts - - Returns: - Result dict with ok/error status - """ - # Generate embeddings - vectors = embedding_service.embed(texts) - - # Add text to metadata if not provided - if metadata is None: - metadata = [{"text": t} for t in texts] - else: - for i, meta in enumerate(metadata): - if "text" not in meta: - meta["text"] = texts[i] - - return self.insert(index_name, vectors, metadata) - - def score( - self, - index_name: str, - query_vector: List[float], - ) -> Dict[str, Any]: - """ - Encrypted similarity scoring (Vault-secured pipeline step 1). - - Returns encrypted score blobs for Vault decryption. - - Args: - index_name: Index to score against - query_vector: Query embedding vector - - Returns: - Result dict with encrypted_blobs list - """ - self._ensure_initialized() - return self._adapter.call_score( - index_name=index_name, - query=[query_vector], - ) - - def remind( - self, - index_name: str, - indices: List[Dict[str, Any]], - output_fields: Optional[List[str]] = None, - ) -> Dict[str, Any]: - """ - Retrieve metadata for indices returned by Vault (Vault-secured pipeline step 3). - - Args: - index_name: Index to fetch metadata from - indices: List of dicts with shard_idx, row_idx, score - output_fields: Fields to include (default: ["metadata"]) - - Returns: - Result dict with metadata entries - """ - self._ensure_initialized() - return self._adapter.call_remind( - index_name=index_name, - indices=indices, - output_fields=output_fields, - ) - diff --git a/agents/common/language.py b/agents/common/language.py deleted file mode 100644 index d6d2597..0000000 --- a/agents/common/language.py +++ /dev/null @@ -1,172 +0,0 @@ -""" -Language Detection Service - -Automatic per-message language detection using langdetect + Unicode script fallback. -Used to route messages to LLM extraction (non-English) or regex extraction (English). -""" - -import re -import unicodedata -from dataclasses import dataclass -from typing import Optional - -# Matches any Hangul, Kana, or CJK character -_NON_LATIN_RE = re.compile( - r'[\u1100-\u11FF\u3040-\u309F\u30A0-\u30FF\u3130-\u318F' - r'\u3400-\u4DBF\u4E00-\u9FFF\uAC00-\uD7AF]' -) - -# Seed langdetect for deterministic results -try: - from langdetect import DetectorFactory - DetectorFactory.seed = 0 -except ImportError: - pass - - -@dataclass(frozen=True) -class LanguageInfo: - """Detected language information""" - code: str # ISO 639-1: "en", "ko", "ja" - confidence: float # 0.0~1.0 - script: str # "Latin", "Hangul", "CJK", "Kana", "Mixed" - - @property - def is_english(self) -> bool: - return self.code == "en" - - @property - def needs_llm_extraction(self) -> bool: - """Non-English text needs LLM extraction path""" - return not self.is_english - - -# Unicode range based script detection -_SCRIPT_RANGES = [ - (0xAC00, 0xD7AF, "Hangul", "ko"), # Hangul Syllables - (0x1100, 0x11FF, "Hangul", "ko"), # Hangul Jamo - (0x3130, 0x318F, "Hangul", "ko"), # Hangul Compatibility Jamo - (0x3040, 0x309F, "Kana", "ja"), # Hiragana - (0x30A0, 0x30FF, "Kana", "ja"), # Katakana - (0x4E00, 0x9FFF, "CJK", "zh"), # CJK Unified Ideographs - (0x3400, 0x4DBF, "CJK", "zh"), # CJK Extension A -] - - -def _detect_script(text: str) -> tuple[str, Optional[str]]: - """Detect dominant script from Unicode character ranges. - - Returns: - (script_name, language_code) or ("Latin", None) for ASCII-dominant text - """ - script_counts: dict[str, int] = {} - lang_counts: dict[str, int] = {} - total = 0 - - for ch in text: - if ch.isspace() or ch in '.,!?;:"\'-()[]{}': - continue - total += 1 - cp = ord(ch) - matched = False - for start, end, script, lang in _SCRIPT_RANGES: - if start <= cp <= end: - script_counts[script] = script_counts.get(script, 0) + 1 - lang_counts[lang] = lang_counts.get(lang, 0) + 1 - matched = True - break - if not matched: - script_counts["Latin"] = script_counts.get("Latin", 0) + 1 - - if total == 0: - return "Latin", None - - # Find dominant script - dominant_script = max(script_counts, key=script_counts.get) - dominant_count = script_counts[dominant_script] - - # If multiple scripts are significant, mark as Mixed - non_latin_scripts = {k: v for k, v in script_counts.items() if k != "Latin"} - if len(non_latin_scripts) > 1: - top_two = sorted(non_latin_scripts.values(), reverse=True) - if len(top_two) >= 2 and top_two[1] > total * 0.2: - # Japanese text often mixes CJK + Kana - if "Kana" in non_latin_scripts and "CJK" in non_latin_scripts: - return "Kana", "ja" - return "Mixed", None - - # Determine language from dominant non-Latin script - if dominant_script != "Latin" and dominant_count > total * 0.15: - # Find the language for this script - for lang, count in lang_counts.items(): - if count == max(lang_counts.values()): - # Special case: CJK with Kana = Japanese - if "Kana" in script_counts and script_counts.get("Kana", 0) > 0: - return "Kana", "ja" - return dominant_script, lang - - return "Latin", None - - -def detect_language(text: str) -> LanguageInfo: - """Detect language of input text. - - Uses langdetect library with Unicode script-based fallback. - Short texts (<10 chars) default to English. - - Args: - text: Input text to detect language for - - Returns: - LanguageInfo with detected language code, confidence, and script - """ - if not text or not text.strip(): - return LanguageInfo(code="en", confidence=1.0, script="Latin") - - cleaned = text.strip() - - # Very short text defaults to English - if len(cleaned) < 10: - # But check for obvious non-Latin scripts - script, lang = _detect_script(cleaned) - if lang: - return LanguageInfo(code=lang, confidence=0.6, script=script) - return LanguageInfo(code="en", confidence=0.5, script="Latin") - - # Determine script first — used to validate langdetect results - script, script_lang = _detect_script(cleaned) - - # Try langdetect - try: - from langdetect import detect_langs - results = detect_langs(cleaned) - if results: - top = results[0] - lang_code = top.lang - confidence = top.prob - - # If text is purely Latin-script (no Hangul/Kana/CJK characters), - # treat as English. langdetect frequently misclassifies short English - # text as fr, af, nl, de, etc. The LLM extraction path is designed - # for CJK scripts (ko, ja, zh), not Latin-script languages. - # However, if there ARE any non-Latin chars (e.g., Korean text with - # English terms like "PostgreSQL"), trust langdetect. - if lang_code != "en" and not _NON_LATIN_RE.search(cleaned): - return LanguageInfo(code="en", confidence=0.5, script="Latin") - - return LanguageInfo( - code=lang_code, - confidence=round(confidence, 4), - script=script, - ) - except ImportError: - pass # langdetect not installed, fall through to Unicode fallback - except Exception: - pass # langdetect failed (e.g., too short), fall through - - # Fallback: Unicode script-based detection - if script_lang: - return LanguageInfo(code=script_lang, confidence=0.7, script=script) - - # Default to English for Latin script - return LanguageInfo(code="en", confidence=0.5, script="Latin") diff --git a/agents/common/llm_client.py b/agents/common/llm_client.py deleted file mode 100644 index 3c886c1..0000000 --- a/agents/common/llm_client.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -Provider-agnostic LLM client for Rune pipelines. - -Supports Anthropic, OpenAI, and Google Gemini with a shared text-generation -interface. -""" - -from __future__ import annotations - -import logging -from typing import Optional - -logger = logging.getLogger("rune.common.llm_client") - - -class LLMClient: - """Unified text generation client across LLM providers.""" - - def __init__( - self, - provider: str = "anthropic", - model: str = "", - anthropic_api_key: Optional[str] = None, - openai_api_key: Optional[str] = None, - google_api_key: Optional[str] = None, - ) -> None: - self.provider = (provider or "anthropic").lower() - self.model = model - self._client = None - - if self.provider == "auto": - raise ValueError( - '"auto" provider must be resolved before creating LLMClient. ' - 'Use _resolve_provider() in the MCP server or scribe server.' - ) - - if self.provider == "anthropic": - if not anthropic_api_key: - logger.info("%s API key not provided, LLM client unavailable", self.provider) - return - try: - import anthropic - - self._client = anthropic.Anthropic(api_key=anthropic_api_key) - except ImportError: - logger.warning("anthropic package not installed") - except Exception as e: - logger.warning("Failed to initialize Anthropic client: %s", e) - return - - if self.provider == "openai": - if not openai_api_key: - logger.info("%s API key not provided, LLM client unavailable", self.provider) - return - try: - from openai import OpenAI - - self._client = OpenAI(api_key=openai_api_key) - except ImportError: - logger.warning("openai package not installed") - except Exception as e: - logger.warning("Failed to initialize OpenAI client: %s", e) - return - - if self.provider == "google": - if not google_api_key: - logger.info("%s API key not provided, LLM client unavailable", self.provider) - return - try: - import google.generativeai as genai - - genai.configure(api_key=google_api_key) - self._client = genai # Store the module, not a model instance - self._google_models = {} # Cache models by system prompt hash - except ImportError: - logger.warning("google-generativeai package not installed") - except Exception as e: - logger.warning("Failed to initialize Gemini client: %s", e) - return - - logger.warning("Unsupported LLM provider: %s", self.provider) - - @property - def is_available(self) -> bool: - return self._client is not None - - def generate( - self, - prompt: str, - *, - system: Optional[str] = None, - max_tokens: int = 512, - timeout: float = 30.0, - ) -> str: - if not self.is_available: - raise RuntimeError("LLM client is not available") - - if self.provider == "anthropic": - response = self._client.messages.create( - model=self.model, - max_tokens=max_tokens, - system=system, - messages=[{"role": "user", "content": prompt}], - timeout=timeout, - ) - return response.content[0].text.strip() - - if self.provider == "openai": - messages = [] - if system: - messages.append({"role": "system", "content": system}) - messages.append({"role": "user", "content": prompt}) - response = self._client.chat.completions.create( - model=self.model, - max_tokens=max_tokens, - messages=messages, - timeout=timeout, - ) - return (response.choices[0].message.content or "").strip() - - if self.provider == "google": - import hashlib - - cache_key = hashlib.md5((system or "").encode()).hexdigest() - if cache_key not in self._google_models: - kwargs = {"model_name": self.model} - if system: - kwargs["system_instruction"] = system - self._google_models[cache_key] = self._client.GenerativeModel(**kwargs) - model = self._google_models[cache_key] - response = model.generate_content( - prompt, - generation_config={"max_output_tokens": max_tokens}, - request_options={"timeout": timeout}, - ) - return response.text.strip() - - raise RuntimeError(f"Unsupported LLM provider: {self.provider}") - diff --git a/agents/common/llm_utils.py b/agents/common/llm_utils.py deleted file mode 100644 index 8a520e0..0000000 --- a/agents/common/llm_utils.py +++ /dev/null @@ -1,39 +0,0 @@ -"""Shared utilities for parsing LLM responses.""" - -from __future__ import annotations - -import json - - -def parse_llm_json(raw: str) -> dict: - """Parse JSON from an LLM response, handling code fences and preamble text. - - Tries in order: - 1. Strip markdown code fences, then json.loads - 2. Direct json.loads on the raw string - 3. Extract substring between first '{' and last '}', then json.loads - 4. Return empty dict - """ - if not raw: - return {} - - text = raw - if text.startswith("```"): - lines = text.split("\n") - lines = [l for l in lines if not l.strip().startswith("```")] - text = "\n".join(lines) - - try: - return json.loads(text) - except json.JSONDecodeError: - pass - - start = raw.find("{") - end = raw.rfind("}") + 1 - if start >= 0 and end > start: - try: - return json.loads(raw[start:end]) - except json.JSONDecodeError: - pass - - return {} diff --git a/agents/common/pattern_cache.py b/agents/common/pattern_cache.py deleted file mode 100644 index 473ce0f..0000000 --- a/agents/common/pattern_cache.py +++ /dev/null @@ -1,203 +0,0 @@ -""" -Pattern Cache - -Pre-embeds trigger patterns from capture-triggers.md at startup. -Used for on-device similarity-based decision detection. -""" - -from typing import List, Dict, Tuple, Optional -from dataclasses import dataclass -import numpy as np - -from .embedding_service import EmbeddingService - - -@dataclass -class PatternEntry: - """A single trigger pattern with its embedding""" - text: str - category: str - priority: str # "high", "medium", "low" - embedding: List[float] - domain: Optional[str] = None - language: Optional[str] = None # ISO 639-1: "en", "ko", "ja" - - -class PatternCache: - """ - Cache of pre-embedded trigger patterns. - - At startup, loads patterns from capture-triggers.md, - embeds them all, and stores for fast similarity lookup. - """ - - def __init__(self, embedding_service: EmbeddingService): - """ - Initialize pattern cache. - - Args: - embedding_service: EmbeddingService instance for generating embeddings - """ - self._embedding = embedding_service - self._patterns: List[PatternEntry] = [] - self._embeddings_matrix: Optional[np.ndarray] = None - self._loaded = False - - @property - def is_loaded(self) -> bool: - """Check if patterns are loaded""" - return self._loaded - - @property - def pattern_count(self) -> int: - """Number of loaded patterns""" - return len(self._patterns) - - def load_patterns(self, patterns: List[Dict]) -> int: - """ - Load and embed patterns. - - Args: - patterns: List of pattern dicts with keys: - - text: Pattern text - - category: Category name - - priority: "high", "medium", or "low" - - domain: Optional domain classification - - Returns: - Number of patterns loaded - """ - if not patterns: - print("[PatternCache] Warning: No patterns to load") - return 0 - - # Extract texts for batch embedding - texts = [p["text"] for p in patterns] - - print(f"[PatternCache] Embedding {len(texts)} patterns...") - embeddings = self._embedding.embed(texts) - - # Create PatternEntry objects - self._patterns = [ - PatternEntry( - text=p["text"], - category=p.get("category", "general"), - priority=p.get("priority", "medium"), - domain=p.get("domain"), - embedding=embeddings[i], - language=p.get("language"), - ) - for i, p in enumerate(patterns) - ] - - # Create embeddings matrix for fast batch similarity - self._embeddings_matrix = np.array([p.embedding for p in self._patterns]) - self._loaded = True - - print(f"[PatternCache] Loaded {len(self._patterns)} patterns") - return len(self._patterns) - - def find_best_match( - self, - text: str, - threshold: float = 0.7 - ) -> Tuple[Optional[PatternEntry], float]: - """ - Find the best matching pattern for input text. - - Args: - text: Input text to match - threshold: Minimum similarity threshold - - Returns: - Tuple of (best_match, score) or (None, best_score) if below threshold - """ - if not self._loaded: - raise RuntimeError("Patterns not loaded. Call load_patterns() first.") - - if not text.strip(): - return (None, 0.0) - - # Embed input text - text_embedding = self._embedding.embed_single(text) - text_vec = np.array(text_embedding) - - # Compute similarities with all patterns (batch operation) - similarities = np.dot(self._embeddings_matrix, text_vec) - - # Find best match - best_idx = int(np.argmax(similarities)) - best_score = float(similarities[best_idx]) - - if best_score >= threshold: - return (self._patterns[best_idx], best_score) - - return (None, best_score) - - def find_top_matches( - self, - text: str, - top_k: int = 5, - threshold: float = 0.5 - ) -> List[Tuple[PatternEntry, float]]: - """ - Find top-k matching patterns for input text. - - Args: - text: Input text to match - top_k: Number of top matches to return - threshold: Minimum similarity threshold - - Returns: - List of (pattern, score) tuples, sorted by score descending - """ - if not self._loaded: - raise RuntimeError("Patterns not loaded. Call load_patterns() first.") - - if not text.strip(): - return [] - - # Embed input text - text_embedding = self._embedding.embed_single(text) - text_vec = np.array(text_embedding) - - # Compute similarities with all patterns - similarities = np.dot(self._embeddings_matrix, text_vec) - - # Get top-k indices - if len(similarities) <= top_k: - top_indices = np.argsort(similarities)[::-1] - else: - top_indices = np.argpartition(similarities, -top_k)[-top_k:] - top_indices = top_indices[np.argsort(similarities[top_indices])[::-1]] - - # Filter by threshold and create results - results = [] - for idx in top_indices: - score = float(similarities[idx]) - if score >= threshold: - results.append((self._patterns[idx], score)) - - return results - - def get_patterns_by_category(self, category: str) -> List[PatternEntry]: - """Get all patterns in a category""" - return [p for p in self._patterns if p.category.lower() == category.lower()] - - def get_patterns_by_priority(self, priority: str) -> List[PatternEntry]: - """Get all patterns with given priority""" - return [p for p in self._patterns if p.priority.lower() == priority.lower()] - - def get_high_priority_patterns(self) -> List[PatternEntry]: - """Get all high-priority patterns""" - return self.get_patterns_by_priority("high") - - def categories(self) -> List[str]: - """Get list of unique categories""" - return list(set(p.category for p in self._patterns)) - - def clear(self) -> None: - """Clear all loaded patterns""" - self._patterns = [] - self._embeddings_matrix = None - self._loaded = False diff --git a/agents/common/schemas/__init__.py b/agents/common/schemas/__init__.py deleted file mode 100644 index d74efc2..0000000 --- a/agents/common/schemas/__init__.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -Rune Decision Record Schemas - -Schema v2 for organizational memory with evidence-based reasoning. -""" - -from .decision_record import ( - DecisionRecord, - DecisionDetail, - Context, - Why, - Evidence, - SourceRef, - SourceType, - Assumption, - Risk, - Quality, - Payload, - Domain, - Sensitivity, - Status, - Certainty, - ReviewState, - generate_record_id, - generate_group_id, -) -from .templates import render_payload_text, render_display_text, PAYLOAD_TEMPLATE, PAYLOAD_HEADERS - -__all__ = [ - "DecisionRecord", - "DecisionDetail", - "Context", - "Why", - "Evidence", - "SourceRef", - "SourceType", - "Assumption", - "Risk", - "Quality", - "Payload", - "Domain", - "Sensitivity", - "Status", - "Certainty", - "ReviewState", - "generate_record_id", - "generate_group_id", - "render_payload_text", - "render_display_text", - "PAYLOAD_TEMPLATE", - "PAYLOAD_HEADERS", -] diff --git a/agents/common/schemas/decision_record.py b/agents/common/schemas/decision_record.py deleted file mode 100644 index 88ce320..0000000 --- a/agents/common/schemas/decision_record.py +++ /dev/null @@ -1,259 +0,0 @@ -""" -Decision Record Schema v2 - -Core principle: Memory items always have a "text payload" that can fully reproduce the context. -The embedding is generated from that text payload. -"Why" cannot be written definitively without evidence. -""" - -from datetime import datetime, timezone -from typing import List, Optional, Literal -from pydantic import BaseModel, Field -from enum import Enum - - -# ============================================================================ -# Enums -# ============================================================================ - -class Domain(str, Enum): - """Decision domain categories""" - ARCHITECTURE = "architecture" - SECURITY = "security" - PRODUCT = "product" - EXEC = "exec" - OPS = "ops" - DESIGN = "design" - DATA = "data" - HR = "hr" - MARKETING = "marketing" - INCIDENT = "incident" - DEBUGGING = "debugging" - QA = "qa" - LEGAL = "legal" - FINANCE = "finance" - SALES = "sales" - CUSTOMER_SUCCESS = "customer_success" - RESEARCH = "research" - RISK = "risk" - GENERAL = "general" - - -class Sensitivity(str, Enum): - """Data sensitivity levels""" - PUBLIC = "public" - INTERNAL = "internal" - RESTRICTED = "restricted" - - -class Status(str, Enum): - """Decision status""" - PROPOSED = "proposed" - ACCEPTED = "accepted" - SUPERSEDED = "superseded" - REVERTED = "reverted" - - -class Certainty(str, Enum): - """Evidence certainty level for 'Why'""" - SUPPORTED = "supported" - PARTIALLY_SUPPORTED = "partially_supported" - UNKNOWN = "unknown" - - -class ReviewState(str, Enum): - """Human review state""" - UNREVIEWED = "unreviewed" - APPROVED = "approved" - EDITED = "edited" - REJECTED = "rejected" - - -class SourceType(str, Enum): - """Source types for evidence""" - SLACK = "slack" - MEETING = "meeting" - DOC = "doc" - GITHUB = "github" - EMAIL = "email" - NOTION = "notion" - OTHER = "other" - - -# ============================================================================ -# Sub-models -# ============================================================================ - -class SourceRef(BaseModel): - """Reference to the source of evidence""" - type: SourceType - url: Optional[str] = None - pointer: Optional[str] = None # e.g., "channel:#arch thread_ts:123" or "timestamp:00:32:14" - - -class Evidence(BaseModel): - """Evidence supporting a claim with direct quote""" - claim: str = Field(..., description="What is being claimed") - quote: str = Field(..., description="Direct quote (1-2 sentences)") - source: SourceRef - - -class Assumption(BaseModel): - """Assumption with confidence level""" - assumption: str - confidence: float = Field(ge=0.0, le=1.0, default=0.5) - - -class Risk(BaseModel): - """Risk with mitigation strategy""" - risk: str - mitigation: Optional[str] = None - - -class DecisionDetail(BaseModel): - """What was decided, by whom, when, where""" - what: str = Field(..., description="The actual decision statement") - who: List[str] = Field(default_factory=list, description="Participants (role:cto, user:alice)") - where: str = Field(default="", description="Channel/meeting where decided") - when: str = Field(default="", description="Date of decision (YYYY-MM-DD)") - - -class Context(BaseModel): - """Context surrounding the decision""" - problem: str = Field(default="", description="Problem being solved") - scope: Optional[str] = None - constraints: List[str] = Field(default_factory=list) - alternatives: List[str] = Field(default_factory=list) - chosen: str = Field(default="", description="Chosen alternative") - trade_offs: List[str] = Field(default_factory=list) - assumptions: List[Assumption] = Field(default_factory=list) - risks: List[Risk] = Field(default_factory=list) - - -class Why(BaseModel): - """ - Rationale for the decision. - - CRITICAL RULE: certainty cannot be 'supported' without evidence. - If evidence is missing, certainty MUST be 'unknown'. - """ - rationale_summary: str = Field(default="", description="Summary of why this decision was made") - certainty: Certainty = Field(default=Certainty.UNKNOWN) - missing_info: List[str] = Field(default_factory=list, description="What information is missing") - - -class Quality(BaseModel): - """Quality metrics for the capture""" - scribe_confidence: float = Field(ge=0.0, le=1.0, default=0.5) - review_state: ReviewState = Field(default=ReviewState.UNREVIEWED) - reviewed_by: Optional[str] = None - review_notes: Optional[str] = None - - -class Payload(BaseModel): - """ - The normalized text payload for embedding. - This is the SINGLE SOURCE OF TRUTH for memory reproduction. - """ - format: Literal["markdown"] = "markdown" - text: str = Field(default="", description="Markdown text for embedding") - - -# ============================================================================ -# Main Schema -# ============================================================================ - -class DecisionRecord(BaseModel): - """ - Decision Record Schema v2.1 - - Core principle: payload.text must be able to fully reproduce the memory. - Embedding target: reusable_insight (schema 2.1+), payload.text (fallback). - """ - schema_version: str = Field(default="2.1") - id: str = Field(..., description="Unique ID: dec_YYYY-MM-DD_domain_slug") - type: Literal["decision_record"] = "decision_record" - - domain: Domain = Field(default=Domain.GENERAL) - sensitivity: Sensitivity = Field(default=Sensitivity.INTERNAL) - status: Status = Field(default=Status.PROPOSED) - superseded_by: Optional[str] = None - timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - - title: str = Field(..., description="Short title for the decision") - decision: DecisionDetail - context: Context = Field(default_factory=Context) - why: Why = Field(default_factory=Why) - evidence: List[Evidence] = Field(default_factory=list) - - links: List[dict] = Field(default_factory=list, description="Related links (ADR, PR, etc.)") - tags: List[str] = Field(default_factory=list) - - # Group fields — for linked long-term memory - group_id: Optional[str] = Field(default=None, description="Shared ID linking all records in a group") - group_type: Optional[str] = Field(default=None, description="Group type: 'phase_chain' (sequential reasoning) or 'bundle' (detail facets)") - phase_seq: Optional[int] = Field(default=None, description="0-indexed position within the group") - phase_total: Optional[int] = Field(default=None, description="Total number of records in the group") - - # Content preservation - original_text: Optional[str] = Field(default=None, description="Original conversation text before extraction") - group_summary: Optional[str] = Field(default=None, description="1-line topic summary shared across all phases for semantic anchoring") - - # PRIMARY embedding target (schema 2.1+) - reusable_insight: str = Field( - default="", - description=( - "Dense natural-language paragraph capturing the core knowledge. " - "PRIMARY text embedded in enVector for semantic search. " - "256-768 tokens, no markdown, self-contained, causality-preserving." - ), - ) - - quality: Quality = Field(default_factory=Quality) - payload: Payload = Field(default_factory=Payload) - - def validate_evidence_certainty(self) -> bool: - """ - Validate that certainty is appropriate given evidence. - Returns True if valid, False if certainty should be downgraded. - """ - has_quotes = any(e.quote for e in self.evidence) - - if self.why.certainty == Certainty.SUPPORTED and not has_quotes: - return False - return True - - def ensure_evidence_certainty_consistency(self) -> None: - """ - Enforce: Why cannot be 'supported' without evidence quotes. - Mutates the record to fix inconsistencies. - """ - has_quotes = any(e.quote for e in self.evidence) - - if not has_quotes: - if self.why.certainty == Certainty.SUPPORTED: - self.why.certainty = Certainty.UNKNOWN - if "No direct quotes found in evidence" not in self.why.missing_info: - self.why.missing_info.append("No direct quotes found in evidence") - - # If no evidence at all, status should be proposed - if not self.evidence: - if self.status == Status.ACCEPTED: - self.status = Status.PROPOSED - - -def generate_record_id(timestamp: datetime, domain: Domain, title: str) -> str: - """Generate a unique ID for a decision record""" - date_str = timestamp.strftime("%Y-%m-%d") - # Create slug from title (first 3 words, lowercase, underscored) - words = title.lower().split()[:3] - slug = "_".join(w for w in words if w.isalnum() or w.replace("_", "").isalnum()) - return f"dec_{date_str}_{domain.value}_{slug}" - - -def generate_group_id(timestamp: datetime, domain: Domain, title: str) -> str: - """Generate a shared group ID for related records (phase_chain or bundle)""" - date_str = timestamp.strftime("%Y-%m-%d") - words = title.lower().split()[:3] - slug = "_".join(w for w in words if w.isalnum() or w.replace("_", "").isalnum()) - return f"grp_{date_str}_{domain.value}_{slug}" diff --git a/agents/common/schemas/embedding.py b/agents/common/schemas/embedding.py deleted file mode 100644 index 2d4e853..0000000 --- a/agents/common/schemas/embedding.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -Embedding text selection and novelty classification for DecisionRecords. - -Schema 2.1+ uses reusable_insight as the primary embedding target. -Schema 2.0 falls back to payload.text. -""" - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from .decision_record import DecisionRecord - - -# Novelty thresholds (Memory-as-Filter) -# Calibrated for Qwen3-Embedding-0.6B (1024dim) via benchmark 2026-04-08 -NOVELTY_THRESHOLD_NOVEL = 0.4 -NOVELTY_THRESHOLD_RELATED = 0.7 -NOVELTY_THRESHOLD_NEAR_DUPLICATE = 0.93 - - -def embedding_text_for_record(record: "DecisionRecord") -> str: - """Select the text to embed in enVector. - - Schema 2.1+: use reusable_insight (dense NL gist). - Schema 2.0 fallback: use payload.text (verbose markdown). - """ - insight = getattr(record, "reusable_insight", "") - if insight and insight.strip(): - return insight.strip() - return record.payload.text - - -def classify_novelty( - max_similarity: float, - threshold_novel: float = NOVELTY_THRESHOLD_NOVEL, - threshold_related: float = NOVELTY_THRESHOLD_RELATED, - threshold_near_duplicate: float = NOVELTY_THRESHOLD_NEAR_DUPLICATE, -) -> dict: - """Classify capture novelty based on similarity to existing memory. - - Returns dict with 'score' (0-1, higher=more novel) and 'class'. - Classes (annotation-only except near_duplicate): - - near_duplicate (>= 0.93): blocks capture - - related (>= 0.7): annotation only - - evolution (>= 0.4): annotation only - - novel (< 0.4): annotation only - """ - novelty_score = 1.0 - max_similarity - if max_similarity >= threshold_near_duplicate: - return {"class": "near_duplicate", "score": round(novelty_score, 4)} - elif max_similarity >= threshold_related: - return {"class": "related", "score": round(novelty_score, 4)} - elif max_similarity >= threshold_novel: - return {"class": "evolution", "score": round(novelty_score, 4)} - else: - return {"class": "novel", "score": round(novelty_score, 4)} diff --git a/agents/common/schemas/templates.py b/agents/common/schemas/templates.py deleted file mode 100644 index 70ea387..0000000 --- a/agents/common/schemas/templates.py +++ /dev/null @@ -1,363 +0,0 @@ -""" -Payload Text Templates - -Renders DecisionRecord to Markdown format for embedding. -The payload.text is the SINGLE SOURCE OF TRUTH for memory reproduction. -""" - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from .decision_record import DecisionRecord - - -PAYLOAD_TEMPLATE = """# Decision Record: {title} -ID: {id} -Status: {status} | Sensitivity: {sensitivity} | Domain: {domain} -When/Where: {when} | {where} - -## Decision -{what} - -## Problem -{problem} - -## Alternatives Considered -{alternatives} - -## Why (Rationale) -{rationale_summary} -Certainty: {certainty} - -## Trade-offs -{trade_offs} - -## Assumptions -{assumptions} - -## Risks & Mitigations -{risks} - -## Evidence (Quotes) -{evidence_block} - -## Links -{links} - -## Tags -{tags} -""" - - -def _format_alternatives(alternatives: list, chosen: str) -> str: - """Format alternatives list with chosen marker""" - if not alternatives: - return "- (none documented)" - - lines = [] - for alt in alternatives: - if alt.lower() == chosen.lower() or chosen.lower() in alt.lower(): - lines.append(f"- {alt} (chosen)") - else: - lines.append(f"- {alt}") - return "\n".join(lines) - - -def _format_trade_offs(trade_offs: list) -> str: - """Format trade-offs list""" - if not trade_offs: - return "- (none documented)" - return "\n".join(f"- {t}" for t in trade_offs) - - -def _format_assumptions(assumptions: list) -> str: - """Format assumptions with confidence""" - if not assumptions: - return "- (none documented)" - - lines = [] - for a in assumptions: - conf = getattr(a, 'confidence', 0.5) - lines.append(f"- {a.assumption} (confidence: {conf:.1f})") - return "\n".join(lines) - - -def _format_risks(risks: list) -> str: - """Format risks with mitigations""" - if not risks: - return "- (none documented)" - - lines = [] - for r in risks: - mitigation = getattr(r, 'mitigation', None) or "TBD" - lines.append(f"- Risk: {r.risk}\n Mitigation: {mitigation}") - return "\n".join(lines) - - -def _format_evidence(evidence: list) -> str: - """Format evidence with quotes and sources""" - if not evidence: - return "(no evidence recorded)" - - lines = [] - for i, e in enumerate(evidence, 1): - source_type = e.source.type.value if hasattr(e.source.type, 'value') else str(e.source.type) - source_url = e.source.url or "(no url)" - source_pointer = e.source.pointer or "" - - lines.append(f"{i}) Claim: {e.claim}") - lines.append(f' Quote: "{e.quote}"') - lines.append(f" Source: {source_type} {source_url}") - if source_pointer: - lines.append(f" Pointer: {source_pointer}") - lines.append("") - - return "\n".join(lines).strip() - - -def _format_links(links: list) -> str: - """Format related links""" - if not links: - return "- (none)" - - lines = [] - for link in links: - rel = link.get('rel', 'link') - url = link.get('url', '') - lines.append(f"- {rel}: {url}") - return "\n".join(lines) - - -def _format_tags(tags: list) -> str: - """Format tags as comma-separated""" - if not tags: - return "(none)" - return ", ".join(tags) - - -def render_payload_text(record: "DecisionRecord") -> str: - """ - Render a DecisionRecord to payload.text (Markdown). - - This text is used for: - 1. Memory reproduction (human-readable full context) - 2. Display in recall results - - NOTE (schema 2.1+): Embedding generation now uses - record.reusable_insight instead of this text. - For schema 2.0 records without reusable_insight, - this text is still used as embedding fallback. - """ - # Extract values with safe defaults - domain = record.domain.value if hasattr(record.domain, 'value') else str(record.domain) - sensitivity = record.sensitivity.value if hasattr(record.sensitivity, 'value') else str(record.sensitivity) - status = record.status.value if hasattr(record.status, 'value') else str(record.status) - certainty = record.why.certainty.value if hasattr(record.why.certainty, 'value') else str(record.why.certainty) - - # Format complex fields - alternatives = _format_alternatives( - record.context.alternatives, - record.context.chosen - ) - trade_offs = _format_trade_offs(record.context.trade_offs) - assumptions = _format_assumptions(record.context.assumptions) - risks = _format_risks(record.context.risks) - evidence_block = _format_evidence(record.evidence) - links = _format_links(record.links) - tags = _format_tags(record.tags) - - # Build rationale with missing info if applicable - rationale = record.why.rationale_summary or "(no rationale documented)" - if record.why.missing_info: - rationale += "\n\nMissing Information:\n" + "\n".join(f"- {m}" for m in record.why.missing_info) - - # Group info (phase_chain or bundle) - phase_line = "" - if getattr(record, 'group_id', None): - seq = (getattr(record, 'phase_seq', None) or 0) + 1 - total = getattr(record, 'phase_total', None) or "?" - gtype = getattr(record, 'group_type', None) or "phase_chain" - phase_line = f"\nPart: {seq} of {total} | Type: {gtype} | Group: {record.group_id}" - - # Render template - text = PAYLOAD_TEMPLATE.format( - title=record.title, - id=record.id, - status=status, - sensitivity=sensitivity, - domain=domain, - when=record.decision.when or "(unknown)", - where=record.decision.where or "(unknown)", - what=record.decision.what, - problem=record.context.problem or "(not documented)", - alternatives=alternatives, - rationale_summary=rationale, - certainty=certainty, - trade_offs=trade_offs, - assumptions=assumptions, - risks=risks, - evidence_block=evidence_block, - links=links, - tags=tags, - ) - - # Insert phase line and group summary after ID line - if phase_line: - lines = text.split("\n") - for i, line in enumerate(lines): - if line.startswith("ID: "): - insert_pos = i + 1 - lines.insert(insert_pos, phase_line.lstrip("\n")) - # Inject group summary as semantic anchor for all phases - group_summary = getattr(record, 'group_summary', None) - if group_summary: - lines.insert(insert_pos + 1, f"Group Summary: {group_summary}") - break - text = "\n".join(lines) - - # Clean up multiple blank lines - while "\n\n\n" in text: - text = text.replace("\n\n\n", "\n\n") - - return text.strip() - - -def render_compact_payload(record: "DecisionRecord") -> str: - """ - Render a compact version for search result previews. - """ - domain = record.domain.value if hasattr(record.domain, 'value') else str(record.domain) - certainty = record.why.certainty.value if hasattr(record.why.certainty, 'value') else str(record.why.certainty) - - return f"""**{record.title}** ({record.id}) -Domain: {domain} | Certainty: {certainty} - -{record.decision.what} - -Why: {record.why.rationale_summary or '(no rationale)'} -""" - - -# Localized headers for display rendering (not for payload.text / embedding) -PAYLOAD_HEADERS = { - "en": { - "decision_record": "Decision Record", - "decision": "Decision", - "problem": "Problem", - "alternatives": "Alternatives Considered", - "rationale": "Why (Rationale)", - "certainty": "Certainty", - "trade_offs": "Trade-offs", - "assumptions": "Assumptions", - "risks": "Risks & Mitigations", - "evidence": "Evidence (Quotes)", - "links": "Links", - "tags": "Tags", - }, - "ko": { - "decision_record": "결정 기록", - "decision": "결정 사항", - "problem": "문제", - "alternatives": "검토한 대안", - "rationale": "근거 (이유)", - "certainty": "확실성", - "trade_offs": "트레이드오프", - "assumptions": "가정", - "risks": "리스크 및 대응", - "evidence": "증거 (인용)", - "links": "링크", - "tags": "태그", - }, - "ja": { - "decision_record": "決定記録", - "decision": "決定事項", - "problem": "問題", - "alternatives": "検討した代替案", - "rationale": "根拠(理由)", - "certainty": "確実性", - "trade_offs": "トレードオフ", - "assumptions": "仮定", - "risks": "リスクと対策", - "evidence": "証拠(引用)", - "links": "リンク", - "tags": "タグ", - }, -} - - -def render_display_text(record: "DecisionRecord", language: str = "en") -> str: - """Render a DecisionRecord with localized headers for user display. - - Unlike render_payload_text() (which is always English for embedding consistency), - this function uses localized section headers for human-readable presentation. - - Args: - record: DecisionRecord to render - language: ISO 639-1 language code - - Returns: - Localized markdown text for display - """ - headers = PAYLOAD_HEADERS.get(language, PAYLOAD_HEADERS["en"]) - - domain = record.domain.value if hasattr(record.domain, 'value') else str(record.domain) - sensitivity = record.sensitivity.value if hasattr(record.sensitivity, 'value') else str(record.sensitivity) - status = record.status.value if hasattr(record.status, 'value') else str(record.status) - certainty = record.why.certainty.value if hasattr(record.why.certainty, 'value') else str(record.why.certainty) - - alternatives = _format_alternatives( - record.context.alternatives, - record.context.chosen - ) - trade_offs = _format_trade_offs(record.context.trade_offs) - assumptions = _format_assumptions(record.context.assumptions) - risks = _format_risks(record.context.risks) - evidence_block = _format_evidence(record.evidence) - links = _format_links(record.links) - tags = _format_tags(record.tags) - - rationale = record.why.rationale_summary or "(no rationale documented)" - if record.why.missing_info: - rationale += "\n\nMissing Information:\n" + "\n".join(f"- {m}" for m in record.why.missing_info) - - text = f"""# {headers['decision_record']}: {record.title} -ID: {record.id} -Status: {status} | Sensitivity: {sensitivity} | Domain: {domain} -When/Where: {record.decision.when or '(unknown)'} | {record.decision.where or '(unknown)'} - -## {headers['decision']} -{record.decision.what} - -## {headers['problem']} -{record.context.problem or '(not documented)'} - -## {headers['alternatives']} -{alternatives} - -## {headers['rationale']} -{rationale} -{headers['certainty']}: {certainty} - -## {headers['trade_offs']} -{trade_offs} - -## {headers['assumptions']} -{assumptions} - -## {headers['risks']} -{risks} - -## {headers['evidence']} -{evidence_block} - -## {headers['links']} -{links} - -## {headers['tags']} -{tags} -""" - - while "\n\n\n" in text: - text = text.replace("\n\n\n", "\n\n") - - return text.strip() diff --git a/agents/retriever/__init__.py b/agents/retriever/__init__.py deleted file mode 100644 index 076d150..0000000 --- a/agents/retriever/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -Retriever Agent - Organizational Context Retrieval - -Searches organizational memory and synthesizes answers using LLM. - -Key Components: -- QueryProcessor: Parses and expands user queries -- Searcher: Searches enVector for relevant context -- Synthesizer: LLM-based answer synthesis from payload.text - -Pipeline: -1. Parse user query (intent, entities, time scope) -2. Search enVector for relevant Decision Records -3. Extract payload.text from results -4. Synthesize answer with LLM (respecting certainty levels) -""" - -from .query_processor import QueryProcessor, ParsedQuery -from .searcher import Searcher, SearchResult -from .synthesizer import Synthesizer, SynthesizedAnswer - -__all__ = [ - "QueryProcessor", - "ParsedQuery", - "Searcher", - "SearchResult", - "Synthesizer", - "SynthesizedAnswer", -] diff --git a/agents/retriever/query_processor.py b/agents/retriever/query_processor.py deleted file mode 100644 index 81c676f..0000000 --- a/agents/retriever/query_processor.py +++ /dev/null @@ -1,436 +0,0 @@ -""" -Query Processor - -Parses and analyzes user queries to understand intent and extract entities. -Uses patterns from retrieval-patterns.md for intent classification. -Supports multilingual queries via LLM-based intent classification + translation. -""" - -import json -import logging -import re -from typing import List, Optional -from dataclasses import dataclass, field -from enum import Enum - -from ..common.language import LanguageInfo, detect_language -from ..common.llm_client import LLMClient -from ..common.llm_utils import parse_llm_json - -logger = logging.getLogger("rune.retriever.query") - - -class QueryIntent(str, Enum): - """Types of query intent""" - DECISION_RATIONALE = "decision_rationale" # "Why did we choose X?" - FEATURE_HISTORY = "feature_history" # "Have customers asked for X?" - PATTERN_LOOKUP = "pattern_lookup" # "How do we handle X?" - TECHNICAL_CONTEXT = "technical_context" # "What's our architecture for X?" - SECURITY_COMPLIANCE = "security_compliance" # "What are the security requirements?" - HISTORICAL_CONTEXT = "historical_context" # "When did we decide X?" - ATTRIBUTION = "attribution" # "Who decided on X?" - GENERAL = "general" # Catch-all - - -class TimeScope(str, Enum): - """Time scope for queries""" - LAST_WEEK = "last_week" - LAST_MONTH = "last_month" - LAST_QUARTER = "last_quarter" - LAST_YEAR = "last_year" - ALL_TIME = "all_time" - - -@dataclass -class ParsedQuery: - """Parsed representation of a user query""" - original: str - cleaned: str - intent: QueryIntent - time_scope: TimeScope = TimeScope.ALL_TIME - entities: List[str] = field(default_factory=list) - keywords: List[str] = field(default_factory=list) - expanded_queries: List[str] = field(default_factory=list) - language: Optional[LanguageInfo] = None - - -class QueryProcessor: - """ - Processes user queries for organizational memory search. - - Responsibilities: - 1. Clean and normalize query text - 2. Detect query intent (why, how, what, when, who) - 3. Extract entities and keywords - 4. Determine time scope - 5. Generate query expansions for better recall - """ - - # Intent detection patterns (from retrieval-patterns.md) - INTENT_PATTERNS = { - QueryIntent.DECISION_RATIONALE: [ - r"why did we (choose|decide|go with|select|pick)", - r"what was the (reasoning|rationale|logic|thinking)", - r"why .+ over .+", - r"what were the (reasons|factors)", - r"why (not|didn't we)", - r"reasoning behind", - ], - QueryIntent.FEATURE_HISTORY: [ - r"(have|did) (customers?|users?) (asked|requested|wanted)", - r"feature request", - r"why did we (reject|say no|decline)", - r"(how many|which) customers", - r"customer feedback (on|about)", - ], - QueryIntent.PATTERN_LOOKUP: [ - r"how do we (handle|deal with|approach|manage)", - r"what'?s our (approach|process|standard|convention)", - r"is there (an?|existing) (pattern|standard|convention)", - r"what'?s the (best practice|recommended way)", - r"how should (we|I)", - ], - QueryIntent.TECHNICAL_CONTEXT: [ - r"what'?s our (architecture|design|system) for", - r"how (does|is) .+ (implemented|built|designed)", - r"(explain|describe) (the|our) .+ (system|architecture|design)", - r"technical (details|overview) (of|for)", - ], - QueryIntent.SECURITY_COMPLIANCE: [ - r"(security|compliance) (requirements?|considerations?)", - r"what (security|privacy) (measures|controls)", - r"(gdpr|hipaa|sox|pci) (requirements?|compliance)", - r"audit (requirements?|trail)", - ], - QueryIntent.HISTORICAL_CONTEXT: [ - r"when did we (decide|choose|implement|launch)", - r"(history|timeline) of", - r"(have|did) we (ever|previously)", - r"how long (have|has) .+ been", - ], - QueryIntent.ATTRIBUTION: [ - r"who (decided|chose|approved|owns)", - r"which (team|person|group) (is responsible|decided|owns)", - r"(owner|maintainer) of", - ], - } - - # Time scope patterns - TIME_PATTERNS = { - TimeScope.LAST_WEEK: [r"last week", r"this week", r"past week", r"7 days"], - TimeScope.LAST_MONTH: [r"last month", r"this month", r"past month", r"30 days"], - TimeScope.LAST_QUARTER: [r"last quarter", r"this quarter", r"Q[1-4]", r"past 3 months"], - TimeScope.LAST_YEAR: [r"last year", r"this year", r"20\d{2}", r"past year"], - } - - # Stop words to filter from keywords - STOP_WORDS = { - "the", "a", "an", "is", "are", "was", "were", "be", "been", "being", - "have", "has", "had", "do", "does", "did", "will", "would", "could", - "should", "may", "might", "must", "shall", "can", "need", "dare", - "ought", "used", "to", "of", "in", "for", "on", "with", "at", "by", - "from", "up", "about", "into", "over", "after", "we", "our", "us", - "i", "me", "my", "you", "your", "it", "its", "they", "them", "their", - "this", "that", "these", "those", "what", "which", "who", "whom", - "when", "where", "why", "how", "and", "or", "but", "if", "because", - "as", "until", "while", "although", "though", "even", "just", "also", - } - - # LLM prompt for multilingual query parsing - QUERY_PARSE_PROMPT = """Analyze this user query and extract structured information. -The query may be in any language. Translate all outputs to English. - -Respond with a valid JSON object: -{{ - "intent": one of ["decision_rationale", "feature_history", "pattern_lookup", "technical_context", "security_compliance", "historical_context", "attribution", "general"], - "english_query": "the query translated to English", - "entities": ["list", "of", "named", "entities"], - "keywords": ["important", "keywords", "in", "english"], - "time_scope": one of ["last_week", "last_month", "last_quarter", "last_year", "all_time"] -}} - -Query: - -{query} - - -JSON:""" - - def __init__( - self, - llm_provider: str = "anthropic", - anthropic_api_key: Optional[str] = None, - openai_api_key: Optional[str] = None, - google_api_key: Optional[str] = None, - model: str = "claude-sonnet-4-20250514", - ): - """Initialize query processor. - - Args: - llm_provider: LLM provider to use - anthropic_api_key: Optional API key for Anthropic - openai_api_key: Optional API key for OpenAI - google_api_key: Optional API key for Gemini - model: Provider model to use - """ - self._llm = None - self._model = model - - self._llm = LLMClient( - provider=llm_provider, - model=model, - anthropic_api_key=anthropic_api_key, - openai_api_key=openai_api_key, - google_api_key=google_api_key, - ) - - def parse(self, query: str) -> ParsedQuery: - """ - Parse a user query into structured form. - - Args: - query: Raw user query string - - Returns: - ParsedQuery with intent, entities, and expansions - """ - language = detect_language(query) - - if language.is_english or not self._llm or not self._llm.is_available: - # English path: existing regex (unchanged) - return self._parse_english(query, language) - else: - # Non-English path: LLM classification + translation - return self._parse_multilingual(query, language) - - def _parse_english(self, query: str, language: Optional[LanguageInfo] = None) -> ParsedQuery: - """Parse English query using regex patterns (original logic).""" - # Clean query - cleaned = self._clean_query(query) - - # Detect intent - intent = self._detect_intent(cleaned) - - # Detect time scope - time_scope = self._detect_time_scope(cleaned) - - # Extract entities - entities = self._extract_entities(query) - - # Extract keywords - keywords = self._extract_keywords(cleaned) - - # Generate query expansions - expanded = self._generate_expansions(cleaned, intent, entities) - - return ParsedQuery( - original=query, - cleaned=cleaned, - intent=intent, - time_scope=time_scope, - entities=entities, - keywords=keywords, - expanded_queries=expanded, - language=language, - ) - - def _parse_multilingual(self, query: str, language: LanguageInfo) -> ParsedQuery: - """Parse non-English query using LLM for intent classification + translation.""" - try: - prompt = self.QUERY_PARSE_PROMPT.format(query=query) - raw = self._llm.generate( - prompt, - max_tokens=256, - timeout=30.0, - ) - result = parse_llm_json(raw) - - # Map intent string to enum - intent_map = {v.value: v for v in QueryIntent} - intent = intent_map.get(result.get("intent", ""), QueryIntent.GENERAL) - - # Map time_scope string to enum - scope_map = {v.value: v for v in TimeScope} - time_scope = scope_map.get(result.get("time_scope", ""), TimeScope.ALL_TIME) - - english_query = result.get("english_query", query) - - # expanded_queries: original + English translation (both searched) - expanded = [query, english_query] - # Add intent-based expansions on the English translation - english_expansions = self._generate_expansions( - english_query.lower(), intent, result.get("entities", []) - ) - for exp in english_expansions: - if exp not in expanded: - expanded.append(exp) - - return ParsedQuery( - original=query, - cleaned=query, - intent=intent, - time_scope=time_scope, - entities=result.get("entities", []), - keywords=result.get("keywords", []), - expanded_queries=expanded[:7], - language=language, - ) - except Exception as e: - logger.warning("LLM parsing failed: %s", e) - # Fallback to regex parsing - return self._parse_english(query, language) - - def _clean_query(self, query: str) -> str: - """Clean and normalize query text""" - # Lowercase - cleaned = query.lower().strip() - - # Remove extra whitespace - cleaned = re.sub(r'\s+', ' ', cleaned) - - # Remove trailing punctuation (but keep question marks) - cleaned = re.sub(r'[.!,;:]+$', '', cleaned) - - return cleaned - - def _detect_intent(self, query: str) -> QueryIntent: - """Detect the primary intent of the query""" - query_lower = query.lower() - - for intent, patterns in self.INTENT_PATTERNS.items(): - for pattern in patterns: - if re.search(pattern, query_lower, re.IGNORECASE): - return intent - - return QueryIntent.GENERAL - - def _detect_time_scope(self, query: str) -> TimeScope: - """Detect time scope from query""" - query_lower = query.lower() - - for scope, patterns in self.TIME_PATTERNS.items(): - for pattern in patterns: - if re.search(pattern, query_lower, re.IGNORECASE): - return scope - - return TimeScope.ALL_TIME - - def _extract_entities(self, query: str) -> List[str]: - """Extract named entities from query""" - entities = [] - - # Extract quoted strings - quoted = re.findall(r'"([^"]+)"|\'([^\']+)\'', query) - for q in quoted: - entity = q[0] or q[1] - if entity and len(entity) > 1: - entities.append(entity) - - # Extract capitalized words/phrases (potential proper nouns) - # But not at the start of sentences - words = query.split() - for i, word in enumerate(words): - if i > 0 and word[0].isupper() and len(word) > 1: - # Check if it's a multi-word entity - phrase = [word] - j = i + 1 - while j < len(words) and words[j][0].isupper(): - phrase.append(words[j]) - j += 1 - entity = ' '.join(phrase) - if entity not in entities: - entities.append(entity) - - # Extract technology names (common patterns) - tech_patterns = [ - r'\b(PostgreSQL|MySQL|MongoDB|Redis|Elasticsearch|Kafka)\b', - r'\b(React|Vue|Angular|Next\.js|Node\.js|Python|Java|Go)\b', - r'\b(AWS|GCP|Azure|Kubernetes|Docker|Terraform)\b', - r'\b(REST|GraphQL|gRPC|WebSocket|HTTP|HTTPS)\b', - ] - for pattern in tech_patterns: - matches = re.findall(pattern, query, re.IGNORECASE) - entities.extend(matches) - - # Deduplicate and return - return list(dict.fromkeys(entities))[:10] - - def _extract_keywords(self, query: str) -> List[str]: - """Extract important keywords from query""" - # Split into words - words = re.findall(r'\b\w+\b', query.lower()) - - # Filter stop words and short words - keywords = [ - w for w in words - if w not in self.STOP_WORDS and len(w) > 2 - ] - - # Deduplicate and return - return list(dict.fromkeys(keywords))[:15] - - def _generate_expansions( - self, - query: str, - intent: QueryIntent, - entities: List[str] - ) -> List[str]: - """Generate query expansions for better recall""" - expansions = [query] # Include original - - # Intent-based expansions - if intent == QueryIntent.DECISION_RATIONALE: - expansions.extend([ - f"decision {query}", - f"rationale {query}", - f"trade-off {query}", - ]) - elif intent == QueryIntent.FEATURE_HISTORY: - expansions.extend([ - f"customer request {query}", - f"feature rejected {query}", - ]) - elif intent == QueryIntent.PATTERN_LOOKUP: - expansions.extend([ - f"standard approach {query}", - f"best practice {query}", - ]) - elif intent == QueryIntent.TECHNICAL_CONTEXT: - expansions.extend([ - f"architecture {query}", - f"implementation {query}", - ]) - - # Entity-based expansions - for entity in entities[:3]: - expansions.append(f"{entity} decision") - expansions.append(f"why {entity}") - - # Deduplicate and limit - seen = set() - unique = [] - for exp in expansions: - if exp.lower() not in seen: - seen.add(exp.lower()) - unique.append(exp) - - return unique[:5] - - def format_for_search(self, parsed: ParsedQuery) -> str: - """ - Format parsed query for enVector search. - - Combines original query with key entities and keywords - for better semantic matching. - """ - parts = [parsed.cleaned] - - # Add entities - if parsed.entities: - parts.append("entities: " + ", ".join(parsed.entities[:3])) - - # Add keywords - if parsed.keywords: - parts.append("keywords: " + ", ".join(parsed.keywords[:5])) - - return " | ".join(parts) diff --git a/agents/retriever/searcher.py b/agents/retriever/searcher.py deleted file mode 100644 index e1d82bc..0000000 --- a/agents/retriever/searcher.py +++ /dev/null @@ -1,576 +0,0 @@ -""" -Searcher - -Searches organizational memory via enVector. -Uses the Vault-secured pipeline: scoring → decrypt → metadata. -Returns Decision Records with their payload.text for synthesis. - -v0.2.4 changes: -- Recency weighting on returned results (benign re-ranking) -- Group assembly from already-fetched results (no over-fetch) -- Client-side metadata filters (best-effort on returned top-k) - -NOTE: Full metadata filtering, group assembly, and recency weighting -should happen Vault-side (rune-admin) to preserve the security model. -The client NEVER requests more than the user's topk from Vault. -""" - -import json -import logging -from datetime import datetime, timedelta, timezone -from typing import List, Dict, Any, Optional -from dataclasses import dataclass, field - -from ..common.envector_client import EnVectorClient -from ..common.embedding_service import EmbeddingService -from .query_processor import ParsedQuery, TimeScope - -logger = logging.getLogger("rune.retriever.searcher") - -# Recency weighting parameters (applied client-side on returned top-k) -HALF_LIFE_DAYS = 90 -SIMILARITY_WEIGHT = 0.7 -RECENCY_WEIGHT = 0.3 - -STATUS_MULTIPLIER = { - "accepted": 1.0, - "proposed": 0.9, - "superseded": 0.5, - "reverted": 0.3, -} - - -@dataclass -class SearchResult: - """A single search result from enVector""" - record_id: str - title: str - payload_text: str # The key output for synthesis - domain: str - certainty: str - status: str - score: float - reusable_insight: str = "" # Schema 2.1+: dense NL gist (primary embedding text) - adjusted_score: float = 0.0 # After recency weighting + status penalty - metadata: Dict[str, Any] = field(default_factory=dict) - # Group fields (phase_chain or bundle) - group_id: Optional[str] = None - group_type: Optional[str] = None - phase_seq: Optional[int] = None - phase_total: Optional[int] = None - - @property - def is_reliable(self) -> bool: - """Check if result has reliable evidence""" - return self.certainty in ("supported", "partially_supported") - - @property - def is_phase(self) -> bool: - """Check if this result is part of a group (phase_chain or bundle)""" - return self.group_id is not None - - @property - def summary(self) -> str: - """Short summary for display""" - return f"{self.title} ({self.domain}, {self.certainty})" - - -class Searcher: - """ - Searches organizational memory using enVector. - - Uses the Vault-secured pipeline (scoring → decrypt → metadata) - when a vault_client is provided. Falls back to direct search otherwise. - - Security model: the client NEVER requests more than the user's topk - from Vault. Over-fetch + post-filter must happen Vault-side. - - v0.2.4 client-side enhancements (on already-returned results only): - - Assemble groups from results already in the result set - - Apply recency weighting (re-ranking, not filtering) - - Best-effort metadata filters (reduces result count, not ideal) - """ - - def __init__( - self, - envector_client: EnVectorClient, - embedding_service: EmbeddingService, - index_name: str, - vault_client=None, - ): - self._client = envector_client - self._embedding = embedding_service - self._index_name = index_name - self._vault = vault_client - - async def search( - self, - query: ParsedQuery, - topk: Optional[int] = None, - filters: Optional[Dict[str, Any]] = None, - ) -> List[SearchResult]: - """ - Search for relevant Decision Records. - - Args: - query: Parsed query from QueryProcessor - topk: Number of results to return (passed to Vault as-is) - filters: Optional metadata filters (best-effort client-side; - full support requires Vault-side implementation): - - domain: str (e.g. "architecture") - - status: str (e.g. "accepted") - - since: str (ISO date, e.g. "2026-01-01") - - Returns: - List of SearchResult objects sorted by adjusted relevance - """ - topk = topk or 10 - - # Step 1: Search with multi-query expansion (respects Vault's topk limit) - all_results = await self._search_with_expansions(query, topk) - - # Step 2: Expand phase chains for groups with missing siblings - all_results = await self._expand_phase_chains(all_results) - - # Step 3: Assemble groups (order by phase_seq, interleave with standalone) - all_results = self._assemble_groups(all_results) - - # Step 4: Best-effort metadata filters (client-side, on complete results) - # NOTE: This may reduce result count below topk. Full support - # requires Vault-side filtering with internal over-fetch. - if filters: - all_results = self._apply_metadata_filters(all_results, filters) - - # Step 5: Time scope filter - if query.time_scope != TimeScope.ALL_TIME: - all_results = self._filter_by_time(all_results, query.time_scope) - - # Step 6: Recency weighting (re-ranks returned results, no security issue) - all_results = self._apply_recency_weighting(all_results) - - return all_results[:topk] - - async def _search_with_expansions( - self, query: ParsedQuery, topk: int - ) -> List[SearchResult]: - """Search with multiple query expansions, dedup results.""" - all_results = [] - seen_ids = set() - - for expanded_query in query.expanded_queries[:3]: - results = await self._search_single(expanded_query, topk) - for result in results: - if result.record_id not in seen_ids: - seen_ids.add(result.record_id) - all_results.append(result) - - # Also search with original query - if query.original not in query.expanded_queries: - results = await self._search_single(query.original, topk) - for result in results: - if result.record_id not in seen_ids: - seen_ids.add(result.record_id) - all_results.append(result) - - all_results.sort(key=lambda r: r.score, reverse=True) - return all_results - - def _assemble_groups(self, results: List[SearchResult]) -> List[SearchResult]: - """ - Assemble group members from already-fetched results. - - When multiple phases of the same group are in the result set, - group them together ordered by phase_seq. Does NOT over-fetch; - missing siblings are handled by _expand_phase_chains. - """ - if not results: - return results - - groups: Dict[str, List[SearchResult]] = {} - group_best_score: Dict[str, float] = {} - standalone = [] - - for r in results: - if r.is_phase and r.group_id: - groups.setdefault(r.group_id, []).append(r) - group_best_score[r.group_id] = max( - group_best_score.get(r.group_id, 0.0), r.score - ) - else: - standalone.append(r) - - if not groups: - return results - - for gid in groups: - groups[gid].sort(key=lambda r: r.phase_seq if r.phase_seq is not None else 0) - - # Interleave: insert groups at their best-score position - all_items = [] - for r in standalone: - all_items.append((r.score, "standalone", r)) - for gid, best_score in group_best_score.items(): - all_items.append((best_score, "group", gid)) - - all_items.sort(key=lambda x: x[0], reverse=True) - - assembled = [] - inserted_groups = set() - for score, item_type, item in all_items: - if item_type == "standalone": - assembled.append(item) - elif item_type == "group" and item not in inserted_groups: - inserted_groups.add(item) - assembled.extend(groups[item]) - - return assembled - - def _apply_metadata_filters( - self, results: List[SearchResult], filters: Dict[str, Any] - ) -> List[SearchResult]: - """ - Best-effort metadata filters on already-returned results. - - WARNING: This reduces result count and may return fewer than topk. - Full metadata filtering requires Vault-side implementation with - internal over-fetch to maintain result count. - """ - filtered = results - - domain = filters.get("domain") - if domain: - filtered = [r for r in filtered if r.domain == domain] - - status = filters.get("status") - if status: - filtered = [r for r in filtered if r.status == status] - - since = filters.get("since") - if since: - filtered = self._filter_since(filtered, since) - - return filtered - - def _filter_since(self, results: List[SearchResult], since_date: str) -> List[SearchResult]: - """Filter results after a given ISO date.""" - filtered = [] - for r in results: - ts_str = r.metadata.get("timestamp") - if ts_str: - try: - if isinstance(ts_str, str): - ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) - else: - ts = datetime.fromtimestamp(float(ts_str), tz=timezone.utc) - if ts.isoformat() >= since_date: - filtered.append(r) - except (ValueError, TypeError): - filtered.append(r) - else: - filtered.append(r) - return filtered - - def _apply_recency_weighting(self, results: List[SearchResult]) -> List[SearchResult]: - """ - Apply time decay and status-based scoring on returned results. - - This is a benign re-ranking of already-returned results. - No security concern: the client only sees what Vault already returned. - """ - now = datetime.now(timezone.utc) - - for r in results: - age_days = 0 - ts_str = r.metadata.get("timestamp") - if ts_str: - try: - if isinstance(ts_str, str): - ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) - else: - ts = datetime.fromtimestamp(float(ts_str), tz=timezone.utc) - age_days = max(0, (now - ts).days) - except (ValueError, TypeError): - pass - - decay = 0.5 ** (age_days / HALF_LIFE_DAYS) if HALF_LIFE_DAYS > 0 else 1.0 - status_mult = STATUS_MULTIPLIER.get(r.status, 1.0) - r.adjusted_score = (SIMILARITY_WEIGHT * r.score + RECENCY_WEIGHT * decay) * status_mult - - results.sort(key=lambda r: r.adjusted_score, reverse=True) - return results - - # ================================================================ - # Phase chain expansion (additional searches for missing siblings) - # ================================================================ - - async def _expand_phase_chains( - self, - results: List[SearchResult], - max_chains: int = 2, - ) -> List[SearchResult]: - """ - Expand phase chain results by fetching sibling phases. - - When a search result is part of a phase chain, searches for the - group_id to retrieve all sibling phases and inserts them in order. - Each sibling search respects Vault's topk limit. - """ - seen_groups = set() - groups_to_expand = [] - for r in results: - if r.is_phase and r.group_id not in seen_groups: - # Check if all siblings are already present - total = r.phase_total or 0 - present = sum(1 for x in results if x.group_id == r.group_id) - if present < total: - seen_groups.add(r.group_id) - groups_to_expand.append(r.group_id) - - if not groups_to_expand: - return results - - groups_to_expand = groups_to_expand[:max_chains] - - group_siblings: Dict[str, List[SearchResult]] = {} - existing_ids = {r.record_id for r in results} - - for group_id in groups_to_expand: - siblings = await self._search_single(f"Group: {group_id}", topk=10) - chain = [s for s in siblings if s.group_id == group_id and s.record_id not in existing_ids] - chain.sort(key=lambda s: s.phase_seq if s.phase_seq is not None else 0) - group_siblings[group_id] = chain - - expanded = [] - expanded_ids = set() - - for r in results: - if r.record_id in expanded_ids: - continue - - if r.is_phase and r.group_id in group_siblings: - # Insert all siblings (including this one) in phase order - all_in_group = [x for x in results if x.group_id == r.group_id] + group_siblings[r.group_id] - seen_in_group = set() - all_in_group.sort(key=lambda x: x.phase_seq if x.phase_seq is not None else 0) - for sibling in all_in_group: - if sibling.record_id not in expanded_ids and sibling.record_id not in seen_in_group: - expanded.append(sibling) - expanded_ids.add(sibling.record_id) - seen_in_group.add(sibling.record_id) - del group_siblings[r.group_id] - elif r.record_id not in expanded_ids: - expanded.append(r) - expanded_ids.add(r.record_id) - - return expanded - - # ================================================================ - # Low-level search methods - # ================================================================ - - async def _search_single(self, query_text: str, topk: int) -> List[SearchResult]: - """Execute a single search query via Vault-secured pipeline.""" - return await self._search_via_vault(query_text, topk) - - async def _search_via_vault(self, query_text: str, topk: int) -> List[SearchResult]: - """ - Vault-secured search pipeline: - 1. Embed query → encrypted similarity scoring on enVector Cloud - 2. Vault decrypts result ciphertext, selects top-k - 3. Retrieve encrypted metadata from enVector Cloud - 4. Vault decrypts metadata - """ - try: - query_vector = self._embedding.embed_single(query_text) - - scoring_result = self._client.score(self._index_name, query_vector) - if not scoring_result.get("ok"): - logger.warning("Scoring failed: %s", scoring_result.get("error")) - return [] - - blobs = scoring_result.get("encrypted_blobs", []) - if not blobs: - return [] - - vault_result = await self._vault.decrypt_search_results( - encrypted_blob_b64=blobs[0], - top_k=topk, - ) - if not vault_result.ok: - logger.warning("Vault decrypt failed: %s", vault_result.error) - return [] - - if not vault_result.results: - return [] - - metadata_result = self._client.remind( - self._index_name, - vault_result.results, - output_fields=["metadata"], - ) - if not metadata_result.get("ok"): - logger.warning("Metadata retrieval failed: %s", metadata_result.get("error")) - return [] - - encrypted_entries = metadata_result.get("results", []) - - vault_decrypt_items = [] - for idx, entry in enumerate(encrypted_entries): - data = entry.get("data", "") - if not data: - continue - - try: - parsed = json.loads(data) - if isinstance(parsed, dict) and "a" in parsed and "c" in parsed: - vault_decrypt_items.append((idx, data)) - else: - entry["metadata"] = parsed - entry.pop("data", None) - except (json.JSONDecodeError, TypeError): - import base64 - try: - raw = base64.b64decode(data) - parsed = json.loads(raw) - entry["metadata"] = parsed - entry.pop("data", None) - except Exception: - logger.warning("Entry %d: unrecognized metadata format, skipping", idx) - entry["metadata"] = {} - entry.pop("data", None) - - if vault_decrypt_items: - try: - decrypted_metadata = await self._vault.decrypt_metadata( - encrypted_metadata_list=[data for _, data in vault_decrypt_items] - ) - for dec_idx, (entry_idx, _) in enumerate(vault_decrypt_items): - if dec_idx < len(decrypted_metadata): - encrypted_entries[entry_idx]["metadata"] = decrypted_metadata[dec_idx] - encrypted_entries[entry_idx].pop("data", None) - except Exception: - logger.info("Batch decrypt failed, falling back to per-entry decrypt") - for entry_idx, data in vault_decrypt_items: - try: - single = await self._vault.decrypt_metadata( - encrypted_metadata_list=[data] - ) - if single: - encrypted_entries[entry_idx]["metadata"] = single[0] - encrypted_entries[entry_idx].pop("data", None) - except Exception as e: - logger.debug("Entry %d decrypt failed: %s", entry_idx, e) - encrypted_entries[entry_idx]["metadata"] = {} - encrypted_entries[entry_idx].pop("data", None) - - return [self._to_search_result(r) for r in encrypted_entries] - - except Exception as e: - logger.error("Vault search error: %s", e, exc_info=True) - return [] - - def _to_search_result(self, raw: Dict[str, Any]) -> SearchResult: - """Convert raw result to SearchResult""" - metadata = raw.get("metadata", {}) - - record_id = metadata.get("id", raw.get("id", "unknown")) - title = metadata.get("title", "Untitled") - domain = metadata.get("domain", "general") - status = metadata.get("status", "unknown") - - why = metadata.get("why", {}) - if isinstance(why, dict): - certainty = why.get("certainty", "unknown") - else: - certainty = "unknown" - - payload = metadata.get("payload", {}) - if isinstance(payload, dict): - payload_text = payload.get("text", "") - else: - payload_text = metadata.get("text", raw.get("text", "")) - - if not payload_text: - decision = metadata.get("decision", {}) - if isinstance(decision, dict): - payload_text = decision.get("what", "") - - reusable_insight = metadata.get("reusable_insight", "") - - group_id = metadata.get("group_id") - group_type = metadata.get("group_type") - phase_seq = metadata.get("phase_seq") - phase_total = metadata.get("phase_total") - - score = raw.get("score", 0.0) - return SearchResult( - record_id=record_id, - title=title, - payload_text=payload_text, - domain=domain, - certainty=certainty, - status=status, - score=score, - reusable_insight=reusable_insight, - adjusted_score=score, - metadata=metadata, - group_id=group_id, - group_type=group_type, - phase_seq=phase_seq, - phase_total=phase_total, - ) - - def _filter_by_time( - self, - results: List[SearchResult], - time_scope: TimeScope - ) -> List[SearchResult]: - """Filter results by time scope""" - now = datetime.now(timezone.utc) - - time_ranges = { - TimeScope.LAST_WEEK: timedelta(days=7), - TimeScope.LAST_MONTH: timedelta(days=30), - TimeScope.LAST_QUARTER: timedelta(days=90), - TimeScope.LAST_YEAR: timedelta(days=365), - } - - if time_scope not in time_ranges: - return results - - cutoff = now - time_ranges[time_scope] - filtered = [] - - for result in results: - timestamp_str = result.metadata.get("timestamp") - if timestamp_str: - try: - if isinstance(timestamp_str, str): - ts = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00")) - else: - ts = datetime.fromtimestamp(float(timestamp_str), tz=timezone.utc) - if ts >= cutoff: - filtered.append(result) - except (ValueError, TypeError): - filtered.append(result) - else: - filtered.append(result) - - return filtered - - async def search_by_id(self, record_id: str) -> Optional[SearchResult]: - """Search for a specific record by ID.""" - results = await self._search_single(f"ID: {record_id}", topk=5) - for result in results: - if result.record_id == record_id: - return result - return None - - async def get_related(self, record_id: str, topk: int = 5) -> List[SearchResult]: - """Find records related to a given record.""" - record = await self.search_by_id(record_id) - if not record: - return [] - search_text = record.reusable_insight.strip() or record.payload_text[:500] - results = await self._search_single(search_text, topk + 1) - return [r for r in results if r.record_id != record_id][:topk] diff --git a/agents/retriever/synthesizer.py b/agents/retriever/synthesizer.py deleted file mode 100644 index 79c3708..0000000 --- a/agents/retriever/synthesizer.py +++ /dev/null @@ -1,482 +0,0 @@ -""" -Synthesizer - -LLM-based answer synthesis from search results. -Uses payload.text from Decision Records to generate coherent answers. - -Key principle: Respect certainty levels from evidence. -- "supported" → confident answer -- "partially_supported" → "likely" or "based on partial evidence" -- "unknown" → "uncertain" or "no clear evidence found" -""" - -import logging -from typing import List, Optional, Dict, Any -from dataclasses import dataclass, field - -from .searcher import SearchResult -from .query_processor import ParsedQuery -from ..common.llm_client import LLMClient - -logger = logging.getLogger("rune.retriever.synthesizer") - - -@dataclass -class SynthesizedAnswer: - """Synthesized answer from LLM""" - answer: str - confidence: float # 0.0 to 1.0 - sources: List[Dict[str, Any]] - related_queries: List[str] = field(default_factory=list) - warnings: List[str] = field(default_factory=list) # e.g., "based on uncertain evidence" - - -# Synthesis prompt template -SYNTHESIS_PROMPT = """You are an AI assistant that answers questions based on organizational decision records. - -{language_instruction} - -Your task is to synthesize an answer from the search results below. Follow these rules strictly: - -1. ONLY use information from the provided records. Do NOT make up information. -2. Respect the certainty level of each record: - - "supported": You can state this confidently - - "partially_supported": Qualify with "likely" or "based on available evidence" - - "unknown": State "uncertain" or "no clear evidence found" -3. Always cite sources by their record ID -4. If no relevant information is found, say "No relevant records found in organizational memory." -5. Be concise but complete. - -User Question: {query} - -Search Results (Decision Records): -{records} - -Instructions: -- Synthesize a clear, direct answer to the question -- Cite record IDs in brackets like [dec_2024-01-01_arch_example] -- Note any uncertainty from records with "unknown" or "partially_supported" certainty -- Suggest follow-up queries if helpful - -Your Answer:""" - - -# Fallback templates per language -FALLBACK_TEMPLATES = { - "en": """## Search Results for: "{query}" - -Found {count} relevant record(s): - -{formatted_results} - ---- -**Note**: This is a direct listing without LLM synthesis. -Configure an LLM provider key (Anthropic/OpenAI/Google) for natural language answers. -""", - "ko": """## "{query}" 검색 결과 - -{count}개의 관련 레코드를 찾았습니다: - -{formatted_results} - ---- -**참고**: LLM 합성 없이 직접 목록을 표시합니다. -자연어 답변을 위해 LLM 제공자 키(Anthropic/OpenAI/Google)를 설정하세요. -""", - "ja": """## "{query}" の検索結果 - -{count}件の関連レコードが見つかりました: - -{formatted_results} - ---- -**注意**: LLM合成なしの直接リスティングです。 -自然言語での回答にはLLMプロバイダーキー(Anthropic/OpenAI/Google)を設定してください。 -""", -} - -# Keep original for backward compatibility -FALLBACK_TEMPLATE = FALLBACK_TEMPLATES["en"] - - -class Synthesizer: - """ - Synthesizes answers from search results using LLM. - - Falls back to simple formatting if LLM is not available. - """ - - def __init__( - self, - llm_provider: str = "anthropic", - anthropic_api_key: Optional[str] = None, - openai_api_key: Optional[str] = None, - google_api_key: Optional[str] = None, - model: str = "claude-sonnet-4-20250514" - ): - """ - Initialize synthesizer. - - Args: - llm_provider: LLM provider to use - anthropic_api_key: Anthropic API key (optional) - openai_api_key: OpenAI API key (optional) - google_api_key: Gemini API key (optional) - model: Model to use for synthesis - """ - self._provider = llm_provider - self._model = model - self._llm = LLMClient( - provider=llm_provider, - model=model, - anthropic_api_key=anthropic_api_key, - openai_api_key=openai_api_key, - google_api_key=google_api_key, - ) - - @property - def has_llm(self) -> bool: - """Check if LLM is available""" - return self._llm.is_available - - def synthesize( - self, - query: ParsedQuery, - results: List[SearchResult] - ) -> SynthesizedAnswer: - """ - Synthesize an answer from search results. - - Args: - query: Parsed user query - results: Search results from Searcher - - Returns: - SynthesizedAnswer with answer text and metadata - """ - if not results: - return SynthesizedAnswer( - answer="No relevant records found in organizational memory.", - confidence=0.0, - sources=[], - related_queries=self._suggest_alternatives(query), - warnings=["No search results found"], - ) - - # Try LLM synthesis first - if self.has_llm: - try: - return self._synthesize_with_llm(query, results) - except Exception as e: - logger.warning("LLM synthesis failed: %s", e) - # Fall through to fallback - - # Fallback to simple formatting - return self._synthesize_fallback(query, results) - - def _synthesize_with_llm( - self, - query: ParsedQuery, - results: List[SearchResult] - ) -> SynthesizedAnswer: - """Synthesize using LLM""" - # Format records for prompt - records_text = self._format_records_for_prompt(results) - - # Determine language instruction - if query.language and not query.language.is_english: - language_instruction = ( - f"IMPORTANT: The user asked in {query.language.code}. " - f"Respond in the SAME language ({query.language.code}). " - f"The source records may be in English — translate relevant parts." - ) - else: - language_instruction = "Respond in English." - - # Build prompt - prompt = SYNTHESIS_PROMPT.format( - query=query.original, - records=records_text, - language_instruction=language_instruction, - ) - - # Call LLM - answer_text = self._llm.generate( - prompt, - max_tokens=1024, - timeout=30.0, - ) - - # Calculate confidence based on results - confidence = self._calculate_confidence(results) - - # Extract sources - sources = [ - { - "record_id": r.record_id, - "title": r.title, - "domain": r.domain, - "certainty": r.certainty, - "score": r.score, - } - for r in results[:5] - ] - - # Check for warnings - warnings = [] - uncertain_count = sum(1 for r in results if r.certainty == "unknown") - if uncertain_count > 0: - warnings.append(f"{uncertain_count} record(s) have uncertain evidence") - - partial_count = sum(1 for r in results if r.certainty == "partially_supported") - if partial_count > 0: - warnings.append(f"{partial_count} record(s) have partial evidence") - - return SynthesizedAnswer( - answer=answer_text, - confidence=confidence, - sources=sources, - related_queries=self._suggest_followups(query, results), - warnings=warnings, - ) - - def _synthesize_fallback( - self, - query: ParsedQuery, - results: List[SearchResult] - ) -> SynthesizedAnswer: - """Fallback synthesis without LLM""" - # Format results - formatted_results = [] - for i, r in enumerate(results[:5], 1): - certainty_marker = { - "supported": "✓", - "partially_supported": "~", - "unknown": "?", - }.get(r.certainty, "?") - - formatted_results.append(f""" -### {i}. {r.title} [{r.record_id}] -**Domain**: {r.domain} | **Certainty**: {certainty_marker} {r.certainty} | **Score**: {r.score:.2f} - -{r.payload_text[:500]}{"..." if len(r.payload_text) > 500 else ""} -""") - - # Select language-specific fallback template - lang_code = query.language.code if query.language else "en" - template = FALLBACK_TEMPLATES.get(lang_code, FALLBACK_TEMPLATES["en"]) - - answer = template.format( - query=query.original, - count=len(results), - formatted_results="\n".join(formatted_results) - ) - - confidence = self._calculate_confidence(results) - - sources = [ - { - "record_id": r.record_id, - "title": r.title, - "domain": r.domain, - "certainty": r.certainty, - "score": r.score, - } - for r in results[:5] - ] - - return SynthesizedAnswer( - answer=answer, - confidence=confidence, - sources=sources, - related_queries=self._suggest_followups(query, results), - warnings=["LLM not available - showing raw results"], - ) - - def _format_records_for_prompt(self, results: List[SearchResult]) -> str: - """Format search results for LLM prompt, grouping linked records.""" - # Group results: linked groups together, standalone separate - groups = [] # List of (group_id_or_none, group_type, [results]) - seen_groups = set() - - for r in results: - if r.is_phase and r.group_id: - if r.group_id not in seen_groups: - seen_groups.add(r.group_id) - chain = [x for x in results if x.group_id == r.group_id] - chain.sort(key=lambda x: x.phase_seq if x.phase_seq is not None else 0) - gtype = r.group_type or "phase_chain" - groups.append((r.group_id, gtype, chain)) - elif not r.is_phase: - groups.append((None, None, [r])) - - formatted = [] - record_num = 1 - - for group_id, group_type, group_results in groups: - if group_id and len(group_results) > 1: - first = group_results[0] - if group_type == "bundle": - # Bundle — detail facets of a single decision - formatted.append(f""" ---- -Record {record_num}: Decision Bundle [{group_id}] -Overall Domain: {first.domain} -Facets: {len(group_results)} -""") - for facet in group_results: - seq = (facet.phase_seq or 0) + 1 - formatted.append(f""" -Facet {seq}: {facet.title} [{facet.record_id}] -Certainty: {facet.certainty} - -{facet.payload_text[:800]} -""") - else: - # Phase chain — sequential reasoning - formatted.append(f""" ---- -Record {record_num}: Phase Chain [{group_id}] -Overall Domain: {first.domain} -Phases: {len(group_results)} -""") - for phase in group_results: - seq = (phase.phase_seq or 0) + 1 - total = phase.phase_total or len(group_results) - formatted.append(f""" -Phase {seq}/{total}: {phase.title} [{phase.record_id}] -Certainty: {phase.certainty} - -{phase.payload_text[:800]} -""") - formatted.append("---\n") - record_num += 1 - else: - # Single record (standalone) - r = group_results[0] - formatted.append(f""" ---- -Record {record_num}: [{r.record_id}] -Title: {r.title} -Domain: {r.domain} -Certainty: {r.certainty} -Relevance Score: {r.score:.2f} - -Content: -{r.payload_text[:1000]} ---- -""") - record_num += 1 - - return "\n".join(formatted) - - def _calculate_confidence(self, results: List[SearchResult]) -> float: - """Calculate overall confidence from results""" - if not results: - return 0.0 - - # Weights for certainty levels - certainty_weights = { - "supported": 1.0, - "partially_supported": 0.6, - "unknown": 0.3, - } - - # Weighted average of top results - total_weight = 0.0 - total_score = 0.0 - - for i, r in enumerate(results[:5]): - # Position weight (higher for top results) - position_weight = 1.0 / (i + 1) - - # Certainty weight - cert_weight = certainty_weights.get(r.certainty, 0.3) - - # Combined weight - weight = position_weight * cert_weight * r.score - total_weight += weight - total_score += weight - - if total_weight == 0: - return 0.0 - - # Normalize to 0-1 range - confidence = min(1.0, total_score / 2.0) # Divide by 2 for reasonable scaling - - return round(confidence, 2) - - def _suggest_alternatives(self, query: ParsedQuery) -> List[str]: - """Suggest alternative queries when no results found""" - suggestions = [] - - # Broader search suggestions - if query.entities: - for entity in query.entities[:2]: - suggestions.append(f"Tell me about {entity}") - - # By intent - if query.intent.value != "general": - suggestions.append(f"What decisions have we made about {' '.join(query.keywords[:3])}") - - # Generic - suggestions.append("What recent decisions have we made?") - - return suggestions[:3] - - def _suggest_followups( - self, - query: ParsedQuery, - results: List[SearchResult] - ) -> List[str]: - """Suggest follow-up queries based on results""" - suggestions = [] - - # Based on result domains - domains = set(r.domain for r in results[:3]) - for domain in domains: - if domain != "general": - suggestions.append(f"What other {domain} decisions have we made?") - - # Based on entities in results - for r in results[:2]: - if r.title: - # Extract key term from title - words = r.title.split()[:3] - if words: - suggestions.append(f"Why did we decide on {' '.join(words)}?") - - # Generic follow-ups - suggestions.append("What were the alternatives considered?") - suggestions.append("Who was involved in this decision?") - - return suggestions[:3] - - -def format_answer_for_display(answer: SynthesizedAnswer) -> str: - """Format synthesized answer for CLI/UI display""" - lines = [ - answer.answer, - "", - f"**Confidence**: {answer.confidence:.0%}", - ] - - if answer.warnings: - lines.append("") - lines.append("**Warnings**:") - for w in answer.warnings: - lines.append(f" - {w}") - - if answer.sources: - lines.append("") - lines.append("**Sources**:") - for s in answer.sources[:3]: - lines.append(f" - [{s['record_id']}] {s['title']} ({s['certainty']})") - - if answer.related_queries: - lines.append("") - lines.append("**Related queries**:") - for q in answer.related_queries: - lines.append(f" - {q}") - - return "\n".join(lines) diff --git a/agents/scribe/__init__.py b/agents/scribe/__init__.py deleted file mode 100644 index 029b66e..0000000 --- a/agents/scribe/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -Scribe Agent - Organizational Context Capture - -Monitors team communications (Slack, GitHub, Notion) to identify and capture -significant decisions using on-device similarity search. - -Key Components: -- DecisionDetector: Pattern-based decision detection -- RecordBuilder: Creates Decision Record with evidence -- ReviewQueue: Manages human review for low-confidence captures -- Handlers: Source-specific event processing (Slack, GitHub, etc.) - -10 Rules for Scribe: -1. Not a logger - only capture significant decisions -2. Schema v2 only - JSON + payload.text -3. Why cannot be confirmed without evidence -4. Evidence requires at least 1 quote -5. Quotes should be 1-2 sentences (direct) -6. No assumptions about finality without explicit signals -7. Update status on decision reversal -8. Default sensitivity to 'internal' when unclear -9. Remove PII/credentials, note in review_notes -10. Output always includes JSON + payload.text -""" - -from .detector import DecisionDetector, DetectionResult -from .record_builder import RecordBuilder, RawEvent -from .review_queue import ReviewQueue, ReviewItem, ReviewAnswers - -__all__ = [ - "DecisionDetector", - "DetectionResult", - "RecordBuilder", - "RawEvent", - "ReviewQueue", - "ReviewItem", - "ReviewAnswers", -] - -# Legacy (available via direct import but not promoted) -# from .tier2_filter import Tier2Filter -# from .llm_extractor import LLMExtractor -# from .pattern_parser import parse_capture_triggers diff --git a/agents/scribe/detector.py b/agents/scribe/detector.py deleted file mode 100644 index 86157e4..0000000 --- a/agents/scribe/detector.py +++ /dev/null @@ -1,225 +0,0 @@ -""" -Decision Detector - -Similarity-based decision detection using pre-embedded patterns. -Core component of the Scribe agent's Stage 1 pipeline. -""" - -from dataclasses import dataclass -from typing import Optional, List, Tuple - -from ..common.pattern_cache import PatternCache, PatternEntry - - -@dataclass -class DetectionResult: - """Result of decision detection""" - is_significant: bool - confidence: float - matched_pattern: Optional[str] = None - category: Optional[str] = None - domain: Optional[str] = None - priority: Optional[str] = None - top_matches: Optional[List[Tuple[str, float]]] = None # For debugging - - -class DecisionDetector: - """ - Detects significant decisions using similarity search. - - Algorithm: - 1. Embed incoming text - 2. Compute similarity to all pre-embedded patterns - 3. If max similarity > threshold, mark as significant - 4. Return detection result with confidence and matched pattern - - This replaces ML-based classification with on-device similarity search. - """ - - def __init__( - self, - pattern_cache: PatternCache, - threshold: float = 0.35, - high_confidence_threshold: float = 0.7 - ): - """ - Initialize decision detector. - - Args: - pattern_cache: PatternCache with pre-embedded patterns - threshold: Minimum similarity to consider significant - high_confidence_threshold: Threshold for auto-capture (no review) - """ - self._cache = pattern_cache - self._threshold = threshold - self._high_confidence_threshold = high_confidence_threshold - - @property - def threshold(self) -> float: - return self._threshold - - @property - def high_confidence_threshold(self) -> float: - return self._high_confidence_threshold - - def detect(self, text: str) -> DetectionResult: - """ - Detect if text contains a significant decision. - - Args: - text: Input text to analyze - - Returns: - DetectionResult with significance, confidence, and matched pattern - """ - if not text or not text.strip(): - return DetectionResult( - is_significant=False, - confidence=0.0, - ) - - # Skip very short messages - if len(text.strip()) < 20: - return DetectionResult( - is_significant=False, - confidence=0.0, - ) - - # Find best matching pattern - match, score = self._cache.find_best_match(text, threshold=0.0) - - # Determine significance - is_significant = score >= self._threshold - - if match: - return DetectionResult( - is_significant=is_significant, - confidence=score, - matched_pattern=match.text, - category=match.category, - domain=match.domain, - priority=match.priority, - ) - - return DetectionResult( - is_significant=False, - confidence=score, - ) - - def detect_with_details(self, text: str, top_k: int = 5) -> DetectionResult: - """ - Detect with additional details including top matches. - - Useful for debugging and understanding why a decision was detected. - - Args: - text: Input text to analyze - top_k: Number of top matches to include - - Returns: - DetectionResult with top_matches for debugging - """ - if not text or not text.strip(): - return DetectionResult( - is_significant=False, - confidence=0.0, - top_matches=[], - ) - - # Find top matches - matches = self._cache.find_top_matches(text, top_k=top_k, threshold=0.0) - - if not matches: - return DetectionResult( - is_significant=False, - confidence=0.0, - top_matches=[], - ) - - # Best match - best_pattern, best_score = matches[0] - is_significant = best_score >= self._threshold - - # Format top matches for debugging - top_matches = [(p.text, s) for p, s in matches] - - return DetectionResult( - is_significant=is_significant, - confidence=best_score, - matched_pattern=best_pattern.text, - category=best_pattern.category, - domain=best_pattern.domain, - priority=best_pattern.priority, - top_matches=top_matches, - ) - - def should_auto_capture(self, result: DetectionResult) -> bool: - """ - Check if detection result warrants auto-capture (skip review). - - Auto-capture when: - - Is significant AND - - Confidence >= high_confidence_threshold - - Args: - result: Detection result to evaluate - - Returns: - True if should auto-capture, False if needs review - """ - if not result.is_significant: - return False - - return result.confidence >= self._high_confidence_threshold - - def needs_review(self, result: DetectionResult) -> bool: - """ - Check if detection result needs human review. - - Review needed when: - - Is significant but confidence < high_confidence_threshold - - Medium priority pattern - - Args: - result: Detection result to evaluate - - Returns: - True if needs review, False otherwise - """ - if not result.is_significant: - return False - - return not self.should_auto_capture(result) - - def explain_detection(self, result: DetectionResult) -> str: - """ - Generate human-readable explanation of detection. - - Args: - result: Detection result to explain - - Returns: - Explanation string - """ - if not result.is_significant: - return f"Not significant (confidence: {result.confidence:.2f}, threshold: {self._threshold})" - - lines = [ - f"Significant decision detected (confidence: {result.confidence:.2f})", - f" Matched pattern: \"{result.matched_pattern}\"", - f" Category: {result.category}", - f" Domain: {result.domain}", - f" Priority: {result.priority}", - ] - - if self.should_auto_capture(result): - lines.append(" Action: AUTO-CAPTURE (high confidence)") - else: - lines.append(" Action: NEEDS REVIEW (moderate confidence)") - - if result.top_matches: - lines.append(" Top matches:") - for pattern, score in result.top_matches[:3]: - lines.append(f" - \"{pattern[:50]}...\" ({score:.2f})") - - return "\n".join(lines) diff --git a/agents/scribe/handlers/__init__.py b/agents/scribe/handlers/__init__.py deleted file mode 100644 index ac45927..0000000 --- a/agents/scribe/handlers/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Source Handlers - -Extensible handlers for different event sources. -Each handler converts source-specific events to a common Message format. - -Available Handlers: -- SlackHandler: Slack webhook events -- NotionHandler: Notion webhook events - -Future Handlers: -- GitHubHandler: PR/Issue webhooks -""" - -from .base import BaseHandler, Message -from .slack import SlackHandler -from .notion import NotionHandler - -__all__ = [ - "BaseHandler", - "Message", - "SlackHandler", - "NotionHandler", -] diff --git a/agents/scribe/handlers/base.py b/agents/scribe/handlers/base.py deleted file mode 100644 index e4d49ec..0000000 --- a/agents/scribe/handlers/base.py +++ /dev/null @@ -1,143 +0,0 @@ -""" -Base Handler - -Abstract base class for source-specific event handlers. -Provides a common interface for converting events to Messages. -""" - -from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from typing import Optional, Dict, Any -from datetime import datetime - - -@dataclass -class Message: - """ - Common message format for all sources. - - This is the standardized format that Scribe works with, - regardless of the original source (Slack, GitHub, etc.). - """ - text: str - user: str - channel: str - source: str # "slack", "github", "notion", etc. - timestamp: str - thread_ts: Optional[str] = None - url: Optional[str] = None - is_bot: bool = False - mentions: list = field(default_factory=list) - reactions: list = field(default_factory=list) - attachments: list = field(default_factory=list) - raw_data: Optional[Dict[str, Any]] = None - - @property - def datetime(self) -> Optional[datetime]: - """Parse timestamp to datetime""" - try: - return datetime.fromtimestamp(float(self.timestamp)) - except (ValueError, TypeError): - return None - - @property - def is_valid(self) -> bool: - """Check if message has minimum required fields""" - return bool(self.text and self.text.strip()) - - -class BaseHandler(ABC): - """ - Abstract base class for source handlers. - - Each handler must implement: - - parse_event: Convert raw event to Message - - verify_signature: Verify webhook signature (if applicable) - """ - - def __init__(self, source_name: str): - """ - Initialize handler. - - Args: - source_name: Name of the source (e.g., "slack", "github") - """ - self.source_name = source_name - - @abstractmethod - async def parse_event(self, raw_data: Dict[str, Any]) -> Optional[Message]: - """ - Parse raw event data into a Message. - - Args: - raw_data: Raw event data from the source - - Returns: - Message object or None if event should be ignored - """ - pass - - @abstractmethod - def verify_signature( - self, - body: bytes, - signature: str, - timestamp: str - ) -> bool: - """ - Verify the webhook signature. - - Args: - body: Raw request body - signature: Signature from headers - timestamp: Timestamp from headers - - Returns: - True if signature is valid - """ - pass - - def should_process(self, message: Message) -> bool: - """ - Check if message should be processed. - - Default implementation filters out: - - Bot messages - - Empty messages - - Very short messages - - Override in subclass for source-specific filtering. - - Args: - message: Parsed message - - Returns: - True if message should be processed - """ - # Skip invalid messages - if not message.is_valid: - return False - - # Skip bot messages - if message.is_bot: - return False - - # Skip very short messages - if len(message.text.strip()) < 20: - return False - - return True - - def extract_thread_context(self, raw_data: Dict[str, Any]) -> Optional[str]: - """ - Extract thread context if message is part of a thread. - - Override in subclass for source-specific implementation. - - Args: - raw_data: Raw event data - - Returns: - Thread context or None - """ - return None diff --git a/agents/scribe/handlers/notion.py b/agents/scribe/handlers/notion.py deleted file mode 100644 index 0844aca..0000000 --- a/agents/scribe/handlers/notion.py +++ /dev/null @@ -1,260 +0,0 @@ -""" -Notion Handler - -Handles Notion webhook events and converts them to Messages. -Supports page.created, page.updated, and database.updated events -via Notion's Send Webhooks API. -""" - -import hmac -import hashlib -import time -import logging -from typing import Optional, Dict, Any, List - -from .base import BaseHandler, Message - -logger = logging.getLogger("rune.scribe.notion") - - -class NotionHandler(BaseHandler): - """ - Handler for Notion webhook events. - - Processes: - - page.created: New page creation - - page.updated: Page content or property updates - - database.updated: Database schema or entry changes - - Ignores: - - Bot/automation edits - - Template instantiation - - Very short pages (likely stubs) - """ - - def __init__(self, signing_secret: str = ""): - """ - Initialize Notion handler. - - Args: - signing_secret: Notion webhook secret for HMAC-SHA256 verification - """ - super().__init__("notion") - self._signing_secret = signing_secret - - async def parse_event(self, raw_data: Dict[str, Any]) -> Optional[Message]: - """ - Parse Notion webhook event into Message. - - Args: - raw_data: Raw Notion webhook payload - - Returns: - Message object or None if event should be ignored - """ - event_type = raw_data.get("type", "") - - if event_type in ("page.created", "page.updated"): - return self._parse_page_event(raw_data) - elif event_type == "database.updated": - return self._parse_database_event(raw_data) - - return None - - def _parse_page_event(self, raw_data: Dict[str, Any]) -> Optional[Message]: - """Parse a page.created or page.updated event""" - page = raw_data.get("data", raw_data.get("page", {})) - - # Extract title from properties - title = self._extract_title(page) - - # Extract rich_text content from page blocks (if included) - body = self._extract_body(raw_data) - - # Combine title + body - text = title - if body: - text = f"{title}\n\n{body}" if title else body - - if not text: - return None - - # Extract user - user = self._extract_user(page) - - # Extract parent context as channel - channel = self._extract_parent_name(page) - - # Extract timestamp — prefer last_edited_time, fall back to created_time - timestamp = self._extract_timestamp(page) - - # Page URL - url = page.get("url") - - # Detect bot/automation edits - is_bot = self._is_bot_edit(page) - - return Message( - text=text, - user=user, - channel=channel, - source="notion", - timestamp=timestamp, - thread_ts=None, - url=url, - is_bot=is_bot, - raw_data=raw_data, - ) - - def _parse_database_event(self, raw_data: Dict[str, Any]) -> Optional[Message]: - """Parse a database.updated event""" - database = raw_data.get("data", raw_data.get("database", {})) - - # Database title - title_parts = database.get("title", []) - title = "".join( - t.get("plain_text", "") for t in title_parts - ) - - if not title: - return None - - text = f"Database updated: {title}" - - # Description if available - description_parts = database.get("description", []) - if description_parts: - desc = "".join(t.get("plain_text", "") for t in description_parts) - if desc: - text = f"{text}\n\n{desc}" - - user = self._extract_user(database) - timestamp = self._extract_timestamp(database) - - return Message( - text=text, - user=user, - channel=title, - source="notion", - timestamp=timestamp, - thread_ts=None, - url=database.get("url"), - is_bot=self._is_bot_edit(database), - raw_data=raw_data, - ) - - def _extract_title(self, page: Dict[str, Any]) -> str: - """Extract page title from properties""" - properties = page.get("properties", {}) - for prop in properties.values(): - if prop.get("type") == "title": - title_parts = prop.get("title", []) - return "".join(t.get("plain_text", "") for t in title_parts) - return "" - - def _extract_body(self, raw_data: Dict[str, Any]) -> str: - """Extract body text from rich_text blocks if included in payload""" - blocks = raw_data.get("blocks", raw_data.get("children", [])) - parts: List[str] = [] - - for block in blocks: - block_type = block.get("type", "") - block_data = block.get(block_type, {}) - - # Extract rich_text from common block types - rich_text = block_data.get("rich_text", []) - text = "".join(t.get("plain_text", "") for t in rich_text) - if text: - parts.append(text) - - return "\n".join(parts) - - def _extract_user(self, obj: Dict[str, Any]) -> str: - """Extract user ID from last_edited_by or created_by""" - editor = obj.get("last_edited_by", obj.get("created_by", {})) - return editor.get("id", "unknown") - - def _extract_parent_name(self, page: Dict[str, Any]) -> str: - """Extract parent context (database name or parent page) as channel""" - parent = page.get("parent", {}) - parent_type = parent.get("type", "") - - if parent_type == "database_id": - return f"db:{parent.get('database_id', 'unknown')}" - elif parent_type == "page_id": - return f"page:{parent.get('page_id', 'unknown')}" - elif parent_type == "workspace": - return "workspace" - - return "notion" - - def _extract_timestamp(self, obj: Dict[str, Any]) -> str: - """Extract timestamp, converting ISO to unix epoch string""" - iso_time = obj.get("last_edited_time", obj.get("created_time", "")) - if iso_time: - try: - from datetime import datetime, timezone - # Notion uses ISO 8601 format - dt = datetime.fromisoformat(iso_time.replace("Z", "+00:00")) - return str(dt.timestamp()) - except (ValueError, TypeError): - pass - return str(time.time()) - - def _is_bot_edit(self, obj: Dict[str, Any]) -> bool: - """Check if edit was made by a bot/integration""" - editor = obj.get("last_edited_by", obj.get("created_by", {})) - return editor.get("type") == "bot" - - def verify_signature( - self, - body: bytes, - signature: str, - timestamp: str - ) -> bool: - """ - Verify Notion webhook signature using HMAC-SHA256. - - Args: - body: Raw request body - signature: X-Notion-Signature header value - timestamp: X-Notion-Timestamp header value (unused for Notion, - kept for BaseHandler interface compatibility) - - Returns: - True if signature is valid - """ - if not self._signing_secret: - return True - - if not signature: - return False - - expected_sig = hmac.new( - self._signing_secret.encode("utf-8"), - body, - hashlib.sha256 - ).hexdigest() - - return hmac.compare_digest(expected_sig, signature) - - def should_process(self, message: Message) -> bool: - """ - Check if Notion message should be processed. - - Filters out: - - Bot/automation edits (via base class) - - Very short content (via base class) - - Template instantiation (title starts with common template markers) - """ - if not super().should_process(message): - return False - - title_line = message.text.split("\n")[0].strip() - - # Skip untitled/template pages - skip_prefixes = ("Untitled", "Template:", "[Template]", "Copy of ") - if title_line.startswith(skip_prefixes): - return False - - return True diff --git a/agents/scribe/handlers/slack.py b/agents/scribe/handlers/slack.py deleted file mode 100644 index 2890fde..0000000 --- a/agents/scribe/handlers/slack.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -Slack Handler - -Handles Slack webhook events and converts them to Messages. -""" - -import hmac -import hashlib -import time -from typing import Optional, Dict, Any, List - -from .base import BaseHandler, Message - - -class SlackHandler(BaseHandler): - """ - Handler for Slack Events API webhooks. - - Processes: - - message events (new messages in channels) - - message_changed events (edited messages) - - Ignores: - - Bot messages - - Message deletions - - Reaction events (for now) - """ - - def __init__(self, signing_secret: str = ""): - """ - Initialize Slack handler. - - Args: - signing_secret: Slack signing secret for verification - """ - super().__init__("slack") - self._signing_secret = signing_secret - - async def parse_event(self, raw_data: Dict[str, Any]) -> Optional[Message]: - """ - Parse Slack event into Message. - - Args: - raw_data: Raw Slack event data - - Returns: - Message object or None if event should be ignored - """ - # Handle URL verification challenge - if raw_data.get("type") == "url_verification": - return None - - # Handle event callback - if raw_data.get("type") != "event_callback": - return None - - event = raw_data.get("event", {}) - event_type = event.get("type", "") - - # Handle message events - if event_type == "message": - if event.get("subtype") == "message_changed": - return self._parse_message_changed_event(event, raw_data) - return self._parse_message_event(event, raw_data) - - return None - - def _parse_message_event( - self, - event: Dict[str, Any], - raw_data: Dict[str, Any] - ) -> Optional[Message]: - """Parse a standard message event""" - # Skip bot messages - if event.get("bot_id") or event.get("subtype") == "bot_message": - return None - - # Skip message subtypes we don't care about - ignored_subtypes = [ - "channel_join", "channel_leave", "channel_topic", - "channel_purpose", "channel_name", "message_deleted", - "file_share", "thread_broadcast", - ] - if event.get("subtype") in ignored_subtypes: - return None - - text = event.get("text", "") - user = event.get("user", "") - channel = event.get("channel", "") - ts = event.get("ts", "") - thread_ts = event.get("thread_ts") - - # Extract mentions - mentions = self._extract_mentions(text) - - # Extract reactions (if available) - reactions = event.get("reactions", []) - - # Build URL if team info available - url = None - team_id = raw_data.get("team_id") - if team_id and channel and ts: - url = f"https://slack.com/archives/{channel}/p{ts.replace('.', '')}" - - return Message( - text=text, - user=user, - channel=channel, - source="slack", - timestamp=ts, - thread_ts=thread_ts, - url=url, - is_bot=False, - mentions=mentions, - reactions=reactions, - raw_data=event, - ) - - def _parse_message_changed_event( - self, - event: Dict[str, Any], - raw_data: Dict[str, Any] - ) -> Optional[Message]: - """Parse a message_changed event""" - message = event.get("message", {}) - - # Skip bot messages - if message.get("bot_id"): - return None - - return Message( - text=message.get("text", ""), - user=message.get("user", ""), - channel=event.get("channel", ""), - source="slack", - timestamp=message.get("ts", ""), - thread_ts=message.get("thread_ts"), - is_bot=False, - mentions=self._extract_mentions(message.get("text", "")), - raw_data=event, - ) - - def _extract_mentions(self, text: str) -> List[str]: - """Extract user mentions from text""" - import re - # Slack mentions format: <@U12345678> - matches = re.findall(r'<@(U[A-Z0-9]+)>', text) - return matches - - def verify_signature( - self, - body: bytes, - signature: str, - timestamp: str - ) -> bool: - """ - Verify Slack request signature. - - Args: - body: Raw request body - signature: X-Slack-Signature header - timestamp: X-Slack-Request-Timestamp header - - Returns: - True if signature is valid - """ - if not self._signing_secret: - # Skip verification if no secret configured - return True - - if not signature or not timestamp: - return False - - # Check timestamp is recent (within 5 minutes) - try: - ts = int(timestamp) - if abs(time.time() - ts) > 300: - return False - except ValueError: - return False - - # Compute expected signature - sig_basestring = f"v0:{timestamp}:{body.decode('utf-8')}" - expected_sig = "v0=" + hmac.new( - self._signing_secret.encode('utf-8'), - sig_basestring.encode('utf-8'), - hashlib.sha256 - ).hexdigest() - - # Compare signatures - return hmac.compare_digest(expected_sig, signature) - - def should_process(self, message: Message) -> bool: - """ - Check if Slack message should be processed. - - Additional Slack-specific filtering. - """ - if not super().should_process(message): - return False - - # Skip messages that are just mentions or links - text = message.text.strip() - - # Skip if mostly mentions - import re - clean_text = re.sub(r'<@U[A-Z0-9]+>', '', text).strip() - if len(clean_text) < 15: - return False - - # Skip if mostly URLs - clean_text = re.sub(r']+>', '', clean_text).strip() - if len(clean_text) < 15: - return False - - return True - - def format_channel_name(self, channel_id: str, channel_name: str = None) -> str: - """Format channel for display""" - if channel_name: - return f"#{channel_name}" - return f"#{channel_id}" - - def is_thread_reply(self, message: Message) -> bool: - """Check if message is a thread reply""" - return message.thread_ts is not None and message.thread_ts != message.timestamp - - def is_url_verification(self, raw_data: Dict[str, Any]) -> bool: - """Check if request is URL verification""" - return raw_data.get("type") == "url_verification" - - def get_challenge(self, raw_data: Dict[str, Any]) -> Optional[str]: - """Get challenge for URL verification""" - if self.is_url_verification(raw_data): - return raw_data.get("challenge") - return None diff --git a/agents/scribe/llm_extractor.py b/agents/scribe/llm_extractor.py deleted file mode 100644 index edbde7e..0000000 --- a/agents/scribe/llm_extractor.py +++ /dev/null @@ -1,421 +0,0 @@ -""" -LLM-based Field Extractor - -Extracts structured decision record fields from non-English text using LLM. -All outputs are translated to English for embedding consistency. - -Supports phase-aware extraction: long reasoning processes (>800 chars) are -automatically split into logical phases, each becoming a linked DecisionRecord. -""" - -import json -import logging -from dataclasses import dataclass, field -from typing import List, Optional - -from ..common.llm_client import LLMClient -from ..common.llm_utils import parse_llm_json - -logger = logging.getLogger("rune.scribe.llm_extractor") - -# Texts longer than this threshold trigger multi-phase extraction -PHASE_SPLIT_THRESHOLD = 800 - -# Texts longer than this, or with many detail items, trigger bundle split -BUNDLE_SPLIT_THRESHOLD = 1500 - - -@dataclass -class ExtractedFields: - """Fields extracted by LLM from non-English text""" - title: str = "" - rationale: str = "" - problem: str = "" - alternatives: List[str] = field(default_factory=list) - trade_offs: List[str] = field(default_factory=list) - status_hint: str = "" # "proposed" | "accepted" | "rejected" - tags: List[str] = field(default_factory=list) - - -@dataclass -class PhaseExtractedFields: - """Fields for a single phase within a multi-phase reasoning chain""" - phase_title: str = "" - phase_decision: str = "" - phase_rationale: str = "" - phase_problem: str = "" - alternatives: List[str] = field(default_factory=list) - trade_offs: List[str] = field(default_factory=list) - tags: List[str] = field(default_factory=list) - - -@dataclass -class ExtractionResult: - """Result of LLM extraction — may be single, multi-phase, or bundle""" - group_title: str = "" - group_type: str = "" # "phase_chain", "bundle", or "" (single) - group_summary: str = "" # 1-line semantic anchor shared across all phases - status_hint: str = "" - tags: List[str] = field(default_factory=list) - confidence: Optional[float] = None # Agent-provided confidence (0.0-1.0) - single: Optional[ExtractedFields] = None - phases: Optional[List[PhaseExtractedFields]] = None - - @property - def is_multi_phase(self) -> bool: - return self.phases is not None and len(self.phases) > 1 - - @property - def is_bundle(self) -> bool: - return self.group_type == "bundle" and self.phases is not None and len(self.phases) > 1 - - -EXTRACTION_PROMPT = """You are a structured information extractor for organizational decision records. - -Given a message (which may be in any language), extract the following fields. -IMPORTANT: All output values MUST be in English (translate if needed). - -Respond with a valid JSON object with these keys: -- "title": A short title for the decision (5-60 chars, in English) -- "rationale": The reasoning behind the decision (in English, empty string if not found) -- "problem": The problem being solved (in English, empty string if not found) -- "alternatives": List of alternatives considered (in English, empty list if none) -- "trade_offs": List of trade-offs mentioned (in English, empty list if none) -- "status_hint": One of "proposed", "accepted", "rejected" based on the tone/language -- "tags": List of relevant topic tags (in English, e.g. ["database", "migration"]) - -Rules: -- Translate ALL values to English -- Keep the title concise and descriptive -- If a field is not clearly present in the text, use empty string or empty list -- For status_hint: use "accepted" if the message indicates a finalized decision, "proposed" if tentative, "rejected" if something was decided against - -Message to extract from: -{text} - -JSON:""" - - -PHASE_EXTRACTION_PROMPT = """You are a structured information extractor for organizational decision records. - -Given a long message containing a multi-part reasoning process (which may be in any language), split it into LOGICAL PHASES and extract structured information for each phase. - -IMPORTANT: -- Split by LOGICAL REASONING PHASES, not by paragraph or character count. -- Each phase should represent a distinct sub-decision, conclusion, or reasoning step. -- All output values MUST be in English (translate if needed). -- Aim for 2-5 phases. Do not create more than 7 phases. -- If the text is actually a single decision (not multi-phase), return a single phase. - -Respond with a valid JSON object: -{{ - "group_title": "Overall title for the entire reasoning chain (5-60 chars, English)", - "status_hint": "proposed" or "accepted" or "rejected", - "tags": ["relevant", "topic", "tags"], - "phases": [ - {{ - "phase_title": "Short title for this phase (e.g., 'Target Market Analysis')", - "phase_decision": "The key decision or conclusion of this phase", - "phase_rationale": "Why this conclusion was reached", - "phase_problem": "The sub-problem this phase addresses", - "alternatives": ["alternatives considered in this phase"], - "trade_offs": ["trade-offs for this phase"], - "tags": ["phase-specific tags"] - }} - ] -}} - -Rules: -- Translate ALL values to English -- Each phase_decision should be self-contained and meaningful on its own -- phase_title should indicate the topic/aspect (e.g., "Positioning Strategy", "Pricing Model", "Go-to-Market Timeline") - -Message to extract from: -{text} - -JSON:""" - - -BUNDLE_SPLIT_PROMPT = """You are a structured information extractor for organizational decision records. - -Given a message describing a SINGLE decision with rich details (which may be in any language), split it into a CORE record plus DETAIL FACETS. This is NOT about sequential reasoning — it's about organizing the supporting material of one decision. - -IMPORTANT: -- The first item MUST be the "Core Decision" — a concise summary of the main decision. -- Subsequent items are detail facets: alternatives analysis, trade-offs, implementation plan, rationale deep-dive, etc. -- All output values MUST be in English (translate if needed). -- Aim for 2-4 facets total (including core). Do not create more than 5. -- Each facet should be self-contained and meaningful on its own. - -Respond with a valid JSON object: -{{ - "group_title": "Overall title for the decision (5-60 chars, English)", - "status_hint": "proposed" or "accepted" or "rejected", - "tags": ["relevant", "topic", "tags"], - "phases": [ - {{ - "phase_title": "Core Decision", - "phase_decision": "The main decision statement — concise", - "phase_rationale": "Brief summary of why", - "phase_problem": "The problem being solved", - "alternatives": [], - "trade_offs": [], - "tags": [] - }}, - {{ - "phase_title": "Alternatives Analysis", - "phase_decision": "Detailed comparison of alternatives considered", - "phase_rationale": "Why the chosen option was selected over others", - "phase_problem": "", - "alternatives": ["alt1", "alt2", "alt3"], - "trade_offs": ["trade-off for each"], - "tags": [] - }} - ] -}} - -Rules: -- Translate ALL values to English -- First facet is always "Core Decision" with the essential what/why -- Other facets organize the supporting detail (alternatives, trade-offs, implementation, evidence, etc.) -- Each phase_decision should be self-contained — readable without other facets - -Message to extract from: -{text} - -JSON:""" - - -class LLMExtractor: - """Extracts structured fields from text using Claude API. - - Supports phase-aware extraction for long reasoning chains. - """ - - def __init__( - self, - llm_provider: str = "anthropic", - anthropic_api_key: Optional[str] = None, - openai_api_key: Optional[str] = None, - google_api_key: Optional[str] = None, - model: str = "claude-sonnet-4-20250514", - ): - self._provider = llm_provider - self._model = model - self._llm = LLMClient( - provider=llm_provider, - model=model, - anthropic_api_key=anthropic_api_key, - openai_api_key=openai_api_key, - google_api_key=google_api_key, - ) - - @property - def is_available(self) -> bool: - """Check if LLM client is ready""" - return self._llm.is_available - - def _generate(self, prompt: str, max_tokens: int) -> str: - return self._llm.generate(prompt, max_tokens=max_tokens) - - def extract(self, text: str) -> ExtractionResult: - """Extract structured fields, auto-detecting split strategy. - - Split strategy: - - Short text (<=800 chars): single extraction, then bundle check - - Long text (>800 chars): phase extraction first, bundle fallback - - Args: - text: Input text (any language) - - Returns: - ExtractionResult (check .is_multi_phase or .is_bundle) - """ - if not self.is_available: - return ExtractionResult(single=ExtractedFields()) - - if len(text) <= PHASE_SPLIT_THRESHOLD: - fields = self._extract_single(text) - result = ExtractionResult( - group_title=fields.title, - status_hint=fields.status_hint, - tags=fields.tags, - single=fields, - ) - # Check if even short text has overflow details - if self._needs_bundle_split(text, fields): - try: - return self._extract_bundle(text) - except Exception as e: - logger.warning("Bundle extraction failed for short text: %s", e) - return result - - # Long text: try phase extraction first - try: - result = self._extract_phases(text) - if result.is_multi_phase: - result.group_type = "phase_chain" - return result - # Phase returned single — check if bundle needed - if self._needs_bundle_split(text, result.single): - try: - return self._extract_bundle(text) - except Exception as e: - logger.warning("Bundle extraction failed after phase: %s", e) - return result - except Exception as e: - logger.warning("Phase extraction failed: %s", e) - # Try bundle before falling back to single - if len(text) > BUNDLE_SPLIT_THRESHOLD: - try: - return self._extract_bundle(text) - except Exception as e2: - logger.warning("Bundle extraction also failed: %s", e2) - fields = self._extract_single(text) - return ExtractionResult( - group_title=fields.title, - status_hint=fields.status_hint, - tags=fields.tags, - single=fields, - ) - - def extract_single(self, text: str) -> ExtractedFields: - """Extract as single record (backward-compatible entry point).""" - if not self.is_available: - return ExtractedFields() - return self._extract_single(text) - - def _extract_single(self, text: str) -> ExtractedFields: - """Single-phase extraction (original logic).""" - try: - prompt = EXTRACTION_PROMPT.format(text=text) - raw = self._generate(prompt, max_tokens=512) - return self._parse_single_response(raw) - except Exception as e: - logger.warning("Single extraction failed: %s", e) - return ExtractedFields() - - def _extract_phases(self, text: str) -> ExtractionResult: - """Multi-phase extraction for long reasoning chains.""" - prompt = PHASE_EXTRACTION_PROMPT.format(text=text) - raw = self._generate(prompt, max_tokens=2048) - data = parse_llm_json(raw) - - phases_data = data.get("phases", []) - group_title = str(data.get("group_title", ""))[:60] - status_hint = str(data.get("status_hint", "")).lower() - tags = [str(t).lower() for t in data.get("tags", []) if t] - - # If LLM returned 0 or 1 phase, treat as single - if len(phases_data) <= 1: - p = phases_data[0] if phases_data else {} - return ExtractionResult( - group_title=group_title, - status_hint=status_hint, - tags=tags, - single=ExtractedFields( - title=str(p.get("phase_title", group_title))[:60], - rationale=str(p.get("phase_rationale", "")), - problem=str(p.get("phase_problem", "")), - alternatives=[str(a) for a in p.get("alternatives", []) if a], - trade_offs=[str(t) for t in p.get("trade_offs", []) if t], - status_hint=status_hint, - tags=tags, - ), - ) - - # Multi-phase - phases = [] - for p in phases_data[:7]: # cap at 7 - phases.append(PhaseExtractedFields( - phase_title=str(p.get("phase_title", ""))[:60], - phase_decision=str(p.get("phase_decision", "")), - phase_rationale=str(p.get("phase_rationale", "")), - phase_problem=str(p.get("phase_problem", "")), - alternatives=[str(a) for a in p.get("alternatives", []) if a], - trade_offs=[str(t) for t in p.get("trade_offs", []) if t], - tags=[str(t).lower() for t in p.get("tags", []) if t], - )) - - return ExtractionResult( - group_title=group_title, - status_hint=status_hint, - tags=tags, - phases=phases, - ) - - def _needs_bundle_split(self, text: str, fields: Optional[ExtractedFields]) -> bool: - """Check if content has detail overflow that warrants bundle splitting.""" - # Long text that exceeds single record capacity - if len(text) > BUNDLE_SPLIT_THRESHOLD: - return True - # Moderate text with many detail items in multiple categories - if fields and len(fields.alternatives) > 3 and len(fields.trade_offs) > 3: - return True - return False - - def _extract_bundle(self, text: str) -> ExtractionResult: - """Bundle extraction: split a single decision into detail facets.""" - prompt = BUNDLE_SPLIT_PROMPT.format(text=text) - raw = self._generate(prompt, max_tokens=2048) - data = parse_llm_json(raw) - - phases_data = data.get("phases", []) - group_title = str(data.get("group_title", ""))[:60] - status_hint = str(data.get("status_hint", "")).lower() - tags = [str(t).lower() for t in data.get("tags", []) if t] - - # If LLM returned 0 or 1 facet, not a real bundle - if len(phases_data) <= 1: - p = phases_data[0] if phases_data else {} - return ExtractionResult( - group_title=group_title, - status_hint=status_hint, - tags=tags, - single=ExtractedFields( - title=str(p.get("phase_title", group_title))[:60], - rationale=str(p.get("phase_rationale", "")), - problem=str(p.get("phase_problem", "")), - alternatives=[str(a) for a in p.get("alternatives", []) if a], - trade_offs=[str(t) for t in p.get("trade_offs", []) if t], - status_hint=status_hint, - tags=tags, - ), - ) - - # Multi-facet bundle — reuse PhaseExtractedFields structure - phases = [] - for p in phases_data[:5]: # cap at 5 facets - phases.append(PhaseExtractedFields( - phase_title=str(p.get("phase_title", ""))[:60], - phase_decision=str(p.get("phase_decision", "")), - phase_rationale=str(p.get("phase_rationale", "")), - phase_problem=str(p.get("phase_problem", "")), - alternatives=[str(a) for a in p.get("alternatives", []) if a], - trade_offs=[str(t) for t in p.get("trade_offs", []) if t], - tags=[str(t).lower() for t in p.get("tags", []) if t], - )) - - return ExtractionResult( - group_title=group_title, - group_type="bundle", - status_hint=status_hint, - tags=tags, - phases=phases, - ) - - def _parse_single_response(self, raw: str) -> ExtractedFields: - """Parse LLM JSON response into ExtractedFields.""" - data = parse_llm_json(raw) - if not data: - return ExtractedFields() - - return ExtractedFields( - title=str(data.get("title", ""))[:60], - rationale=str(data.get("rationale", "")), - problem=str(data.get("problem", "")), - alternatives=[str(a) for a in data.get("alternatives", []) if a], - trade_offs=[str(t) for t in data.get("trade_offs", []) if t], - status_hint=str(data.get("status_hint", "")).lower(), - tags=[str(t).lower() for t in data.get("tags", []) if t], - ) diff --git a/agents/scribe/pattern_parser.py b/agents/scribe/pattern_parser.py deleted file mode 100644 index 3ccd14c..0000000 --- a/agents/scribe/pattern_parser.py +++ /dev/null @@ -1,423 +0,0 @@ -""" -Pattern Parser - -Parses trigger patterns from patterns/capture-triggers.md. -Extracts phrases organized by category and priority. -""" - -import re -from pathlib import Path -from typing import List, Dict, Optional - - -# Domain mapping from category names -CATEGORY_TO_DOMAIN = { - # Architecture & Engineering - "architecture": "architecture", - "technical": "architecture", - "performance": "architecture", - "optimization": "architecture", - "technical_debt": "architecture", - "migration": "architecture", - "infrastructure": "architecture", - "api": "architecture", - "database": "architecture", - # Security & Compliance - "security": "security", - "compliance": "security", - "encryption": "security", - "authentication": "security", - "authorization": "security", - "vulnerability": "security", - # Product & Business - "product": "product", - "business": "product", - "feature": "product", - "roadmap": "product", - "mvp": "product", - "startup": "product", - # Executive & Strategic - "executive": "exec", - "strategic": "exec", - "funding": "exec", - "board": "exec", - "leadership": "exec", - # Operations & Deployment - "ops": "ops", - "operations": "ops", - "deployment": "ops", - "monitoring": "ops", - "observability": "ops", - "sre": "ops", - "oncall": "ops", - "on_call": "ops", - "runbook": "ops", - # Design & UX - "design": "design", - "ux": "design", - "ui": "design", - "accessibility": "design", - "design_system": "design", - # Data & Analytics - "data": "data", - "analytics": "data", - "ml": "data", - "machine_learning": "data", - "statistics": "data", - # HR & People - "hr": "hr", - "hiring": "hr", - "people": "hr", - "onboarding": "hr", - "culture": "hr", - "compensation": "hr", - # Marketing & Growth - "marketing": "marketing", - "growth": "marketing", - "campaign": "marketing", - "brand": "marketing", - # Incident & Postmortem - "incident": "incident", - "incident_response": "incident", - "outage": "incident", - "postmortem": "incident", - "post_mortem": "incident", - "failure_retro": "incident", - "failure_retrospective": "incident", - "outage_communication": "incident", - "rca": "incident", - "root_cause": "incident", - # Debugging & Troubleshooting - "debugging": "debugging", - "troubleshooting": "debugging", - "bug_fix": "debugging", - "performance_investigation": "debugging", - "regression": "debugging", - "hotfix": "debugging", - # QA & Testing - "qa": "qa", - "testing": "qa", - "quality_assurance": "qa", - "bug_triage": "qa", - "test_strategy": "qa", - # Legal & Compliance - "legal": "legal", - "regulatory": "legal", - "contract": "legal", - "ip": "legal", - "patent": "legal", - "privacy": "legal", - # Finance & Budget - "finance": "finance", - "budget": "finance", - "cost": "finance", - "pricing": "finance", - "revenue": "finance", - # Sales & Partnerships - "sales": "sales", - "partnership": "sales", - "deal": "sales", - "enterprise_sales": "sales", - # Customer Success - "customer_success": "customer_success", - "customer_escalation": "customer_escalation", - "churn": "customer_success", - "retention": "customer_success", - "support": "customer_success", - # Research & R&D - "research": "research", - "rnd": "research", - "r_and_d": "research", - "experiment": "research", - "prototype": "research", - "poc": "research", - # Risk Assessment - "risk": "risk", - "risk_assessment": "risk", - "mitigation": "risk", - "contingency": "risk", - # Cross-team & Process - "cross_team": "ops", - "process": "ops", - "coordination": "ops", -} - - -def _normalize_category(raw_category: str) -> str: - """Normalize category name to lowercase with underscores""" - # Remove special characters, convert to lowercase - normalized = re.sub(r'[^a-zA-Z0-9\s]', '', raw_category.lower()) - normalized = re.sub(r'\s+', '_', normalized.strip()) - return normalized - - -def _infer_domain(category: str) -> str: - """Infer domain from category name""" - category_lower = category.lower() - - # Check direct mapping - for key, domain in CATEGORY_TO_DOMAIN.items(): - if key in category_lower: - return domain - - return "general" - - -def _detect_priority(line: str, section_context: str) -> str: - """Detect priority from line content and section context""" - line_lower = line.lower() - section_lower = section_context.lower() - - # High priority indicators - high_indicators = [ - "high_confidence", "high-confidence", "high priority", - "always capture", "critical", "must capture", - "explicit decision", "trade-off", "security", "compliance" - ] - - # Medium priority indicators - medium_indicators = [ - "medium_confidence", "medium-confidence", "medium priority", - "usually capture", "context-dependent" - ] - - # Check section context first - for indicator in high_indicators: - if indicator in section_lower: - return "high" - - for indicator in medium_indicators: - if indicator in section_lower: - return "medium" - - # Check line content - for indicator in high_indicators: - if indicator in line_lower: - return "high" - - # Default to medium - return "medium" - - -def parse_capture_triggers(md_path: str) -> List[Dict]: - """ - Parse capture-triggers.md into structured pattern list. - - Args: - md_path: Path to capture-triggers.md file - - Returns: - List of dicts with keys: - - text: Pattern text - - category: Category name - - priority: "high", "medium", or "low" - - domain: Domain classification - - Example: - [ - { - "text": "We decided to use X instead of Y because...", - "category": "architecture_technical_decisions", - "priority": "high", - "domain": "architecture" - }, - ... - ] - """ - path = Path(md_path) - if not path.exists(): - raise FileNotFoundError(f"Pattern file not found: {md_path}") - - content = path.read_text(encoding='utf-8') - patterns = [] - - current_category = "general" - current_section = "" - current_priority = "medium" - - lines = content.split('\n') - - for line in lines: - line_stripped = line.strip() - - # Skip empty lines and comments - if not line_stripped or line_stripped.startswith('