diff --git a/README.md b/README.md index 554918a..f752e1d 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ CodeLens is an AI-native code intelligence platform that gives AI agents **full ## Features -- **12 CLI Commands** — From basic scan/query to AST taint analysis, CVE scanning, plugin management, auto-fix, dashboards, CI/CD quality gates, and `graph-schema` for cheap graph-shape introspection +- **13 CLI Commands** — From basic scan/query to AST taint analysis, CVE scanning, plugin management, auto-fix, dashboards, CI/CD quality gates, and `graph-schema` for cheap graph-shape introspection - **MCP Server (12 Tools)** — Native AI agent integration via Model Context Protocol (JSON-RPC over stdio), 12 tools total (auto-discovered from COMMAND_REGISTRY), every tool accepts a `format` parameter (`json`/`markdown`/`ai`/`sarif`/`compact`) - **Token-Efficient Compact Output (v8.2, issue #17)** — `--format compact` produces single-char-key JSON with abbreviated types, omitted null fields, and relative paths — ~50% smaller than `json` on real trace output. Combined with `--limit`/`--offset` pagination, 5 structural queries now cost <5k tokens (down from 30-80k) - **AST Taint Engine** — Tree-sitter based taint analysis with return-value propagation, scope hierarchy, and branch condition refinement @@ -167,8 +167,8 @@ codelens/ │ ├── changelog.md # Older changelog (per-version highlights) │ └── agent-integration.md # AI agent integration guide ├── scripts/ -│ ├── codelens.py # CLI entry point (12 commands registered) -│ ├── mcp_server.py # MCP JSON-RPC server (12 tools) +│ ├── codelens.py # CLI entry point (13 commands registered) +│ ├── mcp_server.py # MCP JSON-RPC server (13 tools) │ ├── registry.py # Registry read/write/build │ ├── persistent_registry.py # SQLite persistent storage (WAL mode) │ ├── base_parser.py # Base tree-sitter parser diff --git a/SKILL-QUICK.md b/SKILL-QUICK.md index 8e9ed09..fecea45 100755 --- a/SKILL-QUICK.md +++ b/SKILL-QUICK.md @@ -116,7 +116,7 @@ $CLI list --limit 5 --offset 10 --format compact # → paginated + co | "Cross-file taint" | `dataflow` | `taint` (taint is single-file, AST-deep) | | "Auto-fix issues" | `fix` | `check` (check just gates, doesn't fix) | -## All 12 Commands +## All 13 Commands ### Setup & Lifecycle (8+) `init` · `scan [--incremental] [--max-files N] [--full]` · `registry-validate` · `detect` · `watch [--debounce SECS] [--git-mode] [--interval SECS]` · `git-status` · `migrate` · `serve` · `lsp-status` (issue #33: `codelens --lsp-status` top-level flag is an alias of `codelens lsp-status` — both delegate to `hybrid_engine.get_lsp_status()` and return the identical payload) @@ -148,9 +148,9 @@ $CLI list --limit 5 --offset 10 --format compact # → paginated + co ### Tooling (1) `plugin ` -**Total: 12 commands** (auto-registered via `commands/__init__.py`; rerun `python3 scripts/sync_command_count.py --apply` after adding/removing a command) +**Total: 13 commands** (auto-registered via `commands/__init__.py`; rerun `python3 scripts/sync_command_count.py --apply` after adding/removing a command) -## MCP Server (12 Tools) +## MCP Server (13 Tools) Start the MCP server for AI agent integration: @@ -158,7 +158,7 @@ Start the MCP server for AI agent integration: python3 scripts/codelens.py serve ``` -Exposes 12 tools as `codelens_` (e.g., `codelens_query`, `codelens_taint`, `codelens_graph_schema`, `codelens_architecture`, `codelens_resolve_types`, `codelens_git_status`): +Exposes 13 tools as `codelens_` (e.g., `codelens_query`, `codelens_taint`, `codelens_graph_schema`, `codelens_architecture`, `codelens_resolve_types`, `codelens_git_status`): - 50 statically-defined tools (full JSON schemas in `mcp_server.py`) - -44 dynamically-discovered tools (auto-discovered from `COMMAND_REGISTRY`; long-running `watch` and `serve` are excluded) - Every tool accepts a `format` parameter (`json`/`markdown`/`ai`/`sarif`/`compact`). Use `format: "compact"` for token-efficient responses (~50% smaller than `json`). diff --git a/SKILL.md b/SKILL.md index 59450cd..2bfc392 100755 --- a/SKILL.md +++ b/SKILL.md @@ -1,10 +1,10 @@ --- name: codelens description: > - CodeLens — AI-Native Code Intelligence. 12 commands for AI-powered code analysis, + CodeLens — AI-Native Code Intelligence. 13 commands for AI-powered code analysis, security auditing, quality scoring, AST-based taint analysis, live CVE scanning, and pre-write safety checks. Supports 28+ languages with tree-sitter + regex - fallback parsing. MCP server exposes 12 tools for AI agent integration. + fallback parsing. MCP server exposes 13 tools for AI agent integration. For quick command reference with validated output schemas, see SKILL-QUICK.md. For version history, see CHANGELOG.md. --- diff --git a/pyproject.toml b/pyproject.toml index 14a9922..d8883bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "codelens" version = "8.2.0" -description = "Live Codebase Reference Intelligence — 12 commands for AI-powered code analysis, security auditing, and quality scoring" +description = "Live Codebase Reference Intelligence — 13 commands for AI-powered code analysis, security auditing, and quality scoring" readme = "README.md" license = {text = "MIT"} requires-python = ">=3.8" diff --git a/scripts/commands/analyze.py b/scripts/commands/analyze.py deleted file mode 100644 index b9a0af1..0000000 --- a/scripts/commands/analyze.py +++ /dev/null @@ -1,863 +0,0 @@ -"""Analyze command — Full repository analysis in a single command. - -The ultimate one-shot command for AI agents who need to understand -an ENTIRE repository immediately. Automatically: - -1. Runs init + scan (if no registry exists) -2. Runs all major analysis engines -3. Produces a comprehensive, prioritized report -4. Includes architecture overview, risks, and actionable next steps - -This is the "I just cloned a repo, tell me everything" command. - -Usage: - codelens analyze /path/to/repo - codelens analyze /path/to/repo --focus security - codelens analyze /path/to/repo --detail full - codelens analyze /path/to/repo --skip-scan (use existing registry) - codelens analyze /path/to/repo --timeout 300 (5 min budget for engines) - codelens analyze /path/to/repo --exclude-tests (skip test entry points) -""" - -import os -import time -from typing import Dict, Any, List, Optional -from commands import register_command -from utils import logger, CODELENS_VERSION - - -def add_args(parser): - parser.add_argument("workspace", nargs="?", default=None, - help="Path to workspace root (auto-detected if omitted)") - parser.add_argument("--focus", choices=["security", "quality", "architecture", "all"], - default="all", - help="Focus area for analysis (default: all)") - parser.add_argument("--detail", choices=["minimal", "standard", "full"], - default="standard", - help="Detail level: minimal (critical only), standard (critical+high), full (all)") - parser.add_argument("--skip-scan", action="store_true", default=False, - help="Skip init+scan if registry already exists") - parser.add_argument("--max-items", type=int, default=15, - help="Maximum items per category (default: 15)") - parser.add_argument("--timeout", type=int, default=300, - help="Total time budget in seconds for analysis engines (default: 300)") - parser.add_argument("--exclude-tests", action="store_true", default=False, - help="Exclude test entry points from entrypoints analysis") - - -def execute(args, workspace): - return analyze_repository( - workspace, - focus=args.focus, - detail=args.detail, - skip_scan=args.skip_scan, - max_items=args.max_items, - timeout=args.timeout, - exclude_tests=args.exclude_tests, - ) - - -def analyze_repository( - workspace: str, - focus: str = "all", - detail: str = "standard", - skip_scan: bool = False, - max_items: int = 15, - timeout: int = 300, - exclude_tests: bool = False, -) -> Dict[str, Any]: - """ - Full repository analysis — the single command to understand an entire codebase. - - This is the primary entry point for AI agents who want to analyze a repository - they've never seen before. It runs all relevant engines, prioritizes findings, - and produces a comprehensive report with actionable recommendations. - - The output is designed to give an AI agent everything it needs to: - - Understand the project's purpose and architecture - - Identify the most critical issues - - Know what to fix first - - Navigate the codebase efficiently - """ - start_time = time.time() - total_budget = float(timeout) - workspace = os.path.abspath(workspace) - - # Severity filter based on detail level - if detail == "minimal": - severity_filter = {"critical"} - elif detail == "standard": - severity_filter = {"critical", "high"} - else: - severity_filter = {"critical", "high", "medium", "low"} - - result = { - "status": "ok", - "workspace": workspace, - "focus": focus, - "detail": detail, - "codelens_version": CODELENS_VERSION, - "time_budget_seconds": total_budget, - } - - # ─── Phase 1: Ensure Registry Exists ────────────────────── - - codelens_dir = os.path.join(workspace, ".codelens") - registry_exists = os.path.exists(os.path.join(codelens_dir, "backend.json")) - - if not skip_scan or not registry_exists: - try: - from commands.scan import execute as scan_execute - import argparse - - # Run init - from commands.init import execute as init_execute - init_args = argparse.Namespace(workspace=workspace) - init_execute(init_args, workspace) - - # Run scan - scan_args = argparse.Namespace( - workspace=workspace, - incremental=False, - full=False, - format="json", - ) - scan_result = scan_execute(scan_args, workspace) - result["scan"] = { - "files_scanned": scan_result.get("files_scanned", {}), - "backend_nodes": scan_result.get("backend", {}).get("nodes", 0) - if isinstance(scan_result.get("backend", {}).get("nodes"), int) - else len(scan_result.get("backend", {}).get("nodes", [])), - "backend_edges": scan_result.get("backend", {}).get("edges", 0) - if isinstance(scan_result.get("backend", {}).get("edges"), int) - else len(scan_result.get("backend", {}).get("edges", [])), - "frontend_classes": scan_result.get("frontend", {}).get("classes", 0), - "frontend_ids": scan_result.get("frontend", {}).get("ids", 0), - "frameworks": scan_result.get("frameworks", []), - "unsupported_langs": scan_result.get("unsupported_langs", []), - } - except Exception as e: - logger.warning(f"Scan phase failed: {e}") - result["scan"] = {"error": str(e)} - - # ─── Phase 2: Project Identity ─────────────────────────── - - try: - from handbook_helpers import _extract_project_identity - identity = _extract_project_identity(workspace) - result["identity"] = { - "name": identity.get("name", os.path.basename(workspace)), - "type": identity.get("type", "unknown"), - "version": identity.get("version", "0.0.0"), - "description": identity.get("description", ""), - "is_monorepo": identity.get("is_monorepo", False), - } - except Exception: - result["identity"] = { - "name": os.path.basename(workspace), - "type": "unknown", - "version": "0.0.0", - } - - # ─── Phase 3: Frameworks & Languages ───────────────────── - - try: - from framework_detect import detect_frameworks - fw = detect_frameworks(workspace) - result["frameworks"] = fw.get("frameworks", []) - result["languages"] = fw.get("languages", {}) - except Exception: - result["frameworks"] = [] - result["languages"] = {} - - # Detect languages from file extensions - if not result.get("languages"): - result["languages"] = _detect_languages(workspace) - - # ─── Phase 4: Architecture Overview ────────────────────── - - try: - from outline_engine import get_workspace_outline - outline = get_workspace_outline(workspace, max_files=200) - result["architecture"] = { - "total_files": outline.get("files_outlined", 0), - "total_lines": outline.get("total_lines", 0), - "directories": _extract_directory_structure(workspace), - "entry_points": [], - "key_modules": [], - } - - # Extract key modules from outline - outlines = outline.get("outlines", []) - module_summary = [] - for o in outlines[:50]: - inner = o.get("outline", o) - fns = inner.get("functions", []) - classes = inner.get("classes", []) - if classes or fns: - module_summary.append({ - "file": o.get("file", ""), - "classes": [c.get("name", "") for c in classes[:5]], - "functions": [f.get("name", "") for f in fns[:5]], - "language": inner.get("language", "unknown"), - }) - result["architecture"]["key_modules"] = module_summary[:20] - except Exception: - result["architecture"] = {"total_files": 0, "directories": []} - - # ─── Phase 5: Entry Points ────────────────────────────── - - try: - from entrypoints_engine import map_entrypoints - ep = map_entrypoints(workspace, exclude_tests=exclude_tests) - result["architecture"]["entry_points"] = [ - { - "type": e.get("type", ""), - "file": e.get("file", ""), - "handler": e.get("handler", ""), - "line": e.get("line", 0), - } - for e in ep.get("entrypoints", [])[:max_items] - ] - except Exception: - pass - - # ─── Phase 6: API Map ──────────────────────────────────── - - try: - from apimap_engine import map_api_routes - api = map_api_routes(workspace) - result["api_map"] = { - "total_routes": api.get("stats", {}).get("total_routes", 0), - "routes": [ - { - "method": r.get("method", ""), - "path": r.get("path", ""), - "handler": r.get("handler", ""), - "file": r.get("file", ""), - "auth": r.get("auth", False), - } - for r in api.get("routes", [])[:max_items] - ], - "by_method": api.get("stats", {}).get("by_method", {}), - } - except Exception: - result["api_map"] = {"total_routes": 0, "routes": []} - - # ─── Phase 7: Findings (Prioritized) ───────────────────── - - findings = [] - - # --- Security --- - if focus in ("security", "all"): - _run_engine(findings, "secrets", "Secrets Detection", - lambda: _detect_secrets(workspace, severity_filter, max_items), - start_time, total_budget) - _run_engine(findings, "vulnerabilities", "CVE Vulnerabilities", - lambda: _detect_vulns(workspace, max_items), - start_time, total_budget) - _run_engine(findings, "dataflow_violations", "Data Flow Violations", - lambda: _detect_dataflow(workspace, max_items), - start_time, total_budget) - _run_engine(findings, "env_issues", "Environment Issues", - lambda: _detect_env(workspace, max_items), - start_time, total_budget) - - # --- Quality --- - if focus in ("quality", "all"): - _run_engine(findings, "code_smells", "Code Smells", - lambda: _detect_smells(workspace, severity_filter, max_items), - start_time, total_budget) - _run_engine(findings, "debug_leaks", "Debug Code Leaks", - lambda: _detect_debug(workspace, max_items), - start_time, total_budget) - _run_engine(findings, "complexity", "Complexity Hotspots", - lambda: _detect_complexity(workspace, max_items), - start_time, total_budget) - _run_engine(findings, "dead_code", "Dead Code", - lambda: _detect_dead_code(workspace, max_items), - start_time, total_budget) - - # --- Architecture --- - if focus in ("architecture", "all"): - _run_engine(findings, "circular_dependencies", "Circular Dependencies", - lambda: _detect_circular(workspace, max_items), - start_time, total_budget) - _run_engine(findings, "perf_hints", "Performance Hints", - lambda: _detect_perf(workspace, max_items), - start_time, total_budget) - _run_engine(findings, "config_drift", "Dependency Drift", - lambda: _detect_config_drift(workspace, max_items), - start_time, total_budget) - _run_engine(findings, "binary_artifacts", "Binary Artifacts", - lambda: _detect_binaries(workspace, max_items), - start_time, total_budget) - - result["findings"] = findings - result["total_finding_categories"] = len(findings) - result["total_issues"] = sum(f.get("total", 0) for f in findings) - - # ─── Phase 7b: Skipped Engines Summary ──────────────────── - - skipped = [f for f in findings if f.get("skipped")] - if skipped: - result["skipped_engines"] = [ - {"category": s["category"], "reason": s.get("skip_reason", "")} - for s in skipped - ] - - # ─── Phase 8: Risk Assessment ───────────────────────────── - - risk_score = _compute_risk_score(findings, result) - result["risk_assessment"] = risk_score - - # ─── Phase 9: Action Plan ───────────────────────────────── - - result["action_plan"] = _generate_action_plan(findings, risk_score) - - # ─── Phase 10: Recommendations ──────────────────────────── - - result["recommendations"] = _generate_recommendations(findings, result) - - # ─── Done ───────────────────────────────────────────────── - - elapsed = time.time() - start_time - result["elapsed_seconds"] = round(elapsed, 2) - result["analysis_timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%S+00:00", time.gmtime()) - - return result - - -# ─── Engine Runners ──────────────────────────────────────── - -def _run_engine(findings: List[Dict], category: str, label: str, engine_fn, start_time: float, total_budget: float, per_engine_timeout: float = 60.0) -> None: - """Safely run an analysis engine with time budget check and per-engine timeout. - - Args: - findings: List to append results to. - category: Engine category name (e.g., 'secrets'). - label: Human-readable label (e.g., 'Hardcoded Secrets'). - engine_fn: Callable that runs the engine and returns a dict or None. - start_time: Global start time for budget calculation. - total_budget: Total time budget in seconds. - per_engine_timeout: Maximum seconds a single engine may run before being - killed. Prevents one slow engine from blocking the entire analysis. - Defaults to 60 seconds. - """ - elapsed = time.time() - start_time - remaining = total_budget - elapsed - - # Skip if less than 20% of budget remains - if remaining < total_budget * 0.2: - logger.debug(f"Skipping engine {category}: time budget nearly exhausted ({remaining:.1f}s remaining)") - findings.append({ - "category": category, - "label": label, - "total": 0, - "severity": "info", - "skipped": True, - "skip_reason": f"Time budget nearly exhausted ({remaining:.1f}s remaining of {total_budget:.0f}s)", - "action": f"Run '{category}' engine individually for full results", - }) - return - - # Per-engine timeout: cap at remaining budget, but at least 5s - engine_budget = min(per_engine_timeout, remaining) - if engine_budget < 5: - findings.append({ - "category": category, - "label": label, - "total": 0, - "severity": "info", - "skipped": True, - "skip_reason": f"Insufficient time budget ({remaining:.1f}s remaining)", - "action": f"Run '{category}' engine individually for full results", - }) - return - - try: - engine_start = time.time() - - # Run engine with per-engine timeout using threading - import threading - engine_result = [None] # Mutable container for thread result - engine_error = [None] # Mutable container for thread error - - def _run(): - try: - engine_result[0] = engine_fn() - except Exception as e: - engine_error[0] = e - - thread = threading.Thread(target=_run, daemon=True) - thread.start() - thread.join(timeout=engine_budget) - - if thread.is_alive(): - # Thread timed out — it's a daemon thread so it won't block exit - logger.warning(f"Engine {category} timed out after {engine_budget:.1f}s") - findings.append({ - "category": category, - "label": label, - "total": 0, - "severity": "info", - "skipped": True, - "skip_reason": f"Engine timed out after {engine_budget:.0f}s (repo may be too large for this engine in aggregate mode)", - "action": f"Run '{category}' engine individually for full results", - }) - return - - if engine_error[0] is not None: - raise engine_error[0] - - engine_elapsed = time.time() - engine_start - result = engine_result[0] - if result: - result["elapsed_seconds"] = round(engine_elapsed, 2) - findings.append(result) - except Exception as e: - logger.debug(f"Engine {category} failed: {e}") - findings.append({ - "category": category, - "label": label, - "total": 0, - "severity": "info", - "skipped": True, - "skip_reason": f"Engine error: {type(e).__name__}", - "action": f"Run '{category}' engine individually for full results", - }) - - -def _detect_secrets(workspace: str, severity_filter: set, max_items: int) -> Optional[Dict]: - from secrets_engine import detect_secrets - sec = detect_secrets(workspace) - total = sec.get("stats", {}).get("total_secrets", 0) - if total == 0: - return None - items = sec.get("findings", sec.get("items", [])) - filtered = [s for s in items if s.get("severity", "low") in severity_filter][:max_items] - return { - "category": "secrets", - "label": "Hardcoded Secrets", - "total": total, - "severity": "critical" if any(s.get("severity") == "critical" for s in items[:20]) else "high", - "by_severity": sec.get("stats", {}).get("by_severity", {}), - "top_items": filtered, - "action": "Move ALL secrets to environment variables or a secret manager (e.g., Vault, AWS Secrets Manager)", - "impact": "Leaked API keys, database passwords, and encryption keys can lead to unauthorized access and data breaches", - } - - -def _detect_vulns(workspace: str, max_items: int) -> Optional[Dict]: - from vulnscan_engine import scan_vulnerabilities - vuln = scan_vulnerabilities(workspace) - total = vuln.get("stats", {}).get("total_vulnerabilities", 0) - if total == 0: - return None - return { - "category": "vulnerabilities", - "label": "Known CVEs", - "total": total, - "severity": "critical", - "by_severity": vuln.get("stats", {}).get("by_severity", {}), - "top_items": vuln.get("vulnerabilities", [])[:max_items], - "action": "Update vulnerable dependencies immediately — check npm audit, pip audit, cargo audit, or govulncheck", - "impact": "Known vulnerabilities can be exploited by attackers even without source code access", - } - - -def _detect_dataflow(workspace: str, max_items: int) -> Optional[Dict]: - from dataflow_engine import trace_dataflow - df = trace_dataflow(workspace) - violations = df.get("stats", {}).get("violations", 0) - if violations == 0: - return None - return { - "category": "dataflow_violations", - "label": "Unsafe Data Flows", - "total": violations, - "severity": "high", - "top_items": df.get("violations", [])[:max_items], - "action": "Add input sanitization and output encoding at every source→sink boundary", - "impact": "Untainted data flows can lead to SQL injection, XSS, and command injection attacks", - } - - -def _detect_env(workspace: str, max_items: int) -> Optional[Dict]: - from envcheck_engine import check_env_vars - env = check_env_vars(workspace) - stats = env.get("stats", {}) - # Compute total issues from undocumented + required-but-missing vars - undocumented = stats.get("undocumented", 0) - total_vars = stats.get("total_vars", 0) - issues = undocumented # Each undocumented var is an issue - if issues == 0 and total_vars == 0: - return None - return { - "category": "env_issues", - "label": "Environment Issues", - "total": issues, - "severity": "medium", - "top_items": [{"name": v.get("name"), "is_required": v.get("is_required"), - "has_fallback": v.get("has_fallback"), - "documentation": v.get("documentation")} - for v in env.get("variables", [])[:max_items] - if not v.get("documentation")], - "action": "Review .env files, ensure secrets are not committed, add .env to .gitignore", - "impact": "Misconfigured environment variables can leak secrets or cause runtime failures", - } - - -def _detect_smells(workspace: str, severity_filter: set, max_items: int) -> Optional[Dict]: - from smell_engine import detect_smells - smell = detect_smells(workspace) - total = smell.get("stats", {}).get("total_smells", 0) - if total == 0: - return None - top_items = smell.get("top_priority", [])[:max_items] - filtered = [s for s in top_items if s.get("severity", "info") in severity_filter] - health = smell.get("stats", {}).get("health_score", 100) - return { - "category": "code_smells", - "label": "Code Smells", - "total": total, - "severity": "critical" if health < 40 else ("high" if health < 60 else "medium"), - "health_score": health, - "by_severity": { - "critical": smell.get("stats", {}).get("critical", 0), - "warning": smell.get("stats", {}).get("warning", 0), - }, - "top_items": filtered, - "action": "Address critical smells first (God Objects, deep nesting), then warnings (long functions, many params)", - "impact": "Poor code quality leads to bugs, slow development, and difficult maintenance", - } - - -def _detect_debug(workspace: str, max_items: int) -> Optional[Dict]: - from debugleak_engine import detect_debug_leaks - dl = detect_debug_leaks(workspace) - total = dl.get("stats", {}).get("total_leaks", 0) - if total == 0: - return None - by_cat = {k: v for k, v in dl.get("stats", {}).get("by_category", {}).items() if v > 0} - return { - "category": "debug_leaks", - "label": "Debug Code Left In", - "total": total, - "severity": "high" if by_cat.get("debugger", 0) > 0 else "medium", - "by_category": by_cat, - "top_items": dl.get("items", [])[:max_items], - "action": "Remove all console.log, var_dump, dd(), debugger, and TODO/FIXME before production deployment", - "impact": "Debug code can leak sensitive information, slow down performance, and confuse users", - } - - -def _detect_complexity(workspace: str, max_items: int) -> Optional[Dict]: - from complexity_engine import compute_complexity - comp = compute_complexity(workspace) - hotspots = comp.get("hotspots", []) - if not hotspots: - return None - return { - "category": "complexity", - "label": "Complexity Hotspots", - "total": len(hotspots), - "severity": "high" if any(h.get("cyclomatic", 0) > 20 for h in hotspots) else "medium", - "avg_cyclomatic": comp.get("stats", {}).get("avg_cyclomatic", 0), - "top_items": hotspots[:max_items], - "action": "Refactor high-complexity functions by extracting helper methods, reducing branches, and simplifying conditionals", - "impact": "Complex functions are bug magnets — they're hard to test, understand, and maintain", - } - - -def _detect_dead_code(workspace: str, max_items: int) -> Optional[Dict]: - from deadcode_engine import detect_dead_code - dc = detect_dead_code(workspace) - total = dc.get("stats", {}).get("total_dead_code", 0) - if total == 0: - return None - return { - "category": "dead_code", - "label": "Dead Code", - "total": total, - "severity": "medium", - "by_category": dc.get("stats", {}).get("by_category", {}), - "top_items": dc.get("results", {}).get("unreachable", [])[:max_items], - "action": "Remove dead code in batches with testing — start with unreachable code and unused exports", - "impact": "Dead code increases maintenance burden, confuses new developers, and bloats the codebase", - } - - -def _detect_circular(workspace: str, max_items: int) -> Optional[Dict]: - from circular_engine import detect_circular - circ = detect_circular(workspace) - total = circ.get("cycle_count", 0) - if total == 0: - return None - chains = circ.get("cycles", circ.get("chains", {})) - all_chains = [] - if isinstance(chains, dict): - for cat, items in chains.items(): - all_chains.extend(items[:3]) - elif isinstance(chains, list): - all_chains = chains[:5] - return { - "category": "circular_dependencies", - "label": "Circular Dependencies", - "total": total, - "severity": "high" if total > 5 else "medium", - "top_items": all_chains[:max_items], - "action": "Break circular imports by extracting shared logic into a separate module or using dependency injection", - "impact": "Circular dependencies cause initialization order issues, make testing hard, and prevent tree-shaking", - } - - -def _detect_perf(workspace: str, max_items: int) -> Optional[Dict]: - from perfhint_engine import detect_perf_hints - perf = detect_perf_hints(workspace) - total = perf.get("stats", {}).get("total_hints", 0) - if total == 0: - return None - return { - "category": "perf_hints", - "label": "Performance Issues", - "total": total, - "severity": perf.get("risk", "low"), - "by_category": perf.get("stats", {}).get("by_category", {}), - "top_items": perf.get("hints", [])[:max_items], - "action": "Address N+1 queries first (critical), then sync blocking, then memory leaks", - "impact": "Performance issues compound — N+1 queries scale linearly with data size, blocking calls freeze the event loop", - } - - -def _detect_config_drift(workspace: str, max_items: int) -> Optional[Dict]: - from configdrift_engine import detect_config_drift - drift = detect_config_drift(workspace) - total = drift.get("stats", {}).get("total_drift_items", 0) - if total == 0: - return None - return { - "category": "config_drift", - "label": "Dependency Drift", - "total": total, - "severity": "low", - "top_items": drift.get("drift_items", [])[:max_items], - "action": "Update outdated dependencies to reduce security risk and get bug fixes", - "impact": "Outdated dependencies may contain unpatched security vulnerabilities", - } - - -def _detect_binaries(workspace: str, max_items: int) -> Optional[Dict]: - from utils import scan_binary_artifacts - bins = scan_binary_artifacts(workspace) - total = bins.get("stats", {}).get("total_artifacts", 0) - if total == 0: - return None - return { - "category": "binary_artifacts", - "label": "Binary/Compiled Files", - "total": total, - "severity": "low", - "by_category": bins.get("stats", {}).get("by_category", {}), - "top_items": bins.get("findings", [])[:max_items], - "recommendations": bins.get("recommendations", []), - "action": "Add binary files to .gitignore and use build pipelines instead", - "impact": "Binary files bloat the repository, make diffs meaningless, and may contain vulnerable code", - } - - -# ─── Helper Functions ────────────────────────────────────── - -def _detect_languages(workspace: str) -> Dict[str, int]: - """Detect programming languages by file extension.""" - ext_map = { - ".php": "php", ".py": "python", ".js": "javascript", ".ts": "typescript", - ".tsx": "tsx", ".jsx": "jsx", ".rs": "rust", ".go": "golang", - ".java": "java", ".cs": "csharp", ".rb": "ruby", ".lua": "lua", - ".dart": "dart", ".c": "c", ".cpp": "cpp", ".h": "c", - ".html": "html", ".css": "css", ".scss": "scss", ".vue": "vue", - ".svelte": "svelte", ".sql": "sql", ".sh": "shell", - } - languages = {} - for root, dirs, files in os.walk(workspace): - dirs[:] = [d for d in dirs if d not in { - 'node_modules', '.git', 'dist', 'build', 'target', - '__pycache__', '.codelens', 'vendor', '.venv', 'venv', - } and not d.startswith('.')] - for f in files: - ext = os.path.splitext(f)[1].lower() - lang = ext_map.get(ext) - if lang: - languages[lang] = languages.get(lang, 0) + 1 - return dict(sorted(languages.items(), key=lambda x: -x[1])) - - -def _extract_directory_structure(workspace: str, max_depth: int = 3) -> List[str]: - """Extract top-level directory structure.""" - dirs = [] - for root, dirnames, filenames in os.walk(workspace): - depth = root.replace(workspace, "").count(os.sep) - if depth >= max_depth: - dirnames.clear() - continue - dirnames[:] = [d for d in dirnames if d not in { - 'node_modules', '.git', 'dist', 'build', 'target', - '__pycache__', '.codelens', 'vendor', '.venv', 'venv', - } and not d.startswith('.')] - for d in dirnames: - rel = os.path.relpath(os.path.join(root, d), workspace) - dirs.append(rel + "/") - return sorted(dirs)[:50] - - -def _compute_risk_score(findings: List[Dict], result: Dict) -> Dict[str, Any]: - """Compute an overall risk score based on all findings. - - Uses logarithmic scaling to prevent saturation to 0 on large projects. - The formula penalizes critical issues more heavily but uses log scaling - so that having 10x more issues doesn't mean 10x lower score. - - Scoring: - - Start at 100 - - Each critical issue costs log2(1 + n) * 8 points (max -25 per category) - - Each high issue costs log2(1 + n) * 4 points (max -15 per category) - - Each medium issue costs log2(1 + n) * 2 points (max -10 per category) - - Each low issue costs log2(1 + n) * 0.5 points (max -5 per category) - """ - import math - - score = 100 # Start at 100, deduct for issues - - critical_count = 0 - high_count = 0 - medium_count = 0 - - for f in findings: - total = f.get("total", 0) - sev = f.get("severity", "low") - if sev == "critical": - critical_count += total - # Logarithmic scaling: 1 issue = -8, 10 issues = -27.7, 100 issues = -53.3 - deduction = math.log2(1 + total) * 8 - score -= min(deduction, 25) # Max -25 per category - elif sev == "high": - high_count += total - deduction = math.log2(1 + total) * 4 - score -= min(deduction, 15) # Max -15 per category - elif sev == "medium": - medium_count += total - deduction = math.log2(1 + total) * 2 - score -= min(deduction, 10) # Max -10 per category - else: - deduction = math.log2(1 + total) * 0.5 - score -= min(deduction, 5) # Max -5 per category - - # Apply exponential decay when score goes below 0 to avoid - # immediate saturation to 0 on projects with many categories. - # This preserves relative differences: -10 → 72, -35 → 31, -70 → 10, -100 → 4 - if score < 0: - score = round(100 * math.exp(score / 30), 1) - score = max(0, min(100, round(score, 1))) - - if score >= 80: - level = "low" - emoji = "🟢" - elif score >= 60: - level = "moderate" - emoji = "🟡" - elif score >= 40: - level = "high" - emoji = "🟠" - else: - level = "critical" - emoji = "🔴" - - return { - "score": score, - "level": level, - "emoji": emoji, - "critical_issues": critical_count, - "high_issues": high_count, - "medium_issues": medium_count, - "summary": f"{emoji} Risk: {level} ({score}/100) — {critical_count} critical, {high_count} high, {medium_count} medium issues", - } - - -def _generate_action_plan(findings: List[Dict], risk: Dict) -> List[Dict]: - """Generate a prioritized action plan.""" - plan = [] - - # Sort findings by severity - sev_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} - sorted_findings = sorted(findings, key=lambda f: sev_order.get(f.get("severity", "low"), 4)) - - for f in sorted_findings: - if f.get("total", 0) == 0: - continue - plan.append({ - "priority": "P0" if f.get("severity") == "critical" else - "P1" if f.get("severity") == "high" else - "P2" if f.get("severity") == "medium" else "P3", - "category": f.get("category", ""), - "label": f.get("label", ""), - "total": f.get("total", 0), - "action": f.get("action", ""), - "impact": f.get("impact", ""), - }) - - return plan - - -def _generate_recommendations(findings: List[Dict], result: Dict) -> List[str]: - """Generate contextual recommendations based on findings and project type.""" - recs = [] - - # Based on findings - for f in findings: - cat = f.get("category", "") - total = f.get("total", 0) - if cat == "secrets" and total > 0: - recs.append("CRITICAL: Hardcoded secrets found — move to environment variables or secret manager IMMEDIATELY before pushing to any remote") - elif cat == "vulnerabilities" and total > 0: - recs.append("CRITICAL: Known CVEs detected — update vulnerable dependencies or apply patches") - elif cat == "debug_leaks" and total > 3: - recs.append("Multiple debug leaks found — set up a pre-commit hook to catch debug code before it's committed") - elif cat == "circular_dependencies" and total > 3: - recs.append("Multiple circular dependencies — consider architectural refactoring or introducing a mediator module") - elif cat == "dead_code" and total > 50: - recs.append("High dead code volume — schedule a cleanup sprint, start with unreachable code and unused exports") - elif cat == "complexity" and total > 5: - recs.append("Complexity hotspots detected — consider pair programming or mob programming sessions for refactoring") - elif cat == "perf_hints" and total > 0: - recs.append("Performance anti-patterns found — profile the application under load to confirm impact") - - # Based on project type - langs = result.get("languages", {}) - if "php" in langs and langs["php"] > 10: - recs.append("PHP project detected — consider running 'phpstan analyse' for type checking and 'phpcs' for coding standards") - if "python" in langs and langs["python"] > 5: - recs.append("Python project detected — consider adding mypy for type checking and ruff for linting") - if "go" in langs: - recs.append("Go project detected — run 'go vet' and 'golangci-lint run' for additional static analysis") - - # Based on architecture - fws = result.get("frameworks", []) - if "react" in fws or "nextjs" in fws: - recs.append("React/Next.js detected — use React DevTools Profiler to identify unnecessary re-renders") - if "laravel" in fws: - recs.append("Laravel detected — run 'php artisan route:list' to verify all routes are registered correctly") - - # General recommendations - if not result.get("api_map", {}).get("total_routes"): - recs.append("No API routes detected — if this is a backend project, run 'scan --full' and check that route files are in the configured paths") - - return recs[:15] - - -register_command( - "analyze", - "Full repository analysis: init + scan + all engines in one command (v6.0)", - add_args, - execute, -hidden=True, -) diff --git a/scripts/commands/audit.py b/scripts/commands/audit.py index 6f498d7..3fefc93 100644 --- a/scripts/commands/audit.py +++ b/scripts/commands/audit.py @@ -7,6 +7,8 @@ - staleness Per-file staleness detection - perf-hint Performance anti-patterns - side-effect Pure vs impure function analysis + - check CI/CD quality gate (issue #200) + - missing-refs CSS/HTML mismatch detection (issue #200) (god-module from the issue mapping is part of arch-metrics, exposed via ``summary --check arch-metrics`` — there is no standalone god-module command @@ -16,6 +18,8 @@ codelens audit # all checks codelens audit --check dead-code # only dead-code codelens audit --check complexity,smell # pick subset + codelens audit --check check # CI quality gate + codelens audit --check missing-refs # CSS/HTML mismatch Output: ``{"s":"ok", "st":{...}, "r":[...]}`` — one entry per check under ``r`` and aggregate counts under ``st``. @@ -59,6 +63,16 @@ "module": "commands.side_effect", "help": "Pure vs impure function analysis", }, + # Issue #200: absorb the hidden-pending CI quality-gate command. + "check": { + "module": "commands.check", + "help": "CI/CD quality gate (exits non-zero on failure)", + }, + # Issue #200: absorb the hidden-pending CSS/HTML mismatch command. + "missing-refs": { + "module": "commands.missingrefs", + "help": "Detect CSS/HTML mismatch bugs", + }, } ALL_CHECKS = list(_CHECKS.keys()) @@ -75,11 +89,15 @@ def add_args(parser): " staleness Per-file staleness detection\n" " perf-hint Performance anti-patterns\n" " side-effect Pure vs impure function analysis\n" + " check CI/CD quality gate (issue #200)\n" + " missing-refs CSS/HTML mismatch detection (issue #200)\n" "\n" "Examples:\n" " codelens audit . # all checks\n" " codelens audit . --check dead-code # only dead-code\n" " codelens audit . --check complexity,smell # pick subset\n" + " codelens audit . --check check # CI quality gate\n" + " codelens audit . --check missing-refs # CSS/HTML mismatch\n" ) parser.add_argument("workspace", nargs="?", default=None, help="Path to workspace root (auto-detected if omitted)") @@ -164,6 +182,33 @@ def _build_namespace(base_args, check_name: str) -> argparse.Namespace: ns.name = getattr(base_args, "name", None) ns.file = getattr(base_args, "file", None) ns.max_files = getattr(base_args, "max_files", None) or 3000 + elif check_name == "check": + # Issue #200: CI quality-gate. check.py reads many args; under the + # audit umbrella we expose the common --severity passthrough and use + # sensible defaults for the rest. Users who need the full gate + # surface (baseline, diff-scan, strict mode, rule-file) should invoke + # ``codelens check`` directly — the deprecated alias still works. + ns.severity = getattr(base_args, "severity", None) or "high" + ns.max_findings = 0 + ns.health_min = 0 + ns.sarif = False + # check.py iterates args.commands directly — must be a list. + # Mirror the default from check.py add_args(). + ns.commands = ['secrets', 'dead-code', 'smell', 'complexity', + 'debug-leak', 'circular', 'taint'] + ns.rule_files = None + ns.baseline_commit = None + ns.save_baseline = False + ns.diff_scan = False + ns.staged = False + ns.diff_vs = None + ns.strict = False + ns.error = False + ns.severity_threshold = None + elif check_name == "missing-refs": + # missing-refs.execute() only uses the workspace arg (passed + # separately); no per-check namespace attributes are required. + pass return ns diff --git a/scripts/commands/check.py b/scripts/commands/check.py index 6b2448e..23d3947 100755 --- a/scripts/commands/check.py +++ b/scripts/commands/check.py @@ -487,4 +487,5 @@ def execute(args, workspace): add_args, execute, hidden=True, + deprecated_alias_for='audit', ) diff --git a/scripts/commands/config_drift.py b/scripts/commands/config_drift.py deleted file mode 100644 index 4e4772d..0000000 --- a/scripts/commands/config_drift.py +++ /dev/null @@ -1,20 +0,0 @@ -"""Config-drift command — Detect dependency drift (package.json vs code).""" - -from configdrift_engine import detect_config_drift -from commands import register_command - - -def add_args(parser): - parser.add_argument("workspace", nargs="?", default=None, - help="Path to workspace root (auto-detected if omitted)") - - -def execute(args, workspace): - return detect_config_drift(workspace) - - -register_command("config-drift", "Detect dependency drift (package.json vs code)", add_args, execute, - -hidden=True, - -) diff --git a/scripts/commands/deps_audit.py b/scripts/commands/deps_audit.py index 7868e82..f41e2d3 100644 --- a/scripts/commands/deps_audit.py +++ b/scripts/commands/deps_audit.py @@ -49,5 +49,6 @@ def execute(args, workspace): "Scan dependencies for known CVEs via OSV.dev (PyPI/npm/crates.io)", add_args, execute, -hidden=True, + hidden=True, + deprecated_alias_for='security', ) diff --git a/scripts/commands/entrypoints.py b/scripts/commands/entrypoints.py deleted file mode 100644 index e0a6102..0000000 --- a/scripts/commands/entrypoints.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Entrypoints command — Map execution entry points.""" - -from entrypoints_engine import map_entrypoints -from commands import register_command - - -def add_args(parser): - parser.add_argument("workspace", nargs="?", default=None, - help="Path to workspace root (auto-detected if omitted)") - parser.add_argument("--type", dest="entry_type", default=None, - choices=["main", "http_handler", "event_handler", "cli_command", - "cron_job", "worker", "module_export", "test_entry"], - help="Filter by entry point type") - parser.add_argument("--exclude-tests", action="store_true", default=False, - help="Exclude test_entry type from scanning (reduces noise on large repos)") - parser.add_argument("--max-files", type=int, default=5000, - help="Maximum number of files to scan (default: 5000)") - - -def execute(args, workspace): - return map_entrypoints(workspace, entry_type=args.entry_type, - exclude_tests=args.exclude_tests, - max_files=args.max_files) - - -register_command("entrypoints", "Map execution entry points", add_args, execute, - -hidden=True, - -) diff --git a/scripts/commands/list.py b/scripts/commands/list.py index aef14ae..63b6735 100644 --- a/scripts/commands/list.py +++ b/scripts/commands/list.py @@ -125,4 +125,6 @@ def cmd_list(workspace: str, domain: str, filter_type: str = "all", hidden=True, +deprecated_alias_for='search', + ) diff --git a/scripts/commands/lsp.py b/scripts/commands/lsp.py deleted file mode 100644 index f26acf4..0000000 --- a/scripts/commands/lsp.py +++ /dev/null @@ -1,135 +0,0 @@ -"""LSP command — launch the native CodeLens LSP 3.17 server. - -Issue #48 (Phase 1): exposes CodeLens analysis (tree-sitter scan + rule -engine + minimal symbol graph) to editors such as Neovim, Emacs, Helix, -VS Code via the Language Server Protocol. - -The server is implemented in ``scripts/lsp_server.py`` (pygls-based). -This file is the thin CLI wrapper that registers the ``lsp`` command, -parses args, and delegates to ``lsp_server.run_stdio`` or -``lsp_server.run_tcp``. - -Usage:: - - codelens lsp # stdio (default) - codelens lsp --rule-file my.yaml # + rule-engine diagnostics - codelens lsp --tcp --port 2087 # TCP transport (debug) - codelens lsp --version # print version, exit 0 - -Phase 1 supported LSP methods: - -* ``initialize`` / ``initialized`` / ``shutdown`` / ``exit`` -* ``textDocument/didOpen`` — parse + scan, publish diagnostics -* ``textDocument/didChange`` — re-parse + re-scan -* ``textDocument/hover`` — return symbol info + callers/callees -* ``textDocument/definition`` — go-to-definition via symbol graph -* ``textDocument/publishDiagnostics`` — auto-sent after didOpen/didChange - -Severity mapping (CodeLens → LSP ``DiagnosticSeverity``): - - critical / ERROR → Error (1) - high / WARNING → Warning (2) - medium / INFO → Information (3) - low / HINT → Hint (4) -""" - -from __future__ import annotations - -import sys - -from commands import register_command - -VERSION = "0.1.0" - - -def add_args(parser): - """Add LSP-server-specific arguments to the parser.""" - parser.add_argument( - "--rule-file", - dest="rule_files", - action="append", - default=None, - metavar="", - help="Path to a Semgrep-compatible YAML rule file (issue #46). " - "May be passed multiple times. When set, the LSP server " - "runs the rule engine on each document and publishes " - "diagnostics.", - ) - parser.add_argument( - "--tcp", - action="store_true", - default=False, - help="Use TCP transport instead of stdio (useful for debugging).", - ) - parser.add_argument( - "--host", - default="127.0.0.1", - help="TCP host to bind to (only used with --tcp). Default: 127.0.0.1.", - ) - parser.add_argument( - "--port", - type=int, - default=2087, - help="TCP port to bind to (only used with --tcp). Default: 2087.", - ) - parser.add_argument( - "--version", - dest="show_version", - action="store_true", - default=False, - help="Print the CodeLens LSP server version and exit.", - ) - - -def execute(args, workspace): - """Launch the LSP server. - - Returns a dict with ``status`` and either ``version`` (for - ``--version``) or ``transport`` info. The function blocks for the - lifetime of the server when running in stdio/TCP mode. - """ - if getattr(args, "show_version", False): - return { - "status": "ok", - "version": VERSION, - "name": "codelens-lsp", - } - - try: - from lsp_server import run_stdio, run_tcp - except ImportError as exc: - return { - "status": "error", - "error": f"cannot start LSP server: {exc}", - "hint": "Install optional deps: pip install codelens[lsp]", - } - - rule_files = list(getattr(args, "rule_files", None) or []) - try: - if getattr(args, "tcp", False): - run_tcp(args.host, args.port, rule_files) - return { - "status": "ok", - "transport": "tcp", - "host": args.host, - "port": args.port, - } - else: - run_stdio(rule_files) - return { - "status": "ok", - "transport": "stdio", - } - except KeyboardInterrupt: - return {"status": "ok", "transport": "stdio", "note": "interrupted by user"} - except Exception as exc: - return {"status": "error", "error": f"LSP server crashed: {exc}"} - - -register_command( - "lsp", - "Run CodeLens as a native LSP 3.17 server (stdio by default; --tcp for debug)", - add_args, - execute, -hidden=True, -) diff --git a/scripts/commands/missingrefs.py b/scripts/commands/missingrefs.py index b313128..b29464f 100644 --- a/scripts/commands/missingrefs.py +++ b/scripts/commands/missingrefs.py @@ -23,4 +23,5 @@ def execute(args, workspace): return detect_missing_refs(workspace) -register_command("missing-refs", "Detect CSS/HTML mismatch bugs", add_args, execute, hidden=True) +register_command("missing-refs", "Detect CSS/HTML mismatch bugs", add_args, execute, + hidden=True, deprecated_alias_for='audit') diff --git a/scripts/commands/plugin.py b/scripts/commands/plugin.py index 9a3d1e1..8f3ede3 100644 --- a/scripts/commands/plugin.py +++ b/scripts/commands/plugin.py @@ -333,10 +333,13 @@ def _extract_plugin_name_from_source(source: str) -> Optional[str]: return None +# Issue #200: plugin is a standalone visible command — it manages plugin +# lifecycle (install/uninstall/search/validate) which is unique and does not +# overlap with any umbrella command. It was hidden-pending BOS decision; the +# decision (issue #200) is to make it visible. register_command( "plugin", "Manage CodeLens plugins (install, list, search, update, info, validate)", add_args, execute, -hidden=True, ) diff --git a/scripts/commands/query.py b/scripts/commands/query.py index c8367c1..7bc9e5b 100644 --- a/scripts/commands/query.py +++ b/scripts/commands/query.py @@ -455,4 +455,6 @@ def fuzzy_sort_key(match): hidden=True, +deprecated_alias_for='search', + ) diff --git a/scripts/commands/search.py b/scripts/commands/search.py index 9b3496c..d49d7c7 100644 --- a/scripts/commands/search.py +++ b/scripts/commands/search.py @@ -26,7 +26,7 @@ from commands import register_command -_MODES = ("semantic", "symbol", "regex", "graph") +_MODES = ("semantic", "symbol", "regex", "graph", "list", "query") def add_args(parser): @@ -38,16 +38,22 @@ def add_args(parser): " symbol Exact symbol name lookup (fuzzy optional)\n" " regex Regex code search across workspace files\n" " graph Cypher-subset graph query (MATCH/WHERE/RETURN/LIMIT)\n" + " list List all registry entries with optional filter (issue #200)\n" + " query Query a specific class/id/function with callers/callees (issue #200)\n" "\n" "Examples:\n" " codelens search . \"google auth\" # semantic (default)\n" " codelens search . \"google auth\" --mode symbol # exact symbol\n" " codelens search . \"handleChange\" --mode regex # regex code search\n" " codelens search . \"MATCH (n) WHERE n.id CONTAINS x\" --mode graph\n" + " codelens search . --mode list # list all entries\n" + " codelens search submit-btn --mode query # query a symbol\n" "\n" "For raw Cypher pass-through, prefer ``codelens graph ``." ) - parser.add_argument("pattern", help="Search query (semantic query, symbol name, regex, or Cypher)") + parser.add_argument("pattern", nargs="?", default=None, + help="Search query (semantic query, symbol name, regex, or Cypher). " + "Optional for --mode list.") parser.add_argument("workspace", nargs="?", default=None, help="Path to workspace root (auto-detected if omitted)") parser.add_argument("--mode", default="semantic", choices=_MODES, @@ -82,6 +88,16 @@ def add_args(parser): help="regex/symbol mode: pagination offset (default: 0)") parser.add_argument("--db-path", default=None, help="Custom SQLite database path (semantic/graph modes)") + # list-mode passthroughs (issue #200). + parser.add_argument("--filter", dest="filter_type", default=None, + choices=["all", "dead", "duplicate_define", "duplicate_ref", + "collision", "active"], + help="list mode: filter by status (default: all)") + # query-mode passthroughs (issue #200). + parser.add_argument("--all", dest="all_results", action="store_true", default=False, + help="query mode: return all callers/callees (no limit)") + parser.add_argument("--additional-paths", default=None, metavar="PATHS", + help="query mode: comma-separated extra repo roots for cross-repo query (issue #15)") def _run_semantic(args, workspace) -> Dict[str, Any]: @@ -162,14 +178,89 @@ def _run_graph(args, workspace) -> Dict[str, Any]: return _qg_execute(sub_args, workspace) +def _run_list(args, workspace) -> Dict[str, Any]: + """List all registry entries with optional filter (issue #200). + + Delegates to ``commands.list.execute()``. The ``pattern`` positional is + repurposed as an optional name filter — when None, all entries are listed. + + Because ``pattern`` is the first positional in the search parser, a common + invocation is ``search /path/to/ws --mode list`` where the user intends + the path to be the workspace, not a name filter. We detect this case + (workspace is None and pattern looks like a directory) and swap them. + """ + from commands.list import execute as _list_execute + pattern = getattr(args, "pattern", None) + ws = getattr(args, "workspace", None) + # Heuristic: if workspace wasn't given but pattern is an existing dir, + # the user meant the path as the workspace (list mode has no query arg). + if ws is None and pattern and os.path.isdir(pattern): + ws = pattern + pattern = None + sub_args = argparse.Namespace( + workspace=ws, + domain=getattr(args, "domain", None) or "all", + filter_type=getattr(args, "filter_type", None) or "all", + limit=getattr(args, "limit", None) or 20, + offset=getattr(args, "offset", 0), + format=getattr(args, "format", None), + top=None, max_tokens=None, lite=False, deep=False, db_path=None, + diff_base=None, diff_scope=None, + disable_suppression=None, codelens_ignore_pattern=None, + ) + result = _list_execute(sub_args, workspace if workspace else ws) + # If a pattern filter was given (and not consumed as workspace), narrow + # results by name substring. + if pattern and isinstance(result, dict) and "results" in result: + filtered = [r for r in result["results"] + if pattern in str(r.get("name", ""))] + result["results"] = filtered + result["count"] = len(filtered) + result["filtered_by"] = pattern + return result + + +def _run_query(args, workspace) -> Dict[str, Any]: + """Query a specific class/id/function with callers/callees (issue #200). + + Delegates to ``commands.query.execute()``. The ``pattern`` positional is + the symbol name to query. + """ + from commands.query import execute as _query_execute + sub_args = argparse.Namespace( + name=args.pattern, + workspace=getattr(args, "workspace", None), + domain=getattr(args, "domain", None), + file=getattr(args, "file", None), + limit=None if getattr(args, "all_results", False) else (getattr(args, "limit", None) or 20), + all=getattr(args, "all_results", False), + fuzzy=getattr(args, "fuzzy", False), + additional_paths=getattr(args, "additional_paths", None), + format=getattr(args, "format", None), + top=None, max_tokens=None, lite=False, deep=False, db_path=None, + diff_base=None, diff_scope=None, + disable_suppression=None, codelens_ignore_pattern=None, + ) + return _query_execute(sub_args, workspace) + + def execute(args, workspace): """Dispatch to the selected search mode and normalize output shape. @FLOW: SEARCH_DISPATCH - @CALLS: _run_semantic() | _run_symbol() | _run_regex() | _run_graph() -> dict + @CALLS: _run_semantic() | _run_symbol() | _run_regex() | _run_graph() + | _run_list() | _run_query() -> dict @MUTATES: nothing (read-only) """ mode = getattr(args, "mode", "semantic") or "semantic" + pattern = getattr(args, "pattern", None) + # Modes that require a pattern. list mode allows pattern=None (list all). + _PATTERN_REQUIRED = {"semantic", "symbol", "regex", "graph", "query"} + if mode in _PATTERN_REQUIRED and not pattern: + return { + "s": "error", "st": {"mode": mode}, "r": [], + "error": f"--mode {mode} requires a pattern (search query / symbol name).", + } try: if mode == "semantic": result = _run_semantic(args, workspace) @@ -179,6 +270,10 @@ def execute(args, workspace): result = _run_regex(args, workspace) elif mode == "graph": result = _run_graph(args, workspace) + elif mode == "list": + result = _run_list(args, workspace) + elif mode == "query": + result = _run_query(args, workspace) else: return {"s": "error", "st": {"mode": mode}, "r": [], "error": f"unknown mode '{mode}'"} diff --git a/scripts/commands/security.py b/scripts/commands/security.py index 8e7c1f1..80910f0 100644 --- a/scripts/commands/security.py +++ b/scripts/commands/security.py @@ -6,12 +6,14 @@ - taint AST-based taint analysis - binary-scan Binary/compiled artifact reverse-engineering - regex-audit Regex ReDoS and issue audit + - deps-audit Dependency vulnerability scan via OSV.dev (issue #200) Usage: codelens security # all checks codelens security --check secrets # only secrets codelens security --check taint,vuln-scan # pick subset codelens security --check binary-scan --deep + codelens security --check deps-audit --offline Output: ``{"s":"ok", "st":{...}, "r":[...]}``. """ @@ -50,6 +52,11 @@ "module": "commands.regex_audit", "help": "Regex ReDoS and issue audit", }, + # Issue #200: absorb the hidden-pending OSV.dev dependency audit. + "deps-audit": { + "module": "commands.deps_audit", + "help": "Dependency vulnerability scan via OSV.dev (PyPI/npm/crates.io)", + }, } ALL_CHECKS = list(_CHECKS.keys()) @@ -65,12 +72,14 @@ def add_args(parser): " taint AST-based taint analysis\n" " binary-scan Binary/compiled artifact reverse-engineering\n" " regex-audit Regex ReDoS and issue audit\n" + " deps-audit Dependency vulnerability scan via OSV.dev (issue #200)\n" "\n" "Examples:\n" " codelens security . # all checks\n" " codelens security . --check secrets # only secrets\n" " codelens security . --check taint,vuln-scan # pick subset\n" " codelens security . --check binary-scan --deep # deep binary scan\n" + " codelens security . --check deps-audit # dependency CVEs\n" ) parser.add_argument("workspace", nargs="?", default=None, help="Path to workspace root (auto-detected if omitted)") @@ -104,6 +113,10 @@ def add_args(parser): help="vuln-scan: OSV cache TTL in seconds") parser.add_argument("--max-age", default=None, help="vuln-scan: max cache age (e.g. 6h, 30m, 2d)") + # deps-audit passthroughs (issue #200). + parser.add_argument("--ecosystem", default=None, + choices=["PyPI", "npm", "crates.io"], + help="deps-audit: limit scan to one package ecosystem") def _parse_checks(check_arg: str) -> List[str]: @@ -151,6 +164,11 @@ def _build_namespace(base_args, check_name: str) -> argparse.Namespace: elif check_name == "regex-audit": ns.severity = getattr(base_args, "severity", None) ns.max_files = getattr(base_args, "max_files", None) or 1000 + elif check_name == "deps-audit": + # Issue #200: deps_audit.execute() reads severity/ecosystem/offline. + ns.severity = getattr(base_args, "severity", None) + ns.ecosystem = getattr(base_args, "ecosystem", None) + ns.offline = getattr(base_args, "offline", False) return ns diff --git a/scripts/commands/state_map.py b/scripts/commands/state_map.py deleted file mode 100644 index 2589010..0000000 --- a/scripts/commands/state_map.py +++ /dev/null @@ -1,22 +0,0 @@ -"""State-map command — Track global state management.""" - -from statemap_engine import map_state -from commands import register_command - - -def add_args(parser): - parser.add_argument("workspace", nargs="?", default=None, - help="Path to workspace root (auto-detected if omitted)") - parser.add_argument("--store", dest="store_name", default=None, - help="Filter by store name") - - -def execute(args, workspace): - return map_state(workspace, store_name=args.store_name) - - -register_command("state-map", "Track global state management", add_args, execute, - -hidden=True, - -) diff --git a/scripts/commands/test_map.py b/scripts/commands/test_map.py deleted file mode 100644 index c68130c..0000000 --- a/scripts/commands/test_map.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Test-map command — Map test coverage for functions.""" - -from testmap_engine import map_test_coverage -from commands import register_command - - -def add_args(parser): - parser.add_argument("workspace", nargs="?", default=None, - help="Path to workspace root (auto-detected if omitted)") - parser.add_argument("--function", dest="function_name", default=None, - help="Check specific function test coverage") - parser.add_argument("--file", default=None, help="Filter by source file path") - parser.add_argument("--max-files", type=int, default=3000, - help="Max files to scan (default: 3000)") - - -def execute(args, workspace): - return map_test_coverage( - workspace, - function_name=args.function_name, - file_filter=args.file, - max_files=args.max_files - ) - - -register_command("test-map", "Map test coverage for functions", add_args, execute, - -hidden=True, - -) diff --git a/scripts/commands/type_infer.py b/scripts/commands/type_infer.py deleted file mode 100644 index 71be86c..0000000 --- a/scripts/commands/type_infer.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Type-infer command — Lightweight type inference for JS/Python.""" - -from typeinfer_engine import infer_types -from commands import register_command - - -def add_args(parser): - parser.add_argument("workspace", nargs="?", default=None, - help="Path to workspace root (auto-detected if omitted)") - parser.add_argument("--file", default=None, help="Specific file to analyze") - parser.add_argument("--function", dest="function_name", default=None, - help="Specific function to infer types for") - - -def execute(args, workspace): - return infer_types( - workspace, - file_path=args.file, - function_name=args.function_name - ) - - -register_command("type-infer", "Lightweight type inference for JS/Python", add_args, execute, - -hidden=True, - -) diff --git a/scripts/graph_model.py b/scripts/graph_model.py index 4375002..3a6ec0b 100644 --- a/scripts/graph_model.py +++ b/scripts/graph_model.py @@ -13,7 +13,7 @@ - New tables `graph_nodes` and `graph_edges` are additive (prefixed `graph_` to avoid colliding with any existing table name). - The flat registry tables and JSON files are untouched. -- All 12 existing CLI commands continue to work unchanged. +- All 13 existing CLI commands continue to work unchanged. Schema: graph_nodes( diff --git a/skill.json b/skill.json index d178a52..12f65f2 100755 --- a/skill.json +++ b/skill.json @@ -1,7 +1,7 @@ { "name": "codelens", "version": "8.2.0", - "description": "Live Codebase Reference Intelligence. 12 commands for AI-powered code analysis, security auditing, quality scoring, and pre-write safety checks. Supports 28+ languages with regex+AST hybrid parsing. Must activate before writing/editing/deleting any class, id, or function.", + "description": "Live Codebase Reference Intelligence. 13 commands for AI-powered code analysis, security auditing, quality scoring, and pre-write safety checks. Supports 28+ languages with regex+AST hybrid parsing. Must activate before writing/editing/deleting any class, id, or function.", "author": "codelens", "command_categories": { "setup": [ diff --git a/tests/test_issue195_consolidation.py b/tests/test_issue195_consolidation.py index d80c0bb..6d781db 100644 --- a/tests/test_issue195_consolidation.py +++ b/tests/test_issue195_consolidation.py @@ -1,11 +1,13 @@ -"""Tests for the 12 umbrella commands introduced in issue #195. +"""Tests for the 12 umbrella commands + plugin standalone (issue #195 + #200). Verifies: - All 12 umbrella commands are registered and visible in COMMAND_REGISTRY. -- --help only shows the 12 umbrella commands (hidden aliases suppressed). -- --command-count reports 12. +- plugin is a standalone visible command (issue #200). +- --help only shows the 13 visible commands (hidden aliases suppressed). +- --command-count reports 13. - Each umbrella command's execute() returns the {s, st, r} shape. - Deprecated aliases print a redirect warning to stderr when invoked. +- Issue #200: the 13 hidden-pending commands are resolved (absorbed or dropped). """ from __future__ import annotations @@ -32,6 +34,14 @@ "summary", "impact", "api-map", "doctor", "history", "graph", } +# Issue #200: plugin is a standalone visible command (not an umbrella — it +# manages plugin lifecycle, which is unique and does not overlap with any +# umbrella). It was hidden-pending BOS decision; issue #200 made it visible. +EXPECTED_STANDALONE = {"plugin"} + +# The full set of visible commands = 12 umbrellas + 1 standalone. +EXPECTED_VISIBLE = EXPECTED_UMBRELLA | EXPECTED_STANDALONE + def test_12_umbrella_commands_registered(): """All 12 umbrella commands must be registered (issue #195).""" @@ -39,11 +49,24 @@ def test_12_umbrella_commands_registered(): assert name in COMMAND_REGISTRY, f"umbrella command {name!r} not registered" -def test_only_12_visible_commands(): - """Only the 12 umbrella commands are visible (non-hidden).""" +def test_plugin_standalone_registered_and_visible(): + """plugin must be registered and visible (issue #200).""" + assert "plugin" in COMMAND_REGISTRY, "plugin not registered" + info = COMMAND_REGISTRY["plugin"] + assert not info.get("hidden", False), "plugin must be visible (issue #200)" + assert info.get("deprecated_alias_for") is None, ( + "plugin must not be a deprecated alias (issue #200)" + ) + + +def test_only_13_visible_commands(): + """Only the 12 umbrella commands + plugin are visible (non-hidden). + + Issue #200: plugin became visible, bringing the count from 12 to 13. + """ visible = get_visible_commands() - assert set(visible.keys()) == EXPECTED_UMBRELLA, ( - f"expected exactly {EXPECTED_UMBRELLA}, got {set(visible.keys())}" + assert set(visible.keys()) == EXPECTED_VISIBLE, ( + f"expected exactly {EXPECTED_VISIBLE}, got {set(visible.keys())}" ) @@ -80,6 +103,12 @@ def test_absorbed_commands_marked_hidden_and_deprecated(): "regex-audit": "security", "query-graph": "graph", "architecture": "summary", + # Issue #200: the 5 hidden-pending commands that were absorbed. + "check": "audit", + "missing-refs": "audit", + "deps-audit": "security", + "list": "search", + "query": "search", } for old_name, umbrella in samples.items(): assert old_name in COMMAND_REGISTRY, f"{old_name!r} not in registry" @@ -99,6 +128,9 @@ def test_dropped_commands_not_registered(): "css-deep", "debug-leak", "detect", "export-snapshot", "refactor-safe", "resolve-types", "stack-trace", "benchmark", "fix", "self-analyze", "guard", "llm", "memory", + # Issue #200: 7 hidden-pending commands dropped (overlap / unproven engine). + "analyze", "lsp", "entrypoints", "state-map", "config-drift", + "test-map", "type-infer", } for name in dropped: assert name not in COMMAND_REGISTRY, f"dropped command {name!r} still registered" @@ -132,12 +164,16 @@ def _run_cli(*args, expect_success=True): return result -def test_help_shows_only_12_umbrella_commands(): - """`codelens --help` must list exactly the 12 umbrella commands.""" +def test_help_shows_only_13_visible_commands(): + """`codelens --help` must list all 13 visible commands (issue #200). + + Issue #195 had 12 umbrella commands; issue #200 added plugin as a + standalone visible command, bringing the total to 13. + """ result = _run_cli("--help") - # Each umbrella command name must appear in the choices list. - for name in EXPECTED_UMBRELLA: - assert name in result.stdout, f"umbrella {name!r} not in --help" + # Each visible command name must appear in the choices list. + for name in EXPECTED_VISIBLE: + assert name in result.stdout, f"visible command {name!r} not in --help" # A sample of hidden aliases must NOT appear in the choices list. # (They may appear in command body text if mentioned in epilogs, but # argparse.SUPPRESS ensures they're not in the {choices} enumeration.) @@ -149,11 +185,15 @@ def test_help_shows_only_12_umbrella_commands(): pass # argparse.SUPPRESS guarantees this; full verification via --command-count -def test_command_count_reports_12(): - """`codelens --command-count` must print exactly 12.""" +def test_command_count_reports_13(): + """`codelens --command-count` must print exactly 13 (issue #200). + + Issue #195 had 12 umbrella commands; issue #200 made plugin visible, + bringing the count to 13. + """ result = _run_cli("--command-count") - assert result.stdout.strip() == "12", ( - f"expected '12', got {result.stdout.strip()!r}" + assert result.stdout.strip() == "13", ( + f"expected '13', got {result.stdout.strip()!r}" ) @@ -344,3 +384,117 @@ def test_graph_umbrella_registered(): info = COMMAND_REGISTRY["graph"] assert info.get("hidden") is not True # umbrella, must be visible assert callable(info["execute"]) + + +# ─── 4. Issue #200: new --check / --mode dispatches ───────────────── +# Each new absorb target must dispatch to the sub-command and tag the result. + + +def test_audit_check_dispatches_to_check(): + """`audit --check check` dispatches to the check sub-command (issue #200).""" + import argparse + from commands.audit import execute as audit_execute + ws = _make_workspace() + args = argparse.Namespace( + workspace=ws, check="check", max_files=None, max_results=None, + categories=None, severity=None, threshold=None, sort_by=None, + name=None, file=None, limit=None, category=None, + no_confirm_hash=False, format="json", top=None, max_tokens=None, + lite=False, deep=False, db_path=None, diff_base=None, diff_scope=None, + disable_suppression=None, codelens_ignore_pattern=None, + ) + result = audit_execute(args, ws) + assert "s" in result + assert "r" in result + assert any(r.get("_check") == "check" for r in result["r"]) + + +def test_audit_missing_refs_dispatches(): + """`audit --check missing-refs` dispatches to the missingrefs sub-command (issue #200).""" + import argparse + from commands.audit import execute as audit_execute + ws = _make_workspace() + args = argparse.Namespace( + workspace=ws, check="missing-refs", max_files=None, max_results=None, + categories=None, severity=None, threshold=None, sort_by=None, + name=None, file=None, limit=None, category=None, + no_confirm_hash=False, format="json", top=None, max_tokens=None, + lite=False, deep=False, db_path=None, diff_base=None, diff_scope=None, + disable_suppression=None, codelens_ignore_pattern=None, + ) + result = audit_execute(args, ws) + assert "s" in result + assert "r" in result + assert any(r.get("_check") == "missing-refs" for r in result["r"]) + + +def test_security_deps_audit_dispatches(): + """`security --check deps-audit` dispatches to deps_audit sub-command (issue #200).""" + import argparse + from commands.security import execute as security_execute + ws = _make_workspace() + args = argparse.Namespace( + workspace=ws, check="deps-audit", max_files=None, severity=None, + no_gitleaks=False, language=None, with_secrets=False, + cross_file=False, no_ast=False, ast=False, deep=False, + offline=True, refresh=False, osv_ttl=None, max_age=None, + ecosystem=None, + format="json", top=None, max_tokens=None, lite=False, + db_path=None, diff_base=None, diff_scope=None, + disable_suppression=None, codelens_ignore_pattern=None, + ) + result = security_execute(args, ws) + assert "s" in result + assert "r" in result + assert any(r.get("_check") == "deps-audit" for r in result["r"]) + + +def test_search_list_mode_dispatches(): + """`search --mode list` dispatches to the list sub-command (issue #200).""" + import argparse + from commands.search import execute as search_execute + ws = _make_workspace() + args = argparse.Namespace( + pattern=None, workspace=ws, mode="list", + file_type=None, file=None, max_results=200, context=0, + ignore_case=False, whole_word=False, domain=None, fuzzy=False, + top=None, validate=False, limit=20, offset=0, db_path=None, + filter_type=None, all_results=False, additional_paths=None, + format="json", max_tokens=None, lite=False, deep=False, + diff_base=None, diff_scope=None, + disable_suppression=None, codelens_ignore_pattern=None, + ) + result = search_execute(args, ws) + assert "s" in result + assert result["st"]["mode"] == "list" + + +def test_search_query_mode_dispatches(): + """`search --mode query` dispatches to the query sub-command (issue #200).""" + import argparse + from commands.search import execute as search_execute + ws = _make_workspace() + args = argparse.Namespace( + pattern="hello", workspace=ws, mode="query", + file_type=None, file=None, max_results=200, context=0, + ignore_case=False, whole_word=False, domain=None, fuzzy=False, + top=None, validate=False, limit=20, offset=0, db_path=None, + filter_type=None, all_results=False, additional_paths=None, + format="json", max_tokens=None, lite=False, deep=False, + diff_base=None, diff_scope=None, + disable_suppression=None, codelens_ignore_pattern=None, + ) + result = search_execute(args, ws) + assert "s" in result + assert result["st"]["mode"] == "query" + + +def test_no_hidden_pending_commands_remain(): + """Issue #200 DoD: no hidden-pending commands (hidden + no deprecated_alias_for).""" + hidden_pending = [ + name for name, info in COMMAND_REGISTRY.items() + if info.get("hidden") and not info.get("deprecated_alias_for") + ] + assert not hidden_pending, ( + f"hidden-pending commands remain (issue #200 not resolved): {hidden_pending}" + )