@@ -55,6 +55,7 @@ Database Health
Healthy
Warning
Error
+ Not Initialized
{% else %}
diff --git a/tests/server/multi/test_multi_result_aggregator.py b/tests/server/multi/test_multi_result_aggregator.py
index e47d8696..67c77e7b 100644
--- a/tests/server/multi/test_multi_result_aggregator.py
+++ b/tests/server/multi/test_multi_result_aggregator.py
@@ -56,12 +56,10 @@ def test_enforces_per_repo_limit(self):
"""Per-repo limit is enforced independently."""
repo_results = {
"repo1": [
- {"file": f"file{i}.py", "score": 0.9 - i * 0.01}
- for i in range(20)
+ {"file": f"file{i}.py", "score": 0.9 - i * 0.01} for i in range(20)
],
"repo2": [
- {"file": f"file{i}.py", "score": 0.85 - i * 0.01}
- for i in range(15)
+ {"file": f"file{i}.py", "score": 0.85 - i * 0.01} for i in range(15)
],
}
@@ -119,8 +117,7 @@ def test_limit_smaller_than_results(self):
"""Limit smaller than number of results per repo."""
repo_results = {
"repo1": [
- {"file": f"file{i}.py", "score": 0.9 - i * 0.1}
- for i in range(10)
+ {"file": f"file{i}.py", "score": 0.9 - i * 0.1} for i in range(10)
],
}
@@ -217,13 +214,9 @@ def test_min_score_applied_before_limit(self):
"""Score filtering is applied before per-repo limit enforcement."""
repo_results = {
"repo1": [
- {"file": f"high{i}.py", "score": 0.9 - i * 0.01}
- for i in range(5)
+ {"file": f"high{i}.py", "score": 0.9 - i * 0.01} for i in range(5)
]
- + [
- {"file": f"low{i}.py", "score": 0.5 - i * 0.01}
- for i in range(10)
- ],
+ + [{"file": f"low{i}.py", "score": 0.5 - i * 0.01} for i in range(10)],
}
# Limit is 3, min_score is 0.7
@@ -287,8 +280,7 @@ def test_min_score_with_limit(self):
"""Score filtering combined with per-repo limit works correctly."""
repo_results = {
"repo1": [
- {"file": f"file{i}.py", "score": 0.95 - i * 0.05}
- for i in range(10)
+ {"file": f"file{i}.py", "score": 0.95 - i * 0.05} for i in range(10)
],
}
diff --git a/tests/server/multi/test_multi_search_service.py b/tests/server/multi/test_multi_search_service.py
index b2953df7..be5a07f3 100644
--- a/tests/server/multi/test_multi_search_service.py
+++ b/tests/server/multi/test_multi_search_service.py
@@ -41,9 +41,7 @@ async def test_semantic_search_uses_thread_pool(self):
)
# Service should use thread pool for execution
- with patch.object(
- service.thread_executor, "submit"
- ) as mock_submit:
+ with patch.object(service.thread_executor, "submit") as mock_submit:
mock_future = Mock()
mock_future.result.return_value = {"results": [], "error": None}
mock_submit.return_value = mock_future
@@ -327,7 +325,9 @@ async def test_timeout_error_includes_recommendations(self):
# - Add --path-filter
error_text = str(response.errors)
# At minimum, should mention timeout
- assert "timeout" in error_text.lower() or "timed out" in error_text.lower()
+ assert (
+ "timeout" in error_text.lower() or "timed out" in error_text.lower()
+ )
except (AttributeError, NotImplementedError):
pytest.fail("MultiSearchService actionable errors not implemented")
@@ -336,7 +336,9 @@ async def test_timeout_error_lists_affected_repos(self):
"""Timeout error lists which repositories timed out vs completed."""
from code_indexer.server.multi.multi_search_service import MultiSearchService
- config = MultiSearchConfig(max_workers=5, query_timeout_seconds=1) # Very short timeout
+ config = MultiSearchConfig(
+ max_workers=5, query_timeout_seconds=1
+ ) # Very short timeout
service = MultiSearchService(config)
request = MultiSearchRequest(
diff --git a/tests/server/routes/test_multi_query_routes.py b/tests/server/routes/test_multi_query_routes.py
index f6373bd8..8aae52f2 100644
--- a/tests/server/routes/test_multi_query_routes.py
+++ b/tests/server/routes/test_multi_query_routes.py
@@ -222,9 +222,7 @@ async def mock_search(request):
pytest.skip("Route not implemented yet")
- def test_all_repos_fail_returns_errors(
- self, mock_auth, mock_multi_search_service
- ):
+ def test_all_repos_fail_returns_errors(self, mock_auth, mock_multi_search_service):
"""When all repos fail, returns empty results with errors."""
pytest.skip("Route not implemented yet")
@@ -276,10 +274,9 @@ def test_repository_not_found_returns_error(
"""Non-existent repository returns error in errors field."""
pytest.skip("Route not implemented yet")
- def test_service_exception_returns_500(
- self, mock_auth, mock_multi_search_service
- ):
+ def test_service_exception_returns_500(self, mock_auth, mock_multi_search_service):
"""Unexpected service exception returns 500 Internal Server Error."""
+
async def mock_search(request):
raise RuntimeError("Unexpected error")
diff --git a/tests/server/services/test_database_health_service.py b/tests/server/services/test_database_health_service.py
index 1a9fb5d3..edb48a83 100644
--- a/tests/server/services/test_database_health_service.py
+++ b/tests/server/services/test_database_health_service.py
@@ -37,7 +37,7 @@ def temp_server_dir(self) -> Generator[Path, None, None]:
"logs.db": "CREATE TABLE IF NOT EXISTS logs (id INTEGER PRIMARY KEY)",
"search_config.db": "CREATE TABLE IF NOT EXISTS config (id INTEGER PRIMARY KEY)",
"file_content_limits.db": "CREATE TABLE IF NOT EXISTS limits (id INTEGER PRIMARY KEY)",
- "scip_audit.db": "CREATE TABLE IF NOT EXISTS audit (id INTEGER PRIMARY KEY)",
+ "groups.db": "CREATE TABLE IF NOT EXISTS groups (id INTEGER PRIMARY KEY)",
"payload_cache.db": "CREATE TABLE IF NOT EXISTS cache (id INTEGER PRIMARY KEY)",
}
@@ -80,7 +80,7 @@ def test_health_service_checks_all_8_databases(self, temp_server_dir: Path):
"logs.db",
"search_config.db",
"file_content_limits.db",
- "scip_audit.db",
+ "groups.db",
"payload_cache.db",
}
actual_files = {result.file_name for result in health_results}
@@ -110,7 +110,7 @@ def test_health_service_provides_display_names(self, temp_server_dir: Path):
"logs.db": "Logs",
"search_config.db": "Search Config",
"file_content_limits.db": "File Limits",
- "scip_audit.db": "SCIP Audit",
+ "groups.db": "Groups",
"payload_cache.db": "Payload Cache",
}
@@ -352,7 +352,7 @@ def healthy_db_path(self) -> Generator[Path, None, None]:
yield db_path
def test_healthy_database_tooltip_shows_only_name(self, healthy_db_path: Path):
- """AC2: Healthy database tooltip shows only database name."""
+ """AC2: Healthy database tooltip shows database name and path."""
from code_indexer.server.services.database_health_service import (
DatabaseHealthService,
DatabaseHealthStatus,
@@ -364,10 +364,15 @@ def test_healthy_database_tooltip_shows_only_name(self, healthy_db_path: Path):
assert result.status == DatabaseHealthStatus.HEALTHY
tooltip = result.get_tooltip()
- assert tooltip == "Main Server"
+ # Tooltip should contain display name and path (no error info for healthy DB)
+ assert "Main Server" in tooltip
+ assert str(healthy_db_path) in tooltip
+ # Should not contain error information for healthy database
+ assert "Connect:" not in tooltip
+ assert "failed" not in tooltip
def test_unhealthy_database_tooltip_shows_failure(self, healthy_db_path: Path):
- """AC3: Unhealthy database tooltip shows name AND failed condition."""
+ """AC3: Unhealthy database tooltip shows name, path, AND failed condition."""
from code_indexer.server.services.database_health_service import (
DatabaseHealthService,
DatabaseHealthStatus,
@@ -380,8 +385,11 @@ def test_unhealthy_database_tooltip_shows_failure(self, healthy_db_path: Path):
assert result.status == DatabaseHealthStatus.ERROR
tooltip = result.get_tooltip()
+ # Tooltip should contain display name, path, and error info
assert "OAuth" in tooltip
- assert " - " in tooltip
+ assert str(healthy_db_path) in tooltip
+ # Should contain error information (check name + error message)
+ assert "Connect:" in tooltip or "failed" in tooltip
# =============================================================================
@@ -455,3 +463,130 @@ def test_get_stats_partial_passes_user_role_to_repo_counts(self):
assert (
"_get_repo_counts" in source and "user_role" in source
), "get_stats_partial must pass user_role to _get_repo_counts"
+
+
+# =============================================================================
+# Lazy-Loaded Database Tests
+# =============================================================================
+
+
+class TestLazyLoadedDatabases:
+ """Tests for graceful handling of lazy-loaded databases."""
+
+ def test_lazy_loaded_database_not_initialized_status(self):
+ """
+ Lazy-loaded database that doesn't exist yet gets NOT_INITIALIZED status.
+
+ Given a lazy-loaded database file (search_config.db or file_content_limits.db)
+ When the database file doesn't exist yet
+ Then the health check returns NOT_INITIALIZED status instead of ERROR
+ """
+ from code_indexer.server.services.database_health_service import (
+ DatabaseHealthService,
+ DatabaseHealthStatus,
+ )
+
+ with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp:
+ # Create non-existent path for lazy-loaded database
+ db_path = Path(tmp) / "search_config.db"
+
+ result = DatabaseHealthService.check_database_health(
+ str(db_path), display_name="Search Config"
+ )
+
+ assert result.status == DatabaseHealthStatus.NOT_INITIALIZED
+ assert result.checks["connect"].passed is False
+ assert (
+ result.checks["connect"].error_message == "Not initialized (optional)"
+ )
+
+ def test_lazy_loaded_database_initialized_is_healthy(self):
+ """
+ Lazy-loaded database that exists and is healthy gets HEALTHY status.
+
+ Given a lazy-loaded database file (search_config.db)
+ When the database file exists and all checks pass
+ Then the health check returns HEALTHY status
+ """
+ from code_indexer.server.services.database_health_service import (
+ DatabaseHealthService,
+ DatabaseHealthStatus,
+ )
+
+ with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp:
+ # Create lazy-loaded database
+ db_path = Path(tmp) / "search_config.db"
+ with sqlite3.connect(str(db_path)) as conn:
+ conn.execute("CREATE TABLE config (id INTEGER PRIMARY KEY)")
+ conn.commit()
+
+ result = DatabaseHealthService.check_database_health(
+ str(db_path), display_name="Search Config"
+ )
+
+ assert result.status == DatabaseHealthStatus.HEALTHY
+ assert result.checks["connect"].passed is True
+
+ def test_non_lazy_database_missing_is_error(self):
+ """
+ Non-lazy-loaded database that doesn't exist gets ERROR status.
+
+ Given a non-lazy-loaded database (e.g., oauth.db)
+ When the database file doesn't exist
+ Then the health check returns ERROR status (not NOT_INITIALIZED)
+ """
+ from code_indexer.server.services.database_health_service import (
+ DatabaseHealthService,
+ DatabaseHealthStatus,
+ )
+
+ with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp:
+ # Create non-existent path for non-lazy database
+ db_path = Path(tmp) / "oauth.db"
+
+ result = DatabaseHealthService.check_database_health(
+ str(db_path), display_name="OAuth"
+ )
+
+ assert result.status == DatabaseHealthStatus.ERROR
+ assert result.checks["connect"].passed is False
+ assert "file not found" in result.checks["connect"].error_message
+
+ def test_lazy_loaded_database_tooltip(self):
+ """
+ Lazy-loaded database tooltip shows 'Not initialized (optional)'.
+
+ Given a lazy-loaded database that doesn't exist yet
+ When get_tooltip() is called
+ Then it shows the display name, path, and 'Not initialized (optional)'
+ """
+ from code_indexer.server.services.database_health_service import (
+ DatabaseHealthService,
+ )
+
+ with tempfile.TemporaryDirectory(prefix="cidx_lazy_test_") as tmp:
+ db_path = Path(tmp) / "file_content_limits.db"
+
+ result = DatabaseHealthService.check_database_health(
+ str(db_path), display_name="File Limits"
+ )
+
+ tooltip = result.get_tooltip()
+ assert "File Limits" in tooltip
+ assert str(db_path) in tooltip
+ assert "Not initialized (optional)" in tooltip
+
+ def test_both_lazy_databases_defined(self):
+ """
+ Verify both lazy-loaded databases are defined in LAZY_LOADED_DATABASES.
+
+ This test documents which databases are lazy-loaded and ensures
+ they're properly configured in the constant.
+ """
+ from code_indexer.server.services.database_health_service import (
+ LAZY_LOADED_DATABASES,
+ )
+
+ assert "search_config.db" in LAZY_LOADED_DATABASES
+ assert "file_content_limits.db" in LAZY_LOADED_DATABASES
+ assert len(LAZY_LOADED_DATABASES) == 2
diff --git a/tests/server/services/test_key_discovery_service.py b/tests/server/services/test_key_discovery_service.py
index 0265b537..3548f365 100644
--- a/tests/server/services/test_key_discovery_service.py
+++ b/tests/server/services/test_key_discovery_service.py
@@ -144,7 +144,9 @@ def test_discover_keys_computes_fingerprint(self, tmp_path):
# Mock _extract_key_info to return known fingerprint and key_type
# (discover_existing_keys now calls _extract_key_info directly)
with patch.object(
- service, "_extract_key_info", return_value=("SHA256:abcdef123456", "ed25519")
+ service,
+ "_extract_key_info",
+ return_value=("SHA256:abcdef123456", "ed25519"),
) as mock_extract:
keys = service.discover_existing_keys()
@@ -399,7 +401,9 @@ def test_extract_key_info_subprocess_exception(self, tmp_path):
service = KeyDiscoveryService(ssh_dir=ssh_dir)
with patch.object(
- kds_module.subprocess, "run", side_effect=subprocess.TimeoutExpired("cmd", 5)
+ kds_module.subprocess,
+ "run",
+ side_effect=subprocess.TimeoutExpired("cmd", 5),
):
fingerprint, key_type = service._extract_key_info(pub_key_path)
diff --git a/tests/server/web/test_auth.py b/tests/server/web/test_auth.py
index 83b4db60..5e989b92 100644
--- a/tests/server/web/test_auth.py
+++ b/tests/server/web/test_auth.py
@@ -512,9 +512,9 @@ def test_login_missing_csrf_auto_recovery(
)
# Bug #714: Auto-recovery redirects instead of 403
- assert response.status_code == 303, (
- f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}"
- )
+ assert (
+ response.status_code == 303
+ ), f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}"
location = response.headers.get("location", "")
assert "/login" in location and "info=session_expired" in location
@@ -539,9 +539,9 @@ def test_login_invalid_csrf_auto_recovery(
)
# Bug #714: Auto-recovery redirects instead of 403
- assert response.status_code == 303, (
- f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}"
- )
+ assert (
+ response.status_code == 303
+ ), f"Expected 303 redirect for CSRF auto-recovery, got {response.status_code}"
location = response.headers.get("location", "")
assert "/login" in location and "info=session_expired" in location
@@ -590,12 +590,10 @@ def test_login_csrf_failure_auto_recovers(
f"got {response.status_code}"
)
location = response.headers.get("location", "")
- assert "/login" in location, (
- f"Expected redirect to /login, got {location}"
- )
- assert "info=session_expired" in location, (
- f"Expected info=session_expired in redirect URL, got {location}"
- )
+ assert "/login" in location, f"Expected redirect to /login, got {location}"
+ assert (
+ "info=session_expired" in location
+ ), f"Expected info=session_expired in redirect URL, got {location}"
def test_login_csrf_failure_sets_fresh_cookie(
self, web_client: TestClient, admin_user: dict
@@ -619,9 +617,9 @@ def test_login_csrf_failure_sets_fresh_cookie(
)
# Should have new CSRF cookie set
- assert "_csrf" in response.cookies, (
- "CSRF failure response should include fresh CSRF cookie"
- )
+ assert (
+ "_csrf" in response.cookies
+ ), "CSRF failure response should include fresh CSRF cookie"
def test_login_missing_csrf_auto_recovers(
self, web_client: TestClient, admin_user: dict
@@ -651,12 +649,10 @@ def test_login_missing_csrf_auto_recovers(
f"got {response.status_code}"
)
location = response.headers.get("location", "")
- assert "/login" in location, (
- f"Expected redirect to /login, got {location}"
- )
- assert "info=session_expired" in location, (
- f"Expected info=session_expired in redirect URL, got {location}"
- )
+ assert "/login" in location, f"Expected redirect to /login, got {location}"
+ assert (
+ "info=session_expired" in location
+ ), f"Expected info=session_expired in redirect URL, got {location}"
# =============================================================================
@@ -727,15 +723,15 @@ def test_login_page_generates_new_token_when_no_cookie(
# Should have CSRF token in form
csrf_token = web_infrastructure.extract_csrf_token(response.text)
- assert csrf_token is not None, (
- "Login page should generate CSRF token when no cookie exists"
- )
+ assert (
+ csrf_token is not None
+ ), "Login page should generate CSRF token when no cookie exists"
# Should set new CSRF cookie
csrf_cookie = response.cookies.get("_csrf")
- assert csrf_cookie is not None, (
- "Login page should set CSRF cookie when no cookie exists"
- )
+ assert (
+ csrf_cookie is not None
+ ), "Login page should set CSRF cookie when no cookie exists"
def test_login_page_generates_new_token_when_cookie_expired(
self, web_infrastructure: WebTestInfrastructure
@@ -760,17 +756,17 @@ def test_login_page_generates_new_token_when_cookie_expired(
# Should have CSRF token in form
csrf_token = web_infrastructure.extract_csrf_token(response.text)
- assert csrf_token is not None, (
- "Login page should generate CSRF token when cookie is invalid"
- )
+ assert (
+ csrf_token is not None
+ ), "Login page should generate CSRF token when cookie is invalid"
# The token should NOT be the invalid one we sent
- assert csrf_token != "invalid_expired_csrf_token_12345", (
- "Login page should not use invalid cookie value as CSRF token"
- )
+ assert (
+ csrf_token != "invalid_expired_csrf_token_12345"
+ ), "Login page should not use invalid cookie value as CSRF token"
# Should set new CSRF cookie
csrf_cookie = response.cookies.get("_csrf")
- assert csrf_cookie is not None, (
- "Login page should set new CSRF cookie when old one is invalid"
- )
+ assert (
+ csrf_cookie is not None
+ ), "Login page should set new CSRF cookie when old one is invalid"
diff --git a/tests/server/web/test_config_payload_cache.py b/tests/server/web/test_config_payload_cache.py
index 99509add..d5530b0a 100644
--- a/tests/server/web/test_config_payload_cache.py
+++ b/tests/server/web/test_config_payload_cache.py
@@ -34,14 +34,14 @@ def test_payload_cache_fields_displayed(self, authenticated_client: TestClient):
assert (
"payload max fetch size" in text_lower
), "Should show Payload Max Fetch Size field"
- assert (
- "payload cache ttl" in text_lower
- ), "Should show Payload Cache TTL field"
+ assert "payload cache ttl" in text_lower, "Should show Payload Cache TTL field"
assert (
"payload cleanup interval" in text_lower
), "Should show Payload Cleanup Interval field"
- def test_payload_cache_default_values_displayed(self, authenticated_client: TestClient):
+ def test_payload_cache_default_values_displayed(
+ self, authenticated_client: TestClient
+ ):
"""
Payload cache fields should show default values.
@@ -59,9 +59,7 @@ def test_payload_cache_default_values_displayed(self, authenticated_client: Test
assert "2000" in response.text, "Should show default preview size (2000)"
assert "5000" in response.text, "Should show default max fetch size (5000)"
assert "900" in response.text, "Should show default cache TTL (900)"
- assert (
- "60" in response.text
- ), "Should show default cleanup interval (60)"
+ assert "60" in response.text, "Should show default cleanup interval (60)"
class TestPayloadCacheConfigEditing:
diff --git a/tests/unit/server/auth/test_mcp_session_state.py b/tests/unit/server/auth/test_mcp_session_state.py
index 9a8ac575..7fddb6bc 100644
--- a/tests/unit/server/auth/test_mcp_session_state.py
+++ b/tests/unit/server/auth/test_mcp_session_state.py
@@ -60,7 +60,9 @@ def test_session_state_initialization(self, admin_user: User):
"""Test that MCPSessionState initializes with correct default values."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
assert session.session_id == "session-123"
assert session.authenticated_user == admin_user
@@ -72,7 +74,9 @@ def test_effective_user_returns_authenticated_when_not_impersonating(
"""Test effective_user returns authenticated user when no impersonation is set."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
assert session.effective_user == admin_user
assert session.effective_user.username == "admin_user"
@@ -83,7 +87,9 @@ def test_effective_user_returns_impersonated_when_impersonating(
"""Test effective_user returns impersonated user when impersonation is set."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
assert session.effective_user == target_user
@@ -97,7 +103,9 @@ def test_set_impersonation_stores_target_user(
"""Test set_impersonation stores the target user correctly."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
assert session.impersonated_user == target_user
@@ -110,7 +118,9 @@ def test_clear_impersonation_removes_impersonated_user(
"""Test clear_impersonation removes the impersonated user."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
# Verify impersonation is set
@@ -129,7 +139,9 @@ def test_is_impersonating_property_returns_false_when_not_impersonating(
"""Test is_impersonating returns False when no impersonation is active."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
assert session.is_impersonating is False
@@ -139,7 +151,9 @@ def test_is_impersonating_property_returns_true_when_impersonating(
"""Test is_impersonating returns True when impersonation is active."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
assert session.is_impersonating is True
@@ -150,7 +164,9 @@ def test_impersonation_preserves_original_admin_permissions(
"""Test that impersonation doesn't affect the authenticated_user object."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
# Original admin should still have admin role
@@ -164,7 +180,9 @@ def test_multiple_impersonation_changes(
"""Test that impersonation can be changed multiple times."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
# First impersonation
session.set_impersonation(target_user)
@@ -208,7 +226,9 @@ def test_impersonation_uses_target_user_permissions(
"""Test that effective permissions come from impersonated user."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
# Admin has manage_users permission
assert session.effective_user.has_permission("manage_users") is True
@@ -227,7 +247,9 @@ def test_impersonation_constrains_to_target_permissions(
"""Test that impersonation constrains to target user's permissions, not elevates."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(sales_user)
# Verify the effective user has ONLY the target user's permissions
@@ -292,7 +314,9 @@ def test_can_impersonate_returns_true_for_admin(self, admin_user: User):
"""Test that admin users are allowed to impersonate."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
assert session.can_impersonate() is True
@@ -300,7 +324,9 @@ def test_can_impersonate_returns_false_for_power_user(self, power_user: User):
"""Test that power users are NOT allowed to impersonate."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=power_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=power_user
+ )
assert session.can_impersonate() is False
@@ -308,7 +334,9 @@ def test_can_impersonate_returns_false_for_normal_user(self, normal_user: User):
"""Test that normal users are NOT allowed to impersonate."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=normal_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=normal_user
+ )
assert session.can_impersonate() is False
@@ -318,7 +346,9 @@ def test_try_set_impersonation_succeeds_for_admin(
"""Test try_set_impersonation succeeds for admin users."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
result = session.try_set_impersonation(target_user)
assert result.success is True
@@ -331,7 +361,9 @@ def test_try_set_impersonation_fails_for_power_user(
"""Test try_set_impersonation fails for power users."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=power_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=power_user
+ )
result = session.try_set_impersonation(target_user)
assert result.success is False
@@ -344,7 +376,9 @@ def test_try_set_impersonation_fails_for_normal_user(
"""Test try_set_impersonation fails for normal users."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=normal_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=normal_user
+ )
result = session.try_set_impersonation(target_user)
assert result.success is False
@@ -379,7 +413,9 @@ def test_to_dict_without_impersonation(self, admin_user: User):
"""Test to_dict when not impersonating."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
result = session.to_dict()
assert result["session_id"] == "session-123"
@@ -391,7 +427,9 @@ def test_to_dict_with_impersonation(self, admin_user: User, target_user: User):
"""Test to_dict when impersonating."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
result = session.to_dict()
@@ -406,7 +444,9 @@ def test_to_dict_includes_effective_user_when_not_impersonating(
"""Test to_dict includes effective_user when not impersonating (HIGH 1 fix)."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
result = session.to_dict()
# effective_user should be included and match authenticated_user
@@ -419,7 +459,9 @@ def test_to_dict_includes_effective_user_when_impersonating(
"""Test to_dict includes effective_user when impersonating (HIGH 1 fix)."""
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
result = session.to_dict()
@@ -468,7 +510,9 @@ def test_concurrent_set_impersonation_is_thread_safe(
import threading
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
errors = []
def set_impersonation_target():
@@ -505,7 +549,9 @@ def test_concurrent_read_write_is_thread_safe(
import threading
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
errors = []
effective_users = []
@@ -546,7 +592,9 @@ def test_concurrent_clear_impersonation_is_thread_safe(
import threading
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="session-123", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="session-123", authenticated_user=admin_user
+ )
session.set_impersonation(target_user)
errors = []
diff --git a/tests/unit/server/auto_update/test_deployment_executor_drain.py b/tests/unit/server/auto_update/test_deployment_executor_drain.py
index 7de4114d..13b45736 100644
--- a/tests/unit/server/auto_update/test_deployment_executor_drain.py
+++ b/tests/unit/server/auto_update/test_deployment_executor_drain.py
@@ -162,9 +162,11 @@ def test_restart_server_uses_maintenance_flow(self):
with tempfile.TemporaryDirectory() as tmpdir:
executor = DeploymentExecutor(repo_path=Path(tmpdir))
- with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \
- patch.object(executor, "_wait_for_drain") as mock_drain, \
- patch("subprocess.run") as mock_run:
+ with (
+ patch.object(executor, "_enter_maintenance_mode") as mock_enter,
+ patch.object(executor, "_wait_for_drain") as mock_drain,
+ patch("subprocess.run") as mock_run,
+ ):
mock_enter.return_value = True
mock_drain.return_value = True
mock_run.return_value = MagicMock(returncode=0)
@@ -184,9 +186,11 @@ def test_restart_server_proceeds_on_drain_timeout(self):
with tempfile.TemporaryDirectory() as tmpdir:
executor = DeploymentExecutor(repo_path=Path(tmpdir))
- with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \
- patch.object(executor, "_wait_for_drain") as mock_drain, \
- patch("subprocess.run") as mock_run:
+ with (
+ patch.object(executor, "_enter_maintenance_mode") as mock_enter,
+ patch.object(executor, "_wait_for_drain") as mock_drain,
+ patch("subprocess.run") as mock_run,
+ ):
mock_enter.return_value = True
mock_drain.return_value = False # Drain timeout exceeded
mock_run.return_value = MagicMock(returncode=0)
@@ -221,11 +225,17 @@ def test_force_restart_logs_running_jobs(self):
},
]
- with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \
- patch.object(executor, "_wait_for_drain") as mock_drain, \
- patch.object(executor, "_get_running_jobs_for_logging") as mock_get_jobs, \
- patch("subprocess.run") as mock_run, \
- patch("code_indexer.server.auto_update.deployment_executor.logger") as mock_logger:
+ with (
+ patch.object(executor, "_enter_maintenance_mode") as mock_enter,
+ patch.object(executor, "_wait_for_drain") as mock_drain,
+ patch.object(
+ executor, "_get_running_jobs_for_logging"
+ ) as mock_get_jobs,
+ patch("subprocess.run") as mock_run,
+ patch(
+ "code_indexer.server.auto_update.deployment_executor.logger"
+ ) as mock_logger,
+ ):
mock_enter.return_value = True
mock_drain.return_value = False # Drain timeout exceeded
mock_get_jobs.return_value = mock_jobs
@@ -235,8 +245,13 @@ def test_force_restart_logs_running_jobs(self):
assert result is True
mock_logger.warning.assert_called()
- warning_calls = [str(call) for call in mock_logger.warning.call_args_list]
- assert any("job-123" in str(call) or "running" in str(call).lower() for call in warning_calls)
+ warning_calls = [
+ str(call) for call in mock_logger.warning.call_args_list
+ ]
+ assert any(
+ "job-123" in str(call) or "running" in str(call).lower()
+ for call in warning_calls
+ )
def test_get_running_jobs_for_logging_fetches_from_drain_status(self):
"""_get_running_jobs_for_logging should fetch jobs from drain-status endpoint."""
@@ -304,10 +319,14 @@ def test_drain_success_logs_info_message(self):
with tempfile.TemporaryDirectory() as tmpdir:
executor = DeploymentExecutor(repo_path=Path(tmpdir))
- with patch.object(executor, "_enter_maintenance_mode") as mock_enter, \
- patch.object(executor, "_wait_for_drain") as mock_drain, \
- patch("subprocess.run") as mock_run, \
- patch("code_indexer.server.auto_update.deployment_executor.logger") as mock_logger:
+ with (
+ patch.object(executor, "_enter_maintenance_mode") as mock_enter,
+ patch.object(executor, "_wait_for_drain") as mock_drain,
+ patch("subprocess.run") as mock_run,
+ patch(
+ "code_indexer.server.auto_update.deployment_executor.logger"
+ ) as mock_logger,
+ ):
mock_enter.return_value = True
mock_drain.return_value = True # Drain succeeds
mock_run.return_value = MagicMock(returncode=0)
diff --git a/tests/unit/server/cache/test_payload_cache_explicit_key.py b/tests/unit/server/cache/test_payload_cache_explicit_key.py
index 6084527e..30c41108 100644
--- a/tests/unit/server/cache/test_payload_cache_explicit_key.py
+++ b/tests/unit/server/cache/test_payload_cache_explicit_key.py
@@ -103,7 +103,9 @@ async def test_store_with_key_preserves_total_size(self, cache, temp_db_path):
assert row[0] == 12345
@pytest.mark.asyncio
- async def test_store_with_key_updates_timestamp_on_replace(self, cache, temp_db_path):
+ async def test_store_with_key_updates_timestamp_on_replace(
+ self, cache, temp_db_path
+ ):
"""
store_with_key() updates created_at timestamp when updating existing key.
@@ -277,7 +279,9 @@ async def test_has_key_returns_false_after_cleanup(self, cache):
# Create cache with very short TTL
with tempfile.TemporaryDirectory() as tmpdir:
temp_path = Path(tmpdir) / "short_ttl_cache.db"
- short_ttl_config = PayloadCacheConfig(cache_ttl_seconds=0) # Immediate expiry
+ short_ttl_config = PayloadCacheConfig(
+ cache_ttl_seconds=0
+ ) # Immediate expiry
short_ttl_cache = PayloadCache(db_path=temp_path, config=short_ttl_config)
await short_ttl_cache.initialize()
diff --git a/tests/unit/server/clients/test_claude_server_client.py b/tests/unit/server/clients/test_claude_server_client.py
index 0b34e51e..de8dd30c 100644
--- a/tests/unit/server/clients/test_claude_server_client.py
+++ b/tests/unit/server/clients/test_claude_server_client.py
@@ -581,9 +581,7 @@ async def test_connection_error_does_not_expose_password(
assert sensitive_password not in str(exc_info.value)
@pytest.mark.asyncio
- async def test_timeout_error_does_not_expose_password(
- self, httpx_mock: HTTPXMock
- ):
+ async def test_timeout_error_does_not_expose_password(self, httpx_mock: HTTPXMock):
"""
Timeout error exception should NOT contain password.
@@ -814,7 +812,12 @@ async def test_get_job_status_returns_in_progress(self, httpx_mock: HTTPXMock):
"job_id": "job-12345",
"status": "in_progress",
"repositories": [
- {"alias": "repo1", "registered": True, "cloned": True, "indexed": True}
+ {
+ "alias": "repo1",
+ "registered": True,
+ "cloned": True,
+ "indexed": True,
+ }
],
"exchange_count": 5,
"tool_use_count": 12,
@@ -879,7 +882,9 @@ async def test_get_job_status_returns_completed(self, httpx_mock: HTTPXMock):
assert "JWT tokens" in result["result"]
@pytest.mark.asyncio
- async def test_get_job_status_raises_not_found_error_on_404(self, httpx_mock: HTTPXMock):
+ async def test_get_job_status_raises_not_found_error_on_404(
+ self, httpx_mock: HTTPXMock
+ ):
"""
get_job_status() should raise ClaudeServerNotFoundError for non-existent job.
@@ -971,7 +976,9 @@ async def test_get_job_conversation_returns_result(self, httpx_mock: HTTPXMock):
assert "JWT tokens" in result["result"]
@pytest.mark.asyncio
- async def test_get_job_conversation_raises_not_found_error_on_404(self, httpx_mock: HTTPXMock):
+ async def test_get_job_conversation_raises_not_found_error_on_404(
+ self, httpx_mock: HTTPXMock
+ ):
"""
get_job_conversation() should raise ClaudeServerNotFoundError for non-existent job.
@@ -1065,8 +1072,11 @@ async def test_register_callback_success(self, httpx_mock: HTTPXMock):
callback_request = requests[-1]
assert callback_request.url.path == "/jobs/job-12345/callbacks"
import json
+
body = json.loads(callback_request.content)
- assert body["url"] == "https://cidx.example.com/api/delegation/callback/job-12345"
+ assert (
+ body["url"] == "https://cidx.example.com/api/delegation/callback/job-12345"
+ )
@pytest.mark.asyncio
async def test_register_callback_raises_on_error(self, httpx_mock: HTTPXMock):
diff --git a/tests/unit/server/clients/test_claude_server_client_pooling.py b/tests/unit/server/clients/test_claude_server_client_pooling.py
index 26d68f70..e1f9878f 100644
--- a/tests/unit/server/clients/test_claude_server_client_pooling.py
+++ b/tests/unit/server/clients/test_claude_server_client_pooling.py
@@ -45,9 +45,9 @@ def test_client_creates_shared_http_client_in_init(self):
)
assert hasattr(client, "_client"), "Missing _client attribute"
- assert isinstance(client._client, httpx.AsyncClient), (
- "_client should be httpx.AsyncClient instance"
- )
+ assert isinstance(
+ client._client, httpx.AsyncClient
+ ), "_client should be httpx.AsyncClient instance"
def test_client_respects_skip_ssl_verify_setting(self):
"""
@@ -96,12 +96,12 @@ def test_client_has_default_connection_limits(self):
# httpx.AsyncClient stores limits in the transport pool
pool = client._client._transport._pool
- assert pool._max_connections == 10, (
- f"Expected max_connections=10, got {pool._max_connections}"
- )
- assert pool._max_keepalive_connections == 5, (
- f"Expected max_keepalive_connections=5, got {pool._max_keepalive_connections}"
- )
+ assert (
+ pool._max_connections == 10
+ ), f"Expected max_connections=10, got {pool._max_connections}"
+ assert (
+ pool._max_keepalive_connections == 5
+ ), f"Expected max_keepalive_connections=5, got {pool._max_keepalive_connections}"
def test_client_has_proper_timeout_configuration(self):
"""
@@ -123,7 +123,9 @@ def test_client_has_proper_timeout_configuration(self):
timeout = client._client.timeout
assert timeout.read == 30.0, f"Expected read timeout=30.0, got {timeout.read}"
- assert timeout.connect == 10.0, f"Expected connect timeout=10.0, got {timeout.connect}"
+ assert (
+ timeout.connect == 10.0
+ ), f"Expected connect timeout=10.0, got {timeout.connect}"
class TestClaudeServerClientPoolingReuse:
@@ -181,9 +183,9 @@ async def test_multiple_requests_use_same_client_instance(
await client.check_repository_exists("repo2")
# Verify same client instance was used
- assert client._client is original_client, (
- "Client instance should not change between requests"
- )
+ assert (
+ client._client is original_client
+ ), "Client instance should not change between requests"
@pytest.mark.asyncio
async def test_no_new_async_client_per_request(self, httpx_mock: HTTPXMock):
diff --git a/tests/unit/server/config/test_claude_delegation_config.py b/tests/unit/server/config/test_claude_delegation_config.py
index f7c1997f..ff7f1edb 100644
--- a/tests/unit/server/config/test_claude_delegation_config.py
+++ b/tests/unit/server/config/test_claude_delegation_config.py
@@ -225,7 +225,10 @@ def test_validate_connectivity_invalid_credentials(self, tmp_path, httpx_mock):
)
assert result.success is False
- assert "401" in result.error_message or "unauthorized" in result.error_message.lower()
+ assert (
+ "401" in result.error_message
+ or "unauthorized" in result.error_message.lower()
+ )
class TestConfigServiceDelegationIntegration:
@@ -254,7 +257,9 @@ def test_get_all_settings_claude_delegation_defaults(self, tmp_path):
assert delegation["claude_server_credential_type"] == "password"
assert delegation["is_configured"] is False
- def test_get_all_settings_claude_delegation_includes_cidx_callback_url(self, tmp_path):
+ def test_get_all_settings_claude_delegation_includes_cidx_callback_url(
+ self, tmp_path
+ ):
"""Test that cidx_callback_url is included in settings output (Story #720)."""
from code_indexer.server.services.config_service import ConfigService
@@ -289,7 +294,6 @@ def test_returns_callback_url_from_config(self, tmp_path):
manager.save_config(config)
# Mock the config service to use our temp directory
- import pytest
with pytest.MonkeyPatch.context() as mp:
mock_service = ConfigService(server_dir_path=str(tmp_path))
mp.setattr(
@@ -306,7 +310,6 @@ def test_returns_none_when_not_configured(self, tmp_path):
from code_indexer.server.mcp.handlers import _get_cidx_callback_base_url
from code_indexer.server.services.config_service import ConfigService
- import pytest
with pytest.MonkeyPatch.context() as mp:
mock_service = ConfigService(server_dir_path=str(tmp_path))
mp.setattr(
@@ -367,7 +370,10 @@ def test_validate_connectivity_rejects_file_scheme(self, tmp_path):
)
assert result.success is False
- assert "scheme" in result.error_message.lower() or "url" in result.error_message.lower()
+ assert (
+ "scheme" in result.error_message.lower()
+ or "url" in result.error_message.lower()
+ )
def test_validate_connectivity_accepts_https_scheme(self, tmp_path, httpx_mock):
"""Test that https:// URLs are accepted."""
@@ -409,7 +415,10 @@ def test_validate_connectivity_rejects_invalid_credential_type(self, tmp_path):
)
assert result.success is False
- assert "credential" in result.error_message.lower() or "type" in result.error_message.lower()
+ assert (
+ "credential" in result.error_message.lower()
+ or "type" in result.error_message.lower()
+ )
def test_validate_connectivity_accepts_password_type(self, tmp_path, httpx_mock):
"""Test that 'password' credential type is accepted."""
@@ -463,7 +472,6 @@ def test_error_message_does_not_contain_credential(self, tmp_path, httpx_mock):
def test_error_message_does_not_contain_password(self, tmp_path, httpx_mock):
"""Test that password is not leaked in any error scenario."""
- import httpx
from code_indexer.server.config.delegation_config import ClaudeDelegationManager
@@ -514,10 +522,13 @@ def test_load_config_warns_if_permissions_not_600(self, tmp_path, caplog):
# Check that a warning was logged about permissions
permission_warnings = [
- record for record in caplog.records
+ record
+ for record in caplog.records
if "permission" in record.message.lower() or "600" in record.message
]
- assert len(permission_warnings) > 0, "Should warn about insecure file permissions"
+ assert (
+ len(permission_warnings) > 0
+ ), "Should warn about insecure file permissions"
def test_load_config_no_warning_if_permissions_600(self, tmp_path, caplog):
"""Test that no warning is logged if config file permissions are 0600."""
@@ -546,10 +557,13 @@ def test_load_config_no_warning_if_permissions_600(self, tmp_path, caplog):
# Check that no permission warning was logged
permission_warnings = [
- record for record in caplog.records
+ record
+ for record in caplog.records
if "permission" in record.message.lower() and "600" in record.message
]
- assert len(permission_warnings) == 0, "Should not warn about secure file permissions"
+ assert (
+ len(permission_warnings) == 0
+ ), "Should not warn about secure file permissions"
class TestDefaultFunctionRepoAliasConstant:
@@ -560,7 +574,10 @@ def test_default_function_repo_alias_constant_exists(self):
from code_indexer.server.config import delegation_config
assert hasattr(delegation_config, "DEFAULT_FUNCTION_REPO_ALIAS")
- assert delegation_config.DEFAULT_FUNCTION_REPO_ALIAS == "claude-delegation-functions-global"
+ assert (
+ delegation_config.DEFAULT_FUNCTION_REPO_ALIAS
+ == "claude-delegation-functions-global"
+ )
def test_dataclass_uses_constant_for_default(self):
"""Test that ClaudeDelegationConfig uses the constant for its default."""
diff --git a/tests/unit/server/mcp/test_execute_delegation_function_handler.py b/tests/unit/server/mcp/test_execute_delegation_function_handler.py
index f8430750..b9161724 100644
--- a/tests/unit/server/mcp/test_execute_delegation_function_handler.py
+++ b/tests/unit/server/mcp/test_execute_delegation_function_handler.py
@@ -12,7 +12,6 @@
from datetime import datetime, timezone
from pathlib import Path
-import httpx
import pytest
from pytest_httpx import HTTPXMock
@@ -93,7 +92,11 @@ class TestExecuteDelegationFunctionHandler:
@pytest.mark.asyncio
async def test_handler_returns_job_id_on_success(
- self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock
+ self,
+ test_user,
+ temp_function_repo,
+ mock_delegation_config,
+ httpx_mock: HTTPXMock,
):
"""Handler returns job_id on successful execution."""
from code_indexer.server.mcp.handlers import handle_execute_delegation_function
@@ -140,7 +143,11 @@ async def test_handler_returns_job_id_on_success(
)
response = await handle_execute_delegation_function(
- {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"},
+ {
+ "function_name": "semantic-search",
+ "parameters": {"query": "bugs"},
+ "prompt": "Find",
+ },
test_user,
)
@@ -254,7 +261,11 @@ async def test_handler_returns_error_for_missing_required_parameter(
)
response = await handle_execute_delegation_function(
- {"function_name": "semantic-search", "parameters": {}, "prompt": "Test"},
+ {
+ "function_name": "semantic-search",
+ "parameters": {},
+ "prompt": "Test",
+ },
test_user,
)
@@ -288,7 +299,11 @@ async def test_response_has_mcp_format(self, test_user):
@pytest.mark.asyncio
async def test_handler_returns_error_when_job_id_missing(
- self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock
+ self,
+ test_user,
+ temp_function_repo,
+ mock_delegation_config,
+ httpx_mock: HTTPXMock,
):
"""
Handler returns error when create_job response has no job_id.
@@ -333,7 +348,11 @@ async def test_handler_returns_error_when_job_id_missing(
)
response = await handle_execute_delegation_function(
- {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"},
+ {
+ "function_name": "semantic-search",
+ "parameters": {"query": "bugs"},
+ "prompt": "Find",
+ },
test_user,
)
@@ -358,7 +377,11 @@ def reset_tracker_singleton(self):
@pytest.mark.asyncio
async def test_handler_registers_callback_url_with_claude_server(
- self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock
+ self,
+ test_user,
+ temp_function_repo,
+ mock_delegation_config,
+ httpx_mock: HTTPXMock,
):
"""
Handler registers callback URL with Claude Server after creating job.
@@ -418,7 +441,11 @@ async def test_handler_registers_callback_url_with_claude_server(
)
response = await handle_execute_delegation_function(
- {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"},
+ {
+ "function_name": "semantic-search",
+ "parameters": {"query": "bugs"},
+ "prompt": "Find",
+ },
test_user,
)
@@ -435,7 +462,11 @@ async def test_handler_registers_callback_url_with_claude_server(
@pytest.mark.asyncio
async def test_handler_registers_job_in_tracker(
- self, test_user, temp_function_repo, mock_delegation_config, httpx_mock: HTTPXMock
+ self,
+ test_user,
+ temp_function_repo,
+ mock_delegation_config,
+ httpx_mock: HTTPXMock,
):
"""
Handler registers job in DelegationJobTracker after starting job.
@@ -496,7 +527,11 @@ async def test_handler_registers_job_in_tracker(
)
response = await handle_execute_delegation_function(
- {"function_name": "semantic-search", "parameters": {"query": "bugs"}, "prompt": "Find"},
+ {
+ "function_name": "semantic-search",
+ "parameters": {"query": "bugs"},
+ "prompt": "Find",
+ },
test_user,
)
diff --git a/tests/unit/server/mcp/test_list_delegation_functions_handler.py b/tests/unit/server/mcp/test_list_delegation_functions_handler.py
index 8134d6f3..6ff5cf2e 100644
--- a/tests/unit/server/mcp/test_list_delegation_functions_handler.py
+++ b/tests/unit/server/mcp/test_list_delegation_functions_handler.py
@@ -178,6 +178,7 @@ async def test_handler_filters_by_impersonated_user_groups(
"code_indexer.server.mcp.handlers._get_delegation_function_repo_path",
lambda: temp_function_repo,
)
+
# _get_user_groups is called with the effective user
# Impersonated user belongs to 'admins' group
# Admin user (if NOT impersonating) would belong to 'engineering' group
diff --git a/tests/unit/server/mcp/test_poll_delegation_job_handler.py b/tests/unit/server/mcp/test_poll_delegation_job_handler.py
index 1a9a6ec4..6ac94fc5 100644
--- a/tests/unit/server/mcp/test_poll_delegation_job_handler.py
+++ b/tests/unit/server/mcp/test_poll_delegation_job_handler.py
@@ -225,7 +225,10 @@ async def test_poll_returns_waiting_when_callback_not_received(
data = json.loads(response["content"][0]["text"])
assert data["status"] == "waiting"
- assert "still running" in data["message"].lower() or "not yet received" in data["message"].lower()
+ assert (
+ "still running" in data["message"].lower()
+ or "not yet received" in data["message"].lower()
+ )
# Key fix: continue_polling should be True so caller can retry
assert data["continue_polling"] is True
@@ -315,7 +318,10 @@ async def test_poll_returns_error_for_job_not_in_tracker(
data = json.loads(response["content"][0]["text"])
assert data["success"] is False
- assert "not found" in data["error"].lower() or "already completed" in data["error"].lower()
+ assert (
+ "not found" in data["error"].lower()
+ or "already completed" in data["error"].lower()
+ )
class TestPollDelegationJobTimeoutParameter:
diff --git a/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py b/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py
index 37f8c307..744f158b 100644
--- a/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py
+++ b/tests/unit/server/mcp/test_session_registry_ttl_cleanup.py
@@ -43,7 +43,9 @@ def test_last_activity_is_set_on_session_creation(self, admin_user):
from code_indexer.server.auth.mcp_session_state import MCPSessionState
before_creation = datetime.now(timezone.utc)
- session = MCPSessionState(session_id="test-session", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="test-session", authenticated_user=admin_user
+ )
after_creation = datetime.now(timezone.utc)
# last_activity should be set during creation
@@ -59,7 +61,9 @@ def test_touch_updates_last_activity(self, admin_user):
from code_indexer.server.auth.mcp_session_state import MCPSessionState
import time
- session = MCPSessionState(session_id="test-session", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="test-session", authenticated_user=admin_user
+ )
original_activity = session.last_activity
# Wait a small amount to ensure timestamp changes
@@ -76,7 +80,9 @@ def test_touch_is_thread_safe(self, admin_user):
import threading
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="test-session", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="test-session", authenticated_user=admin_user
+ )
errors: List[Exception] = []
def touch_repeatedly():
@@ -103,7 +109,9 @@ def test_last_activity_property_is_thread_safe(self, admin_user):
import threading
from code_indexer.server.auth.mcp_session_state import MCPSessionState
- session = MCPSessionState(session_id="test-session", authenticated_user=admin_user)
+ session = MCPSessionState(
+ session_id="test-session", authenticated_user=admin_user
+ )
errors: List[Exception] = []
activities: List[datetime] = []
@@ -160,7 +168,9 @@ def fresh_registry(self):
registry.clear_all()
return registry
- def test_get_or_create_session_touches_existing_session(self, admin_user, fresh_registry):
+ def test_get_or_create_session_touches_existing_session(
+ self, admin_user, fresh_registry
+ ):
"""Test that get_or_create_session() calls touch() on existing session (AC2)."""
import time
@@ -199,7 +209,9 @@ def test_get_session_touches_existing_session(self, admin_user, fresh_registry):
# last_activity should be updated
assert session_again.last_activity > original_activity
- def test_get_session_returns_none_for_nonexistent_without_touching(self, fresh_registry):
+ def test_get_session_returns_none_for_nonexistent_without_touching(
+ self, fresh_registry
+ ):
"""Test that get_session() returns None for non-existent session (no touch needed)."""
result = fresh_registry.get_session("nonexistent-session-xyz")
assert result is None
@@ -267,15 +279,21 @@ def test_cleanup_keeps_active_sessions(self, admin_user, fresh_registry):
assert removed_count == 0
assert fresh_registry.get_session(session_id) is not None
- def test_cleanup_returns_count_of_removed_sessions(self, admin_user, fresh_registry):
+ def test_cleanup_returns_count_of_removed_sessions(
+ self, admin_user, fresh_registry
+ ):
"""Test that cleanup_stale_sessions() returns correct count (AC3, AC5)."""
# Create multiple sessions
for i in range(5):
- session = fresh_registry.get_or_create_session(f"cleanup-count-{i}", admin_user)
+ session = fresh_registry.get_or_create_session(
+ f"cleanup-count-{i}", admin_user
+ )
# Make first 3 stale
if i < 3:
with session._lock:
- session._last_activity = datetime.now(timezone.utc) - timedelta(hours=2)
+ session._last_activity = datetime.now(timezone.utc) - timedelta(
+ hours=2
+ )
# Set TTL to 1 hour
fresh_registry._ttl_seconds = 3600
@@ -288,7 +306,9 @@ def test_cleanup_returns_count_of_removed_sessions(self, admin_user, fresh_regis
# 2 active sessions should remain
assert fresh_registry.session_count() == 2
- def test_cleanup_logs_when_sessions_removed(self, admin_user, fresh_registry, caplog):
+ def test_cleanup_logs_when_sessions_removed(
+ self, admin_user, fresh_registry, caplog
+ ):
"""Test that cleanup logs the count of removed sessions (AC5)."""
# Create a stale session
session_id = "log-test-session"
@@ -306,7 +326,9 @@ def test_cleanup_logs_when_sessions_removed(self, admin_user, fresh_registry, ca
assert removed_count == 1
assert "Cleaned up 1 stale MCP sessions" in caplog.text
- def test_cleanup_does_not_log_when_no_sessions_removed(self, admin_user, fresh_registry, caplog):
+ def test_cleanup_does_not_log_when_no_sessions_removed(
+ self, admin_user, fresh_registry, caplog
+ ):
"""Test that cleanup does not log when no sessions are removed (AC5)."""
# Create an active session
fresh_registry.get_or_create_session("active-no-log", admin_user)
@@ -378,7 +400,9 @@ async def test_stop_background_cleanup_cancels_task(self, fresh_registry):
assert task.cancelled() or task.done()
@pytest.mark.asyncio
- async def test_background_cleanup_uses_configured_ttl(self, admin_user, fresh_registry):
+ async def test_background_cleanup_uses_configured_ttl(
+ self, admin_user, fresh_registry
+ ):
"""Test that background cleanup uses configured TTL value (AC3, AC4)."""
# Start cleanup with short TTL (1 second) and very short interval
fresh_registry.start_background_cleanup(
@@ -427,7 +451,9 @@ async def test_start_background_cleanup_is_idempotent(self, fresh_registry):
await asyncio.sleep(0.01)
@pytest.mark.asyncio
- async def test_start_background_cleanup_logs_configuration(self, fresh_registry, caplog):
+ async def test_start_background_cleanup_logs_configuration(
+ self, fresh_registry, caplog
+ ):
"""Test that start_background_cleanup logs the configuration (AC5)."""
with caplog.at_level(logging.INFO):
fresh_registry.start_background_cleanup(
@@ -471,6 +497,8 @@ def test_default_ttl_is_one_hour(self):
def test_default_cleanup_interval_is_fifteen_minutes(self):
"""Test that default cleanup interval is 15 minutes (900 seconds) as per AC4."""
- from code_indexer.server.mcp.session_registry import DEFAULT_CLEANUP_INTERVAL_SECONDS
+ from code_indexer.server.mcp.session_registry import (
+ DEFAULT_CLEANUP_INTERVAL_SECONDS,
+ )
assert DEFAULT_CLEANUP_INTERVAL_SECONDS == 900 # 15 minutes
diff --git a/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py b/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py
index 64da643c..2e0fbd3e 100644
--- a/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py
+++ b/tests/unit/server/query/test_semantic_query_manager_warning_log_conditions.py
@@ -400,9 +400,7 @@ def test_warning_for_hybrid_mode_with_default_params(
mock_search_service.return_value = mock_service
# Also mock FTS search
- with patch.object(
- query_manager, "_execute_fts_search", return_value=[]
- ):
+ with patch.object(query_manager, "_execute_fts_search", return_value=[]):
with caplog.at_level(logging.WARNING):
query_manager._search_single_repository(
repo_path=mock_non_composite_repo,
diff --git a/tests/unit/server/repositories/test_orphaned_job_cleanup.py b/tests/unit/server/repositories/test_orphaned_job_cleanup.py
index 3b3f0730..755cc4e6 100644
--- a/tests/unit/server/repositories/test_orphaned_job_cleanup.py
+++ b/tests/unit/server/repositories/test_orphaned_job_cleanup.py
@@ -244,7 +244,9 @@ def test_cleanup_orphaned_jobs_with_no_orphaned_jobs_returns_zero(
) -> None:
"""When cleanup_orphaned_jobs() is called with no orphans, it returns 0."""
from code_indexer.server.storage.database_manager import DatabaseSchema
- from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend
+ from code_indexer.server.storage.sqlite_backends import (
+ BackgroundJobsSqliteBackend,
+ )
db_path = tmp_path / "clean_test.db"
schema = DatabaseSchema(str(db_path))
@@ -275,8 +277,12 @@ class TestBackgroundJobManagerOrphanedJobCleanup:
def test_manager_cleans_orphaned_jobs_on_sqlite_load(self, tmp_path: Path) -> None:
"""When BackgroundJobManager initializes with SQLite, orphaned jobs are cleaned up."""
from code_indexer.server.storage.database_manager import DatabaseSchema
- from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend
- from code_indexer.server.repositories.background_jobs import BackgroundJobManager
+ from code_indexer.server.storage.sqlite_backends import (
+ BackgroundJobsSqliteBackend,
+ )
+ from code_indexer.server.repositories.background_jobs import (
+ BackgroundJobManager,
+ )
# Setup: Create database with orphaned jobs
db_path = tmp_path / "manager_test.db"
@@ -331,8 +337,12 @@ def test_manager_logs_orphaned_job_cleanup_count(
"""When BackgroundJobManager cleans orphaned jobs, it logs the count."""
import logging
from code_indexer.server.storage.database_manager import DatabaseSchema
- from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend
- from code_indexer.server.repositories.background_jobs import BackgroundJobManager
+ from code_indexer.server.storage.sqlite_backends import (
+ BackgroundJobsSqliteBackend,
+ )
+ from code_indexer.server.repositories.background_jobs import (
+ BackgroundJobManager,
+ )
# Setup: Create database with orphaned jobs
db_path = tmp_path / "log_test.db"
@@ -379,8 +389,12 @@ def test_manager_shows_zero_running_pending_after_restart(
) -> None:
"""After restart with orphaned jobs, running and pending counts should be zero."""
from code_indexer.server.storage.database_manager import DatabaseSchema
- from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend
- from code_indexer.server.repositories.background_jobs import BackgroundJobManager
+ from code_indexer.server.storage.sqlite_backends import (
+ BackgroundJobsSqliteBackend,
+ )
+ from code_indexer.server.repositories.background_jobs import (
+ BackgroundJobManager,
+ )
# Setup: Create database with orphaned jobs
db_path = tmp_path / "counts_test.db"
@@ -425,8 +439,12 @@ def test_manager_preserves_completed_jobs_during_cleanup(
) -> None:
"""When manager initializes, completed jobs from before restart are preserved."""
from code_indexer.server.storage.database_manager import DatabaseSchema
- from code_indexer.server.storage.sqlite_backends import BackgroundJobsSqliteBackend
- from code_indexer.server.repositories.background_jobs import BackgroundJobManager
+ from code_indexer.server.storage.sqlite_backends import (
+ BackgroundJobsSqliteBackend,
+ )
+ from code_indexer.server.repositories.background_jobs import (
+ BackgroundJobManager,
+ )
# Setup: Create database with mixed job states
db_path = tmp_path / "preserve_test.db"
diff --git a/tests/unit/server/routers/test_delegation_callbacks.py b/tests/unit/server/routers/test_delegation_callbacks.py
index e097d1db..b074bdd3 100644
--- a/tests/unit/server/routers/test_delegation_callbacks.py
+++ b/tests/unit/server/routers/test_delegation_callbacks.py
@@ -77,9 +77,7 @@ def test_callback_receives_completed_job(self, client, reset_tracker_singleton):
tracker = DelegationJobTracker.get_instance()
# Register the job
- asyncio.get_event_loop().run_until_complete(
- tracker.register_job("job-12345")
- )
+ asyncio.get_event_loop().run_until_complete(tracker.register_job("job-12345"))
response = client.post(
"/api/delegation/callback/job-12345",
@@ -115,9 +113,7 @@ def test_callback_receives_failed_job(self, client, reset_tracker_singleton):
import asyncio
tracker = DelegationJobTracker.get_instance()
- asyncio.get_event_loop().run_until_complete(
- tracker.register_job("job-99999")
- )
+ asyncio.get_event_loop().run_until_complete(tracker.register_job("job-99999"))
response = client.post(
"/api/delegation/callback/job-99999",
@@ -176,9 +172,7 @@ def test_callback_uses_job_id_from_path(self, client, reset_tracker_singleton):
import asyncio
tracker = DelegationJobTracker.get_instance()
- asyncio.get_event_loop().run_until_complete(
- tracker.register_job("path-job-id")
- )
+ asyncio.get_event_loop().run_until_complete(tracker.register_job("path-job-id"))
response = client.post(
"/api/delegation/callback/path-job-id",
@@ -254,9 +248,7 @@ def test_callback_handles_missing_optional_fields(
import asyncio
tracker = DelegationJobTracker.get_instance()
- asyncio.get_event_loop().run_until_complete(
- tracker.register_job("minimal-job")
- )
+ asyncio.get_event_loop().run_until_complete(tracker.register_job("minimal-job"))
# Minimal payload - only JobId, Status, Output
response = client.post(
diff --git a/tests/unit/server/services/test_delegation_job_tracker.py b/tests/unit/server/services/test_delegation_job_tracker.py
index 572bf0bd..4669b3d2 100644
--- a/tests/unit/server/services/test_delegation_job_tracker.py
+++ b/tests/unit/server/services/test_delegation_job_tracker.py
@@ -459,7 +459,11 @@ async def test_multiple_concurrent_jobs(self):
await tracker.register_job("job-B")
result_a = JobResult(
- job_id="job-A", status="completed", output="Result A", exit_code=0, error=None
+ job_id="job-A",
+ status="completed",
+ output="Result A",
+ exit_code=0,
+ error=None,
)
result_b = JobResult(
job_id="job-B", status="failed", output="", exit_code=1, error="Error B"
@@ -718,7 +722,9 @@ async def test_wait_for_job_returns_cached_result_immediately(self, payload_cach
assert "Cached result" in result.output
@pytest.mark.asyncio
- async def test_wait_for_job_falls_back_to_future_when_not_cached(self, payload_cache):
+ async def test_wait_for_job_falls_back_to_future_when_not_cached(
+ self, payload_cache
+ ):
"""
wait_for_job() waits on Future when result not in cache.
diff --git a/tests/unit/server/services/test_health_service_cpu_thresholds.py b/tests/unit/server/services/test_health_service_cpu_thresholds.py
index b3bb46f7..ab3454dd 100644
--- a/tests/unit/server/services/test_health_service_cpu_thresholds.py
+++ b/tests/unit/server/services/test_health_service_cpu_thresholds.py
@@ -50,15 +50,14 @@ def test_single_cpu_spike_does_not_trigger_warning(self, mock_db_health):
"""A single CPU spike >95% should NOT trigger warning."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 98.0 # High CPU spike
mock_disk.return_value = MagicMock(
@@ -81,17 +80,15 @@ def test_cpu_sustained_30_seconds_returns_degraded(self, mock_db_health):
"""CPU >95% sustained for 30 seconds should return DEGRADED."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io, patch(
- "time.time"
- ) as mock_time:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ patch("time.time") as mock_time,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 98.0 # High CPU
mock_disk.return_value = MagicMock(
@@ -120,17 +117,15 @@ def test_cpu_sustained_60_seconds_returns_unhealthy(self, mock_db_health):
"""CPU >95% sustained for 60 seconds should return UNHEALTHY."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io, patch(
- "time.time"
- ) as mock_time:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ patch("time.time") as mock_time,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 98.0 # High CPU
mock_disk.return_value = MagicMock(
@@ -159,15 +154,14 @@ def test_cpu_history_has_attribute(self, mock_db_health):
"""HealthCheckService should have _cpu_history attribute."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
diff --git a/tests/unit/server/services/test_health_service_database_aggregation.py b/tests/unit/server/services/test_health_service_database_aggregation.py
index 367b2577..3fe21c01 100644
--- a/tests/unit/server/services/test_health_service_database_aggregation.py
+++ b/tests/unit/server/services/test_health_service_database_aggregation.py
@@ -33,15 +33,14 @@ def healthy_db_results():
@pytest.fixture
def mock_healthy_system():
"""Fixture providing healthy system metrics mocks."""
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
diff --git a/tests/unit/server/services/test_health_service_failure_reasons.py b/tests/unit/server/services/test_health_service_failure_reasons.py
index 3814895f..4c2c249a 100644
--- a/tests/unit/server/services/test_health_service_failure_reasons.py
+++ b/tests/unit/server/services/test_health_service_failure_reasons.py
@@ -47,7 +47,6 @@ def test_failure_reasons_field_exists_in_response(self):
def test_failure_reasons_defaults_to_empty_list(self):
"""failure_reasons should default to empty list."""
from code_indexer.server.models.api_models import HealthCheckResponse
- from pydantic import Field
field_info = HealthCheckResponse.model_fields.get("failure_reasons")
assert field_info is not None
@@ -76,15 +75,14 @@ def test_healthy_status_has_empty_failure_reasons(self):
]
mock_db_cls.return_value = mock_db
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
@@ -127,15 +125,14 @@ def test_failure_reasons_lists_failing_indicators(self):
]
mock_db_cls.return_value = mock_db
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
@@ -185,15 +182,14 @@ def test_failure_reasons_limited_to_3_with_more_indicator(self):
mock_db.get_all_database_health.return_value = db_results
mock_db_cls.return_value = mock_db
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
@@ -239,15 +235,14 @@ def test_storage_service_error_appears_in_failure_reasons(self):
]
mock_db_cls.return_value = mock_db
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
# Set low memory to trigger healthy system metrics
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
@@ -296,17 +291,15 @@ def test_database_service_error_appears_in_failure_reasons(self):
]
mock_db_cls.return_value = mock_db
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io, patch(
- "sqlite3.connect"
- ) as mock_sqlite:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ patch("sqlite3.connect") as mock_sqlite,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
@@ -354,15 +347,14 @@ def test_degraded_service_error_appears_in_failure_reasons(self):
]
mock_db_cls.return_value = mock_db
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
# Set disk space in warning range (80-90% used = DEGRADED)
diff --git a/tests/unit/server/services/test_health_service_ram_thresholds.py b/tests/unit/server/services/test_health_service_ram_thresholds.py
index ad055907..e29bf5eb 100644
--- a/tests/unit/server/services/test_health_service_ram_thresholds.py
+++ b/tests/unit/server/services/test_health_service_ram_thresholds.py
@@ -57,15 +57,14 @@ def test_ram_at_80_percent_returns_degraded(self, mock_db_health):
"""RAM >= 80% usage should return DEGRADED (yellow)."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=80.0) # Warning threshold
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
@@ -91,15 +90,14 @@ def test_ram_at_90_percent_returns_unhealthy(self, mock_db_health):
"""RAM >= 90% usage should return UNHEALTHY (red)."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=90.0) # Critical threshold
mock_cpu.return_value = 30.0
mock_disk.return_value = MagicMock(
diff --git a/tests/unit/server/services/test_health_service_volume_aggregation.py b/tests/unit/server/services/test_health_service_volume_aggregation.py
index a238d535..f7ad5950 100644
--- a/tests/unit/server/services/test_health_service_volume_aggregation.py
+++ b/tests/unit/server/services/test_health_service_volume_aggregation.py
@@ -49,15 +49,14 @@ def test_volume_with_warning_percent_returns_degraded(self, mock_db_health):
"""When ANY volume has 80-90% used, Server Status should be DEGRADED."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
@@ -96,15 +95,14 @@ def test_volume_with_less_than_1gb_free_returns_unhealthy(self, mock_db_health):
"""When ANY volume has <1GB free, Server Status should be UNHEALTHY."""
from code_indexer.server.services.health_service import HealthCheckService
- with patch("psutil.virtual_memory") as mock_mem, patch(
- "psutil.cpu_percent"
- ) as mock_cpu, patch("psutil.disk_usage") as mock_disk, patch(
- "psutil.disk_partitions"
- ) as mock_parts, patch(
- "psutil.disk_io_counters"
- ) as mock_disk_io, patch(
- "psutil.net_io_counters"
- ) as mock_net_io:
+ with (
+ patch("psutil.virtual_memory") as mock_mem,
+ patch("psutil.cpu_percent") as mock_cpu,
+ patch("psutil.disk_usage") as mock_disk,
+ patch("psutil.disk_partitions") as mock_parts,
+ patch("psutil.disk_io_counters") as mock_disk_io,
+ patch("psutil.net_io_counters") as mock_net_io,
+ ):
mock_mem.return_value = MagicMock(percent=50.0)
mock_cpu.return_value = 30.0
diff --git a/tests/unit/server/services/test_job_phase_detector.py b/tests/unit/server/services/test_job_phase_detector.py
index b180e3d6..377578e9 100644
--- a/tests/unit/server/services/test_job_phase_detector.py
+++ b/tests/unit/server/services/test_job_phase_detector.py
@@ -89,7 +89,12 @@ def test_detect_phase_cidx_indexing_when_cloned_not_indexed(self):
"status": "in_progress",
"repositories": [
{"alias": "repo1", "registered": True, "cloned": True, "indexed": True},
- {"alias": "repo2", "registered": True, "cloned": True, "indexed": False},
+ {
+ "alias": "repo2",
+ "registered": True,
+ "cloned": True,
+ "indexed": False,
+ },
],
}
@@ -255,7 +260,12 @@ def test_get_progress_cidx_indexing_shows_counts(self):
"status": "in_progress",
"repositories": [
{"alias": "repo1", "registered": True, "cloned": True, "indexed": True},
- {"alias": "repo2", "registered": True, "cloned": True, "indexed": False},
+ {
+ "alias": "repo2",
+ "registered": True,
+ "cloned": True,
+ "indexed": False,
+ },
],
}
@@ -305,7 +315,10 @@ def test_get_progress_done_completed_includes_result(self):
progress = detector.get_progress(job_state, JobPhase.DONE)
- assert progress.progress.get("result") == "The authentication system uses JWT tokens..."
+ assert (
+ progress.progress.get("result")
+ == "The authentication system uses JWT tokens..."
+ )
assert progress.is_terminal is True
def test_get_progress_done_failed_includes_error(self):
diff --git a/tests/unit/server/services/test_maintenance_service.py b/tests/unit/server/services/test_maintenance_service.py
index a3fcec8b..92cc7aa9 100644
--- a/tests/unit/server/services/test_maintenance_service.py
+++ b/tests/unit/server/services/test_maintenance_service.py
@@ -5,7 +5,6 @@
"""
import pytest
-import threading
from unittest.mock import MagicMock
@@ -276,7 +275,9 @@ def test_refresh_scheduler_job_submission_rejected_during_maintenance(self):
get_maintenance_state,
)
from code_indexer.global_repos.refresh_scheduler import RefreshScheduler
- from code_indexer.server.repositories.background_jobs import BackgroundJobManager
+ from code_indexer.server.repositories.background_jobs import (
+ BackgroundJobManager,
+ )
from code_indexer.server.jobs.exceptions import MaintenanceModeError
from unittest.mock import MagicMock, patch
import tempfile
diff --git a/tests/unit/server/services/test_prompt_template_processor.py b/tests/unit/server/services/test_prompt_template_processor.py
index c5147abf..c949f278 100644
--- a/tests/unit/server/services/test_prompt_template_processor.py
+++ b/tests/unit/server/services/test_prompt_template_processor.py
@@ -6,8 +6,6 @@
Tests follow TDD methodology - tests written FIRST before implementation.
"""
-import pytest
-
class TestPromptTemplateProcessorRender:
"""Tests for PromptTemplateProcessor.render() method."""
@@ -374,9 +372,9 @@ def test_render_impersonation_instruction_precedes_template_content(self):
template_pos = result.find("TEMPLATE_MARKER_UNIQUE_12345")
assert impersonation_pos == 0, "Impersonation instruction must be at position 0"
- assert template_pos > impersonation_pos, (
- "Template content must come after impersonation instruction"
- )
+ assert (
+ template_pos > impersonation_pos
+ ), "Template content must come after impersonation instruction"
class TestPromptTemplateProcessorSpaceVariantPlaceholders: