diff --git a/src/code_indexer/server/clients/claude_server_client.py b/src/code_indexer/server/clients/claude_server_client.py index f4cbe69a..d5a47c18 100644 --- a/src/code_indexer/server/clients/claude_server_client.py +++ b/src/code_indexer/server/clients/claude_server_client.py @@ -107,6 +107,7 @@ async def authenticate(self) -> str: # Claude Server returns "expires" (ISO datetime), standard returns "expires_in" (seconds) if "expires" in data: from dateutil.parser import parse as parse_datetime + self._jwt_expires = parse_datetime(data["expires"]) else: expires_in = data.get("expires_in", 3600) @@ -194,9 +195,7 @@ async def _make_authenticated_request( ) elif response.status_code == 401 and not retry_on_401: # Second 401 means auth truly failed - raise exception - raise ClaudeServerAuthError( - "Authentication failed after token refresh" - ) + raise ClaudeServerAuthError("Authentication failed after token refresh") return response @@ -250,9 +249,7 @@ async def register_repository( f"Repository registration failed: HTTP {response.status_code}" ) - async def create_job( - self, prompt: str, repositories: List[str] - ) -> Dict[str, Any]: + async def create_job(self, prompt: str, repositories: List[str]) -> Dict[str, Any]: """ Create a new job with the given prompt. @@ -276,13 +273,9 @@ async def create_job( if response.status_code in (200, 201): return response.json() elif response.status_code >= 500: - raise ClaudeServerError( - f"Claude Server error: HTTP {response.status_code}" - ) + raise ClaudeServerError(f"Claude Server error: HTTP {response.status_code}") else: - raise ClaudeServerError( - f"Job creation failed: HTTP {response.status_code}" - ) + raise ClaudeServerError(f"Job creation failed: HTTP {response.status_code}") async def start_job(self, job_id: str) -> Dict[str, Any]: """ diff --git a/src/code_indexer/server/config/delegation_config.py b/src/code_indexer/server/config/delegation_config.py index b2ea8456..01e89948 100644 --- a/src/code_indexer/server/config/delegation_config.py +++ b/src/code_indexer/server/config/delegation_config.py @@ -52,7 +52,9 @@ class ClaudeDelegationConfig: claude_server_credential_type: Literal["password", "api_key"] = "password" claude_server_credential: str = "" # Encrypted at rest skip_ssl_verify: bool = False # Allow self-signed certificates for E2E testing - cidx_callback_url: str = "" # Story #720: URL that Claude Server uses to POST callbacks + cidx_callback_url: str = ( + "" # Story #720: URL that Claude Server uses to POST callbacks + ) @property def is_configured(self) -> bool: diff --git a/src/code_indexer/server/mcp/handlers.py b/src/code_indexer/server/mcp/handlers.py index 944bef32..641eb562 100644 --- a/src/code_indexer/server/mcp/handlers.py +++ b/src/code_indexer/server/mcp/handlers.py @@ -8852,7 +8852,9 @@ def _get_delegation_function_repo_path() -> Optional[Path]: repo_path = golden_repo_manager.get_actual_repo_path(function_repo_alias) return Path(repo_path) if repo_path else None except Exception as e: - logger.warning(f"Function repository '{function_repo_alias}' not found: {e}") + logger.warning( + f"Function repository '{function_repo_alias}' not found: {e}" + ) return None except Exception as e: @@ -8974,7 +8976,9 @@ def _get_delegation_config(): return None -def _validate_function_parameters(target_function, parameters: Dict[str, Any]) -> Optional[str]: +def _validate_function_parameters( + target_function, parameters: Dict[str, Any] +) -> Optional[str]: """ Validate required parameters are present. @@ -8989,7 +8993,9 @@ def _validate_function_parameters(target_function, parameters: Dict[str, Any]) - return None -async def _ensure_repos_registered(client, required_repos: List[Dict[str, Any]]) -> List[str]: +async def _ensure_repos_registered( + client, required_repos: List[Dict[str, Any]] +) -> List[str]: """ Ensure required repositories are registered in Claude Server. @@ -9064,8 +9070,14 @@ async def handle_execute_delegation_function( repo_path = _get_delegation_function_repo_path() delegation_config = _get_delegation_config() - if repo_path is None or delegation_config is None or not delegation_config.is_configured: - return _mcp_response({"success": False, "error": "Claude Delegation not configured"}) + if ( + repo_path is None + or delegation_config is None + or not delegation_config.is_configured + ): + return _mcp_response( + {"success": False, "error": "Claude Delegation not configured"} + ) function_name = args.get("function_name", "") parameters = args.get("parameters", {}) @@ -9074,17 +9086,27 @@ async def handle_execute_delegation_function( # Load and find function loader = DelegationFunctionLoader() all_functions = loader.load_functions(repo_path) - target_function = next((f for f in all_functions if f.name == function_name), None) + target_function = next( + (f for f in all_functions if f.name == function_name), None + ) if target_function is None: - return _mcp_response({"success": False, "error": f"Function not found: {function_name}"}) + return _mcp_response( + {"success": False, "error": f"Function not found: {function_name}"} + ) # Access validation - effective_user = session_state.effective_user if session_state and session_state.is_impersonating else user + effective_user = ( + session_state.effective_user + if session_state and session_state.is_impersonating + else user + ) user_groups = _get_user_groups(effective_user) if not (user_groups & set(target_function.allowed_groups)): - return _mcp_response({"success": False, "error": "Access denied: insufficient permissions"}) + return _mcp_response( + {"success": False, "error": "Access denied: insufficient permissions"} + ) # Parameter validation param_error = _validate_function_parameters(target_function, parameters) @@ -9099,11 +9121,15 @@ async def handle_execute_delegation_function( password=delegation_config.claude_server_credential, skip_ssl_verify=delegation_config.skip_ssl_verify, ) as client: - repo_aliases = await _ensure_repos_registered(client, target_function.required_repos) + repo_aliases = await _ensure_repos_registered( + client, target_function.required_repos + ) # Render prompt and create job processor = PromptTemplateProcessor() - impersonation_user = target_function.impersonation_user or effective_user.username + impersonation_user = ( + target_function.impersonation_user or effective_user.username + ) rendered_prompt = processor.render( template=target_function.prompt_template, parameters=parameters, @@ -9111,19 +9137,27 @@ async def handle_execute_delegation_function( impersonation_user=impersonation_user, ) - job_result = await client.create_job(prompt=rendered_prompt, repositories=repo_aliases) + job_result = await client.create_job( + prompt=rendered_prompt, repositories=repo_aliases + ) # Claude Server returns camelCase "jobId" job_id = job_result.get("jobId") or job_result.get("job_id") if not job_id: - return _mcp_response({"success": False, "error": "Job created but no job_id returned"}) + return _mcp_response( + {"success": False, "error": "Job created but no job_id returned"} + ) # Story #720: Register callback URL with Claude Server for completion notification callback_base_url = _get_cidx_callback_base_url() if callback_base_url: - callback_url = f"{callback_base_url.rstrip('/')}/api/delegation/callback/{job_id}" + callback_url = ( + f"{callback_base_url.rstrip('/')}/api/delegation/callback/{job_id}" + ) try: await client.register_callback(job_id, callback_url) - logger.debug(f"Registered callback URL for job {job_id}: {callback_url}") + logger.debug( + f"Registered callback URL for job {job_id}: {callback_url}" + ) except Exception as callback_err: # Log but don't fail - callback registration is best-effort logger.warning( @@ -9142,10 +9176,15 @@ async def handle_execute_delegation_function( return _mcp_response({"success": True, "job_id": job_id}) except ClaudeServerError as e: - logger.error(f"Claude Server error: {e}", extra={"correlation_id": get_correlation_id()}) + logger.error( + f"Claude Server error: {e}", extra={"correlation_id": get_correlation_id()} + ) return _mcp_response({"success": False, "error": f"Claude Server error: {e}"}) except Exception as e: - logger.exception(f"Error in execute_delegation_function: {e}", extra={"correlation_id": get_correlation_id()}) + logger.exception( + f"Error in execute_delegation_function: {e}", + extra={"correlation_id": get_correlation_id()}, + ) return _mcp_response({"success": False, "error": str(e)}) @@ -9179,66 +9218,87 @@ async def handle_poll_delegation_job( delegation_config = _get_delegation_config() if delegation_config is None or not delegation_config.is_configured: - return _mcp_response({"success": False, "error": "Claude Delegation not configured"}) + return _mcp_response( + {"success": False, "error": "Claude Delegation not configured"} + ) job_id = args.get("job_id", "") if not job_id: - return _mcp_response({"success": False, "error": "Missing required parameter: job_id"}) + return _mcp_response( + {"success": False, "error": "Missing required parameter: job_id"} + ) # Story #720: Get timeout_seconds from args (default 45s, below MCP's 60s) # Also support legacy "timeout" parameter for backward compatibility timeout = args.get("timeout_seconds", args.get("timeout", 45)) if not isinstance(timeout, (int, float)): - return _mcp_response({ - "success": False, - "error": "timeout_seconds must be a number (recommended: 5-300)" - }) + return _mcp_response( + { + "success": False, + "error": "timeout_seconds must be a number (recommended: 5-300)", + } + ) # Minimum 0.01s (for testing), maximum 300s (5 minutes) # Recommended range for production: 5-300 seconds if timeout < 0.01 or timeout > 300: - return _mcp_response({ - "success": False, - "error": "timeout_seconds must be between 0.01 and 300" - }) + return _mcp_response( + { + "success": False, + "error": "timeout_seconds must be between 0.01 and 300", + } + ) # Check if job exists in tracker before waiting tracker = DelegationJobTracker.get_instance() job_exists = await tracker.has_job(job_id) if not job_exists: - return _mcp_response({ - "success": False, - "error": f"Job {job_id} not found or already completed", - }) + return _mcp_response( + { + "success": False, + "error": f"Job {job_id} not found or already completed", + } + ) # Wait for callback via DelegationJobTracker result = await tracker.wait_for_job(job_id, timeout=timeout) if result is None: # Timeout - job still exists, caller can try again - return _mcp_response({ - "status": "waiting", - "message": "Job still running, callback not yet received", - "continue_polling": True, - }) + return _mcp_response( + { + "status": "waiting", + "message": "Job still running, callback not yet received", + "continue_polling": True, + } + ) # Return result based on status from callback if result.status == "completed": - return _mcp_response({ - "status": "completed", - "result": result.output, - "continue_polling": False, - }) + return _mcp_response( + { + "status": "completed", + "result": result.output, + "continue_polling": False, + } + ) else: # Failed or other status - return _mcp_response({ - "status": "failed", - "error": result.error or result.output, - "continue_polling": False, - }) + return _mcp_response( + { + "status": "failed", + "error": result.error or result.output, + "continue_polling": False, + } + ) except Exception as e: - logger.error(f"Error waiting for delegation job {job_id}: {e}", extra={"correlation_id": get_correlation_id()}) - return _mcp_response({"success": False, "error": f"Error waiting for job completion: {str(e)}"}) + logger.error( + f"Error waiting for delegation job {job_id}: {e}", + extra={"correlation_id": get_correlation_id()}, + ) + return _mcp_response( + {"success": False, "error": f"Error waiting for job completion: {str(e)}"} + ) HANDLER_REGISTRY["poll_delegation_job"] = handle_poll_delegation_job diff --git a/src/code_indexer/server/mcp/session_registry.py b/src/code_indexer/server/mcp/session_registry.py index 5350ff01..ba3ba1c2 100644 --- a/src/code_indexer/server/mcp/session_registry.py +++ b/src/code_indexer/server/mcp/session_registry.py @@ -82,7 +82,9 @@ def __new__(cls) -> "SessionRegistry": # TTL cleanup attributes (Story #731) instance._cleanup_task: Optional[asyncio.Task] = None instance._ttl_seconds = DEFAULT_SESSION_TTL_SECONDS - instance._cleanup_interval_seconds = DEFAULT_CLEANUP_INTERVAL_SECONDS + instance._cleanup_interval_seconds = ( + DEFAULT_CLEANUP_INTERVAL_SECONDS + ) cls._instance = instance return cls._instance diff --git a/src/code_indexer/server/mcp/tools.py b/src/code_indexer/server/mcp/tools.py index 9ff0f149..39aeb06e 100644 --- a/src/code_indexer/server/mcp/tools.py +++ b/src/code_indexer/server/mcp/tools.py @@ -7410,7 +7410,13 @@ def filter_tools_by_role(user: User) -> List[Dict[str, Any]]: }, "phase": { "type": "string", - "enum": ["repo_registration", "repo_cloning", "cidx_indexing", "job_running", "done"], + "enum": [ + "repo_registration", + "repo_cloning", + "cidx_indexing", + "job_running", + "done", + ], "description": "Current phase of job execution", }, "progress": { diff --git a/src/code_indexer/server/query/semantic_query_manager.py b/src/code_indexer/server/query/semantic_query_manager.py index 9463d98d..a3aaf019 100644 --- a/src/code_indexer/server/query/semantic_query_manager.py +++ b/src/code_indexer/server/query/semantic_query_manager.py @@ -979,13 +979,15 @@ def _search_single_repository( # Story #725: Only warn when non-default filters are explicitly set # Note: accuracy="balanced" is the default, so we only warn if accuracy # is set to something other than "balanced" or None - has_non_default_filters = any([ - language, # None is default - exclude_language, # None is default - path_filter, # None is default - exclude_path, # None is default - accuracy and accuracy != "balanced" # "balanced" is default - ]) + has_non_default_filters = any( + [ + language, # None is default + exclude_language, # None is default + path_filter, # None is default + exclude_path, # None is default + accuracy and accuracy != "balanced", # "balanced" is default + ] + ) if search_mode in ["semantic", "hybrid"] and has_non_default_filters: self.logger.warning( f"Advanced filter parameters (language={language}, exclude_language={exclude_language}, " diff --git a/src/code_indexer/server/routers/delegation_callbacks.py b/src/code_indexer/server/routers/delegation_callbacks.py index 0094d4d6..3b4b9ec2 100644 --- a/src/code_indexer/server/routers/delegation_callbacks.py +++ b/src/code_indexer/server/routers/delegation_callbacks.py @@ -39,7 +39,9 @@ class CallbackPayload(BaseModel): Repository: Optional[str] = Field(None, description="Repository alias") CreatedAt: Optional[datetime] = Field(None, description="Job creation timestamp") StartedAt: Optional[datetime] = Field(None, description="Job start timestamp") - CompletedAt: Optional[datetime] = Field(None, description="Job completion timestamp") + CompletedAt: Optional[datetime] = Field( + None, description="Job completion timestamp" + ) ReferenceId: Optional[str] = Field(None, description="Reference ID for tracking") AffinityToken: Optional[str] = Field(None, description="Affinity token for routing") @@ -75,9 +77,7 @@ async def receive_delegation_callback( Returns: CallbackResponse indicating receipt and whether job was found """ - logger.info( - f"Received callback for job {job_id}: status={payload.Status}" - ) + logger.info(f"Received callback for job {job_id}: status={payload.Status}") tracker = DelegationJobTracker.get_instance() diff --git a/src/code_indexer/server/services/database_health_service.py b/src/code_indexer/server/services/database_health_service.py index 61140518..648554e6 100644 --- a/src/code_indexer/server/services/database_health_service.py +++ b/src/code_indexer/server/services/database_health_service.py @@ -28,6 +28,9 @@ class DatabaseHealthStatus(str, Enum): HEALTHY = "healthy" # Green - All 5 checks pass WARNING = "warning" # Yellow - Some checks pass, some fail ERROR = "error" # Red - Critical checks fail + NOT_INITIALIZED = ( + "not_initialized" # Blue/Gray - Lazy-loaded database not yet created + ) @dataclass @@ -54,6 +57,7 @@ def get_tooltip(self) -> str: Always shows display name and path. Unhealthy databases also show the failed condition. + NOT_INITIALIZED databases show they're optional and not yet created. """ # Always include path in tooltip base_tooltip = f"{self.display_name}\n{self.db_path}" @@ -61,6 +65,10 @@ def get_tooltip(self) -> str: if self.status == DatabaseHealthStatus.HEALTHY: return base_tooltip + # NOT_INITIALIZED databases get special message + if self.status == DatabaseHealthStatus.NOT_INITIALIZED: + return f"{base_tooltip}\nNot initialized (optional)" + # Find first failed check to include in tooltip for check_name, result in self.checks.items(): if not result.passed: @@ -83,6 +91,10 @@ def get_tooltip(self) -> str: "payload_cache.db": "Payload Cache", } +# Lazy-loaded databases (singleton pattern with get_instance()) +# These databases are only created when their features are first accessed +LAZY_LOADED_DATABASES = {"search_config.db", "file_content_limits.db"} + class DatabaseHealthService: """ @@ -203,6 +215,12 @@ def _check_connect(db_path: str) -> CheckResult: try: # Check if file exists first if not Path(db_path).exists(): + # Check if this is a lazy-loaded database + file_name = Path(db_path).name + if file_name in LAZY_LOADED_DATABASES: + return CheckResult( + passed=False, error_message="Not initialized (optional)" + ) return CheckResult( passed=False, error_message="Connection failed: file not found" ) @@ -314,7 +332,13 @@ def _determine_status(checks: Dict[str, CheckResult]) -> DatabaseHealthStatus: - GREEN (HEALTHY): All 5 checks pass - YELLOW (WARNING): Some checks pass, some fail (degraded but operational) - RED (ERROR): Critical checks fail (connect/read) + - BLUE/GRAY (NOT_INITIALIZED): Lazy-loaded database not yet created """ + # Check for lazy-loaded database not yet initialized + if "connect" in checks and not checks["connect"].passed: + if checks["connect"].error_message == "Not initialized (optional)": + return DatabaseHealthStatus.NOT_INITIALIZED + # Critical checks - if these fail, status is ERROR critical_checks = ["connect", "read"] for check_name in critical_checks: diff --git a/src/code_indexer/server/services/delegation_function_loader.py b/src/code_indexer/server/services/delegation_function_loader.py index 35585227..0d0c4a8c 100644 --- a/src/code_indexer/server/services/delegation_function_loader.py +++ b/src/code_indexer/server/services/delegation_function_loader.py @@ -111,9 +111,7 @@ def parse_function_file(self, file_path: Path) -> DelegationFunction: prompt_template=body, ) - def _parse_frontmatter( - self, content: str - ) -> tuple[Optional[Dict[str, Any]], str]: + def _parse_frontmatter(self, content: str) -> tuple[Optional[Dict[str, Any]], str]: """ Parse YAML frontmatter from markdown content. @@ -132,7 +130,7 @@ def _parse_frontmatter( return None, content frontmatter_str = content[3:end_index].strip() - body = content[end_index + 3:].strip() + body = content[end_index + 3 :].strip() try: frontmatter = yaml.safe_load(frontmatter_str) @@ -159,8 +157,4 @@ def filter_by_groups( if not user_groups: return [] - return [ - func - for func in functions - if set(func.allowed_groups) & user_groups - ] + return [func for func in functions if set(func.allowed_groups) & user_groups] diff --git a/src/code_indexer/server/services/delegation_job_tracker.py b/src/code_indexer/server/services/delegation_job_tracker.py index 34a68884..0837f028 100644 --- a/src/code_indexer/server/services/delegation_job_tracker.py +++ b/src/code_indexer/server/services/delegation_job_tracker.py @@ -125,9 +125,7 @@ async def complete_job(self, result: JobResult) -> bool: async with self._lock: future = self._pending_jobs.get(result.job_id) if future is None: - logger.warning( - f"complete_job called for unknown job: {result.job_id}" - ) + logger.warning(f"complete_job called for unknown job: {result.job_id}") return False if future.done(): @@ -209,7 +207,9 @@ async def wait_for_job( return result except asyncio.TimeoutError: # DO NOT remove on timeout - job is still valid, caller can retry - logger.debug(f"wait_for_job timed out for job: {job_id}, keeping in tracker") + logger.debug( + f"wait_for_job timed out for job: {job_id}, keeping in tracker" + ) return None except asyncio.CancelledError: # Shield was cancelled but Future may still be valid - propagate diff --git a/src/code_indexer/server/services/health_service.py b/src/code_indexer/server/services/health_service.py index fbf616b3..cb865deb 100644 --- a/src/code_indexer/server/services/health_service.py +++ b/src/code_indexer/server/services/health_service.py @@ -80,7 +80,9 @@ def __init__(self): # CPU history for sustained threshold detection (Story #727 AC4) # List of (timestamp, cpu_percent) tuples for rolling 60s window self._cpu_history: List[Tuple[float, float]] = [] - self._cpu_history_lock = threading.Lock() # Thread safety for concurrent requests + self._cpu_history_lock = ( + threading.Lock() + ) # Thread safety for concurrent requests except Exception as e: logger.error( @@ -379,6 +381,9 @@ def _collect_database_failures( f"{db_result.display_name} DB: {check.error_message or check_name}" ) break + # NOT_INITIALIZED and HEALTHY statuses don't affect overall health + # NOT_INITIALIZED databases are lazy-loaded and optional (not yet created) + # HEALTHY databases are fully operational return has_warning, has_error, reasons @@ -491,15 +496,13 @@ def _check_cpu_sustained(self, current_cpu: float) -> Tuple[bool, bool]: readings_60s = [c for t, c in history_snapshot] # Degraded: CPU >95% sustained for 30+ seconds - is_degraded = ( - len(readings_30s) >= MIN_CPU_READINGS_FOR_DEGRADED - and all(c > CPU_SUSTAINED_THRESHOLD for c in readings_30s) + is_degraded = len(readings_30s) >= MIN_CPU_READINGS_FOR_DEGRADED and all( + c > CPU_SUSTAINED_THRESHOLD for c in readings_30s ) # Unhealthy: CPU >95% sustained for 60+ seconds - is_unhealthy = ( - len(readings_60s) >= MIN_CPU_READINGS_FOR_UNHEALTHY - and all(c > CPU_SUSTAINED_THRESHOLD for c in readings_60s) + is_unhealthy = len(readings_60s) >= MIN_CPU_READINGS_FOR_UNHEALTHY and all( + c > CPU_SUSTAINED_THRESHOLD for c in readings_60s ) return is_degraded, is_unhealthy @@ -532,7 +535,9 @@ def _calculate_overall_status( failure_reasons.extend(db_reasons) # AC2: Volume health - vol_warn, vol_err, vol_reasons = self._collect_volume_failures(system_info.volumes) + vol_warn, vol_err, vol_reasons = self._collect_volume_failures( + system_info.volumes + ) has_warning = has_warning or vol_warn has_error = has_error or vol_err failure_reasons.extend(vol_reasons) @@ -568,7 +573,9 @@ def _calculate_overall_status( # AC5: Limit to MAX_FAILURE_REASONS with "+N more" indicator if len(failure_reasons) > MAX_FAILURE_REASONS: extra_count = len(failure_reasons) - MAX_FAILURE_REASONS - failure_reasons = failure_reasons[:MAX_FAILURE_REASONS] + [f"+{extra_count} more"] + failure_reasons = failure_reasons[:MAX_FAILURE_REASONS] + [ + f"+{extra_count} more" + ] return status, failure_reasons diff --git a/src/code_indexer/server/services/job_phase_detector.py b/src/code_indexer/server/services/job_phase_detector.py index c8b93218..02b64e82 100644 --- a/src/code_indexer/server/services/job_phase_detector.py +++ b/src/code_indexer/server/services/job_phase_detector.py @@ -76,9 +76,7 @@ def detect_phase(self, job_state: Dict[str, Any]) -> JobPhase: # All repos ready, job is running return JobPhase.JOB_RUNNING - def get_progress( - self, job_state: Dict[str, Any], phase: JobPhase - ) -> PhaseProgress: + def get_progress(self, job_state: Dict[str, Any], phase: JobPhase) -> PhaseProgress: """ Extract phase-specific progress metrics. @@ -100,15 +98,16 @@ def get_progress( ) return PhaseProgress( phase=phase, - progress={"repos_total": repos_total, "repos_registered": repos_registered}, + progress={ + "repos_total": repos_total, + "repos_registered": repos_registered, + }, message=f"Registering repositories ({repos_registered}/{repos_total})...", is_terminal=False, ) if phase == JobPhase.REPO_CLONING: - repos_cloned = sum( - 1 for repo in repositories if repo.get("cloned", False) - ) + repos_cloned = sum(1 for repo in repositories if repo.get("cloned", False)) return PhaseProgress( phase=phase, progress={"repos_total": repos_total, "repos_cloned": repos_cloned}, @@ -132,7 +131,10 @@ def get_progress( tool_use_count = job_state.get("tool_use_count", 0) return PhaseProgress( phase=phase, - progress={"exchange_count": exchange_count, "tool_use_count": tool_use_count}, + progress={ + "exchange_count": exchange_count, + "tool_use_count": tool_use_count, + }, message=f"Processing query ({exchange_count} exchanges, {tool_use_count} tool calls)...", is_terminal=False, ) diff --git a/src/code_indexer/server/services/maintenance_service.py b/src/code_indexer/server/services/maintenance_service.py index f9398f78..e1411e53 100644 --- a/src/code_indexer/server/services/maintenance_service.py +++ b/src/code_indexer/server/services/maintenance_service.py @@ -72,7 +72,9 @@ def enter_maintenance_mode(self) -> Dict[str, Any]: "maintenance_mode": True, "running_jobs": running_jobs, "queued_jobs": queued_jobs, - "entered_at": self._entered_at.isoformat() if self._entered_at else None, + "entered_at": ( + self._entered_at.isoformat() if self._entered_at else None + ), "message": f"Maintenance mode active. {running_jobs} running, {queued_jobs} queued.", } @@ -137,7 +139,9 @@ def get_status(self) -> Dict[str, Any]: "drained": drained, "running_jobs": running_jobs, "queued_jobs": queued_jobs, - "entered_at": self._entered_at.isoformat() if self._entered_at else None, + "entered_at": ( + self._entered_at.isoformat() if self._entered_at else None + ), } def is_drained(self) -> bool: diff --git a/src/code_indexer/server/services/prompt_template_processor.py b/src/code_indexer/server/services/prompt_template_processor.py index 46f81627..59f503f7 100644 --- a/src/code_indexer/server/services/prompt_template_processor.py +++ b/src/code_indexer/server/services/prompt_template_processor.py @@ -24,9 +24,9 @@ class PromptTemplateProcessor: 3. Substituting {{user_prompt}} placeholder with user's additional prompt """ - IMPERSONATION_INSTRUCTION = '''CRITICAL: As your FIRST action before any other operations, call the MCP tool `set_session_impersonation` with username "{impersonation_user}". All your subsequent queries to CIDX must use this impersonated identity. + IMPERSONATION_INSTRUCTION = """CRITICAL: As your FIRST action before any other operations, call the MCP tool `set_session_impersonation` with username "{impersonation_user}". All your subsequent queries to CIDX must use this impersonated identity. -''' +""" def render( self, diff --git a/src/code_indexer/server/web/routes.py b/src/code_indexer/server/web/routes.py index 5708d3a0..46104191 100644 --- a/src/code_indexer/server/web/routes.py +++ b/src/code_indexer/server/web/routes.py @@ -4626,7 +4626,9 @@ async def update_claude_delegation_config( return _create_login_redirect(request) if not validate_login_csrf_token(request, csrf_token): - return _create_config_page_response(request, session, error_message="Invalid CSRF token") + return _create_config_page_response( + request, session, error_message="Invalid CSRF token" + ) form_data = await request.form() config_service = get_config_service() @@ -4643,32 +4645,48 @@ async def update_claude_delegation_config( if not url or not username or not credential: return _create_config_page_response( - request, session, error_message="URL, username, and credential are required", - validation_errors={"claude_delegation": "Missing required fields"}) + request, + session, + error_message="URL, username, and credential are required", + validation_errors={"claude_delegation": "Missing required fields"}, + ) # Validate connectivity before saving cred_type = form_data.get("claude_server_credential_type", "password") - result = delegation_manager.validate_connectivity(url, username, credential, cred_type) + result = delegation_manager.validate_connectivity( + url, username, credential, cred_type + ) if not result.success: return _create_config_page_response( - request, session, error_message=f"Connection failed: {result.error_message}", - validation_errors={"claude_delegation": result.error_message}) + request, + session, + error_message=f"Connection failed: {result.error_message}", + validation_errors={"claude_delegation": result.error_message}, + ) # Save configuration with encrypted credential from ..config.delegation_config import DEFAULT_FUNCTION_REPO_ALIAS + cidx_callback_url = form_data.get("cidx_callback_url", "").strip() # Story #720 skip_ssl_verify = form_data.get("skip_ssl_verify", "false").lower() == "true" config = ClaudeDelegationConfig( - function_repo_alias=form_data.get("function_repo_alias", "").strip() or DEFAULT_FUNCTION_REPO_ALIAS, - claude_server_url=url, claude_server_username=username, - claude_server_credential_type=cred_type, claude_server_credential=credential, + function_repo_alias=form_data.get("function_repo_alias", "").strip() + or DEFAULT_FUNCTION_REPO_ALIAS, + claude_server_url=url, + claude_server_username=username, + claude_server_credential_type=cred_type, + claude_server_credential=credential, cidx_callback_url=cidx_callback_url, - skip_ssl_verify=skip_ssl_verify) + skip_ssl_verify=skip_ssl_verify, + ) delegation_manager.save_config(config) return _create_config_page_response( - request, session, success_message="Claude Delegation configuration saved and verified") + request, + session, + success_message="Claude Delegation configuration saved and verified", + ) @web_router.post("/config/{section}", response_class=HTMLResponse) diff --git a/src/code_indexer/server/web/static/admin.css b/src/code_indexer/server/web/static/admin.css index 6df76843..d01d2cb7 100644 --- a/src/code_indexer/server/web/static/admin.css +++ b/src/code_indexer/server/web/static/admin.css @@ -837,6 +837,10 @@ button.danger, .button.danger { fill: #ef4444; } +.hexagon.hex-not-initialized { + fill: #94a3b8; +} + /* AC2/AC3: Hover effect for tooltips (title attribute provides native tooltip) */ .honeycomb-svg polygon.hexagon:hover { stroke-width: 2; @@ -887,6 +891,10 @@ button.danger, .button.danger { background-color: #ef4444; } +.legend-dot.not-initialized { + background-color: #94a3b8; +} + /* ========================================================================== Story #712 AC5: Disk Metrics Complete Display ========================================================================== */ diff --git a/src/code_indexer/server/web/templates/partials/dashboard_health.html b/src/code_indexer/server/web/templates/partials/dashboard_health.html index a6ce9a4c..f198fc7c 100644 --- a/src/code_indexer/server/web/templates/partials/dashboard_health.html +++ b/src/code_indexer/server/web/templates/partials/dashboard_health.html @@ -39,7 +39,7 @@

Database Health

{% set col = i % 4 %} {% set x = 35 + col * 55 %} {% set y = 40 + row * 65 %} - {% set status_class = 'healthy' if db.status.value == 'healthy' else ('warning' if db.status.value == 'warning' else 'error') %} + {% set status_class = 'healthy' if db.status.value == 'healthy' else ('warning' if db.status.value == 'warning' else ('not-initialized' if db.status.value == 'not_initialized' else 'error')) %} @@ -55,6 +55,7 @@

Database Health

Healthy Warning Error + Not Initialized {% else %}
diff --git a/tests/server/multi/test_multi_result_aggregator.py b/tests/server/multi/test_multi_result_aggregator.py index e47d8696..67c77e7b 100644 --- a/tests/server/multi/test_multi_result_aggregator.py +++ b/tests/server/multi/test_multi_result_aggregator.py @@ -56,12 +56,10 @@ def test_enforces_per_repo_limit(self): """Per-repo limit is enforced independently.""" repo_results = { "repo1": [ - {"file": f"file{i}.py", "score": 0.9 - i * 0.01} - for i in range(20) + {"file": f"file{i}.py", "score": 0.9 - i * 0.01} for i in range(20) ], "repo2": [ - {"file": f"file{i}.py", "score": 0.85 - i * 0.01} - for i in range(15) + {"file": f"file{i}.py", "score": 0.85 - i * 0.01} for i in range(15) ], } @@ -119,8 +117,7 @@ def test_limit_smaller_than_results(self): """Limit smaller than number of results per repo.""" repo_results = { "repo1": [ - {"file": f"file{i}.py", "score": 0.9 - i * 0.1} - for i in range(10) + {"file": f"file{i}.py", "score": 0.9 - i * 0.1} for i in range(10) ], } @@ -217,13 +214,9 @@ def test_min_score_applied_before_limit(self): """Score filtering is applied before per-repo limit enforcement.""" repo_results = { "repo1": [ - {"file": f"high{i}.py", "score": 0.9 - i * 0.01} - for i in range(5) + {"file": f"high{i}.py", "score": 0.9 - i * 0.01} for i in range(5) ] - + [ - {"file": f"low{i}.py", "score": 0.5 - i * 0.01} - for i in range(10) - ], + + [{"file": f"low{i}.py", "score": 0.5 - i * 0.01} for i in range(10)], } # Limit is 3, min_score is 0.7 @@ -287,8 +280,7 @@ def test_min_score_with_limit(self): """Score filtering combined with per-repo limit works correctly.""" repo_results = { "repo1": [ - {"file": f"file{i}.py", "score": 0.95 - i * 0.05} - for i in range(10) + {"file": f"file{i}.py", "score": 0.95 - i * 0.05} for i in range(10) ], } diff --git a/tests/server/multi/test_multi_search_service.py b/tests/server/multi/test_multi_search_service.py index b2953df7..be5a07f3 100644 --- a/tests/server/multi/test_multi_search_service.py +++ b/tests/server/multi/test_multi_search_service.py @@ -41,9 +41,7 @@ async def test_semantic_search_uses_thread_pool(self): ) # Service should use thread pool for execution - with patch.object( - service.thread_executor, "submit" - ) as mock_submit: + with patch.object(service.thread_executor, "submit") as mock_submit: mock_future = Mock() mock_future.result.return_value = {"results": [], "error": None} mock_submit.return_value = mock_future @@ -327,7 +325,9 @@ async def test_timeout_error_includes_recommendations(self): # - Add --path-filter error_text = str(response.errors) # At minimum, should mention timeout - assert "timeout" in error_text.lower() or "timed out" in error_text.lower() + assert ( + "timeout" in error_text.lower() or "timed out" in error_text.lower() + ) except (AttributeError, NotImplementedError): pytest.fail("MultiSearchService actionable errors not implemented") @@ -336,7 +336,9 @@ async def test_timeout_error_lists_affected_repos(self): """Timeout error lists which repositories timed out vs completed.""" from code_indexer.server.multi.multi_search_service import MultiSearchService - config = MultiSearchConfig(max_workers=5, query_timeout_seconds=1) # Very short timeout + config = MultiSearchConfig( + max_workers=5, query_timeout_seconds=1 + ) # Very short timeout service = MultiSearchService(config) request = MultiSearchRequest( diff --git a/tests/server/routes/test_multi_query_routes.py b/tests/server/routes/test_multi_query_routes.py index f6373bd8..8aae52f2 100644 --- a/tests/server/routes/test_multi_query_routes.py +++ b/tests/server/routes/test_multi_query_routes.py @@ -222,9 +222,7 @@ async def mock_search(request): pytest.skip("Route not implemented yet") - def test_all_repos_fail_returns_errors( - self, mock_auth, mock_multi_search_service - ): + def test_all_repos_fail_returns_errors(self, mock_auth, mock_multi_search_service): """When all repos fail, returns empty results with errors.""" pytest.skip("Route not implemented yet") @@ -276,10 +274,9 @@ def test_repository_not_found_returns_error( """Non-existent repository returns error in errors field.""" pytest.skip("Route not implemented yet") - def test_service_exception_returns_500( - self, mock_auth, mock_multi_search_service - ): + def test_service_exception_returns_500(self, mock_auth, mock_multi_search_service): """Unexpected service exception returns 500 Internal Server Error.""" + async def mock_search(request): raise RuntimeError("Unexpected error") diff --git a/tests/server/services/test_database_health_service.py b/tests/server/services/test_database_health_service.py index 1a9fb5d3..edb48a83 100644 --- a/tests/server/services/test_database_health_service.py +++ b/tests/server/services/test_database_health_service.py @@ -37,7 +37,7 @@ def temp_server_dir(self) -> Generator[Path, None, None]: "logs.db": "CREATE TABLE IF NOT EXISTS logs (id INTEGER PRIMARY KEY)", "search_config.db": "CREATE TABLE IF NOT EXISTS config (id INTEGER PRIMARY KEY)", "file_content_limits.db": "CREATE TABLE IF NOT EXISTS limits (id INTEGER PRIMARY KEY)", - "scip_audit.db": "CREATE TABLE IF NOT EXISTS audit (id INTEGER PRIMARY KEY)", + "groups.db": "CREATE TABLE IF NOT EXISTS groups (id INTEGER PRIMARY KEY)", "payload_cache.db": "CREATE TABLE IF NOT EXISTS cache (id INTEGER PRIMARY KEY)", } @@ -80,7 +80,7 @@ def test_health_service_checks_all_8_databases(self, temp_server_dir: Path): "logs.db", "search_config.db", "file_content_limits.db", - "scip_audit.db", + "groups.db", "payload_cache.db", } actual_files = {result.file_name for result in health_results} @@ -110,7 +110,7 @@ def test_health_service_provides_display_names(self, temp_server_dir: Path): "logs.db": "Logs", "search_config.db": "Search Config", "file_content_limits.db": "File Limits", - "scip_audit.db": "SCIP Audit", + "groups.db": "Groups", "payload_cache.db": "Payload Cache", } @@ -352,7 +352,7 @@ def healthy_db_path(self) -> Generator[Path, None, None]: yield db_path def test_healthy_database_tooltip_shows_only_name(self, healthy_db_path: Path): - """AC2: Healthy database tooltip shows only database name.""" + """AC2: Healthy database tooltip shows database name and path.""" from code_indexer.server.services.database_health_service import ( DatabaseHealthService, DatabaseHealthStatus, @@ -364,10 +364,15 @@ def test_healthy_database_tooltip_shows_only_name(self, healthy_db_path: Path): assert result.status == DatabaseHealthStatus.HEALTHY tooltip = result.get_tooltip() - assert tooltip == "Main Server" + # Tooltip should contain display name and path (no error info for healthy DB) + assert "Main Server" in tooltip + assert str(healthy_db_path) in tooltip + # Should not contain error information for healthy database + assert "Connect:" not in tooltip + assert "failed" not in tooltip def test_unhealthy_database_tooltip_shows_failure(self, healthy_db_path: Path): - """AC3: Unhealthy database tooltip shows name AND failed condition.""" + """AC3: Unhealthy database tooltip shows name, path, AND failed condition.""" from code_indexer.server.services.database_health_service import ( DatabaseHealthService, DatabaseHealthStatus, @@ -380,8 +385,11 @@ def test_unhealthy_database_tooltip_shows_failure(self, healthy_db_path: Path): assert result.status == DatabaseHealthStatus.ERROR tooltip = result.get_tooltip() + # Tooltip should contain display name, path, and error info assert "OAuth" in tooltip - assert " - " in tooltip + assert str(healthy_db_path) in tooltip + # Should contain error information (check name + error message) + assert "Connect:" in tooltip or "failed" in tooltip # ============================================================================= @@ -455,3 +463,130 @@ def test_get_stats_partial_passes_user_role_to_repo_counts(self): assert ( "_get_repo_counts" in source and "user_role" in source ), "get_stats_partial must pass user_role to _get_repo_counts" + + +# ============================================================================= +# Lazy-Loaded Database Tests +# ============================================================================= + + +class TestLazyLoadedDatabases: + """Tests for graceful handling of lazy-loaded databases.""" + + def test_lazy_loaded_database_not_initialized_status(self): + """ + Lazy-loaded database that doesn't exist yet gets NOT_INITIALIZED status. + + Given a lazy-loaded database file (search_config.db or file_content_limits.db) + When the database file doesn't exist yet + Then the health check returns NOT_INITIALIZED status instead of ERROR + """ + from code_indexer.server.services.database_health_service import ( + DatabaseHealthService, + DatabaseHealthStatus, + ) + + with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp: + # Create non-existent path for lazy-loaded database + db_path = Path(tmp) / "search_config.db" + + result = DatabaseHealthService.check_database_health( + str(db_path), display_name="Search Config" + ) + + assert result.status == DatabaseHealthStatus.NOT_INITIALIZED + assert result.checks["connect"].passed is False + assert ( + result.checks["connect"].error_message == "Not initialized (optional)" + ) + + def test_lazy_loaded_database_initialized_is_healthy(self): + """ + Lazy-loaded database that exists and is healthy gets HEALTHY status. + + Given a lazy-loaded database file (search_config.db) + When the database file exists and all checks pass + Then the health check returns HEALTHY status + """ + from code_indexer.server.services.database_health_service import ( + DatabaseHealthService, + DatabaseHealthStatus, + ) + + with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp: + # Create lazy-loaded database + db_path = Path(tmp) / "search_config.db" + with sqlite3.connect(str(db_path)) as conn: + conn.execute("CREATE TABLE config (id INTEGER PRIMARY KEY)") + conn.commit() + + result = DatabaseHealthService.check_database_health( + str(db_path), display_name="Search Config" + ) + + assert result.status == DatabaseHealthStatus.HEALTHY + assert result.checks["connect"].passed is True + + def test_non_lazy_database_missing_is_error(self): + """ + Non-lazy-loaded database that doesn't exist gets ERROR status. + + Given a non-lazy-loaded database (e.g., oauth.db) + When the database file doesn't exist + Then the health check returns ERROR status (not NOT_INITIALIZED) + """ + from code_indexer.server.services.database_health_service import ( + DatabaseHealthService, + DatabaseHealthStatus, + ) + + with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp: + # Create non-existent path for non-lazy database + db_path = Path(tmp) / "oauth.db" + + result = DatabaseHealthService.check_database_health( + str(db_path), display_name="OAuth" + ) + + assert result.status == DatabaseHealthStatus.ERROR + assert result.checks["connect"].passed is False + assert "file not found" in result.checks["connect"].error_message + + def test_lazy_loaded_database_tooltip(self): + """ + Lazy-loaded database tooltip shows 'Not initialized (optional)'. + + Given a lazy-loaded database that doesn't exist yet + When get_tooltip() is called + Then it shows the display name, path, and 'Not initialized (optional)' + """ + from code_indexer.server.services.database_health_service import ( + DatabaseHealthService, + ) + + with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp: + db_path = Path(tmp) / "file_content_limits.db" + + result = DatabaseHealthService.check_database_health( + str(db_path), display_name="File Limits" + ) + + tooltip = result.get_tooltip() + assert "File Limits" in tooltip + assert str(db_path) in tooltip + assert "Not initialized (optional)" in tooltip + + def test_both_lazy_databases_defined(self): + """ + Verify both lazy-loaded databases are defined in LAZY_LOADED_DATABASES. + + This test documents which databases are lazy-loaded and ensures + they're properly configured in the constant. + """ + from code_indexer.server.services.database_health_service import ( + LAZY_LOADED_DATABASES, + ) + + assert "search_config.db" in LAZY_LOADED_DATABASES + assert "file_content_limits.db" in LAZY_LOADED_DATABASES + assert len(LAZY_LOADED_DATABASES) == 2 diff --git a/tests/server/services/test_key_discovery_service.py b/tests/server/services/test_key_discovery_service.py index 0265b537..3548f365 100644 --- a/tests/server/services/test_key_discovery_service.py +++ b/tests/server/services/test_key_discovery_service.py @@ -144,7 +144,9 @@ def test_discover_keys_computes_fingerprint(self, tmp_path): # Mock _extract_key_info to return known fingerprint and key_type # (discover_existing_keys now calls _extract_key_info directly) with patch.object( - service, "_extract_key_info", return_value=("SHA256:abcdef123456", "ed25519") + service, + "_extract_key_info", + return_value=("SHA256:abcdef123456", "ed25519"), ) as mock_extract: keys = service.discover_existing_keys() @@ -399,7 +401,9 @@ def test_extract_key_info_subprocess_exception(self, tmp_path): service = KeyDiscoveryService(ssh_dir=ssh_dir) with patch.object( - kds_module.subprocess, "run", side_effect=subprocess.TimeoutExpired("cmd", 5) + kds_module.subprocess, + "run", + side_effect=subprocess.TimeoutExpired("cmd", 5), ): fingerprint, key_type = service._extract_key_info(pub_key_path) diff --git a/tests/server/web/test_auth.py b/tests/server/web/test_auth.py index 83b4db60..5e989b92 100644 --- a/tests/server/web/test_auth.py +++ b/tests/server/web/test_auth.py @@ -512,9 +512,9 @@ def test_login_missing_csrf_auto_recovery( ) # Bug #714: Auto-recovery redirects instead of 403 - assert response.status_code == 303, ( - f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}" - ) + assert ( + response.status_code == 303 + ), f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}" location = response.headers.get("location", "") assert "/login" in location and "info=session_expired" in location @@ -539,9 +539,9 @@ def test_login_invalid_csrf_auto_recovery( ) # Bug #714: Auto-recovery redirects instead of 403 - assert response.status_code == 303, ( - f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}" - ) + assert ( + response.status_code == 303 + ), f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}" location = response.headers.get("location", "") assert "/login" in location and "info=session_expired" in location @@ -590,12 +590,10 @@ def test_login_csrf_failure_auto_recovers( f"got {response.status_code}" ) location = response.headers.get("location", "") - assert "/login" in location, ( - f"Expected redirect to /login, got {location}" - ) - assert "info=session_expired" in location, ( - f"Expected info=session_expired in redirect URL, got {location}" - ) + assert "/login" in location, f"Expected redirect to /login, got {location}" + assert ( + "info=session_expired" in location + ), f"Expected info=session_expired in redirect URL, got {location}" def test_login_csrf_failure_sets_fresh_cookie( self, web_client: TestClient, admin_user: dict @@ -619,9 +617,9 @@ def test_login_csrf_failure_sets_fresh_cookie( ) # Should have new CSRF cookie set - assert "_csrf" in response.cookies, ( - "CSRF failure response should include fresh CSRF cookie" - ) + assert ( + "_csrf" in response.cookies + ), "CSRF failure response should include fresh CSRF cookie" def test_login_missing_csrf_auto_recovers( self, web_client: TestClient, admin_user: dict @@ -651,12 +649,10 @@ def test_login_missing_csrf_auto_recovers( f"got {response.status_code}" ) location = response.headers.get("location", "") - assert "/login" in location, ( - f"Expected redirect to /login, got {location}" - ) - assert "info=session_expired" in location, ( - f"Expected info=session_expired in redirect URL, got {location}" - ) + assert "/login" in location, f"Expected redirect to /login, got {location}" + assert ( + "info=session_expired" in location + ), f"Expected info=session_expired in redirect URL, got {location}" # ============================================================================= @@ -727,15 +723,15 @@ def test_login_page_generates_new_token_when_no_cookie( # Should have CSRF token in form csrf_token = web_infrastructure.extract_csrf_token(response.text) - assert csrf_token is not None, ( - "Login page should generate CSRF token when no cookie exists" - ) + assert ( + csrf_token is not None + ), "Login page should generate CSRF token when no cookie exists" # Should set new CSRF cookie csrf_cookie = response.cookies.get("_csrf") - assert csrf_cookie is not None, ( - "Login page should set CSRF cookie when no cookie exists" - ) + assert ( + csrf_cookie is not None + ), "Login page should set CSRF cookie when no cookie exists" def test_login_page_generates_new_token_when_cookie_expired( self, web_infrastructure: WebTestInfrastructure @@ -760,17 +756,17 @@ def test_login_page_generates_new_token_when_cookie_expired( # Should have CSRF token in form csrf_token = web_infrastructure.extract_csrf_token(response.text) - assert csrf_token is not None, ( - "Login page should generate CSRF token when cookie is invalid" - ) + assert ( + csrf_token is not None + ), "Login page should generate CSRF token when cookie is invalid" # The token should NOT be the invalid one we sent - assert csrf_token != "invalid_expired_csrf_token_12345", ( - "Login page should not use invalid cookie value as CSRF token" - ) + assert ( + csrf_token != "invalid_expired_csrf_token_12345" + ), "Login page should not use invalid cookie value as CSRF token" # Should set new CSRF cookie csrf_cookie = response.cookies.get("_csrf") - assert csrf_cookie is not None, ( - "Login page should set new CSRF cookie when old one is invalid" - ) + assert ( + csrf_cookie is not None + ), "Login page should set new CSRF cookie when old one is invalid" diff --git a/tests/server/web/test_config_payload_cache.py b/tests/server/web/test_config_payload_cache.py index 99509add..d5530b0a 100644 --- a/tests/server/web/test_config_payload_cache.py +++ b/tests/server/web/test_config_payload_cache.py @@ -34,14 +34,14 @@ def test_payload_cache_fields_displayed(self, authenticated_client: TestClient): assert ( "payload max fetch size" in text_lower ), "Should show Payload Max Fetch Size field" - assert ( - "payload cache ttl" in text_lower - ), "Should show Payload Cache TTL field" + assert "payload cache ttl" in text_lower, "Should show Payload Cache TTL field" assert ( "payload cleanup interval" in text_lower ), "Should show Payload Cleanup Interval field" - def test_payload_cache_default_values_displayed(self, authenticated_client: TestClient): + def test_payload_cache_default_values_displayed( + self, authenticated_client: TestClient + ): """ Payload cache fields should show default values. @@ -59,9 +59,7 @@ def test_payload_cache_default_values_displayed(self, authenticated_client: Test assert "2000" in response.text, "Should show default preview size (2000)" assert "5000" in response.text, "Should show default max fetch size (5000)" assert "900" in response.text, "Should show default cache TTL (900)" - assert ( - "60" in response.text - ), "Should show default cleanup interval (60)" + assert "60" in response.text, "Should show default cleanup interval (60)" class TestPayloadCacheConfigEditing: diff --git a/tests/unit/server/auth/test_mcp_session_state.py b/tests/unit/server/auth/test_mcp_session_state.py index 9a8ac575..7fddb6bc 100644 --- a/tests/unit/server/auth/test_mcp_session_state.py +++ b/tests/unit/server/auth/test_mcp_session_state.py @@ -60,7 +60,9 @@ def test_session_state_initialization(self, admin_user: User): """Test that MCPSessionState initializes with correct default values.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) assert session.session_id == "session-123" assert session.authenticated_user == admin_user @@ -72,7 +74,9 @@ def test_effective_user_returns_authenticated_when_not_impersonating( """Test effective_user returns authenticated user when no impersonation is set.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) assert session.effective_user == admin_user assert session.effective_user.username == "admin_user" @@ -83,7 +87,9 @@ def test_effective_user_returns_impersonated_when_impersonating( """Test effective_user returns impersonated user when impersonation is set.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) assert session.effective_user == target_user @@ -97,7 +103,9 @@ def test_set_impersonation_stores_target_user( """Test set_impersonation stores the target user correctly.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) assert session.impersonated_user == target_user @@ -110,7 +118,9 @@ def test_clear_impersonation_removes_impersonated_user( """Test clear_impersonation removes the impersonated user.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) # Verify impersonation is set @@ -129,7 +139,9 @@ def test_is_impersonating_property_returns_false_when_not_impersonating( """Test is_impersonating returns False when no impersonation is active.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) assert session.is_impersonating is False @@ -139,7 +151,9 @@ def test_is_impersonating_property_returns_true_when_impersonating( """Test is_impersonating returns True when impersonation is active.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) assert session.is_impersonating is True @@ -150,7 +164,9 @@ def test_impersonation_preserves_original_admin_permissions( """Test that impersonation doesn't affect the authenticated_user object.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) # Original admin should still have admin role @@ -164,7 +180,9 @@ def test_multiple_impersonation_changes( """Test that impersonation can be changed multiple times.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) # First impersonation session.set_impersonation(target_user) @@ -208,7 +226,9 @@ def test_impersonation_uses_target_user_permissions( """Test that effective permissions come from impersonated user.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) # Admin has manage_users permission assert session.effective_user.has_permission("manage_users") is True @@ -227,7 +247,9 @@ def test_impersonation_constrains_to_target_permissions( """Test that impersonation constrains to target user's permissions, not elevates.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(sales_user) # Verify the effective user has ONLY the target user's permissions @@ -292,7 +314,9 @@ def test_can_impersonate_returns_true_for_admin(self, admin_user: User): """Test that admin users are allowed to impersonate.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) assert session.can_impersonate() is True @@ -300,7 +324,9 @@ def test_can_impersonate_returns_false_for_power_user(self, power_user: User): """Test that power users are NOT allowed to impersonate.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=power_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=power_user + ) assert session.can_impersonate() is False @@ -308,7 +334,9 @@ def test_can_impersonate_returns_false_for_normal_user(self, normal_user: User): """Test that normal users are NOT allowed to impersonate.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=normal_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=normal_user + ) assert session.can_impersonate() is False @@ -318,7 +346,9 @@ def test_try_set_impersonation_succeeds_for_admin( """Test try_set_impersonation succeeds for admin users.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) result = session.try_set_impersonation(target_user) assert result.success is True @@ -331,7 +361,9 @@ def test_try_set_impersonation_fails_for_power_user( """Test try_set_impersonation fails for power users.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=power_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=power_user + ) result = session.try_set_impersonation(target_user) assert result.success is False @@ -344,7 +376,9 @@ def test_try_set_impersonation_fails_for_normal_user( """Test try_set_impersonation fails for normal users.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=normal_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=normal_user + ) result = session.try_set_impersonation(target_user) assert result.success is False @@ -379,7 +413,9 @@ def test_to_dict_without_impersonation(self, admin_user: User): """Test to_dict when not impersonating.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) result = session.to_dict() assert result["session_id"] == "session-123" @@ -391,7 +427,9 @@ def test_to_dict_with_impersonation(self, admin_user: User, target_user: User): """Test to_dict when impersonating.""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) result = session.to_dict() @@ -406,7 +444,9 @@ def test_to_dict_includes_effective_user_when_not_impersonating( """Test to_dict includes effective_user when not impersonating (HIGH 1 fix).""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) result = session.to_dict() # effective_user should be included and match authenticated_user @@ -419,7 +459,9 @@ def test_to_dict_includes_effective_user_when_impersonating( """Test to_dict includes effective_user when impersonating (HIGH 1 fix).""" from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) result = session.to_dict() @@ -468,7 +510,9 @@ def test_concurrent_set_impersonation_is_thread_safe( import threading from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) errors = [] def set_impersonation_target(): @@ -505,7 +549,9 @@ def test_concurrent_read_write_is_thread_safe( import threading from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) errors = [] effective_users = [] @@ -546,7 +592,9 @@ def test_concurrent_clear_impersonation_is_thread_safe( import threading from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="session-123", authenticated_user=admin_user) + session = MCPSessionState( + session_id="session-123", authenticated_user=admin_user + ) session.set_impersonation(target_user) errors = [] diff --git a/tests/unit/server/auto_update/test_deployment_executor_drain.py b/tests/unit/server/auto_update/test_deployment_executor_drain.py index 7de4114d..13b45736 100644 --- a/tests/unit/server/auto_update/test_deployment_executor_drain.py +++ b/tests/unit/server/auto_update/test_deployment_executor_drain.py @@ -162,9 +162,11 @@ def test_restart_server_uses_maintenance_flow(self): with tempfile.TemporaryDirectory() as tmpdir: executor = DeploymentExecutor(repo_path=Path(tmpdir)) - with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \ - patch.object(executor, "_wait_for_drain") as mock_drain, \ - patch("subprocess.run") as mock_run: + with ( + patch.object(executor, "_enter_maintenance_mode") as mock_enter, + patch.object(executor, "_wait_for_drain") as mock_drain, + patch("subprocess.run") as mock_run, + ): mock_enter.return_value = True mock_drain.return_value = True mock_run.return_value = MagicMock(returncode=0) @@ -184,9 +186,11 @@ def test_restart_server_proceeds_on_drain_timeout(self): with tempfile.TemporaryDirectory() as tmpdir: executor = DeploymentExecutor(repo_path=Path(tmpdir)) - with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \ - patch.object(executor, "_wait_for_drain") as mock_drain, \ - patch("subprocess.run") as mock_run: + with ( + patch.object(executor, "_enter_maintenance_mode") as mock_enter, + patch.object(executor, "_wait_for_drain") as mock_drain, + patch("subprocess.run") as mock_run, + ): mock_enter.return_value = True mock_drain.return_value = False # Drain timeout exceeded mock_run.return_value = MagicMock(returncode=0) @@ -221,11 +225,17 @@ def test_force_restart_logs_running_jobs(self): }, ] - with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \ - patch.object(executor, "_wait_for_drain") as mock_drain, \ - patch.object(executor, "_get_running_jobs_for_logging") as mock_get_jobs, \ - patch("subprocess.run") as mock_run, \ - patch("code_indexer.server.auto_update.deployment_executor.logger") as mock_logger: + with ( + patch.object(executor, "_enter_maintenance_mode") as mock_enter, + patch.object(executor, "_wait_for_drain") as mock_drain, + patch.object( + executor, "_get_running_jobs_for_logging" + ) as mock_get_jobs, + patch("subprocess.run") as mock_run, + patch( + "code_indexer.server.auto_update.deployment_executor.logger" + ) as mock_logger, + ): mock_enter.return_value = True mock_drain.return_value = False # Drain timeout exceeded mock_get_jobs.return_value = mock_jobs @@ -235,8 +245,13 @@ def test_force_restart_logs_running_jobs(self): assert result is True mock_logger.warning.assert_called() - warning_calls = [str(call) for call in mock_logger.warning.call_args_list] - assert any("job-123" in str(call) or "running" in str(call).lower() for call in warning_calls) + warning_calls = [ + str(call) for call in mock_logger.warning.call_args_list + ] + assert any( + "job-123" in str(call) or "running" in str(call).lower() + for call in warning_calls + ) def test_get_running_jobs_for_logging_fetches_from_drain_status(self): """_get_running_jobs_for_logging should fetch jobs from drain-status endpoint.""" @@ -304,10 +319,14 @@ def test_drain_success_logs_info_message(self): with tempfile.TemporaryDirectory() as tmpdir: executor = DeploymentExecutor(repo_path=Path(tmpdir)) - with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \ - patch.object(executor, "_wait_for_drain") as mock_drain, \ - patch("subprocess.run") as mock_run, \ - patch("code_indexer.server.auto_update.deployment_executor.logger") as mock_logger: + with ( + patch.object(executor, "_enter_maintenance_mode") as mock_enter, + patch.object(executor, "_wait_for_drain") as mock_drain, + patch("subprocess.run") as mock_run, + patch( + "code_indexer.server.auto_update.deployment_executor.logger" + ) as mock_logger, + ): mock_enter.return_value = True mock_drain.return_value = True # Drain succeeds mock_run.return_value = MagicMock(returncode=0) diff --git a/tests/unit/server/cache/test_payload_cache_explicit_key.py b/tests/unit/server/cache/test_payload_cache_explicit_key.py index 6084527e..30c41108 100644 --- a/tests/unit/server/cache/test_payload_cache_explicit_key.py +++ b/tests/unit/server/cache/test_payload_cache_explicit_key.py @@ -103,7 +103,9 @@ async def test_store_with_key_preserves_total_size(self, cache, temp_db_path): assert row[0] == 12345 @pytest.mark.asyncio - async def test_store_with_key_updates_timestamp_on_replace(self, cache, temp_db_path): + async def test_store_with_key_updates_timestamp_on_replace( + self, cache, temp_db_path + ): """ store_with_key() updates created_at timestamp when updating existing key. @@ -277,7 +279,9 @@ async def test_has_key_returns_false_after_cleanup(self, cache): # Create cache with very short TTL with tempfile.TemporaryDirectory() as tmpdir: temp_path = Path(tmpdir) / "short_ttl_cache.db" - short_ttl_config = PayloadCacheConfig(cache_ttl_seconds=0) # Immediate expiry + short_ttl_config = PayloadCacheConfig( + cache_ttl_seconds=0 + ) # Immediate expiry short_ttl_cache = PayloadCache(db_path=temp_path, config=short_ttl_config) await short_ttl_cache.initialize() diff --git a/tests/unit/server/clients/test_claude_server_client.py b/tests/unit/server/clients/test_claude_server_client.py index 0b34e51e..de8dd30c 100644 --- a/tests/unit/server/clients/test_claude_server_client.py +++ b/tests/unit/server/clients/test_claude_server_client.py @@ -581,9 +581,7 @@ async def test_connection_error_does_not_expose_password( assert sensitive_password not in str(exc_info.value) @pytest.mark.asyncio - async def test_timeout_error_does_not_expose_password( - self, httpx_mock: HTTPXMock - ): + async def test_timeout_error_does_not_expose_password(self, httpx_mock: HTTPXMock): """ Timeout error exception should NOT contain password. @@ -814,7 +812,12 @@ async def test_get_job_status_returns_in_progress(self, httpx_mock: HTTPXMock): "job_id": "job-12345", "status": "in_progress", "repositories": [ - {"alias": "repo1", "registered": True, "cloned": True, "indexed": True} + { + "alias": "repo1", + "registered": True, + "cloned": True, + "indexed": True, + } ], "exchange_count": 5, "tool_use_count": 12, @@ -879,7 +882,9 @@ async def test_get_job_status_returns_completed(self, httpx_mock: HTTPXMock): assert "JWT tokens" in result["result"] @pytest.mark.asyncio - async def test_get_job_status_raises_not_found_error_on_404(self, httpx_mock: HTTPXMock): + async def test_get_job_status_raises_not_found_error_on_404( + self, httpx_mock: HTTPXMock + ): """ get_job_status() should raise ClaudeServerNotFoundError for non-existent job. @@ -971,7 +976,9 @@ async def test_get_job_conversation_returns_result(self, httpx_mock: HTTPXMock): assert "JWT tokens" in result["result"] @pytest.mark.asyncio - async def test_get_job_conversation_raises_not_found_error_on_404(self, httpx_mock: HTTPXMock): + async def test_get_job_conversation_raises_not_found_error_on_404( + self, httpx_mock: HTTPXMock + ): """ get_job_conversation() should raise ClaudeServerNotFoundError for non-existent job. @@ -1065,8 +1072,11 @@ async def test_register_callback_success(self, httpx_mock: HTTPXMock): callback_request = requests[-1] assert callback_request.url.path == "/jobs/job-12345/callbacks" import json + body = json.loads(callback_request.content) - assert body["url"] == "https://cidx.example.com/api/delegation/callback/job-12345" + assert ( + body["url"] == "https://cidx.example.com/api/delegation/callback/job-12345" + ) @pytest.mark.asyncio async def test_register_callback_raises_on_error(self, httpx_mock: HTTPXMock): diff --git a/tests/unit/server/clients/test_claude_server_client_pooling.py b/tests/unit/server/clients/test_claude_server_client_pooling.py index 26d68f70..e1f9878f 100644 --- a/tests/unit/server/clients/test_claude_server_client_pooling.py +++ b/tests/unit/server/clients/test_claude_server_client_pooling.py @@ -45,9 +45,9 @@ def test_client_creates_shared_http_client_in_init(self): ) assert hasattr(client, "_client"), "Missing _client attribute" - assert isinstance(client._client, httpx.AsyncClient), ( - "_client should be httpx.AsyncClient instance" - ) + assert isinstance( + client._client, httpx.AsyncClient + ), "_client should be httpx.AsyncClient instance" def test_client_respects_skip_ssl_verify_setting(self): """ @@ -96,12 +96,12 @@ def test_client_has_default_connection_limits(self): # httpx.AsyncClient stores limits in the transport pool pool = client._client._transport._pool - assert pool._max_connections == 10, ( - f"Expected max_connections=10, got {pool._max_connections}" - ) - assert pool._max_keepalive_connections == 5, ( - f"Expected max_keepalive_connections=5, got {pool._max_keepalive_connections}" - ) + assert ( + pool._max_connections == 10 + ), f"Expected max_connections=10, got {pool._max_connections}" + assert ( + pool._max_keepalive_connections == 5 + ), f"Expected max_keepalive_connections=5, got {pool._max_keepalive_connections}" def test_client_has_proper_timeout_configuration(self): """ @@ -123,7 +123,9 @@ def test_client_has_proper_timeout_configuration(self): timeout = client._client.timeout assert timeout.read == 30.0, f"Expected read timeout=30.0, got {timeout.read}" - assert timeout.connect == 10.0, f"Expected connect timeout=10.0, got {timeout.connect}" + assert ( + timeout.connect == 10.0 + ), f"Expected connect timeout=10.0, got {timeout.connect}" class TestClaudeServerClientPoolingReuse: @@ -181,9 +183,9 @@ async def test_multiple_requests_use_same_client_instance( await client.check_repository_exists("repo2") # Verify same client instance was used - assert client._client is original_client, ( - "Client instance should not change between requests" - ) + assert ( + client._client is original_client + ), "Client instance should not change between requests" @pytest.mark.asyncio async def test_no_new_async_client_per_request(self, httpx_mock: HTTPXMock): diff --git a/tests/unit/server/config/test_claude_delegation_config.py b/tests/unit/server/config/test_claude_delegation_config.py index f7c1997f..ff7f1edb 100644 --- a/tests/unit/server/config/test_claude_delegation_config.py +++ b/tests/unit/server/config/test_claude_delegation_config.py @@ -225,7 +225,10 @@ def test_validate_connectivity_invalid_credentials(self, tmp_path, httpx_mock): ) assert result.success is False - assert "401" in result.error_message or "unauthorized" in result.error_message.lower() + assert ( + "401" in result.error_message + or "unauthorized" in result.error_message.lower() + ) class TestConfigServiceDelegationIntegration: @@ -254,7 +257,9 @@ def test_get_all_settings_claude_delegation_defaults(self, tmp_path): assert delegation["claude_server_credential_type"] == "password" assert delegation["is_configured"] is False - def test_get_all_settings_claude_delegation_includes_cidx_callback_url(self, tmp_path): + def test_get_all_settings_claude_delegation_includes_cidx_callback_url( + self, tmp_path + ): """Test that cidx_callback_url is included in settings output (Story #720).""" from code_indexer.server.services.config_service import ConfigService @@ -289,7 +294,6 @@ def test_returns_callback_url_from_config(self, tmp_path): manager.save_config(config) # Mock the config service to use our temp directory - import pytest with pytest.MonkeyPatch.context() as mp: mock_service = ConfigService(server_dir_path=str(tmp_path)) mp.setattr( @@ -306,7 +310,6 @@ def test_returns_none_when_not_configured(self, tmp_path): from code_indexer.server.mcp.handlers import _get_cidx_callback_base_url from code_indexer.server.services.config_service import ConfigService - import pytest with pytest.MonkeyPatch.context() as mp: mock_service = ConfigService(server_dir_path=str(tmp_path)) mp.setattr( @@ -367,7 +370,10 @@ def test_validate_connectivity_rejects_file_scheme(self, tmp_path): ) assert result.success is False - assert "scheme" in result.error_message.lower() or "url" in result.error_message.lower() + assert ( + "scheme" in result.error_message.lower() + or "url" in result.error_message.lower() + ) def test_validate_connectivity_accepts_https_scheme(self, tmp_path, httpx_mock): """Test that https:// URLs are accepted.""" @@ -409,7 +415,10 @@ def test_validate_connectivity_rejects_invalid_credential_type(self, tmp_path): ) assert result.success is False - assert "credential" in result.error_message.lower() or "type" in result.error_message.lower() + assert ( + "credential" in result.error_message.lower() + or "type" in result.error_message.lower() + ) def test_validate_connectivity_accepts_password_type(self, tmp_path, httpx_mock): """Test that 'password' credential type is accepted.""" @@ -463,7 +472,6 @@ def test_error_message_does_not_contain_credential(self, tmp_path, httpx_mock): def test_error_message_does_not_contain_password(self, tmp_path, httpx_mock): """Test that password is not leaked in any error scenario.""" - import httpx from code_indexer.server.config.delegation_config import ClaudeDelegationManager @@ -514,10 +522,13 @@ def test_load_config_warns_if_permissions_not_600(self, tmp_path, caplog): # Check that a warning was logged about permissions permission_warnings = [ - record for record in caplog.records + record + for record in caplog.records if "permission" in record.message.lower() or "600" in record.message ] - assert len(permission_warnings) > 0, "Should warn about insecure file permissions" + assert ( + len(permission_warnings) > 0 + ), "Should warn about insecure file permissions" def test_load_config_no_warning_if_permissions_600(self, tmp_path, caplog): """Test that no warning is logged if config file permissions are 0600.""" @@ -546,10 +557,13 @@ def test_load_config_no_warning_if_permissions_600(self, tmp_path, caplog): # Check that no permission warning was logged permission_warnings = [ - record for record in caplog.records + record + for record in caplog.records if "permission" in record.message.lower() and "600" in record.message ] - assert len(permission_warnings) == 0, "Should not warn about secure file permissions" + assert ( + len(permission_warnings) == 0 + ), "Should not warn about secure file permissions" class TestDefaultFunctionRepoAliasConstant: @@ -560,7 +574,10 @@ def test_default_function_repo_alias_constant_exists(self): from code_indexer.server.config import delegation_config assert hasattr(delegation_config, "DEFAULT_FUNCTION_REPO_ALIAS") - assert delegation_config.DEFAULT_FUNCTION_REPO_ALIAS == "claude-delegation-functions-global" + assert ( + delegation_config.DEFAULT_FUNCTION_REPO_ALIAS + == "claude-delegation-functions-global" + ) def test_dataclass_uses_constant_for_default(self): """Test that ClaudeDelegationConfig uses the constant for its default.""" diff --git a/tests/unit/server/mcp/test_execute_delegation_function_handler.py b/tests/unit/server/mcp/test_execute_delegation_function_handler.py index f8430750..b9161724 100644 --- a/tests/unit/server/mcp/test_execute_delegation_function_handler.py +++ b/tests/unit/server/mcp/test_execute_delegation_function_handler.py @@ -12,7 +12,6 @@ from datetime import datetime, timezone from pathlib import Path -import httpx import pytest from pytest_httpx import HTTPXMock @@ -93,7 +92,11 @@ class TestExecuteDelegationFunctionHandler: @pytest.mark.asyncio async def test_handler_returns_job_id_on_success( - self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock + self, + test_user, + temp_function_repo, + mock_delegation_config, + httpx_mock: HTTPXMock, ): """Handler returns job_id on successful execution.""" from code_indexer.server.mcp.handlers import handle_execute_delegation_function @@ -140,7 +143,11 @@ async def test_handler_returns_job_id_on_success( ) response = await handle_execute_delegation_function( - {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"}, + { + "function_name": "semantic-search", + "parameters": {"query": "bugs"}, + "prompt": "Find", + }, test_user, ) @@ -254,7 +261,11 @@ async def test_handler_returns_error_for_missing_required_parameter( ) response = await handle_execute_delegation_function( - {"function_name": "semantic-search", "parameters": {}, "prompt": "Test"}, + { + "function_name": "semantic-search", + "parameters": {}, + "prompt": "Test", + }, test_user, ) @@ -288,7 +299,11 @@ async def test_response_has_mcp_format(self, test_user): @pytest.mark.asyncio async def test_handler_returns_error_when_job_id_missing( - self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock + self, + test_user, + temp_function_repo, + mock_delegation_config, + httpx_mock: HTTPXMock, ): """ Handler returns error when create_job response has no job_id. @@ -333,7 +348,11 @@ async def test_handler_returns_error_when_job_id_missing( ) response = await handle_execute_delegation_function( - {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"}, + { + "function_name": "semantic-search", + "parameters": {"query": "bugs"}, + "prompt": "Find", + }, test_user, ) @@ -358,7 +377,11 @@ def reset_tracker_singleton(self): @pytest.mark.asyncio async def test_handler_registers_callback_url_with_claude_server( - self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock + self, + test_user, + temp_function_repo, + mock_delegation_config, + httpx_mock: HTTPXMock, ): """ Handler registers callback URL with Claude Server after creating job. @@ -418,7 +441,11 @@ async def test_handler_registers_callback_url_with_claude_server( ) response = await handle_execute_delegation_function( - {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"}, + { + "function_name": "semantic-search", + "parameters": {"query": "bugs"}, + "prompt": "Find", + }, test_user, ) @@ -435,7 +462,11 @@ async def test_handler_registers_callback_url_with_claude_server( @pytest.mark.asyncio async def test_handler_registers_job_in_tracker( - self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock + self, + test_user, + temp_function_repo, + mock_delegation_config, + httpx_mock: HTTPXMock, ): """ Handler registers job in DelegationJobTracker after starting job. @@ -496,7 +527,11 @@ async def test_handler_registers_job_in_tracker( ) response = await handle_execute_delegation_function( - {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"}, + { + "function_name": "semantic-search", + "parameters": {"query": "bugs"}, + "prompt": "Find", + }, test_user, ) diff --git a/tests/unit/server/mcp/test_list_delegation_functions_handler.py b/tests/unit/server/mcp/test_list_delegation_functions_handler.py index 8134d6f3..6ff5cf2e 100644 --- a/tests/unit/server/mcp/test_list_delegation_functions_handler.py +++ b/tests/unit/server/mcp/test_list_delegation_functions_handler.py @@ -178,6 +178,7 @@ async def test_handler_filters_by_impersonated_user_groups( "code_indexer.server.mcp.handlers._get_delegation_function_repo_path", lambda: temp_function_repo, ) + # _get_user_groups is called with the effective user # Impersonated user belongs to 'admins' group # Admin user (if NOT impersonating) would belong to 'engineering' group diff --git a/tests/unit/server/mcp/test_poll_delegation_job_handler.py b/tests/unit/server/mcp/test_poll_delegation_job_handler.py index 1a9a6ec4..6ac94fc5 100644 --- a/tests/unit/server/mcp/test_poll_delegation_job_handler.py +++ b/tests/unit/server/mcp/test_poll_delegation_job_handler.py @@ -225,7 +225,10 @@ async def test_poll_returns_waiting_when_callback_not_received( data = json.loads(response["content"][0]["text"]) assert data["status"] == "waiting" - assert "still running" in data["message"].lower() or "not yet received" in data["message"].lower() + assert ( + "still running" in data["message"].lower() + or "not yet received" in data["message"].lower() + ) # Key fix: continue_polling should be True so caller can retry assert data["continue_polling"] is True @@ -315,7 +318,10 @@ async def test_poll_returns_error_for_job_not_in_tracker( data = json.loads(response["content"][0]["text"]) assert data["success"] is False - assert "not found" in data["error"].lower() or "already completed" in data["error"].lower() + assert ( + "not found" in data["error"].lower() + or "already completed" in data["error"].lower() + ) class TestPollDelegationJobTimeoutParameter: diff --git a/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py b/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py index 37f8c307..744f158b 100644 --- a/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py +++ b/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py @@ -43,7 +43,9 @@ def test_last_activity_is_set_on_session_creation(self, admin_user): from code_indexer.server.auth.mcp_session_state import MCPSessionState before_creation = datetime.now(timezone.utc) - session = MCPSessionState(session_id="test-session", authenticated_user=admin_user) + session = MCPSessionState( + session_id="test-session", authenticated_user=admin_user + ) after_creation = datetime.now(timezone.utc) # last_activity should be set during creation @@ -59,7 +61,9 @@ def test_touch_updates_last_activity(self, admin_user): from code_indexer.server.auth.mcp_session_state import MCPSessionState import time - session = MCPSessionState(session_id="test-session", authenticated_user=admin_user) + session = MCPSessionState( + session_id="test-session", authenticated_user=admin_user + ) original_activity = session.last_activity # Wait a small amount to ensure timestamp changes @@ -76,7 +80,9 @@ def test_touch_is_thread_safe(self, admin_user): import threading from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="test-session", authenticated_user=admin_user) + session = MCPSessionState( + session_id="test-session", authenticated_user=admin_user + ) errors: List[Exception] = [] def touch_repeatedly(): @@ -103,7 +109,9 @@ def test_last_activity_property_is_thread_safe(self, admin_user): import threading from code_indexer.server.auth.mcp_session_state import MCPSessionState - session = MCPSessionState(session_id="test-session", authenticated_user=admin_user) + session = MCPSessionState( + session_id="test-session", authenticated_user=admin_user + ) errors: List[Exception] = [] activities: List[datetime] = [] @@ -160,7 +168,9 @@ def fresh_registry(self): registry.clear_all() return registry - def test_get_or_create_session_touches_existing_session(self, admin_user, fresh_registry): + def test_get_or_create_session_touches_existing_session( + self, admin_user, fresh_registry + ): """Test that get_or_create_session() calls touch() on existing session (AC2).""" import time @@ -199,7 +209,9 @@ def test_get_session_touches_existing_session(self, admin_user, fresh_registry): # last_activity should be updated assert session_again.last_activity > original_activity - def test_get_session_returns_none_for_nonexistent_without_touching(self, fresh_registry): + def test_get_session_returns_none_for_nonexistent_without_touching( + self, fresh_registry + ): """Test that get_session() returns None for non-existent session (no touch needed).""" result = fresh_registry.get_session("nonexistent-session-xyz") assert result is None @@ -267,15 +279,21 @@ def test_cleanup_keeps_active_sessions(self, admin_user, fresh_registry): assert removed_count == 0 assert fresh_registry.get_session(session_id) is not None - def test_cleanup_returns_count_of_removed_sessions(self, admin_user, fresh_registry): + def test_cleanup_returns_count_of_removed_sessions( + self, admin_user, fresh_registry + ): """Test that cleanup_stale_sessions() returns correct count (AC3, AC5).""" # Create multiple sessions for i in range(5): - session = fresh_registry.get_or_create_session(f"cleanup-count-{i}", admin_user) + session = fresh_registry.get_or_create_session( + f"cleanup-count-{i}", admin_user + ) # Make first 3 stale if i < 3: with session._lock: - session._last_activity = datetime.now(timezone.utc) - timedelta(hours=2) + session._last_activity = datetime.now(timezone.utc) - timedelta( + hours=2 + ) # Set TTL to 1 hour fresh_registry._ttl_seconds = 3600 @@ -288,7 +306,9 @@ def test_cleanup_returns_count_of_removed_sessions(self, admin_user, fresh_regis # 2 active sessions should remain assert fresh_registry.session_count() == 2 - def test_cleanup_logs_when_sessions_removed(self, admin_user, fresh_registry, caplog): + def test_cleanup_logs_when_sessions_removed( + self, admin_user, fresh_registry, caplog + ): """Test that cleanup logs the count of removed sessions (AC5).""" # Create a stale session session_id = "log-test-session" @@ -306,7 +326,9 @@ def test_cleanup_logs_when_sessions_removed(self, admin_user, fresh_registry, ca assert removed_count == 1 assert "Cleaned up 1 stale MCP sessions" in caplog.text - def test_cleanup_does_not_log_when_no_sessions_removed(self, admin_user, fresh_registry, caplog): + def test_cleanup_does_not_log_when_no_sessions_removed( + self, admin_user, fresh_registry, caplog + ): """Test that cleanup does not log when no sessions are removed (AC5).""" # Create an active session fresh_registry.get_or_create_session("active-no-log", admin_user) @@ -378,7 +400,9 @@ async def test_stop_background_cleanup_cancels_task(self, fresh_registry): assert task.cancelled() or task.done() @pytest.mark.asyncio - async def test_background_cleanup_uses_configured_ttl(self, admin_user, fresh_registry): + async def test_background_cleanup_uses_configured_ttl( + self, admin_user, fresh_registry + ): """Test that background cleanup uses configured TTL value (AC3, AC4).""" # Start cleanup with short TTL (1 second) and very short interval fresh_registry.start_background_cleanup( @@ -427,7 +451,9 @@ async def test_start_background_cleanup_is_idempotent(self, fresh_registry): await asyncio.sleep(0.01) @pytest.mark.asyncio - async def test_start_background_cleanup_logs_configuration(self, fresh_registry, caplog): + async def test_start_background_cleanup_logs_configuration( + self, fresh_registry, caplog + ): """Test that start_background_cleanup logs the configuration (AC5).""" with caplog.at_level(logging.INFO): fresh_registry.start_background_cleanup( @@ -471,6 +497,8 @@ def test_default_ttl_is_one_hour(self): def test_default_cleanup_interval_is_fifteen_minutes(self): """Test that default cleanup interval is 15 minutes (900 seconds) as per AC4.""" - from code_indexer.server.mcp.session_registry import DEFAULT_CLEANUP_INTERVAL_SECONDS + from code_indexer.server.mcp.session_registry import ( + DEFAULT_CLEANUP_INTERVAL_SECONDS, + ) assert DEFAULT_CLEANUP_INTERVAL_SECONDS == 900 # 15 minutes diff --git a/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py b/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py index 64da643c..2e0fbd3e 100644 --- a/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py +++ b/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py @@ -400,9 +400,7 @@ def test_warning_for_hybrid_mode_with_default_params( mock_search_service.return_value = mock_service # Also mock FTS search - with patch.object( - query_manager, "_execute_fts_search", return_value=[] - ): + with patch.object(query_manager, "_execute_fts_search", return_value=[]): with caplog.at_level(logging.WARNING): query_manager._search_single_repository( repo_path=mock_non_composite_repo, diff --git a/tests/unit/server/repositories/test_orphaned_job_cleanup.py b/tests/unit/server/repositories/test_orphaned_job_cleanup.py index 3b3f0730..755cc4e6 100644 --- a/tests/unit/server/repositories/test_orphaned_job_cleanup.py +++ b/tests/unit/server/repositories/test_orphaned_job_cleanup.py @@ -244,7 +244,9 @@ def test_cleanup_orphaned_jobs_with_no_orphaned_jobs_returns_zero( ) -> None: """When cleanup_orphaned_jobs() is called with no orphans, it returns 0.""" from code_indexer.server.storage.database_manager import DatabaseSchema - from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend + from code_indexer.server.storage.sqlite_backends import ( + BackgroundJobsSqliteBackend, + ) db_path = tmp_path / "clean_test.db" schema = DatabaseSchema(str(db_path)) @@ -275,8 +277,12 @@ class TestBackgroundJobManagerOrphanedJobCleanup: def test_manager_cleans_orphaned_jobs_on_sqlite_load(self, tmp_path: Path) -> None: """When BackgroundJobManager initializes with SQLite, orphaned jobs are cleaned up.""" from code_indexer.server.storage.database_manager import DatabaseSchema - from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend - from code_indexer.server.repositories.background_jobs import BackgroundJobManager + from code_indexer.server.storage.sqlite_backends import ( + BackgroundJobsSqliteBackend, + ) + from code_indexer.server.repositories.background_jobs import ( + BackgroundJobManager, + ) # Setup: Create database with orphaned jobs db_path = tmp_path / "manager_test.db" @@ -331,8 +337,12 @@ def test_manager_logs_orphaned_job_cleanup_count( """When BackgroundJobManager cleans orphaned jobs, it logs the count.""" import logging from code_indexer.server.storage.database_manager import DatabaseSchema - from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend - from code_indexer.server.repositories.background_jobs import BackgroundJobManager + from code_indexer.server.storage.sqlite_backends import ( + BackgroundJobsSqliteBackend, + ) + from code_indexer.server.repositories.background_jobs import ( + BackgroundJobManager, + ) # Setup: Create database with orphaned jobs db_path = tmp_path / "log_test.db" @@ -379,8 +389,12 @@ def test_manager_shows_zero_running_pending_after_restart( ) -> None: """After restart with orphaned jobs, running and pending counts should be zero.""" from code_indexer.server.storage.database_manager import DatabaseSchema - from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend - from code_indexer.server.repositories.background_jobs import BackgroundJobManager + from code_indexer.server.storage.sqlite_backends import ( + BackgroundJobsSqliteBackend, + ) + from code_indexer.server.repositories.background_jobs import ( + BackgroundJobManager, + ) # Setup: Create database with orphaned jobs db_path = tmp_path / "counts_test.db" @@ -425,8 +439,12 @@ def test_manager_preserves_completed_jobs_during_cleanup( ) -> None: """When manager initializes, completed jobs from before restart are preserved.""" from code_indexer.server.storage.database_manager import DatabaseSchema - from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend - from code_indexer.server.repositories.background_jobs import BackgroundJobManager + from code_indexer.server.storage.sqlite_backends import ( + BackgroundJobsSqliteBackend, + ) + from code_indexer.server.repositories.background_jobs import ( + BackgroundJobManager, + ) # Setup: Create database with mixed job states db_path = tmp_path / "preserve_test.db" diff --git a/tests/unit/server/routers/test_delegation_callbacks.py b/tests/unit/server/routers/test_delegation_callbacks.py index e097d1db..b074bdd3 100644 --- a/tests/unit/server/routers/test_delegation_callbacks.py +++ b/tests/unit/server/routers/test_delegation_callbacks.py @@ -77,9 +77,7 @@ def test_callback_receives_completed_job(self, client, reset_tracker_singleton): tracker = DelegationJobTracker.get_instance() # Register the job - asyncio.get_event_loop().run_until_complete( - tracker.register_job("job-12345") - ) + asyncio.get_event_loop().run_until_complete(tracker.register_job("job-12345")) response = client.post( "/api/delegation/callback/job-12345", @@ -115,9 +113,7 @@ def test_callback_receives_failed_job(self, client, reset_tracker_singleton): import asyncio tracker = DelegationJobTracker.get_instance() - asyncio.get_event_loop().run_until_complete( - tracker.register_job("job-99999") - ) + asyncio.get_event_loop().run_until_complete(tracker.register_job("job-99999")) response = client.post( "/api/delegation/callback/job-99999", @@ -176,9 +172,7 @@ def test_callback_uses_job_id_from_path(self, client, reset_tracker_singleton): import asyncio tracker = DelegationJobTracker.get_instance() - asyncio.get_event_loop().run_until_complete( - tracker.register_job("path-job-id") - ) + asyncio.get_event_loop().run_until_complete(tracker.register_job("path-job-id")) response = client.post( "/api/delegation/callback/path-job-id", @@ -254,9 +248,7 @@ def test_callback_handles_missing_optional_fields( import asyncio tracker = DelegationJobTracker.get_instance() - asyncio.get_event_loop().run_until_complete( - tracker.register_job("minimal-job") - ) + asyncio.get_event_loop().run_until_complete(tracker.register_job("minimal-job")) # Minimal payload - only JobId, Status, Output response = client.post( diff --git a/tests/unit/server/services/test_delegation_job_tracker.py b/tests/unit/server/services/test_delegation_job_tracker.py index 572bf0bd..4669b3d2 100644 --- a/tests/unit/server/services/test_delegation_job_tracker.py +++ b/tests/unit/server/services/test_delegation_job_tracker.py @@ -459,7 +459,11 @@ async def test_multiple_concurrent_jobs(self): await tracker.register_job("job-B") result_a = JobResult( - job_id="job-A", status="completed", output="Result A", exit_code=0, error=None + job_id="job-A", + status="completed", + output="Result A", + exit_code=0, + error=None, ) result_b = JobResult( job_id="job-B", status="failed", output="", exit_code=1, error="Error B" @@ -718,7 +722,9 @@ async def test_wait_for_job_returns_cached_result_immediately(self, payload_cach assert "Cached result" in result.output @pytest.mark.asyncio - async def test_wait_for_job_falls_back_to_future_when_not_cached(self, payload_cache): + async def test_wait_for_job_falls_back_to_future_when_not_cached( + self, payload_cache + ): """ wait_for_job() waits on Future when result not in cache. diff --git a/tests/unit/server/services/test_health_service_cpu_thresholds.py b/tests/unit/server/services/test_health_service_cpu_thresholds.py index b3bb46f7..ab3454dd 100644 --- a/tests/unit/server/services/test_health_service_cpu_thresholds.py +++ b/tests/unit/server/services/test_health_service_cpu_thresholds.py @@ -50,15 +50,14 @@ def test_single_cpu_spike_does_not_trigger_warning(self, mock_db_health): """A single CPU spike >95% should NOT trigger warning.""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 98.0 # High CPU spike mock_disk.return_value = MagicMock( @@ -81,17 +80,15 @@ def test_cpu_sustained_30_seconds_returns_degraded(self, mock_db_health): """CPU >95% sustained for 30 seconds should return DEGRADED.""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io, patch( - "time.time" - ) as mock_time: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + patch("time.time") as mock_time, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 98.0 # High CPU mock_disk.return_value = MagicMock( @@ -120,17 +117,15 @@ def test_cpu_sustained_60_seconds_returns_unhealthy(self, mock_db_health): """CPU >95% sustained for 60 seconds should return UNHEALTHY.""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io, patch( - "time.time" - ) as mock_time: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + patch("time.time") as mock_time, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 98.0 # High CPU mock_disk.return_value = MagicMock( @@ -159,15 +154,14 @@ def test_cpu_history_has_attribute(self, mock_db_health): """HealthCheckService should have _cpu_history attribute.""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( diff --git a/tests/unit/server/services/test_health_service_database_aggregation.py b/tests/unit/server/services/test_health_service_database_aggregation.py index 367b2577..3fe21c01 100644 --- a/tests/unit/server/services/test_health_service_database_aggregation.py +++ b/tests/unit/server/services/test_health_service_database_aggregation.py @@ -33,15 +33,14 @@ def healthy_db_results(): @pytest.fixture def mock_healthy_system(): """Fixture providing healthy system metrics mocks.""" - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( diff --git a/tests/unit/server/services/test_health_service_failure_reasons.py b/tests/unit/server/services/test_health_service_failure_reasons.py index 3814895f..4c2c249a 100644 --- a/tests/unit/server/services/test_health_service_failure_reasons.py +++ b/tests/unit/server/services/test_health_service_failure_reasons.py @@ -47,7 +47,6 @@ def test_failure_reasons_field_exists_in_response(self): def test_failure_reasons_defaults_to_empty_list(self): """failure_reasons should default to empty list.""" from code_indexer.server.models.api_models import HealthCheckResponse - from pydantic import Field field_info = HealthCheckResponse.model_fields.get("failure_reasons") assert field_info is not None @@ -76,15 +75,14 @@ def test_healthy_status_has_empty_failure_reasons(self): ] mock_db_cls.return_value = mock_db - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( @@ -127,15 +125,14 @@ def test_failure_reasons_lists_failing_indicators(self): ] mock_db_cls.return_value = mock_db - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( @@ -185,15 +182,14 @@ def test_failure_reasons_limited_to_3_with_more_indicator(self): mock_db.get_all_database_health.return_value = db_results mock_db_cls.return_value = mock_db - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( @@ -239,15 +235,14 @@ def test_storage_service_error_appears_in_failure_reasons(self): ] mock_db_cls.return_value = mock_db - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): # Set low memory to trigger healthy system metrics mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 @@ -296,17 +291,15 @@ def test_database_service_error_appears_in_failure_reasons(self): ] mock_db_cls.return_value = mock_db - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io, patch( - "sqlite3.connect" - ) as mock_sqlite: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + patch("sqlite3.connect") as mock_sqlite, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( @@ -354,15 +347,14 @@ def test_degraded_service_error_appears_in_failure_reasons(self): ] mock_db_cls.return_value = mock_db - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 # Set disk space in warning range (80-90% used = DEGRADED) diff --git a/tests/unit/server/services/test_health_service_ram_thresholds.py b/tests/unit/server/services/test_health_service_ram_thresholds.py index ad055907..e29bf5eb 100644 --- a/tests/unit/server/services/test_health_service_ram_thresholds.py +++ b/tests/unit/server/services/test_health_service_ram_thresholds.py @@ -57,15 +57,14 @@ def test_ram_at_80_percent_returns_degraded(self, mock_db_health): """RAM >= 80% usage should return DEGRADED (yellow).""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=80.0) # Warning threshold mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( @@ -91,15 +90,14 @@ def test_ram_at_90_percent_returns_unhealthy(self, mock_db_health): """RAM >= 90% usage should return UNHEALTHY (red).""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=90.0) # Critical threshold mock_cpu.return_value = 30.0 mock_disk.return_value = MagicMock( diff --git a/tests/unit/server/services/test_health_service_volume_aggregation.py b/tests/unit/server/services/test_health_service_volume_aggregation.py index a238d535..f7ad5950 100644 --- a/tests/unit/server/services/test_health_service_volume_aggregation.py +++ b/tests/unit/server/services/test_health_service_volume_aggregation.py @@ -49,15 +49,14 @@ def test_volume_with_warning_percent_returns_degraded(self, mock_db_health): """When ANY volume has 80-90% used, Server Status should be DEGRADED.""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 @@ -96,15 +95,14 @@ def test_volume_with_less_than_1gb_free_returns_unhealthy(self, mock_db_health): """When ANY volume has <1GB free, Server Status should be UNHEALTHY.""" from code_indexer.server.services.health_service import HealthCheckService - with patch("psutil.virtual_memory") as mock_mem, patch( - "psutil.cpu_percent" - ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch( - "psutil.disk_partitions" - ) as mock_parts, patch( - "psutil.disk_io_counters" - ) as mock_disk_io, patch( - "psutil.net_io_counters" - ) as mock_net_io: + with ( + patch("psutil.virtual_memory") as mock_mem, + patch("psutil.cpu_percent") as mock_cpu, + patch("psutil.disk_usage") as mock_disk, + patch("psutil.disk_partitions") as mock_parts, + patch("psutil.disk_io_counters") as mock_disk_io, + patch("psutil.net_io_counters") as mock_net_io, + ): mock_mem.return_value = MagicMock(percent=50.0) mock_cpu.return_value = 30.0 diff --git a/tests/unit/server/services/test_job_phase_detector.py b/tests/unit/server/services/test_job_phase_detector.py index b180e3d6..377578e9 100644 --- a/tests/unit/server/services/test_job_phase_detector.py +++ b/tests/unit/server/services/test_job_phase_detector.py @@ -89,7 +89,12 @@ def test_detect_phase_cidx_indexing_when_cloned_not_indexed(self): "status": "in_progress", "repositories": [ {"alias": "repo1", "registered": True, "cloned": True, "indexed": True}, - {"alias": "repo2", "registered": True, "cloned": True, "indexed": False}, + { + "alias": "repo2", + "registered": True, + "cloned": True, + "indexed": False, + }, ], } @@ -255,7 +260,12 @@ def test_get_progress_cidx_indexing_shows_counts(self): "status": "in_progress", "repositories": [ {"alias": "repo1", "registered": True, "cloned": True, "indexed": True}, - {"alias": "repo2", "registered": True, "cloned": True, "indexed": False}, + { + "alias": "repo2", + "registered": True, + "cloned": True, + "indexed": False, + }, ], } @@ -305,7 +315,10 @@ def test_get_progress_done_completed_includes_result(self): progress = detector.get_progress(job_state, JobPhase.DONE) - assert progress.progress.get("result") == "The authentication system uses JWT tokens..." + assert ( + progress.progress.get("result") + == "The authentication system uses JWT tokens..." + ) assert progress.is_terminal is True def test_get_progress_done_failed_includes_error(self): diff --git a/tests/unit/server/services/test_maintenance_service.py b/tests/unit/server/services/test_maintenance_service.py index a3fcec8b..92cc7aa9 100644 --- a/tests/unit/server/services/test_maintenance_service.py +++ b/tests/unit/server/services/test_maintenance_service.py @@ -5,7 +5,6 @@ """ import pytest -import threading from unittest.mock import MagicMock @@ -276,7 +275,9 @@ def test_refresh_scheduler_job_submission_rejected_during_maintenance(self): get_maintenance_state, ) from code_indexer.global_repos.refresh_scheduler import RefreshScheduler - from code_indexer.server.repositories.background_jobs import BackgroundJobManager + from code_indexer.server.repositories.background_jobs import ( + BackgroundJobManager, + ) from code_indexer.server.jobs.exceptions import MaintenanceModeError from unittest.mock import MagicMock, patch import tempfile diff --git a/tests/unit/server/services/test_prompt_template_processor.py b/tests/unit/server/services/test_prompt_template_processor.py index c5147abf..c949f278 100644 --- a/tests/unit/server/services/test_prompt_template_processor.py +++ b/tests/unit/server/services/test_prompt_template_processor.py @@ -6,8 +6,6 @@ Tests follow TDD methodology - tests written FIRST before implementation. """ -import pytest - class TestPromptTemplateProcessorRender: """Tests for PromptTemplateProcessor.render() method.""" @@ -374,9 +372,9 @@ def test_render_impersonation_instruction_precedes_template_content(self): template_pos = result.find("TEMPLATE_MARKER_UNIQUE_12345") assert impersonation_pos == 0, "Impersonation instruction must be at position 0" - assert template_pos > impersonation_pos, ( - "Template content must come after impersonation instruction" - ) + assert ( + template_pos > impersonation_pos + ), "Template content must come after impersonation instruction" class TestPromptTemplateProcessorSpaceVariantPlaceholders: