diff --git a/cycode/cli/apps/ai_guardrails/command_utils.py b/cycode/cli/apps/ai_guardrails/command_utils.py index e010f0a2..edc3104a 100644 --- a/cycode/cli/apps/ai_guardrails/command_utils.py +++ b/cycode/cli/apps/ai_guardrails/command_utils.py @@ -12,24 +12,26 @@ console = Console() -def validate_and_parse_ide(ide: str) -> AIIDEType: - """Validate IDE parameter and convert to AIIDEType enum. +def validate_and_parse_ide(ide: str) -> Optional[AIIDEType]: + """Validate IDE parameter, returning None for 'all'. Args: - ide: IDE name string (e.g., 'cursor') + ide: IDE name string (e.g., 'cursor', 'claude-code', 'all') Returns: - AIIDEType enum value + AIIDEType enum value, or None if 'all' was specified Raises: typer.Exit: If IDE is invalid """ + if ide.lower() == 'all': + return None try: return AIIDEType(ide.lower()) except ValueError: valid_ides = ', '.join([ide_type.value for ide_type in AIIDEType]) console.print( - f'[red]Error:[/] Invalid IDE "{ide}". Supported IDEs: {valid_ides}', + f'[red]Error:[/] Invalid IDE "{ide}". Supported IDEs: {valid_ides}, all', style='bold red', ) raise typer.Exit(1) from None diff --git a/cycode/cli/apps/ai_guardrails/consts.py b/cycode/cli/apps/ai_guardrails/consts.py index 21d89a3f..8714ec10 100644 --- a/cycode/cli/apps/ai_guardrails/consts.py +++ b/cycode/cli/apps/ai_guardrails/consts.py @@ -2,12 +2,7 @@ Currently supports: - Cursor - -To add a new IDE (e.g., Claude Code): -1. Add new value to AIIDEType enum -2. Create _get__hooks_dir() function with platform-specific paths -3. Add entry to IDE_CONFIGS dict with IDE-specific hook event names -4. Unhide --ide option in commands (install, uninstall, status) +- Claude Code """ import platform @@ -20,6 +15,14 @@ class AIIDEType(str, Enum): """Supported AI IDE types.""" CURSOR = 'cursor' + CLAUDE_CODE = 'claude-code' + + +class PolicyMode(str, Enum): + """Policy enforcement mode for global mode and per-feature actions.""" + + BLOCK = 'block' + WARN = 'warn' class IDEConfig(NamedTuple): @@ -42,6 +45,14 @@ def _get_cursor_hooks_dir() -> Path: return Path.home() / '.config' / 'Cursor' +def _get_claude_code_hooks_dir() -> Path: + """Get Claude Code hooks directory. + + Claude Code uses ~/.claude on all platforms. + """ + return Path.home() / '.claude' + + # IDE-specific configurations IDE_CONFIGS: dict[AIIDEType, IDEConfig] = { AIIDEType.CURSOR: IDEConfig( @@ -51,6 +62,13 @@ def _get_cursor_hooks_dir() -> Path: hooks_file_name='hooks.json', hook_events=['beforeSubmitPrompt', 'beforeReadFile', 'beforeMCPExecution'], ), + AIIDEType.CLAUDE_CODE: IDEConfig( + name='Claude Code', + hooks_dir=_get_claude_code_hooks_dir(), + repo_hooks_subdir='.claude', + hooks_file_name='settings.json', + hook_events=['UserPromptSubmit', 'PreToolUse:Read', 'PreToolUse:mcp'], + ), } # Default IDE @@ -60,6 +78,47 @@ def _get_cursor_hooks_dir() -> Path: CYCODE_SCAN_PROMPT_COMMAND = 'cycode ai-guardrails scan' +def _get_cursor_hooks_config() -> dict: + """Get Cursor-specific hooks configuration.""" + config = IDE_CONFIGS[AIIDEType.CURSOR] + hooks = {event: [{'command': CYCODE_SCAN_PROMPT_COMMAND}] for event in config.hook_events} + + return { + 'version': 1, + 'hooks': hooks, + } + + +def _get_claude_code_hooks_config() -> dict: + """Get Claude Code-specific hooks configuration. + + Claude Code uses a different hook format with nested structure: + - hooks are arrays of objects with 'hooks' containing command arrays + - PreToolUse uses 'matcher' field to specify which tools to intercept + """ + command = f'{CYCODE_SCAN_PROMPT_COMMAND} --ide claude-code' + + return { + 'hooks': { + 'UserPromptSubmit': [ + { + 'hooks': [{'type': 'command', 'command': command}], + } + ], + 'PreToolUse': [ + { + 'matcher': 'Read', + 'hooks': [{'type': 'command', 'command': command}], + }, + { + 'matcher': 'mcp__.*', + 'hooks': [{'type': 'command', 'command': command}], + }, + ], + }, + } + + def get_hooks_config(ide: AIIDEType) -> dict: """Get the hooks configuration for a specific IDE. @@ -69,10 +128,6 @@ def get_hooks_config(ide: AIIDEType) -> dict: Returns: Dict with hooks configuration for the specified IDE """ - config = IDE_CONFIGS[ide] - hooks = {event: [{'command': CYCODE_SCAN_PROMPT_COMMAND}] for event in config.hook_events} - - return { - 'version': 1, - 'hooks': hooks, - } + if ide == AIIDEType.CLAUDE_CODE: + return _get_claude_code_hooks_config() + return _get_cursor_hooks_config() diff --git a/cycode/cli/apps/ai_guardrails/hooks_manager.py b/cycode/cli/apps/ai_guardrails/hooks_manager.py index 42f879f6..b8d43c43 100644 --- a/cycode/cli/apps/ai_guardrails/hooks_manager.py +++ b/cycode/cli/apps/ai_guardrails/hooks_manager.py @@ -59,9 +59,27 @@ def save_hooks_file(hooks_path: Path, hooks_config: dict) -> bool: def is_cycode_hook_entry(entry: dict) -> bool: - """Check if a hook entry is from cycode-cli.""" + """Check if a hook entry is from cycode-cli. + + Handles both Cursor format (flat) and Claude Code format (nested). + + Cursor format: {"command": "cycode ai-guardrails scan"} + Claude Code format: {"hooks": [{"type": "command", "command": "cycode ai-guardrails scan --ide claude-code"}]} + """ + # Check Cursor format (flat command) command = entry.get('command', '') - return CYCODE_SCAN_PROMPT_COMMAND in command + if CYCODE_SCAN_PROMPT_COMMAND in command: + return True + + # Check Claude Code format (nested hooks array) + hooks = entry.get('hooks', []) + for hook in hooks: + if isinstance(hook, dict): + hook_command = hook.get('command', '') + if CYCODE_SCAN_PROMPT_COMMAND in hook_command: + return True + + return False def install_hooks( @@ -185,7 +203,15 @@ def get_hooks_status(scope: str = 'user', repo_path: Optional[Path] = None, ide: ide_config = IDE_CONFIGS[ide] has_cycode_hooks = False for event in ide_config.hook_events: - entries = existing.get('hooks', {}).get(event, []) + # Handle event:matcher format + if ':' in event: + actual_event, matcher_prefix = event.split(':', 1) + all_entries = existing.get('hooks', {}).get(actual_event, []) + # Filter entries by matcher + entries = [e for e in all_entries if e.get('matcher', '').startswith(matcher_prefix)] + else: + entries = existing.get('hooks', {}).get(event, []) + cycode_entries = [e for e in entries if is_cycode_hook_entry(e)] if cycode_entries: has_cycode_hooks = True diff --git a/cycode/cli/apps/ai_guardrails/install_command.py b/cycode/cli/apps/ai_guardrails/install_command.py index 6186752d..4b1095ab 100644 --- a/cycode/cli/apps/ai_guardrails/install_command.py +++ b/cycode/cli/apps/ai_guardrails/install_command.py @@ -11,7 +11,7 @@ validate_and_parse_ide, validate_scope, ) -from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS +from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS, AIIDEType from cycode.cli.apps.ai_guardrails.hooks_manager import install_hooks from cycode.cli.utils.sentry import add_breadcrumb @@ -30,9 +30,9 @@ def install_command( str, typer.Option( '--ide', - help='IDE to install hooks for (e.g., "cursor"). Defaults to cursor.', + help='IDE to install hooks for (e.g., "cursor", "claude-code", or "all" for all IDEs). Defaults to cursor.', ), - ] = 'cursor', + ] = AIIDEType.CURSOR, repo_path: Annotated[ Optional[Path], typer.Option( @@ -54,6 +54,7 @@ def install_command( cycode ai-guardrails install # Install for all projects (user scope) cycode ai-guardrails install --scope repo # Install for current repo only cycode ai-guardrails install --ide cursor # Install for Cursor IDE + cycode ai-guardrails install --ide all # Install for all supported IDEs cycode ai-guardrails install --scope repo --repo-path /path/to/repo """ add_breadcrumb('ai-guardrails-install') @@ -62,17 +63,35 @@ def install_command( validate_scope(scope) repo_path = resolve_repo_path(scope, repo_path) ide_type = validate_and_parse_ide(ide) - ide_name = IDE_CONFIGS[ide_type].name - success, message = install_hooks(scope, repo_path, ide=ide_type) - if success: - console.print(f'[green]✓[/] {message}') + ides_to_install: list[AIIDEType] = list(AIIDEType) if ide_type is None else [ide_type] + + results: list[tuple[str, bool, str]] = [] + for current_ide in ides_to_install: + ide_name = IDE_CONFIGS[current_ide].name + success, message = install_hooks(scope, repo_path, ide=current_ide) + results.append((ide_name, success, message)) + + # Report results for each IDE + any_success = False + all_success = True + for _ide_name, success, message in results: + if success: + console.print(f'[green]✓[/] {message}') + any_success = True + else: + console.print(f'[red]✗[/] {message}', style='bold red') + all_success = False + + if any_success: console.print() console.print('[bold]Next steps:[/]') - console.print(f'1. Restart {ide_name} to activate the hooks') + successful_ides = [name for name, success, _ in results if success] + ide_list = ', '.join(successful_ides) + console.print(f'1. Restart {ide_list} to activate the hooks') console.print('2. (Optional) Customize policy in ~/.cycode/ai-guardrails.yaml') console.print() console.print('[dim]The hooks will scan prompts, file reads, and MCP tool calls for secrets.[/]') - else: - console.print(f'[red]✗[/] {message}', style='bold red') + + if not all_success: raise typer.Exit(1) diff --git a/cycode/cli/apps/ai_guardrails/scan/handlers.py b/cycode/cli/apps/ai_guardrails/scan/handlers.py index 95e9d606..32be1241 100644 --- a/cycode/cli/apps/ai_guardrails/scan/handlers.py +++ b/cycode/cli/apps/ai_guardrails/scan/handlers.py @@ -13,6 +13,7 @@ import typer +from cycode.cli.apps.ai_guardrails.consts import PolicyMode from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload from cycode.cli.apps.ai_guardrails.scan.policy import get_policy_value from cycode.cli.apps.ai_guardrails.scan.response_builders import get_response_builder @@ -46,7 +47,7 @@ def handle_before_submit_prompt(ctx: typer.Context, payload: AIHookPayload, poli ai_client.create_event(payload, AiHookEventType.PROMPT, AIHookOutcome.ALLOWED) return response_builder.allow_prompt() - mode = get_policy_value(policy, 'mode', default='block') + mode = get_policy_value(policy, 'mode', default=PolicyMode.BLOCK) prompt = payload.prompt or '' max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) @@ -55,29 +56,26 @@ def handle_before_submit_prompt(ctx: typer.Context, payload: AIHookPayload, poli scan_id = None block_reason = None outcome = AIHookOutcome.ALLOWED + error_message = None try: violation_summary, scan_id = _scan_text_for_secrets(ctx, clipped, timeout_ms) - if ( - violation_summary - and get_policy_value(prompt_config, 'action', default='block') == 'block' - and mode == 'block' - ): - outcome = AIHookOutcome.BLOCKED + if violation_summary: block_reason = BlockReason.SECRETS_IN_PROMPT - user_message = f'{violation_summary}. Remove secrets before sending.' - response = response_builder.deny_prompt(user_message) - else: - if violation_summary: - outcome = AIHookOutcome.WARNED - response = response_builder.allow_prompt() - return response + action = get_policy_value(prompt_config, 'action', default=PolicyMode.BLOCK) + if action == PolicyMode.BLOCK and mode == PolicyMode.BLOCK: + outcome = AIHookOutcome.BLOCKED + user_message = f'{violation_summary}. Remove secrets before sending.' + return response_builder.deny_prompt(user_message) + outcome = AIHookOutcome.WARNED + return response_builder.allow_prompt() except Exception as e: outcome = ( AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED ) - block_reason = BlockReason.SCAN_FAILURE if outcome == AIHookOutcome.BLOCKED else None + block_reason = BlockReason.SCAN_FAILURE + error_message = str(e) raise e finally: ai_client.create_event( @@ -86,6 +84,7 @@ def handle_before_submit_prompt(ctx: typer.Context, payload: AIHookPayload, poli outcome, scan_id=scan_id, block_reason=block_reason, + error_message=error_message, ) @@ -106,38 +105,53 @@ def handle_before_read_file(ctx: typer.Context, payload: AIHookPayload, policy: ai_client.create_event(payload, AiHookEventType.FILE_READ, AIHookOutcome.ALLOWED) return response_builder.allow_permission() - mode = get_policy_value(policy, 'mode', default='block') + mode = get_policy_value(policy, 'mode', default=PolicyMode.BLOCK) file_path = payload.file_path or '' - action = get_policy_value(file_read_config, 'action', default='block') + action = get_policy_value(file_read_config, 'action', default=PolicyMode.BLOCK) scan_id = None block_reason = None outcome = AIHookOutcome.ALLOWED + error_message = None try: # Check path-based denylist first - if is_denied_path(file_path, policy) and action == 'block': - outcome = AIHookOutcome.BLOCKED + if is_denied_path(file_path, policy): block_reason = BlockReason.SENSITIVE_PATH - user_message = f'Cycode blocked sending {file_path} to the AI (sensitive path policy).' - return response_builder.deny_permission( + if mode == PolicyMode.BLOCK and action == PolicyMode.BLOCK: + outcome = AIHookOutcome.BLOCKED + user_message = f'Cycode blocked sending {file_path} to the AI (sensitive path policy).' + return response_builder.deny_permission( + user_message, + 'This file path is classified as sensitive; do not read/send it to the model.', + ) + # Warn mode - ask user for permission + outcome = AIHookOutcome.WARNED + user_message = f'Cycode flagged {file_path} as sensitive. Allow reading?' + return response_builder.ask_permission( user_message, - 'This file path is classified as sensitive; do not read/send it to the model.', + 'This file path is classified as sensitive; proceed with caution.', ) # Scan file content if enabled if get_policy_value(file_read_config, 'scan_content', default=True): violation_summary, scan_id = _scan_path_for_secrets(ctx, file_path, policy) - if violation_summary and action == 'block' and mode == 'block': - outcome = AIHookOutcome.BLOCKED + if violation_summary: block_reason = BlockReason.SECRETS_IN_FILE - user_message = f'Cycode blocked reading {file_path}. {violation_summary}' - return response_builder.deny_permission( + if mode == PolicyMode.BLOCK and action == PolicyMode.BLOCK: + outcome = AIHookOutcome.BLOCKED + user_message = f'Cycode blocked reading {file_path}. {violation_summary}' + return response_builder.deny_permission( + user_message, + 'Secrets detected; do not send this file to the model.', + ) + # Warn mode - ask user for permission + outcome = AIHookOutcome.WARNED + user_message = f'Cycode detected secrets in {file_path}. {violation_summary}' + return response_builder.ask_permission( user_message, - 'Secrets detected; do not send this file to the model.', + 'Possible secrets detected; proceed with caution.', ) - if violation_summary: - outcome = AIHookOutcome.WARNED return response_builder.allow_permission() return response_builder.allow_permission() @@ -145,7 +159,8 @@ def handle_before_read_file(ctx: typer.Context, payload: AIHookPayload, policy: outcome = ( AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED ) - block_reason = BlockReason.SCAN_FAILURE if outcome == AIHookOutcome.BLOCKED else None + block_reason = BlockReason.SCAN_FAILURE + error_message = str(e) raise e finally: ai_client.create_event( @@ -154,6 +169,7 @@ def handle_before_read_file(ctx: typer.Context, payload: AIHookPayload, policy: outcome, scan_id=scan_id, block_reason=block_reason, + error_message=error_message, ) @@ -175,26 +191,27 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli ai_client.create_event(payload, AiHookEventType.MCP_EXECUTION, AIHookOutcome.ALLOWED) return response_builder.allow_permission() - mode = get_policy_value(policy, 'mode', default='block') + mode = get_policy_value(policy, 'mode', default=PolicyMode.BLOCK) tool = payload.mcp_tool_name or 'unknown' args = payload.mcp_arguments or {} args_text = args if isinstance(args, str) else json.dumps(args) max_bytes = get_policy_value(policy, 'secrets', 'max_bytes', default=200000) timeout_ms = get_policy_value(policy, 'secrets', 'timeout_ms', default=30000) clipped = truncate_utf8(args_text, max_bytes) - action = get_policy_value(mcp_config, 'action', default='block') + action = get_policy_value(mcp_config, 'action', default=PolicyMode.BLOCK) scan_id = None block_reason = None outcome = AIHookOutcome.ALLOWED + error_message = None try: if get_policy_value(mcp_config, 'scan_arguments', default=True): violation_summary, scan_id = _scan_text_for_secrets(ctx, clipped, timeout_ms) if violation_summary: - if mode == 'block' and action == 'block': + block_reason = BlockReason.SECRETS_IN_MCP_ARGS + if mode == PolicyMode.BLOCK and action == PolicyMode.BLOCK: outcome = AIHookOutcome.BLOCKED - block_reason = BlockReason.SECRETS_IN_MCP_ARGS user_message = f'Cycode blocked MCP tool call "{tool}". {violation_summary}' return response_builder.deny_permission( user_message, @@ -211,7 +228,8 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli outcome = ( AIHookOutcome.ALLOWED if get_policy_value(policy, 'fail_open', default=True) else AIHookOutcome.BLOCKED ) - block_reason = BlockReason.SCAN_FAILURE if outcome == AIHookOutcome.BLOCKED else None + block_reason = BlockReason.SCAN_FAILURE + error_message = str(e) raise e finally: ai_client.create_event( @@ -220,6 +238,7 @@ def handle_before_mcp_execution(ctx: typer.Context, payload: AIHookPayload, poli outcome, scan_id=scan_id, block_reason=block_reason, + error_message=error_message, ) diff --git a/cycode/cli/apps/ai_guardrails/scan/payload.py b/cycode/cli/apps/ai_guardrails/scan/payload.py index 83787348..ce72a574 100644 --- a/cycode/cli/apps/ai_guardrails/scan/payload.py +++ b/cycode/cli/apps/ai_guardrails/scan/payload.py @@ -1,9 +1,120 @@ """Unified payload object for AI hook events from different tools.""" +import json +from collections.abc import Iterator from dataclasses import dataclass +from pathlib import Path from typing import Optional -from cycode.cli.apps.ai_guardrails.scan.types import CURSOR_EVENT_MAPPING +from cycode.cli.apps.ai_guardrails.consts import AIIDEType +from cycode.cli.apps.ai_guardrails.scan.types import ( + CLAUDE_CODE_EVENT_MAPPING, + CLAUDE_CODE_EVENT_NAMES, + CURSOR_EVENT_MAPPING, + CURSOR_EVENT_NAMES, + AiHookEventType, +) + + +def _reverse_readline(path: Path, buf_size: int = 8192) -> Iterator[str]: + """Read a file line by line from the end without loading entire file into memory. + + Yields lines in reverse order (last line first). + """ + with path.open('rb') as f: + f.seek(0, 2) # Seek to end + file_size = f.tell() + if file_size == 0: + return + + remaining = file_size + buffer = b'' + + while remaining > 0: + # Read a chunk from the end + read_size = min(buf_size, remaining) + remaining -= read_size + f.seek(remaining) + chunk = f.read(read_size) + buffer = chunk + buffer + + # Yield complete lines from buffer + while b'\n' in buffer: + # Find the last newline + newline_pos = buffer.rfind(b'\n') + if newline_pos == len(buffer) - 1: + # Trailing newline, look for previous one + newline_pos = buffer.rfind(b'\n', 0, newline_pos) + if newline_pos == -1: + break + # Yield the line after this newline + line = buffer[newline_pos + 1 :] + buffer = buffer[: newline_pos + 1] + if line.strip(): + yield line.decode('utf-8', errors='replace') + + # Yield any remaining content as the first line of the file + if buffer.strip(): + yield buffer.decode('utf-8', errors='replace') + + +def _extract_model(entry: dict) -> Optional[str]: + """Extract model from a transcript entry (top level or nested in message).""" + return entry.get('model') or (entry.get('message') or {}).get('model') + + +def _extract_generation_id(entry: dict) -> Optional[str]: + """Extract generation ID from a user-type transcript entry.""" + if entry.get('type') == 'user': + return entry.get('uuid') + return None + + +def _extract_from_claude_transcript( + transcript_path: str, +) -> tuple[Optional[str], Optional[str], Optional[str]]: + """Extract IDE version, model, and latest generation ID from Claude Code transcript file. + + The transcript is a JSONL file where each line is a JSON object. + We look for 'version' (IDE version), 'model', and 'uuid' (generation ID) fields. + The generation_id is the UUID of the latest 'user' type message. + + Scans from end to start since latest entries are at the end. + Uses reverse reading to avoid loading entire file into memory. + + Returns: + Tuple of (ide_version, model, generation_id), any may be None if not found. + """ + if not transcript_path: + return None, None, None + + path = Path(transcript_path) + if not path.exists(): + return None, None, None + + ide_version = None + model = None + generation_id = None + + try: + for line in _reverse_readline(path): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + ide_version = ide_version or entry.get('version') + model = model or _extract_model(entry) + generation_id = generation_id or _extract_generation_id(entry) + + if ide_version and model and generation_id: + break + except json.JSONDecodeError: + continue + except OSError: + pass + + return ide_version, model, generation_id @dataclass @@ -18,7 +129,7 @@ class AIHookPayload: # User and IDE information ide_user_email: Optional[str] = None model: Optional[str] = None - ide_provider: str = None # e.g., 'cursor', 'claude-code' + ide_provider: str = None # AIIDEType value (e.g., 'cursor', 'claude-code') ide_version: Optional[str] = None # Event-specific data @@ -44,7 +155,7 @@ def from_cursor_payload(cls, payload: dict) -> 'AIHookPayload': generation_id=payload.get('generation_id'), ide_user_email=payload.get('user_email'), model=payload.get('model'), - ide_provider='cursor', + ide_provider=AIIDEType.CURSOR, ide_version=payload.get('cursor_version'), prompt=payload.get('prompt', ''), file_path=payload.get('file_path') or payload.get('path'), @@ -54,12 +165,95 @@ def from_cursor_payload(cls, payload: dict) -> 'AIHookPayload': ) @classmethod - def from_payload(cls, payload: dict, tool: str = 'cursor') -> 'AIHookPayload': + def from_claude_code_payload(cls, payload: dict) -> 'AIHookPayload': + """Create AIHookPayload from Claude Code IDE payload. + + Claude Code has a different structure: + - hook_event_name: 'UserPromptSubmit' or 'PreToolUse' + - For PreToolUse: tool_name determines if it's file read ('Read') or MCP ('mcp__*') + - tool_input contains tool arguments (e.g., file_path for Read tool) + - transcript_path points to JSONL file with version and model info + """ + hook_event_name = payload.get('hook_event_name', '') + tool_name = payload.get('tool_name', '') + tool_input = payload.get('tool_input') + + if hook_event_name == 'UserPromptSubmit': + canonical_event = AiHookEventType.PROMPT + elif hook_event_name == 'PreToolUse': + canonical_event = AiHookEventType.FILE_READ if tool_name == 'Read' else AiHookEventType.MCP_EXECUTION + else: + # Unknown event, use the raw event name + canonical_event = CLAUDE_CODE_EVENT_MAPPING.get(hook_event_name, hook_event_name) + + # Extract file_path from tool_input for Read tool + file_path = None + if tool_name == 'Read' and isinstance(tool_input, dict): + file_path = tool_input.get('file_path') + + # For MCP tools, the entire tool_input is the arguments + mcp_arguments = tool_input if tool_name.startswith('mcp__') else None + + # Extract MCP server and tool name from tool_name (format: mcp____) + mcp_server_name = None + mcp_tool_name = None + if tool_name.startswith('mcp__'): + parts = tool_name.split('__') + if len(parts) >= 2: + mcp_server_name = parts[1] + if len(parts) >= 3: + mcp_tool_name = parts[2] + + # Extract IDE version, model, and generation ID from transcript file + ide_version, model, generation_id = _extract_from_claude_transcript(payload.get('transcript_path')) + + return cls( + event_name=canonical_event, + conversation_id=payload.get('session_id'), + generation_id=generation_id, + ide_user_email=None, # Claude Code doesn't provide this in hook payload + model=model, + ide_provider=AIIDEType.CLAUDE_CODE, + ide_version=ide_version, + prompt=payload.get('prompt', ''), + file_path=file_path, + mcp_server_name=mcp_server_name, + mcp_tool_name=mcp_tool_name, + mcp_arguments=mcp_arguments, + ) + + @staticmethod + def is_payload_for_ide(payload: dict, ide: str) -> bool: + """Check if the payload's event name matches the expected IDE. + + This prevents double-processing when Cursor reads Claude Code hooks + or vice versa. If the payload's hook_event_name doesn't match the + expected IDE's event names, we should skip processing. + + Args: + payload: The raw payload from the IDE + ide: The IDE name or AIIDEType enum value + + Returns: + True if the payload matches the IDE, False otherwise. + """ + hook_event_name = payload.get('hook_event_name', '') + + if ide == AIIDEType.CLAUDE_CODE: + return hook_event_name in CLAUDE_CODE_EVENT_NAMES + if ide == AIIDEType.CURSOR: + return hook_event_name in CURSOR_EVENT_NAMES + + # Unknown IDE, allow processing + return True + + @classmethod + def from_payload(cls, payload: dict, tool: str = AIIDEType.CURSOR) -> 'AIHookPayload': """Create AIHookPayload from any tool's payload. Args: payload: The raw payload from the IDE - tool: The IDE/tool name (e.g., 'cursor') + tool: The IDE/tool name or AIIDEType enum value Returns: AIHookPayload instance @@ -67,6 +261,8 @@ def from_payload(cls, payload: dict, tool: str = 'cursor') -> 'AIHookPayload': Raises: ValueError: If the tool is not supported """ - if tool == 'cursor': + if tool == AIIDEType.CURSOR: return cls.from_cursor_payload(payload) - raise ValueError(f'Unsupported IDE/tool: {tool}.') + if tool == AIIDEType.CLAUDE_CODE: + return cls.from_claude_code_payload(payload) + raise ValueError(f'Unsupported IDE/tool: {tool}') diff --git a/cycode/cli/apps/ai_guardrails/scan/response_builders.py b/cycode/cli/apps/ai_guardrails/scan/response_builders.py index 867965c3..f0da71b7 100644 --- a/cycode/cli/apps/ai_guardrails/scan/response_builders.py +++ b/cycode/cli/apps/ai_guardrails/scan/response_builders.py @@ -7,6 +7,8 @@ from abc import ABC, abstractmethod +from cycode.cli.apps.ai_guardrails.consts import AIIDEType + class IDEResponseBuilder(ABC): """Abstract base class for IDE-specific response builders.""" @@ -62,17 +64,64 @@ def deny_prompt(self, user_message: str) -> dict: return {'continue': False, 'user_message': user_message} -# Registry of response builders by IDE name +class ClaudeCodeResponseBuilder(IDEResponseBuilder): + """Response builder for Claude Code IDE hooks. + + Claude Code hook response formats: + - UserPromptSubmit: {} for allow, {"decision": "block", "reason": str} for deny + - PreToolUse: hookSpecificOutput with permissionDecision (allow/deny/ask) + """ + + def allow_permission(self) -> dict: + """Allow file read or MCP execution.""" + return { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'allow', + } + } + + def deny_permission(self, user_message: str, agent_message: str) -> dict: + """Deny file read or MCP execution.""" + return { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'deny', + 'permissionDecisionReason': user_message, + } + } + + def ask_permission(self, user_message: str, agent_message: str) -> dict: + """Ask user for permission (warn mode).""" + return { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'ask', + 'permissionDecisionReason': user_message, + } + } + + def allow_prompt(self) -> dict: + """Allow prompt submission (empty response means allow).""" + return {} + + def deny_prompt(self, user_message: str) -> dict: + """Deny prompt submission.""" + return {'decision': 'block', 'reason': user_message} + + +# Registry of response builders by IDE type _RESPONSE_BUILDERS: dict[str, IDEResponseBuilder] = { - 'cursor': CursorResponseBuilder(), + AIIDEType.CURSOR: CursorResponseBuilder(), + AIIDEType.CLAUDE_CODE: ClaudeCodeResponseBuilder(), } -def get_response_builder(ide: str = 'cursor') -> IDEResponseBuilder: +def get_response_builder(ide: str = AIIDEType.CURSOR) -> IDEResponseBuilder: """Get the response builder for a specific IDE. Args: - ide: The IDE name (e.g., 'cursor', 'claude-code') + ide: The IDE name (e.g., 'cursor', 'claude-code') or AIIDEType enum Returns: IDEResponseBuilder instance for the specified IDE @@ -80,7 +129,10 @@ def get_response_builder(ide: str = 'cursor') -> IDEResponseBuilder: Raises: ValueError: If the IDE is not supported """ - builder = _RESPONSE_BUILDERS.get(ide.lower()) + # Normalize to AIIDEType if string passed + if isinstance(ide, str): + ide = ide.lower() + builder = _RESPONSE_BUILDERS.get(ide) if not builder: raise ValueError(f'Unsupported IDE: {ide}. Supported IDEs: {list(_RESPONSE_BUILDERS.keys())}') return builder diff --git a/cycode/cli/apps/ai_guardrails/scan/scan_command.py b/cycode/cli/apps/ai_guardrails/scan/scan_command.py index e08bb4de..73981831 100644 --- a/cycode/cli/apps/ai_guardrails/scan/scan_command.py +++ b/cycode/cli/apps/ai_guardrails/scan/scan_command.py @@ -16,6 +16,7 @@ import click import typer +from cycode.cli.apps.ai_guardrails.consts import AIIDEType from cycode.cli.apps.ai_guardrails.scan.handlers import get_handler_for_event from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload from cycode.cli.apps.ai_guardrails.scan.policy import load_policy @@ -69,7 +70,7 @@ def scan_command( help='IDE that sent the payload (e.g., "cursor"). Defaults to cursor.', hidden=True, ), - ] = 'cursor', + ] = AIIDEType.CURSOR, ) -> None: """Scan content from AI IDE hooks for secrets. @@ -96,6 +97,16 @@ def scan_command( output_json(response_builder.allow_prompt()) return + # Check if the payload matches the expected IDE - prevents double-processing + # when Cursor reads Claude Code hooks from ~/.claude/settings.json + if not AIHookPayload.is_payload_for_ide(payload, tool): + logger.debug( + 'Payload event does not match expected IDE, skipping', + extra={'hook_event_name': payload.get('hook_event_name'), 'expected_ide': tool}, + ) + output_json(response_builder.allow_prompt()) + return + unified_payload = AIHookPayload.from_payload(payload, tool=tool) event_name = unified_payload.event_name logger.debug('Processing AI guardrails hook', extra={'event_name': event_name, 'tool': tool}) diff --git a/cycode/cli/apps/ai_guardrails/scan/types.py b/cycode/cli/apps/ai_guardrails/scan/types.py index 095ca61b..585c7820 100644 --- a/cycode/cli/apps/ai_guardrails/scan/types.py +++ b/cycode/cli/apps/ai_guardrails/scan/types.py @@ -31,6 +31,17 @@ class AiHookEventType(StrEnum): 'beforeMCPExecution': AiHookEventType.MCP_EXECUTION, } +# Claude Code event mapping - note that PreToolUse requires tool_name inspection +# to determine the actual event type (file read vs MCP execution) +CLAUDE_CODE_EVENT_MAPPING = { + 'UserPromptSubmit': AiHookEventType.PROMPT, + 'PreToolUse': None, # Requires tool_name inspection to determine actual type +} + +# Set of known event names per IDE (for IDE detection) +CURSOR_EVENT_NAMES = set(CURSOR_EVENT_MAPPING.keys()) +CLAUDE_CODE_EVENT_NAMES = set(CLAUDE_CODE_EVENT_MAPPING.keys()) + class AIHookOutcome(StrEnum): """Outcome of an AI hook event evaluation.""" diff --git a/cycode/cli/apps/ai_guardrails/status_command.py b/cycode/cli/apps/ai_guardrails/status_command.py index 0a9801b5..14a31e7f 100644 --- a/cycode/cli/apps/ai_guardrails/status_command.py +++ b/cycode/cli/apps/ai_guardrails/status_command.py @@ -8,6 +8,7 @@ from rich.table import Table from cycode.cli.apps.ai_guardrails.command_utils import console, validate_and_parse_ide, validate_scope +from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS, AIIDEType from cycode.cli.apps.ai_guardrails.hooks_manager import get_hooks_status from cycode.cli.utils.sentry import add_breadcrumb @@ -26,9 +27,9 @@ def status_command( str, typer.Option( '--ide', - help='IDE to check status for (e.g., "cursor"). Defaults to cursor.', + help='IDE to check status for (e.g., "cursor", "claude-code", or "all" for all IDEs). Defaults to cursor.', ), - ] = 'cursor', + ] = AIIDEType.CURSOR, repo_path: Annotated[ Optional[Path], typer.Option( @@ -50,6 +51,7 @@ def status_command( cycode ai-guardrails status --scope user # Show only user-level status cycode ai-guardrails status --scope repo # Show only repo-level status cycode ai-guardrails status --ide cursor # Check status for Cursor IDE + cycode ai-guardrails status --ide all # Check status for all supported IDEs """ add_breadcrumb('ai-guardrails-status') @@ -59,34 +61,41 @@ def status_command( repo_path = Path(os.getcwd()) ide_type = validate_and_parse_ide(ide) - scopes_to_check = ['user', 'repo'] if scope == 'all' else [scope] + ides_to_check: list[AIIDEType] = list(AIIDEType) if ide_type is None else [ide_type] - for check_scope in scopes_to_check: - status = get_hooks_status(check_scope, repo_path if check_scope == 'repo' else None, ide=ide_type) + scopes_to_check = ['user', 'repo'] if scope == 'all' else [scope] + for current_ide in ides_to_check: + ide_name = IDE_CONFIGS[current_ide].name console.print() - console.print(f'[bold]{check_scope.upper()} SCOPE[/]') - console.print(f'Path: {status["hooks_path"]}') + console.print(f'[bold cyan]═══ {ide_name} ═══[/]') + + for check_scope in scopes_to_check: + status = get_hooks_status(check_scope, repo_path if check_scope == 'repo' else None, ide=current_ide) + + console.print() + console.print(f'[bold]{check_scope.upper()} SCOPE[/]') + console.print(f'Path: {status["hooks_path"]}') - if not status['file_exists']: - console.print('[dim]No hooks.json file found[/]') - continue + if not status['file_exists']: + console.print('[dim]No hooks file found[/]') + continue - if status['cycode_installed']: - console.print('[green]✓ Cycode AI guardrails: INSTALLED[/]') - else: - console.print('[yellow]○ Cycode AI guardrails: NOT INSTALLED[/]') + if status['cycode_installed']: + console.print('[green]✓ Cycode AI guardrails: INSTALLED[/]') + else: + console.print('[yellow]○ Cycode AI guardrails: NOT INSTALLED[/]') - # Show hook details - table = Table(show_header=True, header_style='bold') - table.add_column('Hook Event') - table.add_column('Cycode Enabled') - table.add_column('Total Hooks') + # Show hook details + table = Table(show_header=True, header_style='bold') + table.add_column('Hook Event') + table.add_column('Cycode Enabled') + table.add_column('Total Hooks') - for event, info in status['hooks'].items(): - enabled = '[green]Yes[/]' if info['enabled'] else '[dim]No[/]' - table.add_row(event, enabled, str(info['total_entries'])) + for event, info in status['hooks'].items(): + enabled = '[green]Yes[/]' if info['enabled'] else '[dim]No[/]' + table.add_row(event, enabled, str(info['total_entries'])) - console.print(table) + console.print(table) console.print() diff --git a/cycode/cli/apps/ai_guardrails/uninstall_command.py b/cycode/cli/apps/ai_guardrails/uninstall_command.py index 23315693..acf3d0c7 100644 --- a/cycode/cli/apps/ai_guardrails/uninstall_command.py +++ b/cycode/cli/apps/ai_guardrails/uninstall_command.py @@ -11,7 +11,7 @@ validate_and_parse_ide, validate_scope, ) -from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS +from cycode.cli.apps.ai_guardrails.consts import IDE_CONFIGS, AIIDEType from cycode.cli.apps.ai_guardrails.hooks_manager import uninstall_hooks from cycode.cli.utils.sentry import add_breadcrumb @@ -30,9 +30,9 @@ def uninstall_command( str, typer.Option( '--ide', - help='IDE to uninstall hooks from (e.g., "cursor"). Defaults to cursor.', + help='IDE to uninstall hooks from (e.g., "cursor", "claude-code", "all"). Defaults to cursor.', ), - ] = 'cursor', + ] = AIIDEType.CURSOR, repo_path: Annotated[ Optional[Path], typer.Option( @@ -54,6 +54,7 @@ def uninstall_command( cycode ai-guardrails uninstall # Remove user-level hooks cycode ai-guardrails uninstall --scope repo # Remove repo-level hooks cycode ai-guardrails uninstall --ide cursor # Uninstall from Cursor IDE + cycode ai-guardrails uninstall --ide all # Uninstall from all supported IDEs """ add_breadcrumb('ai-guardrails-uninstall') @@ -61,13 +62,31 @@ def uninstall_command( validate_scope(scope) repo_path = resolve_repo_path(scope, repo_path) ide_type = validate_and_parse_ide(ide) - ide_name = IDE_CONFIGS[ide_type].name - success, message = uninstall_hooks(scope, repo_path, ide=ide_type) - if success: - console.print(f'[green]✓[/] {message}') + ides_to_uninstall: list[AIIDEType] = list(AIIDEType) if ide_type is None else [ide_type] + + results: list[tuple[str, bool, str]] = [] + for current_ide in ides_to_uninstall: + ide_name = IDE_CONFIGS[current_ide].name + success, message = uninstall_hooks(scope, repo_path, ide=current_ide) + results.append((ide_name, success, message)) + + # Report results for each IDE + any_success = False + all_success = True + for _ide_name, success, message in results: + if success: + console.print(f'[green]✓[/] {message}') + any_success = True + else: + console.print(f'[red]✗[/] {message}', style='bold red') + all_success = False + + if any_success: console.print() - console.print(f'[dim]Restart {ide_name} for changes to take effect.[/]') - else: - console.print(f'[red]✗[/] {message}', style='bold red') + successful_ides = [name for name, success, _ in results if success] + ide_list = ', '.join(successful_ides) + console.print(f'[dim]Restart {ide_list} for changes to take effect.[/]') + + if not all_success: raise typer.Exit(1) diff --git a/cycode/cyclient/ai_security_manager_client.py b/cycode/cyclient/ai_security_manager_client.py index 627e2b33..1090ad8d 100644 --- a/cycode/cyclient/ai_security_manager_client.py +++ b/cycode/cyclient/ai_security_manager_client.py @@ -61,6 +61,7 @@ def create_event( outcome: 'AIHookOutcome', scan_id: Optional[str] = None, block_reason: Optional['BlockReason'] = None, + error_message: Optional[str] = None, ) -> None: """Create an AI hook event from hook payload.""" conversation_id = payload.conversation_id @@ -77,6 +78,7 @@ def create_event( 'cli_scan_id': scan_id, 'mcp_server_name': payload.mcp_server_name, 'mcp_tool_name': payload.mcp_tool_name, + 'error_message': error_message, } try: diff --git a/tests/cli/commands/ai_guardrails/scan/test_handlers.py b/tests/cli/commands/ai_guardrails/scan/test_handlers.py index 58dfe195..634469b7 100644 --- a/tests/cli/commands/ai_guardrails/scan/test_handlers.py +++ b/tests/cli/commands/ai_guardrails/scan/test_handlers.py @@ -135,8 +135,8 @@ def test_handle_before_submit_prompt_scan_failure_fail_open( mock_ctx.obj['ai_security_client'].create_event.assert_called_once() call_args = mock_ctx.obj['ai_security_client'].create_event.call_args assert call_args.args[2] == AIHookOutcome.ALLOWED - # When fail_open=True, no block_reason since action is allowed - assert call_args.kwargs['block_reason'] is None + # block_reason is set for tracking even when fail_open allows the action + assert call_args.kwargs['block_reason'] == BlockReason.SCAN_FAILURE @patch('cycode.cli.apps.ai_guardrails.scan.handlers._scan_text_for_secrets') diff --git a/tests/cli/commands/ai_guardrails/scan/test_payload.py b/tests/cli/commands/ai_guardrails/scan/test_payload.py index 9d14dda3..27c3010f 100644 --- a/tests/cli/commands/ai_guardrails/scan/test_payload.py +++ b/tests/cli/commands/ai_guardrails/scan/test_payload.py @@ -1,6 +1,7 @@ """Tests for AI hook payload normalization.""" import pytest +from pytest_mock import MockerFixture from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload from cycode.cli.apps.ai_guardrails.scan.types import AiHookEventType @@ -133,3 +134,243 @@ def test_from_cursor_payload_empty_fields() -> None: assert unified.conversation_id is None assert unified.prompt == '' # Default to empty string assert unified.ide_provider == 'cursor' + + +# Claude Code payload tests + + +def test_from_claude_code_payload_prompt_event() -> None: + """Test conversion of Claude Code UserPromptSubmit payload.""" + claude_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'session_id': 'session-123', + 'prompt': 'Test prompt for Claude Code', + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.event_name == AiHookEventType.PROMPT + assert unified.conversation_id == 'session-123' + assert unified.ide_provider == 'claude-code' + assert unified.prompt == 'Test prompt for Claude Code' + + +def test_from_claude_code_payload_file_read_event() -> None: + """Test conversion of Claude Code PreToolUse with Read tool.""" + claude_payload = { + 'hook_event_name': 'PreToolUse', + 'session_id': 'session-456', + 'tool_name': 'Read', + 'tool_input': {'file_path': '/path/to/secret.env'}, + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.event_name == AiHookEventType.FILE_READ + assert unified.file_path == '/path/to/secret.env' + assert unified.ide_provider == 'claude-code' + assert unified.mcp_tool_name is None + + +def test_from_claude_code_payload_mcp_execution_event() -> None: + """Test conversion of Claude Code PreToolUse with MCP tool.""" + claude_payload = { + 'hook_event_name': 'PreToolUse', + 'session_id': 'session-789', + 'tool_name': 'mcp__gitlab__discussion_list', + 'tool_input': {'resource_type': 'merge_request', 'parent_id': 'org/repo', 'resource_id': '4'}, + } + + unified = AIHookPayload.from_payload(claude_payload, tool='claude-code') + + assert unified.event_name == AiHookEventType.MCP_EXECUTION + assert unified.mcp_server_name == 'gitlab' + assert unified.mcp_tool_name == 'discussion_list' + assert unified.mcp_arguments == {'resource_type': 'merge_request', 'parent_id': 'org/repo', 'resource_id': '4'} + assert unified.ide_provider == 'claude-code' + + +def test_from_claude_code_payload_empty_fields() -> None: + """Test handling of empty/missing fields for Claude Code.""" + claude_payload = { + 'hook_event_name': 'UserPromptSubmit', + # Most fields missing + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.event_name == AiHookEventType.PROMPT + assert unified.conversation_id is None + assert unified.prompt == '' # Default to empty string + assert unified.ide_provider == 'claude-code' + + +# Claude Code transcript extraction tests + + +def test_from_claude_code_payload_extracts_from_transcript(mocker: MockerFixture) -> None: + """Test that version, model, and generation_id are extracted from transcript file.""" + transcript_content = ( + b'{"type":"user","version":"2.1.20","uuid":"user-uuid-1","message":{"role":"user","content":"hello"}}\n' + b'{"type":"assistant","message":{"model":"claude-opus-4-5-20251101","role":"assistant",' + b'"content":[{"type":"text","text":"Hi!"}]},"uuid":"assistant-uuid-1"}\n' + b'{"type":"user","version":"2.1.20","uuid":"user-uuid-2","message":{"role":"user","content":"test prompt"}}\n' + ) + mock_path = mocker.patch('cycode.cli.apps.ai_guardrails.scan.payload.Path') + mock_path.return_value.exists.return_value = True + mock_path.return_value.open.return_value.__enter__.return_value.seek = mocker.Mock() + mock_path.return_value.open.return_value.__enter__.return_value.tell.return_value = len(transcript_content) + mock_path.return_value.open.return_value.__enter__.return_value.read.return_value = transcript_content + + claude_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'session_id': 'session-123', + 'prompt': 'test prompt', + 'transcript_path': '/mock/transcript.jsonl', + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.ide_version == '2.1.20' + assert unified.model == 'claude-opus-4-5-20251101' + assert unified.generation_id == 'user-uuid-2' + + +def test_from_claude_code_payload_handles_missing_transcript(mocker: MockerFixture) -> None: + """Test that missing transcript file doesn't break payload parsing.""" + mock_path = mocker.patch('cycode.cli.apps.ai_guardrails.scan.payload.Path') + mock_path.return_value.exists.return_value = False + + claude_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'session_id': 'session-123', + 'prompt': 'test', + 'transcript_path': '/nonexistent/path/transcript.jsonl', + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.ide_version is None + assert unified.model is None + assert unified.generation_id is None + assert unified.conversation_id == 'session-123' + assert unified.prompt == 'test' + + +def test_from_claude_code_payload_handles_no_transcript_path() -> None: + """Test that absent transcript_path doesn't break payload parsing.""" + claude_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'session_id': 'session-123', + 'prompt': 'test', + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.ide_version is None + assert unified.model is None + assert unified.generation_id is None + + +def test_from_claude_code_payload_extracts_model_from_nested_message(mocker: MockerFixture) -> None: + """Test that model is extracted from nested message.model field.""" + transcript_content = ( + b'{"type":"assistant","message":{"model":"claude-sonnet-4-20250514",' + b'"role":"assistant","content":[]},"uuid":"uuid-1"}\n' + ) + + mock_path = mocker.patch('cycode.cli.apps.ai_guardrails.scan.payload.Path') + mock_path.return_value.exists.return_value = True + mock_path.return_value.open.return_value.__enter__.return_value.seek = mocker.Mock() + mock_path.return_value.open.return_value.__enter__.return_value.tell.return_value = len(transcript_content) + mock_path.return_value.open.return_value.__enter__.return_value.read.return_value = transcript_content + + claude_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'prompt': 'test', + 'transcript_path': '/mock/transcript.jsonl', + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.model == 'claude-sonnet-4-20250514' + + +def test_from_claude_code_payload_gets_latest_user_uuid(mocker: MockerFixture) -> None: + """Test that generation_id is the UUID of the latest user message.""" + transcript_content = b"""{"type":"user","uuid":"old-user-uuid","message":{"role":"user","content":"first"}} +{"type":"assistant","uuid":"assistant-uuid","message":{"role":"assistant","content":[]}} +{"type":"user","uuid":"latest-user-uuid","message":{"role":"user","content":"second"}} +{"type":"assistant","uuid":"last-assistant-uuid","message":{"role":"assistant","content":[]}} +""" + mock_path = mocker.patch('cycode.cli.apps.ai_guardrails.scan.payload.Path') + mock_path.return_value.exists.return_value = True + mock_path.return_value.open.return_value.__enter__.return_value.seek = mocker.Mock() + mock_path.return_value.open.return_value.__enter__.return_value.tell.return_value = len(transcript_content) + mock_path.return_value.open.return_value.__enter__.return_value.read.return_value = transcript_content + + claude_payload = { + 'hook_event_name': 'UserPromptSubmit', + 'prompt': 'test', + 'transcript_path': '/mock/transcript.jsonl', + } + + unified = AIHookPayload.from_claude_code_payload(claude_payload) + + assert unified.generation_id == 'latest-user-uuid' + + +# IDE detection tests + + +def test_is_payload_for_ide_claude_code_matches_claude_code() -> None: + """Test that Claude Code events match when expected IDE is claude-code.""" + payload = {'hook_event_name': 'UserPromptSubmit'} + assert AIHookPayload.is_payload_for_ide(payload, 'claude-code') is True + + payload = {'hook_event_name': 'PreToolUse'} + assert AIHookPayload.is_payload_for_ide(payload, 'claude-code') is True + + +def test_is_payload_for_ide_cursor_matches_cursor() -> None: + """Test that Cursor events match when expected IDE is cursor.""" + payload = {'hook_event_name': 'beforeSubmitPrompt'} + assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is True + + payload = {'hook_event_name': 'beforeReadFile'} + assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is True + + payload = {'hook_event_name': 'beforeMCPExecution'} + assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is True + + +def test_is_payload_for_ide_claude_code_does_not_match_cursor() -> None: + """Test that Claude Code events don't match when expected IDE is cursor. + + This prevents double-processing when Cursor reads Claude Code hooks. + """ + payload = {'hook_event_name': 'UserPromptSubmit'} + assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is False + + payload = {'hook_event_name': 'PreToolUse'} + assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is False + + +def test_is_payload_for_ide_cursor_does_not_match_claude_code() -> None: + """Test that Cursor events don't match when expected IDE is claude-code.""" + payload = {'hook_event_name': 'beforeSubmitPrompt'} + assert AIHookPayload.is_payload_for_ide(payload, 'claude-code') is False + + payload = {'hook_event_name': 'beforeReadFile'} + assert AIHookPayload.is_payload_for_ide(payload, 'claude-code') is False + + +def test_is_payload_for_ide_empty_event_name() -> None: + """Test handling of empty or missing hook_event_name.""" + payload = {'hook_event_name': ''} + assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is False + assert AIHookPayload.is_payload_for_ide(payload, 'claude-code') is False + + payload = {} + assert AIHookPayload.is_payload_for_ide(payload, 'cursor') is False + assert AIHookPayload.is_payload_for_ide(payload, 'claude-code') is False diff --git a/tests/cli/commands/ai_guardrails/scan/test_response_builders.py b/tests/cli/commands/ai_guardrails/scan/test_response_builders.py index 86e87ca7..45f80829 100644 --- a/tests/cli/commands/ai_guardrails/scan/test_response_builders.py +++ b/tests/cli/commands/ai_guardrails/scan/test_response_builders.py @@ -3,6 +3,7 @@ import pytest from cycode.cli.apps.ai_guardrails.scan.response_builders import ( + ClaudeCodeResponseBuilder, CursorResponseBuilder, IDEResponseBuilder, get_response_builder, @@ -77,3 +78,71 @@ def test_cursor_response_builder_is_singleton() -> None: builder2 = get_response_builder('cursor') assert builder1 is builder2 + + +# Claude Code response builder tests + + +def test_claude_code_response_builder_allow_permission() -> None: + """Test Claude Code allow permission response.""" + builder = ClaudeCodeResponseBuilder() + response = builder.allow_permission() + + assert response == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'allow', + } + } + + +def test_claude_code_response_builder_deny_permission() -> None: + """Test Claude Code deny permission response with messages.""" + builder = ClaudeCodeResponseBuilder() + response = builder.deny_permission('User message', 'Agent message') + + assert response == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'deny', + 'permissionDecisionReason': 'User message', + } + } + + +def test_claude_code_response_builder_ask_permission() -> None: + """Test Claude Code ask permission response for warnings.""" + builder = ClaudeCodeResponseBuilder() + response = builder.ask_permission('Warning message', 'Agent warning') + + assert response == { + 'hookSpecificOutput': { + 'hookEventName': 'PreToolUse', + 'permissionDecision': 'ask', + 'permissionDecisionReason': 'Warning message', + } + } + + +def test_claude_code_response_builder_allow_prompt() -> None: + """Test Claude Code allow prompt response (empty dict).""" + builder = ClaudeCodeResponseBuilder() + response = builder.allow_prompt() + + assert response == {} + + +def test_claude_code_response_builder_deny_prompt() -> None: + """Test Claude Code deny prompt response with message.""" + builder = ClaudeCodeResponseBuilder() + response = builder.deny_prompt('Secrets detected') + + assert response == {'decision': 'block', 'reason': 'Secrets detected'} + + +def test_get_response_builder_claude_code() -> None: + """Test getting Claude Code response builder.""" + builder = get_response_builder('claude-code') + + assert isinstance(builder, ClaudeCodeResponseBuilder) + assert isinstance(builder, IDEResponseBuilder) diff --git a/tests/cli/commands/ai_guardrails/scan/test_scan_command.py b/tests/cli/commands/ai_guardrails/scan/test_scan_command.py new file mode 100644 index 00000000..d1473c69 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/scan/test_scan_command.py @@ -0,0 +1,138 @@ +"""Tests for AI guardrails scan command.""" + +import json +from io import StringIO +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from cycode.cli.apps.ai_guardrails.scan.scan_command import scan_command + + +@pytest.fixture +def mock_ctx() -> MagicMock: + """Create a mock typer context.""" + ctx = MagicMock() + ctx.obj = {} + return ctx + + +@pytest.fixture +def mock_scan_command_deps(mocker: MockerFixture) -> dict[str, MagicMock]: + """Mock scan_command dependencies that should not be called on early exit.""" + return { + 'initialize_clients': mocker.patch('cycode.cli.apps.ai_guardrails.scan.scan_command._initialize_clients'), + 'load_policy': mocker.patch('cycode.cli.apps.ai_guardrails.scan.scan_command.load_policy'), + 'get_handler': mocker.patch('cycode.cli.apps.ai_guardrails.scan.scan_command.get_handler_for_event'), + } + + +def _assert_no_api_calls(mocks: dict[str, MagicMock]) -> None: + """Assert that no API-related functions were called.""" + mocks['initialize_clients'].assert_not_called() + mocks['load_policy'].assert_not_called() + mocks['get_handler'].assert_not_called() + + +class TestIdeMismatchSkipsProcessing: + """Tests that verify IDE mismatch causes early exit without API calls.""" + + def test_claude_code_payload_with_cursor_ide( + self, + mock_ctx: MagicMock, + mocker: MockerFixture, + capsys: pytest.CaptureFixture[str], + mock_scan_command_deps: dict[str, MagicMock], + ) -> None: + """Test Claude Code payload is skipped when --ide cursor is specified. + + When Cursor reads Claude Code hooks from ~/.claude/settings.json, it will invoke + the hook with Claude Code event names. The scan command should skip processing. + """ + payload = {'hook_event_name': 'UserPromptSubmit', 'session_id': 'session-123', 'prompt': 'test'} + mocker.patch('sys.stdin', StringIO(json.dumps(payload))) + + scan_command(mock_ctx, ide='cursor') + + _assert_no_api_calls(mock_scan_command_deps) + response = json.loads(capsys.readouterr().out) + assert response.get('continue') is True + + def test_cursor_payload_with_claude_code_ide( + self, + mock_ctx: MagicMock, + mocker: MockerFixture, + capsys: pytest.CaptureFixture[str], + mock_scan_command_deps: dict[str, MagicMock], + ) -> None: + """Test Cursor payload is skipped when --ide claude-code is specified.""" + payload = {'hook_event_name': 'beforeSubmitPrompt', 'conversation_id': 'conv-123', 'prompt': 'test'} + mocker.patch('sys.stdin', StringIO(json.dumps(payload))) + + scan_command(mock_ctx, ide='claude-code') + + _assert_no_api_calls(mock_scan_command_deps) + response = json.loads(capsys.readouterr().out) + assert response == {} # Claude Code allow_prompt returns empty dict + + +class TestInvalidPayloadSkipsProcessing: + """Tests that verify invalid payloads cause early exit without API calls.""" + + def test_empty_payload( + self, + mock_ctx: MagicMock, + mocker: MockerFixture, + capsys: pytest.CaptureFixture[str], + mock_scan_command_deps: dict[str, MagicMock], + ) -> None: + """Test empty payload skips processing.""" + mocker.patch('sys.stdin', StringIO('')) + + scan_command(mock_ctx, ide='cursor') + + mock_scan_command_deps['initialize_clients'].assert_not_called() + response = json.loads(capsys.readouterr().out) + assert response.get('continue') is True + + def test_invalid_json_payload( + self, + mock_ctx: MagicMock, + mocker: MockerFixture, + capsys: pytest.CaptureFixture[str], + mock_scan_command_deps: dict[str, MagicMock], + ) -> None: + """Test invalid JSON skips processing.""" + mocker.patch('sys.stdin', StringIO('not valid json {')) + + scan_command(mock_ctx, ide='cursor') + + mock_scan_command_deps['initialize_clients'].assert_not_called() + response = json.loads(capsys.readouterr().out) + assert response.get('continue') is True + + +class TestMatchingIdeProcessesPayload: + """Tests that verify matching IDE processes the payload normally.""" + + def test_claude_code_payload_with_claude_code_ide( + self, + mock_ctx: MagicMock, + mocker: MockerFixture, + mock_scan_command_deps: dict[str, MagicMock], + ) -> None: + """Test Claude Code payload is processed when --ide claude-code is specified.""" + payload = {'hook_event_name': 'UserPromptSubmit', 'session_id': 'session-123', 'prompt': 'test'} + mocker.patch('sys.stdin', StringIO(json.dumps(payload))) + + mock_scan_command_deps['load_policy'].return_value = {'fail_open': True} + mock_handler = MagicMock(return_value={'decision': 'allow'}) + mock_scan_command_deps['get_handler'].return_value = mock_handler + + scan_command(mock_ctx, ide='claude-code') + + mock_scan_command_deps['initialize_clients'].assert_called_once() + mock_scan_command_deps['load_policy'].assert_called_once() + mock_scan_command_deps['get_handler'].assert_called_once() + mock_handler.assert_called_once() diff --git a/tests/cli/commands/ai_guardrails/test_command_utils.py b/tests/cli/commands/ai_guardrails/test_command_utils.py index 4f0ef55e..5d8d224b 100644 --- a/tests/cli/commands/ai_guardrails/test_command_utils.py +++ b/tests/cli/commands/ai_guardrails/test_command_utils.py @@ -15,6 +15,9 @@ def test_validate_and_parse_ide_valid() -> None: assert validate_and_parse_ide('cursor') == AIIDEType.CURSOR assert validate_and_parse_ide('CURSOR') == AIIDEType.CURSOR assert validate_and_parse_ide('CuRsOr') == AIIDEType.CURSOR + assert validate_and_parse_ide('claude-code') == AIIDEType.CLAUDE_CODE + assert validate_and_parse_ide('Claude-Code') == AIIDEType.CLAUDE_CODE + assert validate_and_parse_ide('all') is None def test_validate_and_parse_ide_invalid() -> None: diff --git a/tests/cli/commands/ai_guardrails/test_hooks_manager.py b/tests/cli/commands/ai_guardrails/test_hooks_manager.py new file mode 100644 index 00000000..f0dec6f7 --- /dev/null +++ b/tests/cli/commands/ai_guardrails/test_hooks_manager.py @@ -0,0 +1,53 @@ +"""Tests for AI guardrails hooks manager.""" + +from cycode.cli.apps.ai_guardrails.hooks_manager import is_cycode_hook_entry + + +def test_is_cycode_hook_entry_cursor_format() -> None: + """Test detecting Cycode hook in Cursor format (flat command).""" + entry = {'command': 'cycode ai-guardrails scan'} + assert is_cycode_hook_entry(entry) is True + + entry = {'command': 'cycode ai-guardrails scan --some-flag'} + assert is_cycode_hook_entry(entry) is True + + +def test_is_cycode_hook_entry_claude_code_format() -> None: + """Test detecting Cycode hook in Claude Code format (nested).""" + entry = { + 'hooks': [{'type': 'command', 'command': 'cycode ai-guardrails scan --ide claude-code'}], + } + assert is_cycode_hook_entry(entry) is True + + entry = { + 'matcher': 'Read', + 'hooks': [{'type': 'command', 'command': 'cycode ai-guardrails scan --ide claude-code'}], + } + assert is_cycode_hook_entry(entry) is True + + +def test_is_cycode_hook_entry_non_cycode() -> None: + """Test that non-Cycode hooks are not detected.""" + # Cursor format + entry = {'command': 'some-other-command'} + assert is_cycode_hook_entry(entry) is False + + # Claude Code format + entry = { + 'hooks': [{'type': 'command', 'command': 'some-other-command'}], + } + assert is_cycode_hook_entry(entry) is False + + # Empty entry + entry = {} + assert is_cycode_hook_entry(entry) is False + + +def test_is_cycode_hook_entry_partial_match() -> None: + """Test partial command match.""" + # Should match if command contains 'cycode ai-guardrails scan' + entry = {'command': '/usr/local/bin/cycode ai-guardrails scan'} + assert is_cycode_hook_entry(entry) is True + + entry = {'command': 'cycode ai-guardrails scan --verbose'} + assert is_cycode_hook_entry(entry) is True