From 7adab6af2e2672984dae6ae3027b2c8e2875556c Mon Sep 17 00:00:00 2001 From: blut-agent Date: Tue, 23 Jun 2026 09:35:53 -0500 Subject: [PATCH 1/2] feat(cli): add policy validation subcommand - Add 'tealtiger validate ' CLI command for validating policy files - Validates required fields (name, description, rules) - Validates action types (allow/deny/transform) - Validates rule structure and detects unknown fields - Supports JSON and YAML file formats - Fixes entry point to use Click group with both test and validate commands - Add 21 unit tests covering load, validate, and error cases --- pyproject.toml | 2 +- src/tealtiger/cli/__init__.py | 19 ++- src/tealtiger/cli/validate.py | 225 +++++++++++++++++++++++++++++ tests/test_cli_validate.py | 260 ++++++++++++++++++++++++++++++++++ 4 files changed, 503 insertions(+), 3 deletions(-) create mode 100644 src/tealtiger/cli/validate.py create mode 100644 tests/test_cli_validate.py diff --git a/pyproject.toml b/pyproject.toml index 9c4a47b..99570d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,7 @@ dependencies = [ ] [project.scripts] -tealtiger = "tealtiger.cli.test:cli" +tealtiger = "tealtiger.cli:cli" [project.optional-dependencies] dev = [ diff --git a/src/tealtiger/cli/__init__.py b/src/tealtiger/cli/__init__.py index a5d7728..4f1815a 100644 --- a/src/tealtiger/cli/__init__.py +++ b/src/tealtiger/cli/__init__.py @@ -2,10 +2,25 @@ Provides command-line tools for: - Policy testing +- Policy validation - Configuration validation - Report generation """ -from .test import test +import click -__all__ = ['test'] +from .test import test as test_cmd +from .validate import validate + + +@click.group() +def cli(): + """TealTiger CLI - AI agent security platform.""" + pass + + +cli.add_command(test_cmd) +cli.add_command(validate) + +if __name__ == "__main__": + cli() diff --git a/src/tealtiger/cli/validate.py b/src/tealtiger/cli/validate.py new file mode 100644 index 0000000..c4080a3 --- /dev/null +++ b/src/tealtiger/cli/validate.py @@ -0,0 +1,225 @@ +"""CLI interface for policy validation. + +TealTiger SDK - Policy validation command. + +Validates policy files (JSON/YAML) against the SDK schema: +- Checks required fields (name, description, rules) +- Validates action types (allow/deny/transform) +- Validates condition operators +- Reports errors with line numbers +""" + +import json +import sys +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import click + +# Valid action values for policy rules +VALID_ACTIONS = {"allow", "deny", "transform"} + +# Valid top-level policy fields +VALID_POLICY_FIELDS = {"id", "name", "description", "rules", "enabled"} + +# Valid rule fields +VALID_RULE_FIELDS = {"condition", "action", "reason"} + + +def _load_policy(path: str) -> Tuple[Any, str | None]: + """Load a policy file (JSON or YAML). + + Args: + path: Path to the policy file. + + Returns: + Tuple of (parsed policy, error message). + On success, error is None. On failure, policy is None. + """ + try: + content = Path(path).read_text(encoding="utf-8") + except FileNotFoundError: + return None, f"File not found: {path}" + except OSError as e: + return None, f"Cannot read file: {e}" + + ext = Path(path).suffix.lower() + + if ext in (".yaml", ".yml"): + try: + import yaml as _yaml + + policy = _yaml.safe_load(content) + except ImportError: + return None, ( + "PyYAML is required for YAML files. " + "Install with: pip install pyyaml" + ) + except _yaml.YAMLError as e: + line = getattr(e, "problem_mark", None) + line_num = line.line + 1 if line else "unknown" + return None, f"YAML parse error at line {line_num}: {e}" + elif ext in (".json",): + try: + policy = json.loads(content) + except json.JSONDecodeError as e: + return None, f"JSON parse error at line {e.lineno}: {e.msg}" + else: + # Try JSON first, then YAML + try: + policy = json.loads(content) + except json.JSONDecodeError: + try: + import yaml as _yaml + + policy = _yaml.safe_load(content) + except ImportError: + return None, ( + "Cannot determine file format. " + "Use .json, .yaml, or .yml extension." + ) + except Exception: + return None, "Failed to parse as JSON or YAML" + + return policy, None + + +def _validate_policy(policy: Any, source: str) -> List[str]: + """Validate a policy dict against the TealTiger schema. + + Args: + policy: Parsed policy dict. + source: File path for error reporting. + + Returns: + List of error messages. Empty list means valid. + """ + errors: List[str] = [] + + if not isinstance(policy, dict): + errors.append(f"✗ {source}: Policy must be a JSON/YAML object (dict), got {type(policy).__name__}") + return errors + + # Check for unknown top-level fields + unknown_fields = set(policy.keys()) - VALID_POLICY_FIELDS + for field in sorted(unknown_fields): + errors.append(f"✗ {source}: Unknown policy field '{field}' (valid: {', '.join(sorted(VALID_POLICY_FIELDS))})") + + # Validate required fields + for field in ("name", "description", "rules"): + if field not in policy: + errors.append(f"✗ {source}: Missing required field '{field}'") + + # Validate name is a non-empty string + if "name" in policy and not isinstance(policy["name"], str): + errors.append(f"✗ {source}: Field 'name' must be a string") + elif "name" in policy and not policy["name"].strip(): + errors.append(f"✗ {source}: Field 'name' must not be empty") + + # Validate description is a non-empty string + if "description" in policy and not isinstance(policy["description"], str): + errors.append(f"✗ {source}: Field 'description' must be a string") + elif "description" in policy and not policy["description"].strip(): + errors.append(f"✗ {source}: Field 'description' must not be empty") + + # Validate enabled is a boolean if present + if "enabled" in policy and not isinstance(policy["enabled"], bool): + errors.append(f"✗ {source}: Field 'enabled' must be a boolean") + + # Validate rules is a non-empty list + if "rules" in policy: + rules = policy["rules"] + if not isinstance(rules, list): + errors.append(f"✗ {source}: Field 'rules' must be a list") + elif len(rules) == 0: + errors.append(f"✗ {source}: Field 'rules' must not be empty") + else: + for i, rule in enumerate(rules): + _validate_rule(rule, source, i + 1, errors) + + return errors + + +def _validate_rule(rule: Any, source: str, rule_num: int, errors: List[str]) -> None: + """Validate a single policy rule. + + Args: + rule: The rule dict to validate. + source: File path for error reporting. + rule_num: 1-based rule number for error messages. + errors: List to append errors to. + """ + prefix = f"{source} Rule {rule_num}" + + if not isinstance(rule, dict): + errors.append(f"✗ {prefix}: Rule must be an object (dict), got {type(rule).__name__}") + return + + # Check for unknown rule fields + unknown_fields = set(rule.keys()) - VALID_RULE_FIELDS + for field in sorted(unknown_fields): + errors.append(f"✗ {prefix}: Unknown rule field '{field}' (valid: {', '.join(sorted(VALID_RULE_FIELDS))})") + + # Validate action + if "action" not in rule: + errors.append(f"✗ {prefix}: Missing required field 'action'") + elif not isinstance(rule["action"], str): + errors.append(f"✗ {prefix}: Field 'action' must be a string") + elif rule["action"] not in VALID_ACTIONS: + errors.append( + f"✗ {prefix}: Invalid action '{rule['action']}' " + f"(did you mean one of: {', '.join(sorted(VALID_ACTIONS))}?)" + ) + + # Validate condition is a dict + if "condition" not in rule: + errors.append(f"✗ {prefix}: Missing required field 'condition'") + elif not isinstance(rule["condition"], dict): + errors.append(f"✗ {prefix}: Field 'condition' must be an object (dict)") + + # Validate reason is a non-empty string + if "reason" not in rule: + errors.append(f"✗ {prefix}: Missing required field 'reason'") + elif not isinstance(rule["reason"], str): + errors.append(f"✗ {prefix}: Field 'reason' must be a string") + elif not rule["reason"].strip(): + errors.append(f"✗ {prefix}: Field 'reason' must not be empty") + + +@click.command() +@click.argument("policy_path", type=click.Path(exists=True)) +@click.option( + "--strict", + is_flag=True, + default=False, + help="Treat warnings as errors (fail on unknown fields)", +) +def validate(policy_path: str, strict: bool) -> None: + """Validate a TealTiger policy file. + + Checks policy structure, required fields, valid action types, + and condition format. Exits 0 on valid, 1 on invalid. + + Examples: + + tealtiger validate my-policy.json + + tealtiger validate my-policy.yaml --strict + """ + policy, load_error = _load_policy(policy_path) + + if load_error: + click.echo(f"✗ {load_error}", err=True) + sys.exit(1) + + errors = _validate_policy(policy, policy_path) + + if errors: + for error in errors: + click.echo(error, err=True) + click.echo(f"\n✗ Policy validation failed with {len(errors)} error(s)", err=True) + sys.exit(1) + + rule_count = len(policy.get("rules", [])) + click.echo(f"✓ Policy '{policy.get('name', 'unnamed')}' is valid ({rule_count} rule{'s' if rule_count != 1 else ''})") + sys.exit(0) diff --git a/tests/test_cli_validate.py b/tests/test_cli_validate.py new file mode 100644 index 0000000..0ccafbd --- /dev/null +++ b/tests/test_cli_validate.py @@ -0,0 +1,260 @@ +"""Tests for the policy validation CLI command.""" + +import json +import tempfile +from pathlib import Path + +from tealtiger.cli.validate import ( + VALID_ACTIONS, + _load_policy, + _validate_policy, +) + + +class TestLoadPolicy: + """Tests for policy file loading.""" + + def test_load_valid_json(self): + """Test loading a valid JSON policy file.""" + policy = {"name": "test", "description": "desc", "rules": []} + with tempfile.NamedTemporaryFile( + suffix=".json", mode="w", delete=False + ) as f: + json.dump(policy, f) + f.flush() + path = f.name + + try: + result, error = _load_policy(path) + assert error is None + assert result == policy + finally: + Path(path).unlink() + + def test_load_invalid_json(self): + """Test loading an invalid JSON file returns an error.""" + with tempfile.NamedTemporaryFile( + suffix=".json", mode="w", delete=False + ) as f: + f.write("{invalid json}") + f.flush() + path = f.name + + try: + result, error = _load_policy(path) + assert result is None + assert error is not None + assert "parse error" in (error or "").lower() + finally: + Path(path).unlink() + + def test_load_missing_file(self): + """Test loading a non-existent file returns an error.""" + result, error = _load_policy("/nonexistent/path/policy.json") + assert result is None + assert "not found" in error.lower() + + def test_load_yaml_file(self): + """Test loading a YAML policy file.""" + try: + import yaml # noqa: F811 + except ImportError: + return # Skip if PyYAML not installed + + policy = {"name": "test", "description": "desc", "rules": []} + with tempfile.NamedTemporaryFile( + suffix=".yaml", mode="w", delete=False + ) as f: + yaml.dump(policy, f) + f.flush() + path = f.name + + try: + result, error = _load_policy(path) + assert error is None + assert result["name"] == "test" + finally: + Path(path).unlink() + + +class TestValidatePolicy: + """Tests for policy validation logic.""" + + def test_valid_policy(self): + """Test a fully valid policy passes validation.""" + policy = { + "name": "production-allowlist", + "description": "Allow all tools in production", + "rules": [ + { + "condition": {"tool_name": "file-read"}, + "action": "allow", + "reason": "Reads are allowed", + } + ], + } + errors = _validate_policy(policy, "policy.json") + assert errors == [] + + def test_valid_policy_multiple_rules(self): + """Test a policy with multiple rules passes validation.""" + policy = { + "name": "multi-rule", + "description": "Multiple rules", + "rules": [ + { + "condition": {"tool_name": "file-read"}, + "action": "allow", + "reason": "Reads allowed", + }, + { + "condition": {"tool_name": "file-write"}, + "action": "deny", + "reason": "Writes denied", + }, + ], + } + errors = _validate_policy(policy, "policy.json") + assert errors == [] + + def test_missing_name(self): + """Test missing required 'name' field.""" + policy = {"description": "no name", "rules": []} + errors = _validate_policy(policy, "policy.json") + assert any("name" in e for e in errors) + + def test_missing_description(self): + """Test missing required 'description' field.""" + policy = {"name": "test", "rules": []} + errors = _validate_policy(policy, "policy.json") + assert any("description" in e for e in errors) + + def test_missing_rules(self): + """Test missing required 'rules' field.""" + policy = {"name": "test", "description": "desc"} + errors = _validate_policy(policy, "policy.json") + assert any("rules" in e for e in errors) + + def test_unknown_action(self): + """Test invalid action type in rule.""" + policy = { + "name": "test", + "description": "desc", + "rules": [ + { + "condition": {"tool_name": "test"}, + "action": "alllow", + "reason": "typo in action", + } + ], + } + errors = _validate_policy(policy, "policy.json") + assert any("alllow" in e for e in errors) + assert any("allow" in e for e in errors) # suggestion + + def test_missing_required_field_in_rule(self): + """Test rule missing required 'condition' field.""" + policy = { + "name": "test", + "description": "desc", + "rules": [ + {"action": "allow", "reason": "no condition"} + ], + } + errors = _validate_policy(policy, "policy.json") + assert any("condition" in e for e in errors) + + def test_unknown_rule_field(self): + """Test rule with unknown field.""" + policy = { + "name": "test", + "description": "desc", + "rules": [ + { + "condition": {"tool_name": "test"}, + "action": "allow", + "reason": "test", + "priority": 1, + } + ], + } + errors = _validate_policy(policy, "policy.json") + assert any("priority" in e for e in errors) + + def test_empty_rules_list(self): + """Test empty rules list.""" + policy = {"name": "test", "description": "desc", "rules": []} + errors = _validate_policy(policy, "policy.json") + assert any("empty" in e.lower() for e in errors) + + def test_rules_not_a_list(self): + """Test rules field is not a list.""" + policy = {"name": "test", "description": "desc", "rules": "not a list"} + errors = _validate_policy(policy, "policy.json") + assert any("list" in e.lower() for e in errors) + + def test_policy_not_a_dict(self): + """Test policy that is not a dict.""" + errors = _validate_policy("just a string", "policy.json") + assert any("must be" in e for e in errors) + + def test_rule_not_a_dict(self): + """Test a rule that is not a dict.""" + policy = { + "name": "test", + "description": "desc", + "rules": ["not a dict"], + } + errors = _validate_policy(policy, "policy.json") + assert any("Rule 1" in e for e in errors) + + def test_valid_actions_set(self): + """Test that VALID_ACTIONS contains expected values.""" + assert VALID_ACTIONS == {"allow", "deny", "transform"} + + def test_missing_action_in_rule(self): + """Test rule missing 'action' field.""" + policy = { + "name": "test", + "description": "desc", + "rules": [ + {"condition": {"tool_name": "test"}, "reason": "no action"} + ], + } + errors = _validate_policy(policy, "policy.json") + assert any("action" in e for e in errors) + + def test_missing_reason_in_rule(self): + """Test rule missing 'reason' field.""" + policy = { + "name": "test", + "description": "desc", + "rules": [ + {"condition": {"tool_name": "test"}, "action": "allow"} + ], + } + errors = _validate_policy(policy, "policy.json") + assert any("reason" in e for e in errors) + + def test_condition_not_dict(self): + """Test rule with condition that is not a dict.""" + policy = { + "name": "test", + "description": "desc", + "rules": [ + {"condition": "not a dict", "action": "allow", "reason": "test"} + ], + } + errors = _validate_policy(policy, "policy.json") + assert any("condition" in e for e in errors) + + def test_multiple_errors(self): + """Test that multiple errors are reported.""" + policy = { + "name": "", + "rules": [ + {"action": "invalid", "reason": ""}, + ], + } + errors = _validate_policy(policy, "policy.json") + assert len(errors) >= 3 # missing description, empty name, invalid action, missing condition, missing reason From 50ef0f07ed95bf3298497d4307abe8e9f405bc24 Mon Sep 17 00:00:00 2001 From: blut-agent Date: Tue, 23 Jun 2026 09:40:42 -0500 Subject: [PATCH 2/2] feat(audit): add to_json() dashboard export method - Add TealAudit.to_json() for simple JSON array output - Outputs flat decision objects with fields: decision_id, timestamp, agent_id, action, tool_name, reason_codes, risk_score, evaluation_time_ms - Maps DecisionAction enum to lowercase strings (allow/deny/monitor) - Converts risk_score to int, handles None values - Empty array when no events stored - Add 9 unit tests covering all edge cases --- src/tealtiger/core/audit/teal_audit.py | 62 ++++++++ tests/core/audit/test_to_json.py | 191 +++++++++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 tests/core/audit/test_to_json.py diff --git a/src/tealtiger/core/audit/teal_audit.py b/src/tealtiger/core/audit/teal_audit.py index 020b0d0..846a2c1 100644 --- a/src/tealtiger/core/audit/teal_audit.py +++ b/src/tealtiger/core/audit/teal_audit.py @@ -473,6 +473,68 @@ def export( else: raise ValueError(f"TealAudit: Unsupported export format: {format}") + def to_json(self) -> str: + """Export audit events as a simple JSON array for dashboards. + + Outputs a flat JSON array of decision objects with the following fields: + - decision_id: correlation_id + - timestamp: ISO 8601 timestamp + - agent_id: agent identifier + - action: allow/deny/monitor + - tool_name: event_type (used as tool_name for dashboard compatibility) + - reason_codes: list of reason code strings + - risk_score: numeric risk score + - evaluation_time_ms: duration in milliseconds + + Returns: + JSON string representing the array of decisions. + """ + events = self.query() + decisions = [] + + for event in events: + # Map action to dashboard-friendly format + action = "monitor" + if event.action: + action_str = event.action.value if hasattr(event.action, "value") else str(event.action) + if action_str == "ALLOW": + action = "allow" + elif action_str == "DENY": + action = "deny" + else: + action = action_str.lower() + + # Map reason_codes to string list + reason_codes = [] + if event.reason_codes: + for rc in event.reason_codes: + if isinstance(rc, str): + reason_codes.append(rc) + else: + reason_codes.append(rc.value if hasattr(rc, "value") else str(rc)) + + # Map risk_score to int if possible + risk_score = event.risk_score + if risk_score is not None: + risk_score = int(risk_score) + + # Map duration to evaluation_time_ms + evaluation_time_ms = event.duration + + decision = { + "decision_id": event.correlation_id, + "timestamp": event.timestamp, + "agent_id": event.agent_id, + "action": action, + "tool_name": event.event_type.value if hasattr(event.event_type, "value") else str(event.event_type), + "reason_codes": reason_codes, + "risk_score": risk_score, + "evaluation_time_ms": evaluation_time_ms, + } + decisions.append(decision) + + return json.dumps(decisions, indent=2, default=str) + def clear(self) -> None: """Clear all stored events""" self.events = [] diff --git a/tests/core/audit/test_to_json.py b/tests/core/audit/test_to_json.py new file mode 100644 index 0000000..dd30c56 --- /dev/null +++ b/tests/core/audit/test_to_json.py @@ -0,0 +1,191 @@ +"""Tests for TealAudit.to_json() dashboard export method.""" + +import json +from datetime import datetime + +import pytest + +from tealtiger.core.audit import ( + AuditEvent, + AuditEventType, + ConsoleOutput, + TealAudit, +) +from tealtiger.core.engine.types import DecisionAction, ReasonCode + + +class TestTealAuditToJson: + """Tests for the to_json() dashboard export method.""" + + def _create_audit(self): + """Create a TealAudit instance with a mock output.""" + return TealAudit( + outputs=[ConsoleOutput()], + max_events=1000, + enable_storage=True, + ) + + def _make_event(self, **kwargs): + """Helper to create an AuditEvent with defaults.""" + defaults = { + "schema_version": "1.0.0", + "event_type": AuditEventType.POLICY_EVALUATION, + "timestamp": datetime.utcnow().isoformat() + "Z", + "correlation_id": "test-correlation", + } + defaults.update(kwargs) + return AuditEvent(**defaults) + + def test_to_json_empty(self): + """Test to_json returns empty array when no events.""" + audit = self._create_audit() + result = json.loads(audit.to_json()) + assert result == [] + + def test_to_json_single_allow(self): + """Test to_json with a single ALLOW decision.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="d-4a8b2c1f", + action=DecisionAction.ALLOW, + agent_id="research-agent", + risk_score=0, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert len(result) == 1 + assert result[0]["decision_id"] == "d-4a8b2c1f" + assert result[0]["action"] == "allow" + assert result[0]["agent_id"] == "research-agent" + assert result[0]["risk_score"] == 0 + assert result[0]["reason_codes"] == [] + + def test_to_json_single_deny(self): + """Test to_json with a single DENY decision.""" + audit = self._create_audit() + event = self._make_event( + event_type=AuditEventType.TOOL_EXECUTION, + correlation_id="d-4a8b2c1f", + action=DecisionAction.DENY, + agent_id="research-agent", + risk_score=80, + reason_codes=[ReasonCode.TOOL_NOT_ALLOWED], + duration=2.1, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert len(result) == 1 + assert result[0]["decision_id"] == "d-4a8b2c1f" + assert result[0]["action"] == "deny" + assert result[0]["tool_name"] == "tool.execution" + assert result[0]["reason_codes"] == ["TOOL_NOT_ALLOWED"] + assert result[0]["risk_score"] == 80 + assert result[0]["evaluation_time_ms"] == 2.1 + + def test_to_json_multiple_decisions(self): + """Test to_json with multiple decisions.""" + audit = self._create_audit() + + for i in range(3): + event = self._make_event( + correlation_id=f"correlation-{i}", + action=DecisionAction.ALLOW if i % 2 == 0 else DecisionAction.DENY, + agent_id=f"agent-{i}", + risk_score=i * 30, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert len(result) == 3 + assert result[0]["action"] == "allow" + assert result[1]["action"] == "deny" + assert result[2]["action"] == "allow" + + def test_to_json_output_schema(self): + """Test that to_json output contains all required fields.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.ALLOW, + agent_id="test-agent", + timestamp="2026-06-16T14:30:00Z", + ) + audit.log(event) + + result = json.loads(audit.to_json()) + required_fields = [ + "decision_id", + "timestamp", + "agent_id", + "action", + "tool_name", + "reason_codes", + "risk_score", + "evaluation_time_ms", + ] + for field in required_fields: + assert field in result[0], f"Missing required field: {field}" + + def test_to_json_reason_codes_list(self): + """Test that reason_codes is always a list of strings.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.DENY, + reason_codes=[ + ReasonCode.PII_DETECTED, + ReasonCode.PROMPT_INJECTION_DETECTED, + ], + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert isinstance(result[0]["reason_codes"], list) + assert result[0]["reason_codes"] == ["PII_DETECTED", "PROMPT_INJECTION_DETECTED"] + + def test_to_json_risk_score_is_int(self): + """Test that risk_score is converted to int.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.ALLOW, + risk_score=42.7, # float should be converted to int + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert isinstance(result[0]["risk_score"], int) + assert result[0]["risk_score"] == 42 + + def test_to_json_risk_score_none(self): + """Test that None risk_score stays None.""" + audit = self._create_audit() + event = self._make_event( + correlation_id="test-id", + action=DecisionAction.ALLOW, + risk_score=None, + ) + audit.log(event) + + result = json.loads(audit.to_json()) + assert result[0]["risk_score"] is None + + def test_to_json_output_is_valid_json(self): + """Test that to_json always returns valid JSON.""" + audit = self._create_audit() + + # Log various event types + for event_type in AuditEventType: + event = self._make_event( + event_type=event_type, + correlation_id=f"test-{event_type.value}", + action=DecisionAction.ALLOW, + agent_id="test-agent", + ) + audit.log(event) + + # Should not raise and should be parseable + result = json.loads(audit.to_json()) + assert len(result) == len(AuditEventType)