Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cortex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)
from cortex.update_checker import UpdateChannel, should_notify_update
from cortex.updater import Updater, UpdateStatus
from cortex.utils.retry import DEFAULT_MAX_RETRIES
from cortex.validators import validate_api_key, validate_install_request
from cortex.version_manager import get_version_string

Expand Down Expand Up @@ -1445,6 +1446,7 @@ def install(
dry_run: bool = False,
parallel: bool = False,
json_output: bool = False,
max_retries: int = DEFAULT_MAX_RETRIES,
) -> int:
"""Install software using the LLM-powered package manager."""
# Initialize installation history
Expand Down Expand Up @@ -1670,6 +1672,7 @@ def parallel_log_callback(message: str, level: str = "info"):
timeout=300,
stop_on_error=True,
progress_callback=progress_callback,
max_retries=max_retries,
)

result = coordinator.execute()
Expand Down
40 changes: 38 additions & 2 deletions cortex/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
from enum import Enum
from typing import Any

from cortex.utils.retry import (
DEFAULT_MAX_RETRIES,
ErrorCategory,
RetryStrategy,
SmartRetry,
load_strategies_from_env,
)
from cortex.validators import DANGEROUS_PATTERNS

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,13 +67,15 @@ def __init__(
enable_rollback: bool = False,
log_file: str | None = None,
progress_callback: Callable[[int, int, InstallationStep], None] | None = None,
max_retries: int = DEFAULT_MAX_RETRIES,
):
"""Initialize an installation run with optional logging and rollback."""
self.timeout = timeout
self.stop_on_error = stop_on_error
self.enable_rollback = enable_rollback
self.log_file = log_file
self.progress_callback = progress_callback
self.max_retries = max_retries

if descriptions and len(descriptions) != len(commands):
raise ValueError("Number of descriptions must match number of commands")
Expand All @@ -90,6 +99,7 @@ def from_plan(
enable_rollback: bool | None = None,
log_file: str | None = None,
progress_callback: Callable[[int, int, InstallationStep], None] | None = None,
max_retries: int = 5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use DEFAULT_MAX_RETRIES constant instead of hardcoded value.

Line 102 uses a hardcoded 5 while __init__ at line 70 uses the imported DEFAULT_MAX_RETRIES constant. This inconsistency could cause confusion and maintenance issues if the default value changes.

🔧 Proposed fix
-        max_retries: int = 5,
+        max_retries: int = DEFAULT_MAX_RETRIES,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
max_retries: int = 5,
max_retries: int = DEFAULT_MAX_RETRIES,
🤖 Prompt for AI Agents
In `@cortex/coordinator.py` at line 102, Replace the hardcoded default value 5 for
the max_retries parameter with the shared constant DEFAULT_MAX_RETRIES to keep
defaults consistent; update the function/method signature that currently
declares max_retries: int = 5 to use max_retries: int = DEFAULT_MAX_RETRIES and
ensure any nearby references (e.g., in the same class/method as __init__ or
other methods) use the same DEFAULT_MAX_RETRIES symbol so the module
consistently relies on the imported constant.

) -> "InstallationCoordinator":
"""Create a coordinator from a structured plan produced by an LLM.

Expand Down Expand Up @@ -124,6 +134,7 @@ def from_plan(
),
log_file=log_file,
progress_callback=progress_callback,
max_retries=max_retries,
)

for rollback_cmd in rollback_commands:
Expand Down Expand Up @@ -174,14 +185,39 @@ def _execute_command(self, step: InstallationStep) -> bool:
self._log(f"Command blocked: {step.command} - {error}")
return False

try:
def run_cmd() -> subprocess.CompletedProcess[str]:
# Use shell=True carefully - commands are validated first
# For complex shell commands (pipes, redirects), shell=True is needed
# Simple commands could use shlex.split() with shell=False
result = subprocess.run(
return subprocess.run(
step.command, shell=True, capture_output=True, text=True, timeout=self.timeout
)

def status_callback(msg: str) -> None:
self._log(msg)
# Only print to stdout if no progress callback is configured to avoid duplicates
if self.progress_callback is None:
print(msg)

# Load strategies and apply CLI override for network errors
strategies = load_strategies_from_env()
if ErrorCategory.NETWORK_ERROR in strategies:
# Create a new instance to avoid mutating the shared default object
original_strategy = strategies[ErrorCategory.NETWORK_ERROR]
strategies[ErrorCategory.NETWORK_ERROR] = RetryStrategy(
max_retries=self.max_retries,
backoff_factor=original_strategy.backoff_factor,
description=original_strategy.description,
)

retry_handler = SmartRetry(
strategies=strategies,
status_callback=status_callback,
)

try:
result = retry_handler.run(run_cmd)

step.return_code = result.returncode
step.output = result.stdout
step.error = result.stderr
Expand Down
222 changes: 222 additions & 0 deletions cortex/utils/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import logging
import os
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

from cortex.error_parser import ErrorCategory, ErrorParser

logger = logging.getLogger(__name__)

# Default maximum number of retries for the global retry setting
DEFAULT_MAX_RETRIES = 5


@dataclass
class RetryStrategy:
"""Configuration for how to retry a specific error type."""

max_retries: int
backoff_factor: float
description: str


# Default strategies for each retryable error category
DEFAULT_STRATEGIES: dict[ErrorCategory, RetryStrategy] = {
ErrorCategory.NETWORK_ERROR: RetryStrategy(
max_retries=DEFAULT_MAX_RETRIES,
backoff_factor=1.0,
description="Network issues - retry aggressively with short backoff",
),
ErrorCategory.LOCK_ERROR: RetryStrategy(
max_retries=3,
backoff_factor=5.0,
description="Lock contention - wait longer between retries",
),
ErrorCategory.UNKNOWN: RetryStrategy(
max_retries=2,
backoff_factor=2.0,
description="Unknown errors - conservative retry",
),
}

# Permanent error categories that should never be retried
PERMANENT_ERRORS: set[ErrorCategory] = {
ErrorCategory.PERMISSION_DENIED,
ErrorCategory.PACKAGE_NOT_FOUND,
ErrorCategory.CONFIGURATION_ERROR,
ErrorCategory.DEPENDENCY_MISSING,
ErrorCategory.CONFLICT,
ErrorCategory.DISK_SPACE,
}


def load_strategies_from_env() -> dict[ErrorCategory, RetryStrategy]:
"""
Load retry strategies from environment variables, falling back to defaults.

Environment variables:
CORTEX_RETRY_NETWORK_MAX: Max retries for network errors (default: 5)
CORTEX_RETRY_NETWORK_BACKOFF: Backoff factor for network errors (default: 1.0)
CORTEX_RETRY_LOCK_MAX: Max retries for lock errors (default: 3)
CORTEX_RETRY_LOCK_BACKOFF: Backoff factor for lock errors (default: 5.0)
CORTEX_RETRY_UNKNOWN_MAX: Max retries for unknown errors (default: 2)
CORTEX_RETRY_UNKNOWN_BACKOFF: Backoff factor for unknown errors (default: 2.0)
"""
strategies = dict(DEFAULT_STRATEGIES)

# Network error overrides
if os.getenv("CORTEX_RETRY_NETWORK_MAX") or os.getenv("CORTEX_RETRY_NETWORK_BACKOFF"):
strategies[ErrorCategory.NETWORK_ERROR] = RetryStrategy(
max_retries=int(os.getenv("CORTEX_RETRY_NETWORK_MAX", "5")),
backoff_factor=float(os.getenv("CORTEX_RETRY_NETWORK_BACKOFF", "1.0")),
description="Network issues (user-configured)",
)

# Lock error overrides
if os.getenv("CORTEX_RETRY_LOCK_MAX") or os.getenv("CORTEX_RETRY_LOCK_BACKOFF"):
strategies[ErrorCategory.LOCK_ERROR] = RetryStrategy(
max_retries=int(os.getenv("CORTEX_RETRY_LOCK_MAX", "3")),
backoff_factor=float(os.getenv("CORTEX_RETRY_LOCK_BACKOFF", "5.0")),
description="Lock contention (user-configured)",
)

# Unknown error overrides
if os.getenv("CORTEX_RETRY_UNKNOWN_MAX") or os.getenv("CORTEX_RETRY_UNKNOWN_BACKOFF"):
strategies[ErrorCategory.UNKNOWN] = RetryStrategy(
max_retries=int(os.getenv("CORTEX_RETRY_UNKNOWN_MAX", "2")),
backoff_factor=float(os.getenv("CORTEX_RETRY_UNKNOWN_BACKOFF", "2.0")),
description="Unknown errors (user-configured)",
)

return strategies


class SmartRetry:
"""
Implements smart retry logic with exponential backoff.
Uses ErrorParser to distinguish between transient and permanent errors.
Supports different retry strategies per error category.
"""

def __init__(
self,
strategies: dict[ErrorCategory, RetryStrategy] | None = None,
status_callback: Callable[[str], None] | None = None,
):
"""
Initialize SmartRetry with optional custom strategies.

Args:
strategies: Custom retry strategies per error category.
If None, loads from environment or uses defaults.
status_callback: Optional callback for status messages.
"""
self.strategies = strategies if strategies is not None else load_strategies_from_env()

# Validate strategies
for category, strategy in self.strategies.items():
if strategy.max_retries < 0:
raise ValueError(f"Strategy for {category.name}: max_retries must be non-negative")
if strategy.backoff_factor <= 0:
raise ValueError(f"Strategy for {category.name}: backoff_factor must be positive")

self.status_callback = status_callback
self.error_parser = ErrorParser()

def run(self, func: Callable[[], Any]) -> Any:
"""
Run a function with smart retry logic.

Args:
func: The function to execute. Expected to return a result object
that has `returncode`, `stdout`, and `stderr` attributes
(like subprocess.CompletedProcess), or raise an exception.

Returns:
The result of the function call.
"""
attempt = 0
last_exception = None
last_result = None
current_strategy: RetryStrategy | None = None

while True:
try:
result = func()
last_result = result

# If result indicates success (returncode 0), return immediately
if hasattr(result, "returncode") and result.returncode == 0:
return result

# If result indicates failure, analyze it
error_msg = ""
if hasattr(result, "stderr") and result.stderr:
error_msg = result.stderr

category = self._get_error_category(error_msg)
current_strategy = self._get_strategy(category)

if current_strategy is None:
# Permanent error - fail fast
return result

except Exception as e:
last_exception = e
category = self._get_error_category(str(e))
current_strategy = self._get_strategy(category)

if current_strategy is None:
# Permanent error - fail fast
raise

# Check if we've exhausted retries for this strategy
if current_strategy is None or attempt >= current_strategy.max_retries:
break

attempt += 1
sleep_time = current_strategy.backoff_factor * (2 ** (attempt - 1))

category_name = category.name if category else "UNKNOWN"
msg = (
f"⚠️ {category_name} detected. "
f"Retrying in {sleep_time}s... (Retry {attempt}/{current_strategy.max_retries})"
)
logger.warning(msg)
if self.status_callback:
self.status_callback(msg)

time.sleep(sleep_time)

if last_exception:
raise last_exception
return last_result

def _get_error_category(self, error_message: str) -> ErrorCategory | None:
"""Classify the error message into a category."""
if not error_message:
logger.warning("Retry: Empty error message detected. Assuming UNKNOWN (transient).")
return ErrorCategory.UNKNOWN

analysis = self.error_parser.parse_error(error_message)

# If the error is explicitly marked as not fixable, treat as permanent
if not analysis.is_fixable:
return None

return analysis.primary_category

def _get_strategy(self, category: ErrorCategory | None) -> RetryStrategy | None:
"""
Get the retry strategy for a given error category.
Returns None for permanent errors (should not retry).
"""
if category is None:
return None

if category in PERMANENT_ERRORS:
return None

return self.strategies.get(category)
1 change: 1 addition & 0 deletions docs/COMMANDS.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ cortex install "python3 with pip and virtualenv" --execute
- Without `--execute`, Cortex only shows the commands it would run
- The `--dry-run` flag is recommended for first-time use to verify commands
- Installation is recorded in history for potential rollback
- **Smart Retry Logic**: Cortex automatically detects transient failures (like network timeouts) and retries commands with exponential backoff (up to 5 attempts). Permanent errors (like permission denied) fail immediately.

---

Expand Down
Loading