-
Notifications
You must be signed in to change notification settings - Fork 62
CM-62984: Add Codex CLI support to ai-guardrails #436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
220829c
0a5b30d
c765e3b
52f18d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| """Codex CLI config.toml management for AI guardrails. | ||
|
|
||
| Codex requires `[features] codex_hooks = true` in its `config.toml` for hook | ||
| scripts to be invoked. This module merges that flag into the user-scope | ||
| (`~/.codex/config.toml`) or repo-scope (`<repo>/.codex/config.toml`) file while | ||
| preserving any existing keys. | ||
| """ | ||
|
|
||
| import sys | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| import tomli_w | ||
|
|
||
| if sys.version_info >= (3, 11): | ||
| import tomllib | ||
| else: # pragma: no cover - py<3.11 fallback | ||
| import tomli as tomllib | ||
|
|
||
| from cycode.logger import get_logger | ||
|
|
||
| logger = get_logger('AI Guardrails Codex Config') | ||
|
|
||
| CODEX_CONFIG_FILE_NAME = 'config.toml' | ||
| CODEX_CONFIG_DIR_NAME = '.codex' | ||
|
|
||
|
|
||
| def get_codex_config_path(scope: str, repo_path: Optional[Path] = None) -> Path: | ||
| """Get the Codex config.toml path for the given scope.""" | ||
| if scope == 'repo' and repo_path: | ||
| return repo_path / CODEX_CONFIG_DIR_NAME / CODEX_CONFIG_FILE_NAME | ||
| return Path.home() / CODEX_CONFIG_DIR_NAME / CODEX_CONFIG_FILE_NAME | ||
|
|
||
|
|
||
| def enable_codex_hooks_feature(scope: str = 'user', repo_path: Optional[Path] = None) -> tuple[bool, str]: | ||
| """Ensure `[features] codex_hooks = true` is set in Codex's config.toml. | ||
|
|
||
| Preserves existing keys. Creates the file (and parent dir) if missing. | ||
|
|
||
| Returns: | ||
| Tuple of (success, message). | ||
| """ | ||
| config_path = get_codex_config_path(scope, repo_path) | ||
|
|
||
| config: dict = {} | ||
| if config_path.exists(): | ||
| try: | ||
| with config_path.open('rb') as f: | ||
| config = tomllib.load(f) | ||
| except Exception as e: | ||
| logger.error('Failed to parse Codex config.toml', exc_info=e) | ||
| return False, f'Failed to parse existing Codex config at {config_path}' | ||
|
|
||
| features = config.get('features') | ||
| if not isinstance(features, dict): | ||
| features = {} | ||
| features['codex_hooks'] = True | ||
| config['features'] = features | ||
|
|
||
| try: | ||
| config_path.parent.mkdir(parents=True, exist_ok=True) | ||
| with config_path.open('wb') as f: | ||
| tomli_w.dump(config, f) | ||
| return True, f'Enabled codex_hooks in {config_path}' | ||
| except Exception as e: | ||
| logger.error('Failed to write Codex config.toml', exc_info=e) | ||
| return False, f'Failed to write Codex config at {config_path}' | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| Currently supports: | ||
| - Cursor | ||
| - Claude Code | ||
| - Codex | ||
| """ | ||
|
|
||
| import platform | ||
|
|
@@ -17,6 +18,7 @@ class AIIDEType(str, Enum): | |
|
|
||
| CURSOR = 'cursor' | ||
| CLAUDE_CODE = 'claude-code' | ||
| CODEX = 'codex' | ||
|
|
||
|
|
||
| class PolicyMode(str, Enum): | ||
|
|
@@ -61,6 +63,14 @@ def _get_claude_code_hooks_dir() -> Path: | |
| return Path.home() / '.claude' | ||
|
|
||
|
|
||
| def _get_codex_hooks_dir() -> Path: | ||
| """Get Codex hooks directory. | ||
|
|
||
| Codex uses ~/.codex on all platforms. | ||
| """ | ||
| return Path.home() / '.codex' | ||
|
|
||
|
|
||
| # IDE-specific configurations | ||
| IDE_CONFIGS: dict[AIIDEType, IDEConfig] = { | ||
| AIIDEType.CURSOR: IDEConfig( | ||
|
|
@@ -77,6 +87,13 @@ def _get_claude_code_hooks_dir() -> Path: | |
| hooks_file_name='settings.json', | ||
| hook_events=['UserPromptSubmit', 'PreToolUse:Read', 'PreToolUse:mcp'], | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add bash? |
||
| ), | ||
| AIIDEType.CODEX: IDEConfig( | ||
| name='Codex', | ||
| hooks_dir=_get_codex_hooks_dir(), | ||
| repo_hooks_subdir='.codex', | ||
| hooks_file_name='hooks.json', | ||
| hook_events=['UserPromptSubmit', 'PreToolUse:Bash'], | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why only bash? |
||
| ), | ||
| } | ||
|
|
||
| # Default IDE | ||
|
|
@@ -140,6 +157,48 @@ def _get_claude_code_hooks_config(async_mode: bool = False) -> dict: | |
| } | ||
|
|
||
|
|
||
| def _get_codex_hooks_config(async_mode: bool = False) -> dict: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks pretty much the same as claude, can we reduce code duplication here |
||
| """Get Codex-specific hooks configuration. | ||
|
|
||
| Codex uses the same nested hook-entry format as Claude Code: | ||
| - events are keyed by name | ||
| - each entry has an optional 'matcher' (regex on tool name / start source) | ||
| and a 'hooks' array with {type, command, ...} | ||
|
|
||
| Codex only supports intercepting Bash for PreToolUse today; MCP and file | ||
| reads are not exposed to hooks. | ||
| """ | ||
| command = f'{CYCODE_SCAN_PROMPT_COMMAND} --ide {AIIDEType.CODEX.value}' | ||
| session_start_command = f'{CYCODE_SESSION_START_COMMAND} --ide {AIIDEType.CODEX.value}' | ||
|
|
||
| hook_entry = {'type': 'command', 'command': command} | ||
| if async_mode: | ||
| hook_entry['async'] = True | ||
| hook_entry['timeout'] = 20 | ||
|
|
||
| return { | ||
| 'hooks': { | ||
| 'SessionStart': [ | ||
| { | ||
| 'matcher': 'startup', | ||
| 'hooks': [{'type': 'command', 'command': session_start_command}], | ||
| } | ||
| ], | ||
| 'UserPromptSubmit': [ | ||
| { | ||
| 'hooks': [deepcopy(hook_entry)], | ||
| } | ||
| ], | ||
| 'PreToolUse': [ | ||
| { | ||
| 'matcher': 'Bash', | ||
| 'hooks': [deepcopy(hook_entry)], | ||
| }, | ||
| ], | ||
| }, | ||
| } | ||
|
|
||
|
|
||
| def get_hooks_config(ide: AIIDEType, async_mode: bool = False) -> dict: | ||
| """Get the hooks configuration for a specific IDE. | ||
|
|
||
|
|
@@ -152,4 +211,6 @@ def get_hooks_config(ide: AIIDEType, async_mode: bool = False) -> dict: | |
| """ | ||
| if ide == AIIDEType.CLAUDE_CODE: | ||
| return _get_claude_code_hooks_config(async_mode=async_mode) | ||
| if ide == AIIDEType.CODEX: | ||
| return _get_codex_hooks_config(async_mode=async_mode) | ||
| return _get_cursor_hooks_config(async_mode=async_mode) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -256,6 +256,72 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli | |
| ) | ||
|
|
||
|
|
||
| def handle_before_command_exec(ctx: typer.Context, payload: AIHookPayload, policy: dict) -> dict: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pretty similar to other handlers like mcp, if we could reduce duplicate code it would be nice |
||
| """ | ||
| Handle PreToolUse:Bash (CommandExec) hook. | ||
|
|
||
| Scans the shell command the agent is about to run for secrets before | ||
| execution. Returns deny_permission to block, ask_permission to warn, | ||
| allow_permission to allow. | ||
| """ | ||
| ai_client = ctx.obj['ai_security_client'] | ||
| ide = payload.ide_provider | ||
| response_builder = get_response_builder(ide) | ||
|
|
||
| command_config = get_policy_value(policy, 'command_exec', default={}) | ||
| if not get_policy_value(command_config, 'enabled', default=True): | ||
| ai_client.create_event(payload, AiHookEventType.COMMAND_EXEC, AIHookOutcome.ALLOWED) | ||
| return response_builder.allow_permission() | ||
|
|
||
| mode = get_policy_value(policy, 'mode', default=PolicyMode.BLOCK) | ||
| command = payload.command or '' | ||
| max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) | ||
| timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) | ||
| clipped = truncate_utf8(command, max_bytes) | ||
| action = get_policy_value(command_config, 'action', default=PolicyMode.BLOCK) | ||
|
|
||
| scan_id = None | ||
| block_reason = None | ||
| outcome = AIHookOutcome.ALLOWED | ||
| error_message = None | ||
|
|
||
| try: | ||
| if get_policy_value(command_config, 'scan_arguments', default=True): | ||
| violation_summary, scan_id = _scan_text_for_secrets(ctx, clipped, timeout_ms) | ||
| if violation_summary: | ||
| block_reason = BlockReason.SECRETS_IN_COMMAND | ||
| if mode == PolicyMode.BLOCK and action == PolicyMode.BLOCK: | ||
| outcome = AIHookOutcome.BLOCKED | ||
| user_message = f'Cycode blocked shell command execution. {violation_summary}' | ||
| return response_builder.deny_permission( | ||
| user_message, | ||
| 'Do not embed secrets in shell commands. Use environment variables or secret references.', | ||
| ) | ||
| outcome = AIHookOutcome.WARNED | ||
| return response_builder.ask_permission( | ||
| f'{violation_summary} in shell command. Allow execution?', | ||
| 'Possible secrets detected in command; proceed with caution.', | ||
| ) | ||
|
|
||
| return response_builder.allow_permission() | ||
| except Exception as e: | ||
| outcome = ( | ||
| AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED | ||
| ) | ||
| block_reason = BlockReason.SCAN_FAILURE | ||
| error_message = str(e) | ||
| raise e | ||
| finally: | ||
| ai_client.create_event( | ||
| payload, | ||
| AiHookEventType.COMMAND_EXEC, | ||
| outcome, | ||
| scan_id=scan_id, | ||
| block_reason=block_reason, | ||
| error_message=error_message, | ||
| ) | ||
|
|
||
|
|
||
| def get_handler_for_event(event_type: str) -> Optional[Callable[[typer.Context, AIHookPayload, dict], dict]]: | ||
| """Get the appropriate handler function for a canonical event type. | ||
|
|
||
|
|
@@ -269,6 +335,7 @@ def get_handler_for_event(event_type: str) -> Optional[Callable[[typer.Context, | |
| AiHookEventType.PROMPT.value: handle_before_submit_prompt, | ||
| AiHookEventType.FILE_READ.value: handle_before_read_file, | ||
| AiHookEventType.MCP_EXECUTION.value: handle_before_mcp_execution, | ||
| AiHookEventType.COMMAND_EXEC.value: handle_before_command_exec, | ||
| } | ||
| return handlers.get(event_type) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,8 @@ | |
| from cycode.cli.apps.ai_guardrails.scan.types import ( | ||
| CLAUDE_CODE_EVENT_MAPPING, | ||
| CLAUDE_CODE_EVENT_NAMES, | ||
| CODEX_EVENT_MAPPING, | ||
| CODEX_EVENT_NAMES, | ||
| CURSOR_EVENT_MAPPING, | ||
| CURSOR_EVENT_NAMES, | ||
| AiHookEventType, | ||
|
|
@@ -141,6 +143,7 @@ class AIHookPayload: | |
| mcp_server_name: Optional[str] = None # For mcp_execution events | ||
| mcp_tool_name: Optional[str] = None # For mcp_execution events | ||
| mcp_arguments: Optional[dict] = None # For mcp_execution events | ||
| command: Optional[str] = None # For command_exec events (e.g., Codex PreToolUse:Bash) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not specific to codex |
||
|
|
||
| @classmethod | ||
| def from_cursor_payload(cls, payload: dict) -> 'AIHookPayload': | ||
|
|
@@ -229,6 +232,44 @@ def from_claude_code_payload(cls, payload: dict) -> 'AIHookPayload': | |
| mcp_arguments=mcp_arguments, | ||
| ) | ||
|
|
||
| @classmethod | ||
| def from_codex_payload(cls, payload: dict) -> 'AIHookPayload': | ||
| """Create AIHookPayload from Codex CLI payload. | ||
|
|
||
| Codex payload shape (per https://developers.openai.com/codex/hooks): | ||
| - hook_event_name: 'UserPromptSubmit' | 'PreToolUse' | 'PostToolUse' | 'SessionStart' | 'Stop' | ||
| - session_id, turn_id, cwd, model delivered at top level | ||
| - For UserPromptSubmit: prompt field | ||
| - For PreToolUse: tool_name (currently only 'Bash') and tool_input.command | ||
| """ | ||
| hook_event_name = payload.get('hook_event_name', '') | ||
| tool_name = payload.get('tool_name', '') | ||
| tool_input = payload.get('tool_input') | ||
|
|
||
| if hook_event_name == 'UserPromptSubmit': | ||
| canonical_event = AiHookEventType.PROMPT | ||
| elif hook_event_name == 'PreToolUse' and tool_name == 'Bash': | ||
| canonical_event = AiHookEventType.COMMAND_EXEC | ||
| else: | ||
| # Unknown or unsupported event combination; fall back to raw event name | ||
| canonical_event = CODEX_EVENT_MAPPING.get(hook_event_name, hook_event_name) | ||
|
Comment on lines
+253
to
+255
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about read file / mcp? |
||
|
|
||
| command = None | ||
| if tool_name == 'Bash' and isinstance(tool_input, dict): | ||
| command = tool_input.get('command') | ||
|
|
||
| return cls( | ||
| event_name=canonical_event, | ||
| conversation_id=payload.get('session_id'), | ||
| generation_id=payload.get('turn_id'), | ||
| ide_user_email=None, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a way to get it? |
||
| model=payload.get('model'), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we get this on every hook? |
||
| ide_provider=AIIDEType.CODEX.value, | ||
| ide_version=payload.get('codex_version'), | ||
| prompt=payload.get('prompt', ''), | ||
| command=command, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def is_payload_for_ide(payload: dict, ide: str) -> bool: | ||
| """Check if the payload's event name matches the expected IDE. | ||
|
|
@@ -250,6 +291,8 @@ def is_payload_for_ide(payload: dict, ide: str) -> bool: | |
| return hook_event_name in CLAUDE_CODE_EVENT_NAMES | ||
| if ide == AIIDEType.CURSOR: | ||
| return hook_event_name in CURSOR_EVENT_NAMES | ||
| if ide == AIIDEType.CODEX: | ||
| return hook_event_name in CODEX_EVENT_NAMES | ||
|
|
||
| # Unknown IDE, allow processing | ||
| return True | ||
|
|
@@ -272,4 +315,6 @@ def from_payload(cls, payload: dict, tool: str = AIIDEType.CURSOR.value) -> 'AIH | |
| return cls.from_cursor_payload(payload) | ||
| if tool == AIIDEType.CLAUDE_CODE: | ||
| return cls.from_claude_code_payload(payload) | ||
| if tool == AIIDEType.CODEX: | ||
| return cls.from_codex_payload(payload) | ||
| raise ValueError(f'Unsupported IDE/tool: {tool}') | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in codex you need two files to enable hooks? .codex/config.toml to enable it globally and then .codex/hooks.json to actually register it?