Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions src/code_indexer/server/clients/claude_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ async def authenticate(self) -> str:
# Claude Server returns "expires" (ISO datetime), standard returns "expires_in" (seconds)
if "expires" in data:
from dateutil.parser import parse as parse_datetime

self._jwt_expires = parse_datetime(data["expires"])
else:
expires_in = data.get("expires_in", 3600)
Expand Down Expand Up @@ -194,9 +195,7 @@ async def _make_authenticated_request(
)
elif response.status_code == 401 and not retry_on_401:
# Second 401 means auth truly failed - raise exception
raise ClaudeServerAuthError(
"Authentication failed after token refresh"
)
raise ClaudeServerAuthError("Authentication failed after token refresh")

return response

Expand Down Expand Up @@ -250,9 +249,7 @@ async def register_repository(
f"Repository registration failed: HTTP {response.status_code}"
)

async def create_job(
self, prompt: str, repositories: List[str]
) -> Dict[str, Any]:
async def create_job(self, prompt: str, repositories: List[str]) -> Dict[str, Any]:
"""
Create a new job with the given prompt.

Expand All @@ -276,13 +273,9 @@ async def create_job(
if response.status_code in (200, 201):
return response.json()
elif response.status_code >= 500:
raise ClaudeServerError(
f"Claude Server error: HTTP {response.status_code}"
)
raise ClaudeServerError(f"Claude Server error: HTTP {response.status_code}")
else:
raise ClaudeServerError(
f"Job creation failed: HTTP {response.status_code}"
)
raise ClaudeServerError(f"Job creation failed: HTTP {response.status_code}")

async def start_job(self, job_id: str) -> Dict[str, Any]:
"""
Expand Down
4 changes: 3 additions & 1 deletion src/code_indexer/server/config/delegation_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class ClaudeDelegationConfig:
claude_server_credential_type: Literal["password", "api_key"] = "password"
claude_server_credential: str = "" # Encrypted at rest
skip_ssl_verify: bool = False # Allow self-signed certificates for E2E testing
cidx_callback_url: str = "" # Story #720: URL that Claude Server uses to POST callbacks
cidx_callback_url: str = (
"" # Story #720: URL that Claude Server uses to POST callbacks
)

@property
def is_configured(self) -> bool:
Expand Down
156 changes: 108 additions & 48 deletions src/code_indexer/server/mcp/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8852,7 +8852,9 @@ def _get_delegation_function_repo_path() -> Optional[Path]:
repo_path = golden_repo_manager.get_actual_repo_path(function_repo_alias)
return Path(repo_path) if repo_path else None
except Exception as e:
logger.warning(f"Function repository '{function_repo_alias}' not found: {e}")
logger.warning(
f"Function repository '{function_repo_alias}' not found: {e}"
)
return None

except Exception as e:
Expand Down Expand Up @@ -8974,7 +8976,9 @@ def _get_delegation_config():
return None


def _validate_function_parameters(target_function, parameters: Dict[str, Any]) -> Optional[str]:
def _validate_function_parameters(
target_function, parameters: Dict[str, Any]
) -> Optional[str]:
"""
Validate required parameters are present.

Expand All @@ -8989,7 +8993,9 @@ def _validate_function_parameters(target_function, parameters: Dict[str, Any]) -
return None


async def _ensure_repos_registered(client, required_repos: List[Dict[str, Any]]) -> List[str]:
async def _ensure_repos_registered(
client, required_repos: List[Dict[str, Any]]
) -> List[str]:
"""
Ensure required repositories are registered in Claude Server.

Expand Down Expand Up @@ -9064,8 +9070,14 @@ async def handle_execute_delegation_function(
repo_path = _get_delegation_function_repo_path()
delegation_config = _get_delegation_config()

if repo_path is None or delegation_config is None or not delegation_config.is_configured:
return _mcp_response({"success": False, "error": "Claude Delegation not configured"})
if (
repo_path is None
or delegation_config is None
or not delegation_config.is_configured
):
return _mcp_response(
{"success": False, "error": "Claude Delegation not configured"}
)

function_name = args.get("function_name", "")
parameters = args.get("parameters", {})
Expand All @@ -9074,17 +9086,27 @@ async def handle_execute_delegation_function(
# Load and find function
loader = DelegationFunctionLoader()
all_functions = loader.load_functions(repo_path)
target_function = next((f for f in all_functions if f.name == function_name), None)
target_function = next(
(f for f in all_functions if f.name == function_name), None
)

if target_function is None:
return _mcp_response({"success": False, "error": f"Function not found: {function_name}"})
return _mcp_response(
{"success": False, "error": f"Function not found: {function_name}"}
)

# Access validation
effective_user = session_state.effective_user if session_state and session_state.is_impersonating else user
effective_user = (
session_state.effective_user
if session_state and session_state.is_impersonating
else user
)
user_groups = _get_user_groups(effective_user)

if not (user_groups & set(target_function.allowed_groups)):
return _mcp_response({"success": False, "error": "Access denied: insufficient permissions"})
return _mcp_response(
{"success": False, "error": "Access denied: insufficient permissions"}
)

# Parameter validation
param_error = _validate_function_parameters(target_function, parameters)
Expand All @@ -9099,31 +9121,43 @@ async def handle_execute_delegation_function(
password=delegation_config.claude_server_credential,
skip_ssl_verify=delegation_config.skip_ssl_verify,
) as client:
repo_aliases = await _ensure_repos_registered(client, target_function.required_repos)
repo_aliases = await _ensure_repos_registered(
client, target_function.required_repos
)

# Render prompt and create job
processor = PromptTemplateProcessor()
impersonation_user = target_function.impersonation_user or effective_user.username
impersonation_user = (
target_function.impersonation_user or effective_user.username
)
rendered_prompt = processor.render(
template=target_function.prompt_template,
parameters=parameters,
user_prompt=user_prompt,
impersonation_user=impersonation_user,
)

job_result = await client.create_job(prompt=rendered_prompt, repositories=repo_aliases)
job_result = await client.create_job(
prompt=rendered_prompt, repositories=repo_aliases
)
# Claude Server returns camelCase "jobId"
job_id = job_result.get("jobId") or job_result.get("job_id")
if not job_id:
return _mcp_response({"success": False, "error": "Job created but no job_id returned"})
return _mcp_response(
{"success": False, "error": "Job created but no job_id returned"}
)

# Story #720: Register callback URL with Claude Server for completion notification
callback_base_url = _get_cidx_callback_base_url()
if callback_base_url:
callback_url = f"{callback_base_url.rstrip('/')}/api/delegation/callback/{job_id}"
callback_url = (
f"{callback_base_url.rstrip('/')}/api/delegation/callback/{job_id}"
)
try:
await client.register_callback(job_id, callback_url)
logger.debug(f"Registered callback URL for job {job_id}: {callback_url}")
logger.debug(
f"Registered callback URL for job {job_id}: {callback_url}"
)
except Exception as callback_err:
# Log but don't fail - callback registration is best-effort
logger.warning(
Expand All @@ -9142,10 +9176,15 @@ async def handle_execute_delegation_function(
return _mcp_response({"success": True, "job_id": job_id})

except ClaudeServerError as e:
logger.error(f"Claude Server error: {e}", extra={"correlation_id": get_correlation_id()})
logger.error(
f"Claude Server error: {e}", extra={"correlation_id": get_correlation_id()}
)
return _mcp_response({"success": False, "error": f"Claude Server error: {e}"})
except Exception as e:
logger.exception(f"Error in execute_delegation_function: {e}", extra={"correlation_id": get_correlation_id()})
logger.exception(
f"Error in execute_delegation_function: {e}",
extra={"correlation_id": get_correlation_id()},
)
return _mcp_response({"success": False, "error": str(e)})


Expand Down Expand Up @@ -9179,66 +9218,87 @@ async def handle_poll_delegation_job(
delegation_config = _get_delegation_config()

if delegation_config is None or not delegation_config.is_configured:
return _mcp_response({"success": False, "error": "Claude Delegation not configured"})
return _mcp_response(
{"success": False, "error": "Claude Delegation not configured"}
)

job_id = args.get("job_id", "")
if not job_id:
return _mcp_response({"success": False, "error": "Missing required parameter: job_id"})
return _mcp_response(
{"success": False, "error": "Missing required parameter: job_id"}
)

# Story #720: Get timeout_seconds from args (default 45s, below MCP's 60s)
# Also support legacy "timeout" parameter for backward compatibility
timeout = args.get("timeout_seconds", args.get("timeout", 45))
if not isinstance(timeout, (int, float)):
return _mcp_response({
"success": False,
"error": "timeout_seconds must be a number (recommended: 5-300)"
})
return _mcp_response(
{
"success": False,
"error": "timeout_seconds must be a number (recommended: 5-300)",
}
)
# Minimum 0.01s (for testing), maximum 300s (5 minutes)
# Recommended range for production: 5-300 seconds
if timeout < 0.01 or timeout > 300:
return _mcp_response({
"success": False,
"error": "timeout_seconds must be between 0.01 and 300"
})
return _mcp_response(
{
"success": False,
"error": "timeout_seconds must be between 0.01 and 300",
}
)

# Check if job exists in tracker before waiting
tracker = DelegationJobTracker.get_instance()
job_exists = await tracker.has_job(job_id)
if not job_exists:
return _mcp_response({
"success": False,
"error": f"Job {job_id} not found or already completed",
})
return _mcp_response(
{
"success": False,
"error": f"Job {job_id} not found or already completed",
}
)

# Wait for callback via DelegationJobTracker
result = await tracker.wait_for_job(job_id, timeout=timeout)

if result is None:
# Timeout - job still exists, caller can try again
return _mcp_response({
"status": "waiting",
"message": "Job still running, callback not yet received",
"continue_polling": True,
})
return _mcp_response(
{
"status": "waiting",
"message": "Job still running, callback not yet received",
"continue_polling": True,
}
)

# Return result based on status from callback
if result.status == "completed":
return _mcp_response({
"status": "completed",
"result": result.output,
"continue_polling": False,
})
return _mcp_response(
{
"status": "completed",
"result": result.output,
"continue_polling": False,
}
)
else:
# Failed or other status
return _mcp_response({
"status": "failed",
"error": result.error or result.output,
"continue_polling": False,
})
return _mcp_response(
{
"status": "failed",
"error": result.error or result.output,
"continue_polling": False,
}
)

except Exception as e:
logger.error(f"Error waiting for delegation job {job_id}: {e}", extra={"correlation_id": get_correlation_id()})
return _mcp_response({"success": False, "error": f"Error waiting for job completion: {str(e)}"})
logger.error(
f"Error waiting for delegation job {job_id}: {e}",
extra={"correlation_id": get_correlation_id()},
)
return _mcp_response(
{"success": False, "error": f"Error waiting for job completion: {str(e)}"}
)


HANDLER_REGISTRY["poll_delegation_job"] = handle_poll_delegation_job
4 changes: 3 additions & 1 deletion src/code_indexer/server/mcp/session_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def __new__(cls) -> "SessionRegistry":
# TTL cleanup attributes (Story #731)
instance._cleanup_task: Optional[asyncio.Task] = None
instance._ttl_seconds = DEFAULT_SESSION_TTL_SECONDS
instance._cleanup_interval_seconds = DEFAULT_CLEANUP_INTERVAL_SECONDS
instance._cleanup_interval_seconds = (
DEFAULT_CLEANUP_INTERVAL_SECONDS
)
cls._instance = instance
return cls._instance

Expand Down
8 changes: 7 additions & 1 deletion src/code_indexer/server/mcp/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -7410,7 +7410,13 @@ def filter_tools_by_role(user: User) -> List[Dict[str, Any]]:
},
"phase": {
"type": "string",
"enum": ["repo_registration", "repo_cloning", "cidx_indexing", "job_running", "done"],
"enum": [
"repo_registration",
"repo_cloning",
"cidx_indexing",
"job_running",
"done",
],
"description": "Current phase of job execution",
},
"progress": {
Expand Down
16 changes: 9 additions & 7 deletions src/code_indexer/server/query/semantic_query_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,13 +979,15 @@ def _search_single_repository(
# Story #725: Only warn when non-default filters are explicitly set
# Note: accuracy="balanced" is the default, so we only warn if accuracy
# is set to something other than "balanced" or None
has_non_default_filters = any([
language, # None is default
exclude_language, # None is default
path_filter, # None is default
exclude_path, # None is default
accuracy and accuracy != "balanced" # "balanced" is default
])
has_non_default_filters = any(
[
language, # None is default
exclude_language, # None is default
path_filter, # None is default
exclude_path, # None is default
accuracy and accuracy != "balanced", # "balanced" is default
]
)
if search_mode in ["semantic", "hybrid"] and has_non_default_filters:
self.logger.warning(
f"Advanced filter parameters (language={language}, exclude_language={exclude_language}, "
Expand Down
8 changes: 4 additions & 4 deletions src/code_indexer/server/routers/delegation_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class CallbackPayload(BaseModel):
Repository: Optional[str] = Field(None, description="Repository alias")
CreatedAt: Optional[datetime] = Field(None, description="Job creation timestamp")
StartedAt: Optional[datetime] = Field(None, description="Job start timestamp")
CompletedAt: Optional[datetime] = Field(None, description="Job completion timestamp")
CompletedAt: Optional[datetime] = Field(
None, description="Job completion timestamp"
)
ReferenceId: Optional[str] = Field(None, description="Reference ID for tracking")
AffinityToken: Optional[str] = Field(None, description="Affinity token for routing")

Expand Down Expand Up @@ -75,9 +77,7 @@ async def receive_delegation_callback(
Returns:
CallbackResponse indicating receipt and whether job was found
"""
logger.info(
f"Received callback for job {job_id}: status={payload.Status}"
)
logger.info(f"Received callback for job {job_id}: status={payload.Status}")

tracker = DelegationJobTracker.get_instance()

Expand Down
Loading
Loading