From 84eb3a00c6b8c9d522e0895bcb788c7b1baf925f Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 02:08:15 +0530 Subject: [PATCH 01/16] feat: implement smart update recommendations with CLI integration and tests --- README.md | 5 + cortex/cli.py | 102 +++- cortex/i18n/locales/en.yaml | 113 ++-- cortex/update_recommender.py | 760 +++++++++++++++++++++++++++ docs/SMART_UPDATE_RECOMMENDATIONS.md | 274 ++++++++++ docs/guides/Developer-Guide.md | 9 +- docs/guides/User-Guide.md | 14 + tests/test_update_recommender.py | 296 +++++++++++ 8 files changed, 1528 insertions(+), 45 deletions(-) create mode 100644 cortex/update_recommender.py create mode 100644 docs/SMART_UPDATE_RECOMMENDATIONS.md create mode 100644 tests/test_update_recommender.py diff --git a/README.md b/README.md index 24db1c178..b12e55638 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ cortex install "tools for video compression" | **Docker Permission Fixer** | Fix root-owned bind mount issues automatically | | **Audit Trail** | Complete history in `~/.cortex/history.db` | | **Hardware-Aware** | Detects GPU, CPU, memory for optimized packages | +| **Smart Update Recommendations** | AI-powered update recommendations and risk assessment | | **Multi-LLM Support** | Works with Claude, GPT-4, or local Ollama models | --- @@ -165,6 +166,9 @@ cortex history # Rollback an installation cortex rollback + +# Get smart update recommendations +cortex update recommend ``` ### Role Management @@ -190,6 +194,7 @@ cortex role set | `cortex sandbox ` | Test packages in Docker sandbox | | `cortex history` | View all past installations | | `cortex rollback ` | Undo a specific installation | +| `cortex update recommend` | Get AI-powered update recommendations | | `cortex --version` | Show version information | | `cortex --help` | Display help message | diff --git a/cortex/cli.py b/cortex/cli.py index 6638a8804..46207c5b2 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -22,13 +22,7 @@ format_package_list, ) from cortex.env_manager import EnvironmentManager, get_env_manager -from cortex.i18n import ( - SUPPORTED_LANGUAGES, - LanguageConfig, - get_language, - set_language, - t, -) +from cortex.i18n import SUPPORTED_LANGUAGES, LanguageConfig, get_language, set_language, t from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter from cortex.network_config import NetworkConfig @@ -1940,6 +1934,85 @@ def progress_callback(message: str, percent: float) -> None: ) return 0 + elif action == "recommend": + # Smart Update Recommendations (Issue #91) + from cortex.update_recommender import UpdateRecommender, recommend_updates + + use_llm = not getattr(args, "no_llm", False) + output_json = getattr(args, "json", False) + + if output_json: + import json as json_module + + llm_router = None + if use_llm: + try: + from cortex.llm_router import LLMRouter + + llm_router = LLMRouter() + except Exception: + pass + + recommender = UpdateRecommender(llm_router=llm_router, verbose=self.verbose) + recommendation = recommender.get_recommendations(use_llm=use_llm) + + # Convert to JSON-serializable format + output = { + "timestamp": recommendation.timestamp, + "total_updates": recommendation.total_updates, + "overall_risk": recommendation.overall_risk.value, + "security_updates": [ + { + "package": u.package_name, + "current": str(u.current_version), + "new": str(u.new_version), + "risk": u.risk_level.value, + "type": u.change_type.value, + } + for u in recommendation.security_updates + ], + "immediate_updates": [ + { + "package": u.package_name, + "current": str(u.current_version), + "new": str(u.new_version), + "risk": u.risk_level.value, + "type": u.change_type.value, + } + for u in recommendation.immediate_updates + ], + "scheduled_updates": [ + { + "package": u.package_name, + "current": str(u.current_version), + "new": str(u.new_version), + "risk": u.risk_level.value, + "type": u.change_type.value, + } + for u in recommendation.scheduled_updates + ], + "deferred_updates": [ + { + "package": u.package_name, + "current": str(u.current_version), + "new": str(u.new_version), + "risk": u.risk_level.value, + "type": u.change_type.value, + "breaking_changes": u.breaking_changes, + } + for u in recommendation.deferred_updates + ], + "groups": { + k: [u.package_name for u in v] for k, v in recommendation.groups.items() + }, + "llm_analysis": recommendation.llm_analysis, + } + print(json_module.dumps(output, indent=2)) + return 0 + else: + cx_print(t("update_recommend.checking"), "thinking") + return recommend_updates(use_llm=use_llm, verbose=self.verbose) + else: # Default: show current version and check for updates cx_print(f"Current version: [cyan]{get_version_string()}[/cyan]", "info") @@ -3786,6 +3859,21 @@ def main(): # update backups update_subs.add_parser("backups", help="List available backups for rollback") + + # update recommend - Smart Update Recommendations (Issue #91) + update_recommend_parser = update_subs.add_parser( + "recommend", help="AI-powered update recommendations" + ) + update_recommend_parser.add_argument( + "--no-llm", + action="store_true", + help="Disable LLM analysis for recommendations", + ) + update_recommend_parser.add_argument( + "--json", + action="store_true", + help="Output recommendations in JSON format", + ) # -------------------------- # WiFi/Bluetooth Driver Matcher diff --git a/cortex/i18n/locales/en.yaml b/cortex/i18n/locales/en.yaml index 493662fea..d608e4f37 100644 --- a/cortex/i18n/locales/en.yaml +++ b/cortex/i18n/locales/en.yaml @@ -58,7 +58,7 @@ language: # {error} - error message set_failed: "Failed to set language: {error}" supported_languages_header: "Supported languages:" - + # ============================================================================= # CLI commands # ============================================================================= @@ -68,7 +68,7 @@ cli: help_text: "AI-powered package manager for Linux" tagline: "Just tell Cortex what you want to install." learn_more: "Learn more" - + # Command descriptions commands: ask: "Ask a question about your system" @@ -87,7 +87,7 @@ cli: sandbox: "Test packages in Docker sandbox" doctor: "System health check" config: "Configure Cortex settings" - + # Argument help args: software: "Software to install" @@ -114,7 +114,7 @@ config: use_command_hint: "Use: cortex config language to change" list_hint: "Use: cortex config language --list for details" code_label: "Code" - + # ============================================================================= # Cache statistics # ============================================================================= @@ -130,7 +130,7 @@ cache: saved_calls: "Saved calls (approx)" read_error: "Unable to read cache stats: {error}" unexpected_error: "Unexpected error reading cache stats: {error}" - + # ============================================================================= # UI Labels (common labels used throughout the interface) # ============================================================================= @@ -160,7 +160,7 @@ ui: example_import: "Example: cortex import {file_path} --execute" example_import_all: "Example: cortex import --all --execute" installation_cancelled: "Installation cancelled" - + # ============================================================================= # Installation # ============================================================================= @@ -170,7 +170,7 @@ install: planning: "Planning installation..." executing: "Executing installation..." verifying: "Verifying installation..." - + # Results success: "Installation complete!" failed: "Installation failed" @@ -180,12 +180,12 @@ install: package_installed_version: "{package} ({version}) installed successfully" # {count} - number of packages packages_installed: "{count} packages installed" - + # Dry run dry_run_header: "Dry-run results" dry_run_message: "Dry-run completed. Use --execute to apply changes." commands_would_run: "Commands that would run" - + # Progress # {current}, {total} - step numbers step_progress: "Step {current}/{total}" @@ -193,7 +193,7 @@ install: step_executing: "Executing step {step}: {description}" step_completed: "Step completed" step_failed: "Step failed" - + # Errors no_commands: "No commands generated" invalid_request: "Invalid installation request" @@ -201,12 +201,12 @@ install: api_error: "API error: {error}" # {package} - package name package_not_found: "Package not found: {package}" - + # Confirmation confirm_install: "Proceed with installation?" # {count} - package count confirm_install_count: "Install {count} packages?" - + # ============================================================================= # Stack management # ============================================================================= @@ -231,7 +231,7 @@ stack: use_command: "Use: cortex stack to install a stack" # {original}, {suggested} - stack names gpu_fallback: "No GPU detected, using '{suggested}' instead of '{original}'" - + # ============================================================================= # Sandbox # ============================================================================= @@ -241,7 +241,7 @@ sandbox: commands_header: "Commands" example_workflow: "Example workflow" environments_header: "Sandbox Environments" - + # Command descriptions (for help text) cmd_create: "Create a sandbox environment" cmd_install: "Install package in sandbox" @@ -250,7 +250,7 @@ sandbox: cmd_cleanup: "Remove sandbox environment" cmd_list: "List all sandboxes" cmd_exec: "Execute command in sandbox" - + # Actions # {name} - sandbox name creating: "Creating sandbox '{name}'..." @@ -264,14 +264,14 @@ sandbox: # {name} - sandbox name cleaning: "Removing sandbox '{name}'..." cleaned: "Sandbox '{name}' removed" - + # Results test_passed: "Test passed" test_failed: "Test failed" all_tests_passed: "All tests passed" # {passed}, {total} - test counts tests_summary: "{passed}/{total} tests passed" - + # Errors docker_required: "Docker is required for sandbox commands" docker_only_for_sandbox: "Docker is required only for sandbox commands." @@ -280,12 +280,12 @@ sandbox: no_sandboxes: "No sandbox environments found" create_hint: "Create one with: cortex sandbox create " list_hint: "Use 'cortex sandbox list' to see available sandboxes." - + # Promotion promote_package: "Installing '{package}' on main system..." promotion_cancelled: "Promotion cancelled" would_run: "Would run: {command}" - + # ============================================================================= # History # ============================================================================= @@ -298,15 +298,15 @@ history: action: "Action" packages: "Packages" status: "Status" - + # Status values status_success: "Success" status_failed: "Failed" status_rolled_back: "Rolled back" - + # {id} - installation ID details_for: "Details for installation #{id}" - + # ============================================================================= # Rollback # ============================================================================= @@ -320,7 +320,7 @@ rollback: # {id} - installation ID already_rolled_back: "Installation #{id} was already rolled back" confirm: "Are you sure you want to roll back this installation?" - + # ============================================================================= # Doctor / Health check # ============================================================================= @@ -331,19 +331,19 @@ doctor: all_passed: "All checks passed!" # {count} - issue count issues_found: "{count} issues found" - + # Check names check_api_key: "API Key Configuration" check_network: "Network Connectivity" check_disk_space: "Disk Space" check_permissions: "File Permissions" check_dependencies: "System Dependencies" - + # Results passed: "Passed" warning: "Warning" failed: "Failed" - + # ============================================================================= # Wizard # ============================================================================= @@ -351,19 +351,19 @@ wizard: welcome: "Welcome to Cortex Setup Wizard!" # {step}, {total} - step numbers step: "Step {step} of {total}" - + # API key setup api_key_prompt: "Enter your API key" api_key_saved: "API key saved successfully" api_key_invalid: "Invalid API key format" export_api_key_hint: "Please export your API key in your shell profile." - + # Provider selection select_provider: "Select your LLM provider" provider_anthropic: "Anthropic (Claude)" provider_openai: "OpenAI (GPT)" provider_ollama: "Ollama (Local)" - + # Completion setup_complete: "Setup complete!" ready_message: "Cortex is ready to use" @@ -409,16 +409,16 @@ env: apply_template_failed: "Failed to apply template '{name}'" persist_failed: "Failed to persist: {error}" persist_removal_failed: "Failed to persist removal: {error}" - + # PATH operations path_added: "Added '{path}' to PATH" path_removed: "Removed '{path}' from PATH" path_already_exists: "'{path}' is already in PATH" path_not_found: "'{path}' not found in PATH" - + # {path} - config file path persist_note: "To use in current shell: source {path}" - + # ============================================================================= # Notifications # ============================================================================= @@ -429,18 +429,18 @@ notify: disabled: "Disabled" dnd_window: "Do Not Disturb Window" history_file: "History File" - + # Actions notifications_enabled: "Notifications enabled" notifications_disabled: "Notifications disabled (Critical alerts will still show)" # {start}, {end} - time values dnd_updated: "DND window updated: {start} - {end}" - + # Errors missing_subcommand: "Please specify a subcommand (config/enable/disable/dnd/send)" invalid_time_format: "Invalid time format. Use HH:MM (e.g., 22:00)" message_required: "Message required" - + # ============================================================================= # API Key messages # ============================================================================= @@ -451,7 +451,7 @@ api_key: # {provider} - provider name using_provider: "Using {provider} API key" using_ollama: "Using Ollama (no API key required)" - + # ============================================================================= # Error messages # ============================================================================= @@ -467,7 +467,7 @@ errors: failed_to: "Failed to {action}: {error}" history_retrieve_failed: "Failed to retrieve history: {error}" sdk_required: "Install required SDK or use CORTEX_PROVIDER=ollama" - + # ============================================================================= # Progress indicators # ============================================================================= @@ -480,3 +480,42 @@ progress: cleaning_up: "Cleaning up..." # {seconds} - duration completed_in: "Completed in {seconds} seconds" +# ============================================================================= +# Update Recommendations +# ============================================================================= +update_recommend: + checking: "Analyzing system for update recommendations..." + no_updates: "All packages are up to date! Your system is healthy." + overall_risk: "Overall risk: {risk}" + total_updates: "Total updates available: {count}" + header: "Update Analysis" + + categories: + security: "Security Updates (Apply ASAP)" + immediate: "Safe to Update Now (Low Risk)" + scheduled: "Recommended for Maintenance Window" + deferred: "Hold for Now" + groups: "Related Update Groups" + + risks: + low: "LOW" + medium: "MEDIUM" + high: "HIGH" + critical: "CRITICAL" + + notes: + security: "security" + warnings: "{count} warnings" + group: "group: {name}" + + ai_analysis: "AI Analysis" + more_updates: "... and {count} more" + + recommendations: + security_urgent: "Security update - prioritize installation" + safe_immediate: "Safe to update immediately" + maintenance_window: "Schedule for maintenance window" + consider_deferring: "Consider deferring this update" + major_upgrade: "Major version upgrade: {current} β†’ {new}" + potential_breaking: "Potential breaking changes:" + part_of_group: "Part of {group} update group" diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py new file mode 100644 index 000000000..4e1c43859 --- /dev/null +++ b/cortex/update_recommender.py @@ -0,0 +1,760 @@ +#!/usr/bin/env python3 +""" +Smart Update Recommender for Cortex Linux + +AI-powered system to recommend when and what to update. +Analyzes installed packages, checks for available updates, +assesses risks, and provides intelligent timing recommendations. + +Issue: #91 - Smart Update Recommendations +""" + +import logging +import re +import subprocess +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Any + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from cortex.i18n.translator import t + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +console = Console() + + +class RiskLevel(Enum): + """Risk level for package updates.""" + + LOW = 1 + MEDIUM = 2 + HIGH = 3 + CRITICAL = 4 + + @property + def value_str(self) -> str: + """Get string value for translation keys.""" + return {1: "low", 2: "medium", 3: "high", 4: "critical"}[self.value] + + +class UpdateCategory(Enum): + """Category of update based on recommended timing.""" + + IMMEDIATE = "immediate" # Safe to update now + SCHEDULED = "scheduled" # Recommended for maintenance window + DEFERRED = "deferred" # Hold for now + SECURITY = "security" # Security update - prioritize + + +class ChangeType(Enum): + """Type of version change.""" + + PATCH = "patch" # Bug fixes only + MINOR = "minor" # New features, backward compatible + MAJOR = "major" # Breaking changes possible + SECURITY = "security" # Security fix + UNKNOWN = "unknown" + + +@dataclass +class PackageVersion: + """Represents a package version with parsed components.""" + + raw: str + major: int = 0 + minor: int = 0 + patch: int = 0 + prerelease: str = "" + epoch: int = 0 + + @classmethod + def parse(cls, version_str: str) -> "PackageVersion": + """Parse a version string into components.""" + if not version_str: + return cls(raw="0.0.0") + + raw = version_str.strip() + + # Handle epoch (e.g., "1:2.3.4") + epoch = 0 + if ":" in raw: + epoch_str, raw = raw.split(":", 1) + try: + epoch = int(epoch_str) + except ValueError: + epoch = 0 + + # Remove common suffixes like -1ubuntu1, +dfsg, etc. + clean_version = re.sub(r"[-+~].*$", "", raw) + + # Parse major.minor.patch + parts, major, minor, patch = clean_version.split("."), 0, 0, 0 + try: + if len(parts) >= 1: + # Strip leading non-digits (e.g., 'v1' -> '1') + major_clean = re.sub(r"^\D+", "", parts[0]) + major = int(re.sub(r"\D.*", "", major_clean) or 0) + if len(parts) >= 2: + minor = int(re.sub(r"\D.*", "", parts[1]) or 0) + if len(parts) >= 3: + # Handle alphanumeric patches like "1f" by taking the number + patch_match = re.search(r"(\d+)", parts[2]) + patch = int(patch_match.group(1)) if patch_match else 0 + except (ValueError, IndexError): + pass + + pr_match = re.search(r"[-+](alpha|beta|rc|dev|pre)[\d.]*", raw, re.I) + return cls(version_str, major, minor, patch, pr_match.group(0) if pr_match else "", epoch) + + def __str__(self) -> str: + return self.raw + + def __lt__(self, other: "PackageVersion") -> bool: + if self.epoch != other.epoch: + return self.epoch < other.epoch + if self.major != other.major: + return self.major < other.major + if self.minor != other.minor: + return self.minor < other.minor + if self.patch != other.patch: + return self.patch < other.patch + + # If numeric versions are same, a pre-release is "less" than a final release + if self.prerelease and not other.prerelease: + return True + if not self.prerelease and other.prerelease: + return False + + return False + + +@dataclass +class UpdateInfo: + """Information about a package update.""" + + package_name: str + current_version: PackageVersion + new_version: PackageVersion + change_type: ChangeType + risk_level: RiskLevel + category: UpdateCategory + description: str = "" + changelog: str = "" + dependencies: list[str] = field(default_factory=list) + is_security: bool = False + breaking_changes: list[str] = field(default_factory=list) + recommended_action: str = "" + group: str = "" # For grouping related updates + + +@dataclass +class UpdateRecommendation: + """Full update recommendation for a system.""" + + timestamp: str + total_updates: int + immediate_updates: list[UpdateInfo] = field(default_factory=list) + scheduled_updates: list[UpdateInfo] = field(default_factory=list) + deferred_updates: list[UpdateInfo] = field(default_factory=list) + security_updates: list[UpdateInfo] = field(default_factory=list) + groups: dict[str, list[UpdateInfo]] = field(default_factory=dict) + llm_analysis: str = "" + overall_risk: RiskLevel = RiskLevel.LOW + + +class UpdateRecommender: + """ + AI-powered update recommendation system. + + Analyzes installed packages, checks for updates, assesses risks, + and provides intelligent recommendations on when and what to update. + """ + + # Package groups for related updates + PACKAGE_GROUPS = { + "python": ["python3", "python3-pip", "python3-dev", "python3-venv"], + "docker": ["docker.io", "docker-ce", "docker-compose", "containerd"], + "postgresql": ["postgresql", "postgresql-client", "postgresql-contrib"], + "mysql": ["mysql-server", "mysql-client", "mariadb-server"], + "nginx": ["nginx", "nginx-common", "nginx-core"], + "nodejs": ["nodejs", "npm", "node-gyp"], + "php": ["php", "php-fpm", "php-mysql", "php-pgsql", "php-cli"], + "kernel": ["linux-image", "linux-headers", "linux-modules"], + "gcc": ["gcc", "g++", "cpp", "build-essential"], + "ssl": ["openssl", "libssl-dev", "ca-certificates"], + } + + # Known high-risk packages + HIGH_RISK_PACKAGES = { + "linux-image": "Kernel update - requires reboot", + "linux-headers": "Kernel headers - may break compiled modules", + "glibc": "Core library - system-wide impact", + "libc6": "Core library - system-wide impact", + "systemd": "Init system - critical for boot", + "grub": "Bootloader - could affect boot", + "docker": "Container runtime - affects running containers", + "postgresql": "Database - may require dump/restore", + "mysql": "Database - may require migration", + "openssl": "Encryption - may affect all TLS connections", + } + + # Security update indicators + SECURITY_INDICATORS = [ + "security", + "cve", + "vulnerability", + "exploit", + "patch", + "critical", + "urgent", + ] + + def __init__( + self, + llm_router: Any | None = None, + verbose: bool = False, + ): + """ + Initialize the Update Recommender. + + Args: + llm_router: Optional LLM router for AI-powered analysis + verbose: Enable verbose output + """ + self.llm_router = llm_router + self.verbose = verbose + + def _run_pkg_cmd(self, cmd: list[str]) -> str | None: + """Internal helper to run package manager commands.""" + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + return result.stdout.strip() if result.returncode == 0 else None + except (subprocess.TimeoutExpired, FileNotFoundError): + return None + + def get_installed_packages(self) -> dict[str, PackageVersion]: + """Get all installed packages with their versions.""" + packages = {} + + # Query installed packages via dpkg-query (Debian/Ubuntu) + output = self._run_pkg_cmd(["dpkg-query", "-W", "-f=${Package} ${Version}\n"]) + if output: + for line in output.split("\n"): + parts = line.split(" ", 1) + if len(parts) == 2: + packages[parts[0]] = PackageVersion.parse(parts[1]) + return packages + + # Fallback to RPM query for RHEL/Fedora/Suse systems + output = self._run_pkg_cmd(["rpm", "-qa", "--qf", "%{NAME} %{VERSION}-%{RELEASE}\n"]) + if output: + for line in output.split("\n"): + parts = line.split(" ", 1) + if len(parts) == 2: + packages[parts[0]] = PackageVersion.parse(parts[1]) + return packages + + def get_available_updates(self) -> list[tuple[str, str, str]]: + """Get list of packages with available updates.""" + updates = [] + + # Attempt update check via APT (Debian/Ubuntu) + if self._run_pkg_cmd(["apt-get", "update", "-q"]) is not None: + output = self._run_pkg_cmd(["apt", "list", "--upgradable"]) + if output: + for line in output.splitlines(): + match = re.search( + r"^(\S+)/\S+\s+(\S+)\s+\S+\s+\[upgradable from:\s+(\S+)\]", line + ) + if match: + updates.append(match.groups()) + return updates + + # Fallback check via DNF/YUM (RHEL/Fedora/Amazon Linux) + for pm in ("dnf", "yum"): + # pm check-update returns 100 if updates available + try: + result = subprocess.run( + [pm, "check-update", "-q"], capture_output=True, text=True, timeout=120 + ) + if result.returncode in (0, 100) and result.stdout: + for line in result.stdout.strip().splitlines(): + if line and not line.startswith(" "): + parts = line.split() + if len(parts) >= 2: + name, new_ver = parts[0].rsplit(".", 1)[0], parts[1] + # Get current version + info = self._run_pkg_cmd([pm, "info", "installed", name]) or "" + old_ver = next( + ( + l.split(":", 1)[1].strip() + for l in info.splitlines() + if l.startswith("Version") + ), + "0.0.0", + ) + updates.append((name, old_ver, new_ver)) + if updates: + return updates + except (subprocess.TimeoutExpired, FileNotFoundError): + continue + + return updates + + def analyze_change_type(self, current: PackageVersion, new: PackageVersion) -> ChangeType: + if new.major > current.major: + return ChangeType.MAJOR + if new.minor > current.minor: + return ChangeType.MINOR + if new.patch > current.patch: + return ChangeType.PATCH + if str(new) != str(current): + return ChangeType.PATCH # Tie-breaker for alphanumeric patches + return ChangeType.UNKNOWN + + def assess_risk( + self, package_name: str, current: PackageVersion, new: PackageVersion, changelog: str = "" + ) -> tuple[RiskLevel, list[str]]: + """Assess update risk.""" + warnings, score = [], 0 + + # Score penalty for known high-impact system packages + for pkg, reason in self.HIGH_RISK_PACKAGES.items(): + if pkg in package_name.lower(): + score += 30 + warnings.append(reason) + break + + # Score penalty based on Semantic Versioning delta severity + ctype = self.analyze_change_type(current, new) + score += {ChangeType.MAJOR: 40, ChangeType.MINOR: 15, ChangeType.PATCH: 5}.get(ctype, 0) + if ctype == ChangeType.MAJOR: + warnings.append(f"Major version change ({current.major} β†’ {new.major})") + + # Additional penalty for unstable pre-release versions + if new.prerelease: + score += 25 + warnings.append(f"Pre-release version: {new.prerelease}") + + # Scan changelogs for keyword indicators of breaking changes + for ind in [ + "breaking change", + "backwards incompatible", + "deprecated", + "removed", + "migration required", + "manual action", + ]: + if ind in changelog.lower(): + score += 15 + warnings.append(f"Changelog mentions: {ind}") + + # Map aggregate score to RiskLevel enum + level = ( + RiskLevel.HIGH if score >= 60 else RiskLevel.MEDIUM if score >= 35 else RiskLevel.LOW + ) + return level, warnings + + def is_security_update( + self, package_name: str, changelog: str = "", description: str = "" + ) -> bool: + """ + Determine if an update is security-related. + + Args: + package_name: Name of the package + changelog: Changelog content + description: Update description + + Returns: + True if this appears to be a security update + """ + combined_text = f"{package_name} {changelog} {description}".lower() + + for indicator in self.SECURITY_INDICATORS: + if indicator in combined_text: + return True + + # Check for CVE pattern + if re.search(r"cve-\d{4}-\d+", combined_text, re.I): + return True + + return False + + def get_package_group(self, package_name: str) -> str: + """ + Get the group a package belongs to. + + Args: + package_name: Name of the package + + Returns: + Group name or empty string if not in a group + """ + for group_name, packages in self.PACKAGE_GROUPS.items(): + for pkg in packages: + if pkg in package_name.lower() or package_name.lower().startswith(pkg): + return group_name + return "" + + def categorize_update( + self, + risk_level: RiskLevel, + is_security: bool, + change_type: ChangeType, + ) -> UpdateCategory: + """ + Determine the recommended update category/timing. + + Args: + risk_level: Assessed risk level + is_security: Whether it's a security update + change_type: Type of version change + + Returns: + UpdateCategory for recommended timing + """ + # Security updates should be applied ASAP + if is_security: + return UpdateCategory.SECURITY + + # High risk or major updates should be deferred + if risk_level == RiskLevel.HIGH or change_type == ChangeType.MAJOR: + return UpdateCategory.DEFERRED + + # Low risk updates can go immediately + if risk_level == RiskLevel.LOW and change_type in ( + ChangeType.PATCH, + ChangeType.MINOR, + ): + return UpdateCategory.IMMEDIATE + + # Medium risk or minor updates for scheduled maintenance + if risk_level == RiskLevel.MEDIUM or change_type == ChangeType.MINOR: + return UpdateCategory.SCHEDULED + + # Default to scheduled for unknown cases + return UpdateCategory.SCHEDULED + + def generate_recommendation_text(self, update: UpdateInfo) -> str: + """Generate human-readable recommendation for an update.""" + res = [ + t( + f"update_recommend.recommendations.{'security_urgent' if update.category == UpdateCategory.SECURITY else 'safe_immediate' if update.category == UpdateCategory.IMMEDIATE else 'maintenance_window' if update.category == UpdateCategory.SCHEDULED else 'consider_deferring'}" + ) + ] + if update.change_type == ChangeType.MAJOR: + res.append( + t( + "update_recommend.recommendations.major_upgrade", + current=str(update.current_version), + new=str(update.new_version), + ) + ) + if update.breaking_changes: + res.append(t("update_recommend.recommendations.potential_breaking")) + res.extend(f" - {bc}" for bc in update.breaking_changes[:3]) + if update.group: + res.append(t("update_recommend.recommendations.part_of_group", group=update.group)) + return "\n".join(res) + + # Risk colors for display + RISK_COLORS = { + RiskLevel.LOW: "green", + RiskLevel.MEDIUM: "yellow", + RiskLevel.HIGH: "red", + RiskLevel.CRITICAL: "bold red", + } + + def analyze_with_llm(self, updates: list[UpdateInfo]) -> str: + """ + Use LLM to provide additional analysis of updates. + + Args: + updates: List of update information + + Returns: + LLM analysis text + """ + if not self.llm_router or not updates: + return "" + + try: + # Build a summary for the LLM + update_summary = [] + for u in updates[:10]: # Limit to first 10 for context length + update_summary.append( + f"- {u.package_name}: {u.current_version} β†’ {u.new_version} " + f"({u.change_type.value}, {u.risk_level.value} risk)" + ) + + prompt = f"""Analyze these pending system updates and provide a brief recommendation: + +{chr(10).join(update_summary)} + +Provide: +1. Overall assessment (1-2 sentences) +2. Any specific concerns or recommendations +3. Suggested update order if dependencies exist + +Keep response concise (under 150 words).""" + + from cortex.llm_router import TaskType + + response = self.llm_router.complete( + messages=[{"role": "user", "content": prompt}], + task_type=TaskType.SYSTEM_OPERATION, + temperature=0.3, + max_tokens=300, + ) + + return response.content + + except Exception as e: + logger.warning(f"LLM analysis failed: {e}") + return "" + + def get_recommendations(self, use_llm: bool = True) -> UpdateRecommendation: + """ + Get complete update recommendations for the system. + + Args: + use_llm: Whether to use LLM for additional analysis + + Returns: + UpdateRecommendation with categorized updates + """ + timestamp = datetime.now().isoformat() + updates = self.get_available_updates() + + if not updates: + return UpdateRecommendation( + timestamp=timestamp, + total_updates=0, + ) + + update_infos = [] + groups: dict[str, list[UpdateInfo]] = {} + + for pkg_name, old_ver, new_ver in updates: + current, new = PackageVersion.parse(old_ver), PackageVersion.parse(new_ver) + change_type = self.analyze_change_type(current, new) + risk_level, breaking_changes = self.assess_risk(pkg_name, current, new) + group = self.get_package_group(pkg_name) + is_security = self.is_security_update(pkg_name) + info = UpdateInfo( + pkg_name, + current, + new, + change_type, + risk_level, + self.categorize_update(risk_level, is_security, change_type), + breaking_changes=breaking_changes, + group=group, + is_security=is_security, + ) + info.recommended_action = self.generate_recommendation_text(info) + update_infos.append(info) + if group: + groups.setdefault(group, []).append(info) + + # Categorize updates + immediate = [u for u in update_infos if u.category == UpdateCategory.IMMEDIATE] + scheduled = [u for u in update_infos if u.category == UpdateCategory.SCHEDULED] + deferred = [u for u in update_infos if u.category == UpdateCategory.DEFERRED] + security = [u for u in update_infos if u.category == UpdateCategory.SECURITY] + + # Determine overall risk + overall_risk = max( + (u.risk_level for u in update_infos), key=lambda x: x.value, default=RiskLevel.LOW + ) + + # Get LLM analysis if requested + llm_analysis = "" + if use_llm and self.llm_router: + llm_analysis = self.analyze_with_llm(update_infos) + + return UpdateRecommendation( + timestamp=timestamp, + total_updates=len(update_infos), + immediate_updates=immediate, + scheduled_updates=scheduled, + deferred_updates=deferred, + security_updates=security, + groups=groups, + llm_analysis=llm_analysis, + overall_risk=overall_risk, + ) + + def display_recommendations(self, recommendation: UpdateRecommendation) -> None: + """ + Display recommendations in a formatted output. + + Args: + recommendation: The update recommendation to display + """ + if recommendation.total_updates == 0: + console.print(f"[green]βœ… {t('update_recommend.no_updates')}[/green]") + return + + console.print() + overall_risk_display = t(f"update_recommend.risks.{recommendation.overall_risk.value_str}") + color = self.RISK_COLORS.get(recommendation.overall_risk, "white") + console.print( + Panel( + f"[bold cyan]πŸ“Š {t('update_recommend.header')}[/bold cyan]\n" + f"{t('update_recommend.total_updates', count=recommendation.total_updates)}\n" + f"{t('update_recommend.overall_risk', risk=f'[{color}]{overall_risk_display}[/]')}", + title="Cortex Update Recommender", + ) + ) + + # Security updates (highest priority) + if recommendation.security_updates: + console.print() + console.print(f"[bold red]πŸ”’ {t('update_recommend.categories.security')}:[/bold red]") + self._display_update_table(recommendation.security_updates) + + # Immediate updates + if recommendation.immediate_updates: + console.print() + console.print( + f"[bold green]βœ… {t('update_recommend.categories.immediate')}:[/bold green]" + ) + self._display_update_table(recommendation.immediate_updates) + + # Scheduled updates + if recommendation.scheduled_updates: + console.print() + console.print( + f"[bold yellow]πŸ“… {t('update_recommend.categories.scheduled')}:[/bold yellow]" + ) + self._display_update_table(recommendation.scheduled_updates) + + # Deferred updates + if recommendation.deferred_updates: + console.print() + console.print( + f"[bold magenta]⏸️ {t('update_recommend.categories.deferred')}:[/bold magenta]" + ) + self._display_update_table(recommendation.deferred_updates) + + # Related update groups + if recommendation.groups: + console.print() + console.print(f"[bold cyan]πŸ“¦ {t('update_recommend.categories.groups')}:[/bold cyan]") + for group_name, group_updates in recommendation.groups.items(): + update_names = [u.package_name for u in group_updates] + console.print( + f" [cyan]{group_name}[/cyan]: {', '.join(update_names[:5])}" + + (f" +{len(update_names) - 5} more" if len(update_names) > 5 else "") + ) + + # LLM Analysis + if recommendation.llm_analysis: + console.print() + console.print( + Panel( + recommendation.llm_analysis, + title=f"[bold]πŸ€– {t('update_recommend.ai_analysis')}[/bold]", + border_style="blue", + ) + ) + + def _display_update_table(self, updates: list[UpdateInfo]) -> None: + """Display a table of updates.""" + table = Table(show_header=True, header_style="bold", box=None) + table.add_column("Package", style="cyan") + table.add_column("Current", style="dim") + table.add_column("New", style="green") + table.add_column("Type") + table.add_column("Risk") + table.add_column("Notes") + + for update in updates[:10]: # Limit display + risk_color = self.RISK_COLORS.get(update.risk_level, "white") + risk_display = t(f"update_recommend.risks.{update.risk_level.value_str}") + type_str = update.change_type.value + + notes = [] + if update.is_security: + notes.append(f"πŸ”’ {t('update_recommend.notes.security')}") + if update.breaking_changes: + notes.append( + f"⚠️ {t('update_recommend.notes.warnings', count=len(update.breaking_changes))}" + ) + if update.group: + notes.append(f"πŸ“¦ {t('update_recommend.notes.group', name=update.group)}") + + table.add_row( + update.package_name, + str(update.current_version), + str(update.new_version), + type_str, + f"[{risk_color}]{risk_display}[/]", + " | ".join(notes) if notes else "-", + ) + + if len(updates) > 10: + table.add_row( + t("update_recommend.more_updates", count=len(updates) - 10), + "", + "", + "", + "", + "", + ) + + console.print(table) + + +def recommend_updates( + use_llm: bool = True, + verbose: bool = False, +) -> int: + """ + Convenience function to run update recommendations. + + Args: + use_llm: Whether to use LLM for analysis + verbose: Enable verbose output + + Returns: + Exit code (0 for success) + """ + try: + # Try to get LLM router if available + llm_router = None + if use_llm: + try: + from cortex.llm_router import LLMRouter + + llm_router = LLMRouter() + except Exception as e: + logger.debug(f"LLM router not available: {e}") + + recommender = UpdateRecommender( + llm_router=llm_router, + verbose=verbose, + ) + + recommendation = recommender.get_recommendations(use_llm=use_llm) + recommender.display_recommendations(recommendation) + + return 0 + + except Exception as e: + console.print(f"[red]Error: {e}[/red]") + if verbose: + import traceback + + traceback.print_exc() + return 1 diff --git a/docs/SMART_UPDATE_RECOMMENDATIONS.md b/docs/SMART_UPDATE_RECOMMENDATIONS.md new file mode 100644 index 000000000..a46dac68e --- /dev/null +++ b/docs/SMART_UPDATE_RECOMMENDATIONS.md @@ -0,0 +1,274 @@ +# Smart Update Recommendations + +## Overview + +Cortex's Smart Update Recommender is an AI-powered system that analyzes your installed packages, checks for available updates, and provides intelligent recommendations on **when** and **what** to update. + +## Features + +- **Scan for Available Updates**: Automatically detects packages with pending updates +- **Risk Assessment**: Evaluates each update's potential impact on your system +- **Timing Recommendations**: Suggests optimal update windows based on risk level +- **Related Updates Grouping**: Groups updates for related packages (e.g., all PostgreSQL components) +- **Breaking Change Prediction**: Identifies potential breaking changes from major version updates +- **LLM Integration**: Uses AI to provide additional context and analysis + +## Usage + +### Basic Command + +```bash +cortex update recommend +``` + +### Example Output + +``` +πŸ“Š Update Analysis + +πŸ”’ Security Updates (Apply ASAP): + - openssl 1.1.1t β†’ 1.1.1u (CVE-2024-1234) + +βœ… Safe to Update Now (Low Risk): + - nginx 1.24.0 β†’ 1.25.0 (minor, security fix) + - curl 8.4.0 β†’ 8.5.0 (patch, bug fixes) + +πŸ“… Recommended for Maintenance Window: + - python3 3.11.4 β†’ 3.11.6 (minor) + - nodejs 18.18.0 β†’ 20.10.0 (major version) + +⏸️ Hold for Now: + - postgresql 14.10 β†’ 15.5 (major version, database migration required) + - docker 24.0 β†’ 25.0 (major, wait for stability reports) + +πŸ“¦ Related Update Groups: + - postgresql: postgresql, postgresql-client, postgresql-contrib + - docker: docker.io, containerd + +πŸ€– AI Analysis: + Most updates are safe to apply. However, the PostgreSQL update requires + a major version migration. Consider backing up your databases before + proceeding. The Docker update should be deferred until version 25.0.1 + addresses reported container networking issues. +``` + +### Command Options + +| Option | Description | +|--------|-------------| +| `--no-llm` | Disable LLM-powered analysis (faster, works offline) | +| `--json` | Output recommendations in JSON format for scripting | + +### JSON Output Example + +```bash +cortex update recommend --json +``` + +```json +{ + "timestamp": "2024-01-15T10:30:00", + "total_updates": 8, + "overall_risk": "medium", + "security_updates": [ + { + "package": "openssl", + "current": "1.1.1t", + "new": "1.1.1u", + "risk": "low", + "type": "patch" + } + ], + "immediate_updates": [...], + "scheduled_updates": [...], + "deferred_updates": [ + { + "package": "postgresql", + "current": "14.10", + "new": "15.5", + "risk": "high", + "type": "major", + "breaking_changes": [ + "Major version change (14 β†’ 15)", + "Database - may require dump/restore" + ] + } + ], + "groups": { + "postgresql": ["postgresql", "postgresql-client", "postgresql-contrib"] + }, + "llm_analysis": "..." +} +``` + +## Update Categories + +### πŸ”’ Security Updates +Priority: **Critical** - Apply as soon as possible + +These updates address known security vulnerabilities. They are typically: +- Patched for specific CVEs +- Low risk to system stability +- Essential for system security + +**Recommended Action**: Apply immediately, ideally within 24-48 hours. + +### βœ… Safe to Update Now (Immediate) +Priority: **Low Risk** - Safe for immediate installation + +Updates in this category: +- Are patch or minor version updates +- Have no known breaking changes +- Don't affect critical system components + +**Recommended Action**: Apply at your convenience. + +### πŸ“… Recommended for Maintenance Window (Scheduled) +Priority: **Medium Risk** - Plan for scheduled maintenance + +These updates: +- May require service restarts +- Could have minor compatibility changes +- Include new features that may affect workflows + +**Recommended Action**: Apply during planned maintenance windows, preferably off-peak hours. + +### ⏸️ Hold for Now (Deferred) +Priority: **High Risk** - Exercise caution + +Updates flagged for deferral: +- Are major version upgrades +- May include breaking changes +- Affect critical infrastructure (databases, kernel, etc.) +- Are pre-release or recently released versions + +**Recommended Action**: Wait for stability reports, plan migration carefully, and test in staging environment first. + +## Risk Assessment Criteria + +The risk level is determined by multiple factors: + +| Factor | Impact on Risk | +|--------|---------------| +| **Version Change Type** | | +| - Patch (X.Y.Z β†’ X.Y.Z+1) | Low (+5) | +| - Minor (X.Y β†’ X.Y+1) | Low-Medium (+15) | +| - Major (X β†’ X+1) | High (+40) | +| **Package Importance** | | +| - Kernel (linux-image) | High (+30) | +| - Core libraries (glibc, libc6) | High (+30) | +| - System services (systemd) | High (+30) | +| - Databases (postgresql, mysql) | High (+30) | +| **Version Stability** | | +| - Pre-release (alpha, beta, rc) | High (+25) | +| **Changelog Analysis** | | +| - Mentions "breaking change" | Medium (+15) | +| - Mentions "deprecated" | Medium (+15) | +| - Mentions "migration required" | Medium (+15) | + +### Risk Score Thresholds + +- **Low**: Score < 15 +- **Medium**: Score 15-34 +- **High**: Score β‰₯ 35 + +## Package Grouping + +Related packages are automatically grouped to help you update them together: + +| Group | Packages | +|-------|----------| +| `python` | python3, python3-pip, python3-dev | +| `docker` | docker.io, docker-ce, containerd | +| `postgresql` | postgresql, postgresql-client, postgresql-contrib | +| `mysql` | mysql-server, mysql-client, mariadb-server | +| `nginx` | nginx, nginx-common, nginx-core | +| `nodejs` | nodejs, npm, node-gyp | +| `kernel` | linux-image, linux-headers, linux-modules | +| `ssl` | openssl, libssl-dev, ca-certificates | + +## Update Strategies + +### Strategy 1: Rolling Updates (Recommended for Most Users) + +1. **Daily**: Apply security updates +2. **Weekly**: Apply low-risk immediate updates +3. **Monthly**: Apply scheduled updates during maintenance window +4. **Quarterly**: Evaluate and plan deferred updates + +### Strategy 2: Stability-First (Production Servers) + +1. Test all updates in staging environment first +2. Apply security updates within 48 hours +3. Batch other updates monthly +4. Defer major version updates until stability is confirmed + +### Strategy 3: Always Current (Development Machines) + +1. Apply immediate and scheduled updates weekly +2. Consider early adoption of deferred updates for testing +3. Keep multiple system snapshots for quick rollback + +## Best Practices + +### Before Updating + +1. **Back up critical data**: Especially before database or kernel updates +2. **Check changelogs**: Review breaking changes for deferred updates +3. **Test in staging**: Major updates should be tested first +4. **Plan rollback**: Know how to revert if issues arise + +### After Updating + +1. **Verify services**: Check that critical services are running +2. **Monitor logs**: Watch for errors in system and application logs +3. **Test functionality**: Validate key workflows still work +4. **Document changes**: Keep record of what was updated and when + +### For Major Version Updates + +1. **Read migration guides**: Official documentation often provides migration steps +2. **Check compatibility**: Ensure dependent applications support the new version +3. **Schedule downtime**: Major updates may require service interruption +4. **Have a rollback plan**: Snapshot VMs or have package backups ready + +## Integration with Other Tools + +### Cron-based Automation + +```bash +# Check for updates daily and log results +0 8 * * * /usr/local/bin/cortex update recommend --no-llm --json >> /var/log/cortex-updates.json +``` + +### CI/CD Pipelines + +```yaml +# GitHub Actions example +- name: Check for system updates + run: | + cortex update recommend --json > updates.json + if jq -e '.security_updates | length > 0' updates.json; then + echo "::warning::Security updates available" + fi +``` + +## Troubleshooting + +### "No updates available" +- Run `sudo apt update` or equivalent to refresh package cache +- Check network connectivity to package repositories + +### LLM analysis not working +- Use `--no-llm` flag for offline operation +- Check API key configuration with `cortex config show` + +### Slow analysis +- Large number of updates may take time +- Use `--no-llm` for faster results without AI analysis + +## See Also + +- `cortex update check` - Check for Cortex self-updates +- `cortex update install` - Install Cortex updates +- `cortex install` - Install system packages with AI assistance diff --git a/docs/guides/Developer-Guide.md b/docs/guides/Developer-Guide.md index 4a06cead5..d2ab615d5 100644 --- a/docs/guides/Developer-Guide.md +++ b/docs/guides/Developer-Guide.md @@ -34,7 +34,8 @@ cortex/ β”‚ β”œβ”€β”€ rollback.py # Rollback system β”‚ β”œβ”€β”€ config_templates.py # Config generation β”‚ β”œβ”€β”€ logging_system.py # Logging & diagnostics -β”‚ └── context_memory.py # AI memory system +β”‚ β”œβ”€β”€ context_memory.py # AI memory system +β”‚ └── update_recommender.py # AI-powered update recommendations β”œβ”€β”€ tests/ β”‚ └── test_*.py # Unit tests β”œβ”€β”€ docs/ @@ -84,6 +85,12 @@ Context Memory (learns patterns) - Optimization recommendations - Driver compatibility +**Update Recommender (`update_recommender.py`)** +- AI-powered update analysis +- Risk assessment per update +- Timing recommendations (Immediate vs. Deferred) +- Related update grouping + ## Contributing ### Claiming Issues diff --git a/docs/guides/User-Guide.md b/docs/guides/User-Guide.md index ceb5f26b0..fd15c15b5 100644 --- a/docs/guides/User-Guide.md +++ b/docs/guides/User-Guide.md @@ -30,8 +30,22 @@ cortex rollback # Rollback to specific point cortex rollback --to + +# Get smart update recommendations +cortex update recommend + +# Get recommendations in JSON format (for scripts) +cortex update recommend --json ``` +### Update Recommendations + +Cortex uses AI to analyze available updates and categorize them by risk: +- **Security Updates**: Critical fixes that should be applied immediately. +- **Safe to Update**: Low-risk updates (patches/minor) safe for now. +- **Maintenance Window**: Medium-risk updates that may need a restart. +- **Hold for Now**: High-risk or major updates that need careful planning. + ### Simulation Mode Test installations without making changes: diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py new file mode 100644 index 000000000..3b21e3b51 --- /dev/null +++ b/tests/test_update_recommender.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +""" +Unit tests for the Smart Update Recommender. +Validates version parsing, risk scoring, and categorization logic. +""" + +import json +import re +import subprocess +import sys +from unittest.mock import MagicMock, patch + +import pytest + +from cortex.update_recommender import ( + ChangeType, + PackageVersion, + RiskLevel, + UpdateCategory, + UpdateInfo, + UpdateRecommendation, + UpdateRecommender, + recommend_updates, +) + + +class TestPackageVersion: + @pytest.mark.parametrize( + "version_str, expected", + [ + ("1.2.3", (1, 2, 3, 0, "")), + ("v2.0.0", (2, 0, 0, 0, "")), + ("1:2.3.4", (2, 3, 4, 1, "")), + ("2.0.0-beta1", (2, 0, 0, 0, "-beta1")), + ("", (0, 0, 0, 0, "")), + (None, (0, 0, 0, 0, "")), + ("1.1.1f", (1, 1, 1, 0, "")), + ("abc:1.2.3", (1, 2, 3, 0, "")), # Invalid epoch + ("1.2", (1, 2, 0, 0, "")), + ("1", (1, 0, 0, 0, "")), + ], + ) + def test_parse(self, version_str, expected): + v = PackageVersion.parse(version_str) + assert (v.major, v.minor, v.patch, v.epoch) == expected[:4] + if expected[4]: + assert expected[4].lower() in v.prerelease.lower() + + def test_comparisons(self): + v1 = PackageVersion.parse("1.2.3") + v2 = PackageVersion.parse("1.2.4") + v3 = PackageVersion.parse("1.3.0") + v4 = PackageVersion.parse("2.0.0") + v5 = PackageVersion.parse("1:1.0.0") + v6 = PackageVersion.parse("1.2.3-beta") + + assert v1 < v2 + assert v2 < v3 + assert v3 < v4 + assert v4 < v5 + assert v6 < v1 # Pre-release is less than final + assert not (v1 < v6) + assert not (v1 < v1) + assert str(v1) == "1.2.3" + + +class TestUpdateRecommender: + @pytest.fixture + def r(self): + return UpdateRecommender(verbose=True) + + def test_enums_and_groups(self, r): + assert RiskLevel.LOW.value_str == "low" + assert r.get_package_group("python3-dev") == "python" + assert r.get_package_group("nginx") == "nginx" + assert r.get_package_group("unknown-pkg") == "" + + @pytest.mark.parametrize( + "curr, new, expected", + [ + ("1.0.0", "2.0.0", ChangeType.MAJOR), + ("1.0.0", "1.1.0", ChangeType.MINOR), + ("1.0.0", "1.0.1", ChangeType.PATCH), + ("1.1.1", "1.1.1f", ChangeType.PATCH), + ("1.0.0", "1.0.0", ChangeType.UNKNOWN), + ], + ) + def test_change_analysis(self, r, curr, new, expected): + assert ( + r.analyze_change_type(PackageVersion.parse(curr), PackageVersion.parse(new)) == expected + ) + + def test_risk_assessment_branches(self, r): + # High risk package + major version + risk, warns = r.assess_risk( + "linux-image-generic", PackageVersion.parse("5.15"), PackageVersion.parse("6.0") + ) + assert risk == RiskLevel.HIGH + assert any("Kernel" in w for w in warns) + + # Pre-release risk + risk, warns = r.assess_risk( + "some-pkg", PackageVersion.parse("1.0"), PackageVersion.parse("1.1-beta") + ) + assert risk == RiskLevel.MEDIUM # 15 (minor) + 25 (pre) = 40 (MEDIUM) + + # Changelog keywords + risk, warns = r.assess_risk( + "some-pkg", + PackageVersion.parse("1.0"), + PackageVersion.parse("1.0.1"), + "Breaking change and deprecated", + ) + assert risk == RiskLevel.MEDIUM # 5 (patch) + 15 + 15 = 35 (MEDIUM) + + def test_security_detection(self, r): + assert r.is_security_update("pkg", "High CVE-2024-0001 fix") + assert r.is_security_update("pkg", "bug fixes", "security patch") + assert not r.is_security_update("some-pkg", "random update") + + def test_categorization_matrix(self, r): + tests = [ + (RiskLevel.LOW, True, ChangeType.PATCH, UpdateCategory.SECURITY), + (RiskLevel.LOW, False, ChangeType.PATCH, UpdateCategory.IMMEDIATE), + (RiskLevel.LOW, False, ChangeType.MINOR, UpdateCategory.IMMEDIATE), + (RiskLevel.MEDIUM, False, ChangeType.MINOR, UpdateCategory.SCHEDULED), + (RiskLevel.HIGH, False, ChangeType.MINOR, UpdateCategory.DEFERRED), + (RiskLevel.LOW, False, ChangeType.MAJOR, UpdateCategory.DEFERRED), + (RiskLevel.LOW, False, ChangeType.UNKNOWN, UpdateCategory.SCHEDULED), + ] + for risk, sec, ctype, expected in tests: + assert r.categorize_update(risk, sec, ctype) == expected + + def test_recommendation_text_branches(self, r): + # Security updates should highlight urgent priority + u = UpdateInfo( + "p", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.PATCH, + RiskLevel.LOW, + UpdateCategory.SECURITY, + is_security=True, + ) + assert "Security update" in r.generate_recommendation_text(u) + + # Major upgrades should flag potential breaking changes + u = UpdateInfo( + "p", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.MAJOR, + RiskLevel.HIGH, + UpdateCategory.DEFERRED, + breaking_changes=["broken"], + ) + assert "Potential breaking" in r.generate_recommendation_text(u) + + # Grouped updates should mention their parent category + u = UpdateInfo( + "p", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.MINOR, + RiskLevel.MEDIUM, + UpdateCategory.SCHEDULED, + group="python", + ) + assert "part of python" in r.generate_recommendation_text(u).lower() + + @patch("cortex.update_recommender.subprocess.run") + def test_pkg_manager_interactions(self, mock_run, r): + # Verify DPKG version parsing (Debian/Ubuntu) + mock_run.return_value = MagicMock(returncode=0, stdout="pkg1 1.0\npkg2 2.0") + pkgs = r.get_installed_packages() + assert "pkg1" in pkgs + + # Verify RPM version parsing fallback (Fedora/RHEL) + mock_run.side_effect = [MagicMock(returncode=1), MagicMock(returncode=0, stdout="pkg3 3.0")] + pkgs = r.get_installed_packages() + assert "pkg3" in pkgs + + # Simulate APT upgradable list output + mock_run.side_effect = [ + MagicMock(returncode=0), # apt-get update + MagicMock(returncode=0, stdout="nginx/jammy 1.25.0 amd64 [upgradable from: 1.24.0]"), + ] + updates = r.get_available_updates() + assert len(updates) == 1 + + # Simulate DNF check-update (exit 100 indicates available updates) + mock_run.side_effect = [ + MagicMock(returncode=1), # apt update fail + MagicMock(returncode=100, stdout="curl.x86_64 8.5.0 updates"), # dnf check-update + MagicMock(returncode=0, stdout="Version : 8.4.0"), # dnf info + ] + updates = r.get_available_updates() + assert len(updates) == 1 and updates[0][0] == "curl" + + # Handle command timeout or missing manager scenarios + mock_run.side_effect = subprocess.TimeoutExpired(["cmd"], 30) + assert r._run_pkg_cmd(["cmd"]) is None + + @patch.object(UpdateRecommender, "get_available_updates") + def test_get_recommendations_full(self, mock_get, r): + mock_get.return_value = [("nginx", "1.24.0", "1.25.0"), ("postgresql", "14.0", "15.0")] + rec = r.get_recommendations(use_llm=False) + assert rec.total_updates == 2 + assert rec.overall_risk == RiskLevel.HIGH + + # Verify LLM analysis integration + mock_router = MagicMock() + mock_router.complete.return_value = MagicMock(content="AI analysis") + r.llm_router = mock_router + with patch.dict("sys.modules", {"cortex.llm_router": MagicMock(TaskType=MagicMock())}): + rec = r.get_recommendations(use_llm=True) + assert rec.llm_analysis == "AI analysis" + + # Ensure robustness if LLM provider returns an error + mock_router.complete.side_effect = Exception("error") + assert r.analyze_with_llm(rec.immediate_updates) == "" + + def test_display_logic(self, r, capsys): + # Create a sample recommendation with mixed risk levels + u1 = UpdateInfo( + "p1", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.PATCH, + RiskLevel.LOW, + UpdateCategory.SECURITY, + is_security=True, + ) + u2 = UpdateInfo( + "p2", + PackageVersion.parse("1"), + PackageVersion.parse("1.1"), + ChangeType.MINOR, + RiskLevel.LOW, + UpdateCategory.IMMEDIATE, + ) + u3 = UpdateInfo( + "p3", + PackageVersion.parse("1"), + PackageVersion.parse("1.2"), + ChangeType.MINOR, + RiskLevel.MEDIUM, + UpdateCategory.SCHEDULED, + ) + u4 = UpdateInfo( + "p4", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.MAJOR, + RiskLevel.HIGH, + UpdateCategory.DEFERRED, + group="db", + ) + + rec = UpdateRecommendation( + "now", + 4, + immediate_updates=[u2], + scheduled_updates=[u3], + deferred_updates=[u4], + security_updates=[u1], + groups={"db": [u4]}, + overall_risk=RiskLevel.HIGH, + ) + + r.display_recommendations(rec) + out = capsys.readouterr().out + assert "Update Analysis" in out + assert "Security Updates" in out + assert "Hold for Now" in out + + # Verify table truncation logic for large update lists + updates = [u2] * 12 + r._display_update_table(updates) + out = capsys.readouterr().out + assert "more" in out.lower() + + # Ensure clean output for healthy systems + r.display_recommendations(UpdateRecommendation("now", 0)) + assert "up to date" in capsys.readouterr().out.lower() + + +@patch("cortex.update_recommender.UpdateRecommender") +def test_convenience_function(mock_class): + mock_instance = mock_class.return_value + mock_instance.get_recommendations.return_value = UpdateRecommendation("now", 0) + assert recommend_updates() == 0 + + # Error path + mock_class.side_effect = Exception("error") + assert recommend_updates(verbose=True) == 1 From 0421c52cd24e6d99f4e8352f6ec0aa6ce15ba5a0 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 02:11:13 +0530 Subject: [PATCH 02/16] chore: cleanup en.yaml to only include new keys and preserve original spacing --- cortex/i18n/locales/en.yaml | 74 ++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/cortex/i18n/locales/en.yaml b/cortex/i18n/locales/en.yaml index d608e4f37..5ce8bab4d 100644 --- a/cortex/i18n/locales/en.yaml +++ b/cortex/i18n/locales/en.yaml @@ -58,7 +58,7 @@ language: # {error} - error message set_failed: "Failed to set language: {error}" supported_languages_header: "Supported languages:" - + # ============================================================================= # CLI commands # ============================================================================= @@ -68,7 +68,7 @@ cli: help_text: "AI-powered package manager for Linux" tagline: "Just tell Cortex what you want to install." learn_more: "Learn more" - + # Command descriptions commands: ask: "Ask a question about your system" @@ -87,7 +87,7 @@ cli: sandbox: "Test packages in Docker sandbox" doctor: "System health check" config: "Configure Cortex settings" - + # Argument help args: software: "Software to install" @@ -114,7 +114,7 @@ config: use_command_hint: "Use: cortex config language to change" list_hint: "Use: cortex config language --list for details" code_label: "Code" - + # ============================================================================= # Cache statistics # ============================================================================= @@ -130,7 +130,7 @@ cache: saved_calls: "Saved calls (approx)" read_error: "Unable to read cache stats: {error}" unexpected_error: "Unexpected error reading cache stats: {error}" - + # ============================================================================= # UI Labels (common labels used throughout the interface) # ============================================================================= @@ -160,7 +160,7 @@ ui: example_import: "Example: cortex import {file_path} --execute" example_import_all: "Example: cortex import --all --execute" installation_cancelled: "Installation cancelled" - + # ============================================================================= # Installation # ============================================================================= @@ -170,7 +170,7 @@ install: planning: "Planning installation..." executing: "Executing installation..." verifying: "Verifying installation..." - + # Results success: "Installation complete!" failed: "Installation failed" @@ -180,12 +180,12 @@ install: package_installed_version: "{package} ({version}) installed successfully" # {count} - number of packages packages_installed: "{count} packages installed" - + # Dry run dry_run_header: "Dry-run results" dry_run_message: "Dry-run completed. Use --execute to apply changes." commands_would_run: "Commands that would run" - + # Progress # {current}, {total} - step numbers step_progress: "Step {current}/{total}" @@ -193,7 +193,7 @@ install: step_executing: "Executing step {step}: {description}" step_completed: "Step completed" step_failed: "Step failed" - + # Errors no_commands: "No commands generated" invalid_request: "Invalid installation request" @@ -201,12 +201,12 @@ install: api_error: "API error: {error}" # {package} - package name package_not_found: "Package not found: {package}" - + # Confirmation confirm_install: "Proceed with installation?" # {count} - package count confirm_install_count: "Install {count} packages?" - + # ============================================================================= # Stack management # ============================================================================= @@ -231,7 +231,7 @@ stack: use_command: "Use: cortex stack to install a stack" # {original}, {suggested} - stack names gpu_fallback: "No GPU detected, using '{suggested}' instead of '{original}'" - + # ============================================================================= # Sandbox # ============================================================================= @@ -241,7 +241,7 @@ sandbox: commands_header: "Commands" example_workflow: "Example workflow" environments_header: "Sandbox Environments" - + # Command descriptions (for help text) cmd_create: "Create a sandbox environment" cmd_install: "Install package in sandbox" @@ -250,7 +250,7 @@ sandbox: cmd_cleanup: "Remove sandbox environment" cmd_list: "List all sandboxes" cmd_exec: "Execute command in sandbox" - + # Actions # {name} - sandbox name creating: "Creating sandbox '{name}'..." @@ -264,14 +264,14 @@ sandbox: # {name} - sandbox name cleaning: "Removing sandbox '{name}'..." cleaned: "Sandbox '{name}' removed" - + # Results test_passed: "Test passed" test_failed: "Test failed" all_tests_passed: "All tests passed" # {passed}, {total} - test counts tests_summary: "{passed}/{total} tests passed" - + # Errors docker_required: "Docker is required for sandbox commands" docker_only_for_sandbox: "Docker is required only for sandbox commands." @@ -280,12 +280,12 @@ sandbox: no_sandboxes: "No sandbox environments found" create_hint: "Create one with: cortex sandbox create " list_hint: "Use 'cortex sandbox list' to see available sandboxes." - + # Promotion promote_package: "Installing '{package}' on main system..." promotion_cancelled: "Promotion cancelled" would_run: "Would run: {command}" - + # ============================================================================= # History # ============================================================================= @@ -298,15 +298,15 @@ history: action: "Action" packages: "Packages" status: "Status" - + # Status values status_success: "Success" status_failed: "Failed" status_rolled_back: "Rolled back" - + # {id} - installation ID details_for: "Details for installation #{id}" - + # ============================================================================= # Rollback # ============================================================================= @@ -320,7 +320,7 @@ rollback: # {id} - installation ID already_rolled_back: "Installation #{id} was already rolled back" confirm: "Are you sure you want to roll back this installation?" - + # ============================================================================= # Doctor / Health check # ============================================================================= @@ -331,19 +331,19 @@ doctor: all_passed: "All checks passed!" # {count} - issue count issues_found: "{count} issues found" - + # Check names check_api_key: "API Key Configuration" check_network: "Network Connectivity" check_disk_space: "Disk Space" check_permissions: "File Permissions" check_dependencies: "System Dependencies" - + # Results passed: "Passed" warning: "Warning" failed: "Failed" - + # ============================================================================= # Wizard # ============================================================================= @@ -351,19 +351,19 @@ wizard: welcome: "Welcome to Cortex Setup Wizard!" # {step}, {total} - step numbers step: "Step {step} of {total}" - + # API key setup api_key_prompt: "Enter your API key" api_key_saved: "API key saved successfully" api_key_invalid: "Invalid API key format" export_api_key_hint: "Please export your API key in your shell profile." - + # Provider selection select_provider: "Select your LLM provider" provider_anthropic: "Anthropic (Claude)" provider_openai: "OpenAI (GPT)" provider_ollama: "Ollama (Local)" - + # Completion setup_complete: "Setup complete!" ready_message: "Cortex is ready to use" @@ -409,16 +409,16 @@ env: apply_template_failed: "Failed to apply template '{name}'" persist_failed: "Failed to persist: {error}" persist_removal_failed: "Failed to persist removal: {error}" - + # PATH operations path_added: "Added '{path}' to PATH" path_removed: "Removed '{path}' from PATH" path_already_exists: "'{path}' is already in PATH" path_not_found: "'{path}' not found in PATH" - + # {path} - config file path persist_note: "To use in current shell: source {path}" - + # ============================================================================= # Notifications # ============================================================================= @@ -429,18 +429,18 @@ notify: disabled: "Disabled" dnd_window: "Do Not Disturb Window" history_file: "History File" - + # Actions notifications_enabled: "Notifications enabled" notifications_disabled: "Notifications disabled (Critical alerts will still show)" # {start}, {end} - time values dnd_updated: "DND window updated: {start} - {end}" - + # Errors missing_subcommand: "Please specify a subcommand (config/enable/disable/dnd/send)" invalid_time_format: "Invalid time format. Use HH:MM (e.g., 22:00)" message_required: "Message required" - + # ============================================================================= # API Key messages # ============================================================================= @@ -451,7 +451,7 @@ api_key: # {provider} - provider name using_provider: "Using {provider} API key" using_ollama: "Using Ollama (no API key required)" - + # ============================================================================= # Error messages # ============================================================================= @@ -467,7 +467,7 @@ errors: failed_to: "Failed to {action}: {error}" history_retrieve_failed: "Failed to retrieve history: {error}" sdk_required: "Install required SDK or use CORTEX_PROVIDER=ollama" - + # ============================================================================= # Progress indicators # ============================================================================= From ed00f036763131be24d00835f76de753128393c8 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 02:56:31 +0530 Subject: [PATCH 03/16] refactor: address code review feedback and fix logger error --- cortex/cli.py | 59 +++--------------------------- cortex/update_recommender.py | 71 +++++++++++++++++++++++++++++------- 2 files changed, 63 insertions(+), 67 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index 101d4f1d7..c1bceaf91 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -53,6 +53,7 @@ # Suppress noisy log messages in normal operation logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("cortex.installation_history").setLevel(logging.ERROR) +logger = logging.getLogger(__name__) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) @@ -2639,64 +2640,16 @@ def progress_callback(message: str, percent: float) -> None: from cortex.llm_router import LLMRouter llm_router = LLMRouter() - except Exception: + except (ImportError, ModuleNotFoundError): pass + except Exception as e: + if self.verbose: + logger.debug(f"LLM router initialization failed: {e}") recommender = UpdateRecommender(llm_router=llm_router, verbose=self.verbose) recommendation = recommender.get_recommendations(use_llm=use_llm) - # Convert to JSON-serializable format - output = { - "timestamp": recommendation.timestamp, - "total_updates": recommendation.total_updates, - "overall_risk": recommendation.overall_risk.value, - "security_updates": [ - { - "package": u.package_name, - "current": str(u.current_version), - "new": str(u.new_version), - "risk": u.risk_level.value, - "type": u.change_type.value, - } - for u in recommendation.security_updates - ], - "immediate_updates": [ - { - "package": u.package_name, - "current": str(u.current_version), - "new": str(u.new_version), - "risk": u.risk_level.value, - "type": u.change_type.value, - } - for u in recommendation.immediate_updates - ], - "scheduled_updates": [ - { - "package": u.package_name, - "current": str(u.current_version), - "new": str(u.new_version), - "risk": u.risk_level.value, - "type": u.change_type.value, - } - for u in recommendation.scheduled_updates - ], - "deferred_updates": [ - { - "package": u.package_name, - "current": str(u.current_version), - "new": str(u.new_version), - "risk": u.risk_level.value, - "type": u.change_type.value, - "breaking_changes": u.breaking_changes, - } - for u in recommendation.deferred_updates - ], - "groups": { - k: [u.package_name for u in v] for k, v in recommendation.groups.items() - }, - "llm_analysis": recommendation.llm_analysis, - } - print(json_module.dumps(output, indent=2)) + print(json_module.dumps(recommendation.to_dict(), indent=2)) return 0 else: cx_print(t("update_recommend.checking"), "thinking") diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 4e1c43859..002914318 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -154,6 +154,19 @@ class UpdateInfo: recommended_action: str = "" group: str = "" # For grouping related updates + def to_dict(self) -> dict[str, Any]: + """Convert to a JSON-serializable dictionary.""" + return { + "package": self.package_name, + "current": str(self.current_version), + "new": str(self.new_version), + "risk": self.risk_level.value_str, + "type": self.change_type.value, + "is_security": self.is_security, + "breaking_changes": self.breaking_changes, + "group": self.group, + } + @dataclass class UpdateRecommendation: @@ -169,6 +182,20 @@ class UpdateRecommendation: llm_analysis: str = "" overall_risk: RiskLevel = RiskLevel.LOW + def to_dict(self) -> dict[str, Any]: + """Convert to a JSON-serializable dictionary.""" + return { + "timestamp": self.timestamp, + "total_updates": self.total_updates, + "overall_risk": self.overall_risk.value_str, + "security_updates": [u.to_dict() for u in self.security_updates], + "immediate_updates": [u.to_dict() for u in self.immediate_updates], + "scheduled_updates": [u.to_dict() for u in self.scheduled_updates], + "deferred_updates": [u.to_dict() for u in self.deferred_updates], + "groups": {k: [u.package_name for u in v] for k, v in self.groups.items()}, + "llm_analysis": self.llm_analysis, + } + class UpdateRecommender: """ @@ -280,32 +307,42 @@ def get_available_updates(self) -> list[tuple[str, str, str]]: # Fallback check via DNF/YUM (RHEL/Fedora/Amazon Linux) for pm in ("dnf", "yum"): - # pm check-update returns 100 if updates available try: + # pm check-update returns 100 if updates available result = subprocess.run( [pm, "check-update", "-q"], capture_output=True, text=True, timeout=120 ) if result.returncode in (0, 100) and result.stdout: + # Optimized: Fetch all installed packages once + installed = self.get_installed_packages() + for line in result.stdout.strip().splitlines(): - if line and not line.startswith(" "): + if ( + line + and not line.startswith(" ") + and not line.startswith("Last metadata") + ): parts = line.split() if len(parts) >= 2: - name, new_ver = parts[0].rsplit(".", 1)[0], parts[1] - # Get current version - info = self._run_pkg_cmd([pm, "info", "installed", name]) or "" - old_ver = next( - ( - l.split(":", 1)[1].strip() - for l in info.splitlines() - if l.startswith("Version") - ), - "0.0.0", + # More robust name parsing: assumes name.arch format + full_name = parts[0] + name = ( + full_name.rsplit(".", 1)[0] if "." in full_name else full_name ) + new_ver = parts[1] + + # Use cached installed data instead of per-package lookup + current = installed.get(name) + old_ver = str(current) if current else "0.0.0" + updates.append((name, old_ver, new_ver)) if updates: return updates except (subprocess.TimeoutExpired, FileNotFoundError): continue + except subprocess.SubprocessError as e: + logger.warning(f"Package manager check failed: {e}") + continue return updates @@ -518,8 +555,11 @@ def analyze_with_llm(self, updates: list[UpdateInfo]) -> str: return response.content + except (ImportError, RuntimeError, ConnectionError) as e: + logger.warning(f"LLM analysis context error: {e}") + return "" except Exception as e: - logger.warning(f"LLM analysis failed: {e}") + logger.error(f"Unexpected LLM analysis error: {e}") return "" def get_recommendations(self, use_llm: bool = True) -> UpdateRecommendation: @@ -751,8 +791,11 @@ def recommend_updates( return 0 + except (RuntimeError, subprocess.SubprocessError) as e: + console.print(f"[red]System Error: {e}[/red]") + return 1 except Exception as e: - console.print(f"[red]Error: {e}[/red]") + console.print(f"[red]Unexpected Error: {e}[/red]") if verbose: import traceback From 48adf0aea63e0b000ae58f5cc3b4d6c3d797cff8 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 02:58:32 +0530 Subject: [PATCH 04/16] security: fix potential ReDoS in APT update parsing --- cortex/update_recommender.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 002914318..10dbaf39d 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -298,8 +298,11 @@ def get_available_updates(self) -> list[tuple[str, str, str]]: output = self._run_pkg_cmd(["apt", "list", "--upgradable"]) if output: for line in output.splitlines(): + # Optimized regex to prevent backtracking (ReDoS) + # Pattern: package/suite version arch [upgradable from: old_version] match = re.search( - r"^(\S+)/\S+\s+(\S+)\s+\S+\s+\[upgradable from:\s+(\S+)\]", line + r"^([^/\s]+)/[^\s]+\s+([^\s]+)\s+[^\s]+\s+\[upgradable from:\s+([^\s]+)\]", + line, ) if match: updates.append(match.groups()) From 41e80411b99c133a105021cf9a7c8ef14a9ef884 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 03:31:36 +0530 Subject: [PATCH 05/16] refactor: address code review feedback, SonarQube concerns, and improve RPM name resolution (formatted) --- cortex/cli.py | 2 +- cortex/update_recommender.py | 199 ++++++++++++++++++++++++------- tests/test_update_recommender.py | 8 +- 3 files changed, 160 insertions(+), 49 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index c1bceaf91..349096c9d 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2640,7 +2640,7 @@ def progress_callback(message: str, percent: float) -> None: from cortex.llm_router import LLMRouter llm_router = LLMRouter() - except (ImportError, ModuleNotFoundError): + except ImportError: pass except Exception as e: if self.verbose: diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 10dbaf39d..10f0309b4 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -15,6 +15,7 @@ from dataclasses import dataclass, field from datetime import datetime from enum import Enum +from functools import total_ordering from pathlib import Path from typing import Any @@ -22,10 +23,11 @@ from rich.panel import Panel from rich.table import Table +from cortex.context_memory import ContextMemory, MemoryEntry from cortex.i18n.translator import t +from cortex.installation_history import InstallationHistory, InstallationStatus # Configure logging -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) console = Console() @@ -64,6 +66,7 @@ class ChangeType(Enum): UNKNOWN = "unknown" +@total_ordering @dataclass class PackageVersion: """Represents a package version with parsed components.""" @@ -247,6 +250,8 @@ class UpdateRecommender: def __init__( self, llm_router: Any | None = None, + history: InstallationHistory | None = None, + memory: ContextMemory | None = None, verbose: bool = False, ): """ @@ -254,11 +259,26 @@ def __init__( Args: llm_router: Optional LLM router for AI-powered analysis + history: Optional installation history for learning + memory: Optional context memory for pattern recognition verbose: Enable verbose output """ self.llm_router = llm_router self.verbose = verbose + # Graceful initialization of subsystems + try: + self.history = history or InstallationHistory() + except Exception as e: + logger.warning(f"Installation history unavailable: {e}") + self.history = None + + try: + self.memory = memory or ContextMemory() + except Exception as e: + logger.warning(f"Context memory unavailable: {e}") + self.memory = None + def _run_pkg_cmd(self, cmd: list[str]) -> str | None: """Internal helper to run package manager commands.""" try: @@ -291,54 +311,43 @@ def get_installed_packages(self) -> dict[str, PackageVersion]: def get_available_updates(self) -> list[tuple[str, str, str]]: """Get list of packages with available updates.""" - updates = [] + updates = self._get_apt_updates() + if updates: + return updates - # Attempt update check via APT (Debian/Ubuntu) - if self._run_pkg_cmd(["apt-get", "update", "-q"]) is not None: - output = self._run_pkg_cmd(["apt", "list", "--upgradable"]) - if output: - for line in output.splitlines(): - # Optimized regex to prevent backtracking (ReDoS) - # Pattern: package/suite version arch [upgradable from: old_version] - match = re.search( - r"^([^/\s]+)/[^\s]+\s+([^\s]+)\s+[^\s]+\s+\[upgradable from:\s+([^\s]+)\]", - line, - ) - if match: - updates.append(match.groups()) - return updates + return self._get_rpm_updates() - # Fallback check via DNF/YUM (RHEL/Fedora/Amazon Linux) + def _get_apt_updates(self) -> list[tuple[str, str, str]]: + """Helper to get updates via APT.""" + updates = [] + if self._run_pkg_cmd(["apt-get", "update", "-q"]) is None: + return updates + + output = self._run_pkg_cmd(["apt", "list", "--upgradable"]) + if not output: + return updates + + for line in output.splitlines(): + # Optimized regex to prevent backtracking (ReDoS) + match = re.search( + r"^([^/\s]+)/[^\s]+\s+([^\s]+)\s+[^\s]+\s+\[upgradable from:\s+([^\s]+)\]", + line, + ) + if match: + updates.append(match.groups()) + return updates + + def _get_rpm_updates(self) -> list[tuple[str, str, str]]: + """Helper to get updates via DNF/YUM.""" + updates = [] for pm in ("dnf", "yum"): try: - # pm check-update returns 100 if updates available result = subprocess.run( [pm, "check-update", "-q"], capture_output=True, text=True, timeout=120 ) if result.returncode in (0, 100) and result.stdout: - # Optimized: Fetch all installed packages once installed = self.get_installed_packages() - - for line in result.stdout.strip().splitlines(): - if ( - line - and not line.startswith(" ") - and not line.startswith("Last metadata") - ): - parts = line.split() - if len(parts) >= 2: - # More robust name parsing: assumes name.arch format - full_name = parts[0] - name = ( - full_name.rsplit(".", 1)[0] if "." in full_name else full_name - ) - new_ver = parts[1] - - # Use cached installed data instead of per-package lookup - current = installed.get(name) - old_ver = str(current) if current else "0.0.0" - - updates.append((name, old_ver, new_ver)) + updates.extend(self._parse_rpm_check_update(result.stdout, installed)) if updates: return updates except (subprocess.TimeoutExpired, FileNotFoundError): @@ -346,7 +355,35 @@ def get_available_updates(self) -> list[tuple[str, str, str]]: except subprocess.SubprocessError as e: logger.warning(f"Package manager check failed: {e}") continue + return updates + def _parse_rpm_check_update( + self, output: str, installed: dict[str, PackageVersion] + ) -> list[tuple[str, str, str]]: + """Helper to parse DNF/YUM check-update output.""" + updates = [] + for line in output.strip().splitlines(): + parts = line.split() + if len(parts) >= 2: + full_name = parts[0] + new_ver = parts[1] + + # Resolve name: prefer full name if installed, then name without arch, + # then fallback to name without arch for consistency. + name = full_name + if "." in full_name: + name_no_arch = full_name.rsplit(".", 1)[0] + if full_name in installed: + name = full_name + elif name_no_arch in installed: + name = name_no_arch + else: + # Fallback for systems where we might not have matched yet + name = name_no_arch + + current = installed.get(name) + old_ver = str(current) if current else "0.0.0" + updates.append((name, old_ver, new_ver)) return updates def analyze_change_type(self, current: PackageVersion, new: PackageVersion) -> ChangeType: @@ -398,11 +435,82 @@ def assess_risk( warnings.append(f"Changelog mentions: {ind}") # Map aggregate score to RiskLevel enum - level = ( - RiskLevel.HIGH if score >= 60 else RiskLevel.MEDIUM if score >= 35 else RiskLevel.LOW - ) + level = self._map_score_to_risk(score) + + # Learning Enhancement: Check history to refine risk + hist_adjustment, hist_notes = self._get_historical_risk_adjustment(package_name) + if hist_adjustment: + score += hist_adjustment + warnings.extend(hist_notes) + # Re-evaluate level if score changed significantly + level = self._map_score_to_risk(score) + return level, warnings + def _map_score_to_risk(self, score: int) -> RiskLevel: + """Map aggregate risk score to RiskLevel enum.""" + if score >= 60: + return RiskLevel.HIGH + if score >= 35: + return RiskLevel.MEDIUM + return RiskLevel.LOW + + def _get_historical_risk_adjustment(self, package_name: str) -> tuple[int, list[str]]: + """ + Query history and memory to refine risk scores based on past performance. + Returns (score_adjustment, notes). + """ + adjustment = 0 + notes = [] + + try: + # Check installation history for previous failures/rollbacks + if not self.history: + return adjustment, notes + + past_records = self.history.get_history(limit=50) + failures = [ + r + for r in past_records + if r.packages + and package_name in r.packages + and r.status in (InstallationStatus.FAILED, InstallationStatus.ROLLED_BACK) + ] + + if failures: + adjustment += 25 + notes.append( + f"Historical instability: {len(failures)} previous failures or rollbacks detected" + ) + + # Check for consistent successes to lower risk slightly + successes = [ + r + for r in past_records + if r.packages + and package_name in r.packages + and r.status == InstallationStatus.SUCCESS + ] + if len(successes) >= 3 and not failures: + adjustment -= 5 + # No note needed for subtle success tracking + + # Check context memory for recurring issues or specific user notes + if self.memory: + memories = self.memory.get_similar_interactions(package_name, limit=5) + failed_memories = [m for m in memories if not m.success] + if failed_memories: + adjustment += 10 + notes.append( + f"Memory: Package previously caused issues during {failed_memories[0].action}" + ) + + except (OSError, AttributeError) as e: + if self.verbose: + logger.debug(f"Historical risk lookup failed: {e}") + + return adjustment, notes + def is_security_update( self, package_name: str, changelog: str = "", description: str = "" ) -> bool: @@ -556,6 +664,9 @@ def analyze_with_llm(self, updates: list[UpdateInfo]) -> str: max_tokens=300, ) + if not response or not hasattr(response, "content"): + return "" + return response.content except (ImportError, RuntimeError, ConnectionError) as e: @@ -794,7 +905,7 @@ def recommend_updates( return 0 - except (RuntimeError, subprocess.SubprocessError) as e: + except (RuntimeError, subprocess.SubprocessError, OSError) as e: console.print(f"[red]System Error: {e}[/red]") return 1 except Exception as e: diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index 3b21e3b51..033716c20 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -59,8 +59,8 @@ def test_comparisons(self): assert v3 < v4 assert v4 < v5 assert v6 < v1 # Pre-release is less than final - assert not (v1 < v6) - assert not (v1 < v1) + assert v1 >= v6 + assert v1 == v1 assert str(v1) == "1.2.3" @@ -102,7 +102,7 @@ def test_risk_assessment_branches(self, r): risk, warns = r.assess_risk( "some-pkg", PackageVersion.parse("1.0"), PackageVersion.parse("1.1-beta") ) - assert risk == RiskLevel.MEDIUM # 15 (minor) + 25 (pre) = 40 (MEDIUM) + assert risk == RiskLevel.MEDIUM # Changelog keywords risk, warns = r.assess_risk( @@ -111,7 +111,7 @@ def test_risk_assessment_branches(self, r): PackageVersion.parse("1.0.1"), "Breaking change and deprecated", ) - assert risk == RiskLevel.MEDIUM # 5 (patch) + 15 + 15 = 35 (MEDIUM) + assert risk == RiskLevel.MEDIUM def test_security_detection(self, r): assert r.is_security_update("pkg", "High CVE-2024-0001 fix") From ccac5cdb52edc96cfb53fb51cd72569e073514e3 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 03:40:37 +0530 Subject: [PATCH 06/16] fix: resolve SonarQube Reliability C rating and finalize quality cleanup (formatted) --- cortex/update_recommender.py | 114 +++++++++++++++++-------------- tests/test_update_recommender.py | 2 +- 2 files changed, 64 insertions(+), 52 deletions(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 10f0309b4..abdcf8e68 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -84,43 +84,68 @@ def parse(cls, version_str: str) -> "PackageVersion": if not version_str: return cls(raw="0.0.0") - raw = version_str.strip() + raw_str = str(version_str).strip() # Handle epoch (e.g., "1:2.3.4") - epoch = 0 - if ":" in raw: - epoch_str, raw = raw.split(":", 1) - try: - epoch = int(epoch_str) - except ValueError: - epoch = 0 + epoch, clean_raw = cls._parse_epoch(raw_str) # Remove common suffixes like -1ubuntu1, +dfsg, etc. - clean_version = re.sub(r"[-+~].*$", "", raw) + core_ver = re.sub(r"[-+~].*$", "", clean_raw) # Parse major.minor.patch - parts, major, minor, patch = clean_version.split("."), 0, 0, 0 + major, minor, patch = cls._parse_components(core_ver) + + pr_match = re.search(r"[-+](alpha|beta|rc|dev|pre)[\d.]*", raw_str, re.I) + pr = pr_match.group(0) if pr_match else "" + + return cls(raw_str, major, minor, patch, pr, epoch) + + @staticmethod + def _parse_epoch(raw: str) -> tuple[int, str]: + if ":" not in raw: + return 0, raw + parts = raw.split(":", 1) + try: + return int(parts[0]), parts[1] + except (ValueError, IndexError): + return 0, raw + + @staticmethod + def _parse_components(core: str) -> tuple[int, int, int]: + parts = core.split(".") + major, minor, patch = 0, 0, 0 try: if len(parts) >= 1: - # Strip leading non-digits (e.g., 'v1' -> '1') major_clean = re.sub(r"^\D+", "", parts[0]) major = int(re.sub(r"\D.*", "", major_clean) or 0) if len(parts) >= 2: minor = int(re.sub(r"\D.*", "", parts[1]) or 0) if len(parts) >= 3: - # Handle alphanumeric patches like "1f" by taking the number - patch_match = re.search(r"(\d+)", parts[2]) - patch = int(patch_match.group(1)) if patch_match else 0 + p_match = re.search(r"(\d+)", parts[2]) + patch = int(p_match.group(1)) if p_match else 0 except (ValueError, IndexError): pass - - pr_match = re.search(r"[-+](alpha|beta|rc|dev|pre)[\d.]*", raw, re.I) - return cls(version_str, major, minor, patch, pr_match.group(0) if pr_match else "", epoch) + return major, minor, patch def __str__(self) -> str: return self.raw - def __lt__(self, other: "PackageVersion") -> bool: + def __eq__(self, other: object) -> bool: + if not isinstance(other, PackageVersion): + return NotImplemented + return ( + self.epoch == other.epoch + and self.major == other.major + and self.minor == other.minor + and self.patch == other.patch + and self.prerelease == other.prerelease + ) + + def __lt__(self, other: object) -> bool: + if not isinstance(other, PackageVersion): + return NotImplemented + + # Compare components in priority order if self.epoch != other.epoch: return self.epoch < other.epoch if self.major != other.major: @@ -130,7 +155,7 @@ def __lt__(self, other: "PackageVersion") -> bool: if self.patch != other.patch: return self.patch < other.patch - # If numeric versions are same, a pre-release is "less" than a final release + # Pre-release comparison: pre-release < final release if self.prerelease and not other.prerelease: return True if not self.prerelease and other.prerelease: @@ -469,41 +494,28 @@ def _get_historical_risk_adjustment(self, package_name: str) -> tuple[int, list[ return adjustment, notes past_records = self.history.get_history(limit=50) - failures = [ - r - for r in past_records - if r.packages - and package_name in r.packages - and r.status in (InstallationStatus.FAILED, InstallationStatus.ROLLED_BACK) - ] - - if failures: - adjustment += 25 - notes.append( - f"Historical instability: {len(failures)} previous failures or rollbacks detected" - ) + for record in past_records: + if not record.packages: + continue + if package_name not in record.packages: + continue + + # Check for critical status issues + if record.status in (InstallationStatus.FAILED, InstallationStatus.ROLLED_BACK): + adjustment += 25 + notes.append( + "Historical instability: previous updates failed or were rolled back" + ) + break - # Check for consistent successes to lower risk slightly - successes = [ - r - for r in past_records - if r.packages - and package_name in r.packages - and r.status == InstallationStatus.SUCCESS - ] - if len(successes) >= 3 and not failures: - adjustment -= 5 - # No note needed for subtle success tracking - - # Check context memory for recurring issues or specific user notes + # Check context memory for recurring issues if self.memory: memories = self.memory.get_similar_interactions(package_name, limit=5) - failed_memories = [m for m in memories if not m.success] - if failed_memories: - adjustment += 10 - notes.append( - f"Memory: Package previously caused issues during {failed_memories[0].action}" - ) + for m in memories: + if not m.success: + adjustment += 10 + notes.append(f"Memory: Previously caused issues during {m.action}") + break except (OSError, AttributeError) as e: if self.verbose: diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index 033716c20..b945fe7e4 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -60,7 +60,7 @@ def test_comparisons(self): assert v4 < v5 assert v6 < v1 # Pre-release is less than final assert v1 >= v6 - assert v1 == v1 + assert v1 == PackageVersion.parse("1.2.3") assert str(v1) == "1.2.3" From e5c4f4d455559522863fe55355d443b8ac57f7b0 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 04:05:17 +0530 Subject: [PATCH 07/16] fix: resolve security detection gaps, threshold alignment, and markdown lint (formatted) --- cortex/update_recommender.py | 90 ++++++++++++++++++++++------ docs/SMART_UPDATE_RECOMMENDATIONS.md | 2 +- tests/test_update_recommender.py | 19 ++++-- 3 files changed, 87 insertions(+), 24 deletions(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index abdcf8e68..3f5e334ce 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -312,6 +312,34 @@ def _run_pkg_cmd(self, cmd: list[str]) -> str | None: except (subprocess.TimeoutExpired, FileNotFoundError): return None + def _get_package_metadata(self, package_name: str) -> tuple[str, str]: + """Fetch package description and changelog metadata.""" + description, changelog = "", "" + + # Try APT + output = self._run_pkg_cmd(["apt-cache", "show", package_name]) + if output: + desc_match = re.search( + r"^Description-(?:en|.*):\s*(.*?)(?=\n\S|$)", output, re.S | re.M + ) + if desc_match: + description = desc_match.group(1).strip() + # Changelog for APT is harder to get without network, + # but sometimes present in /usr/share/doc//changelog.Debian.gz + return description, changelog + + # Try DNF + output = self._run_pkg_cmd(["dnf", "info", "-q", package_name]) + if output: + lines = output.splitlines() + for i, line in enumerate(lines): + if line.startswith("Description :"): + description = " ".join(lines[i:]).replace("Description :", "").strip() + break + return description, changelog + + return description, changelog + def get_installed_packages(self) -> dict[str, PackageVersion]: """Get all installed packages with their versions.""" packages = {} @@ -334,7 +362,7 @@ def get_installed_packages(self) -> dict[str, PackageVersion]: packages[parts[0]] = PackageVersion.parse(parts[1]) return packages - def get_available_updates(self) -> list[tuple[str, str, str]]: + def get_available_updates(self) -> list[dict[str, Any]]: """Get list of packages with available updates.""" updates = self._get_apt_updates() if updates: @@ -342,10 +370,11 @@ def get_available_updates(self) -> list[tuple[str, str, str]]: return self._get_rpm_updates() - def _get_apt_updates(self) -> list[tuple[str, str, str]]: + def _get_apt_updates(self) -> list[dict[str, Any]]: """Helper to get updates via APT.""" updates = [] if self._run_pkg_cmd(["apt-get", "update", "-q"]) is None: + logger.warning("APT update check failed. Skipping APT updates.") return updates output = self._run_pkg_cmd(["apt", "list", "--upgradable"]) @@ -353,16 +382,19 @@ def _get_apt_updates(self) -> list[tuple[str, str, str]]: return updates for line in output.splitlines(): - # Optimized regex to prevent backtracking (ReDoS) + # Pattern: package/suite new_version arch [upgradable from: old_version] match = re.search( - r"^([^/\s]+)/[^\s]+\s+([^\s]+)\s+[^\s]+\s+\[upgradable from:\s+([^\s]+)\]", + r"^([^/\s]+)/([^\s]+)\s+([^\s]+)\s+[^\s]+\s+\[upgradable from:\s+([^\s]+)\]", line, ) if match: - updates.append(match.groups()) + pkg, suite, new_v, old_v = match.groups() + updates.append( + {"name": pkg, "old_version": old_v, "new_version": new_v, "repo": suite} + ) return updates - def _get_rpm_updates(self) -> list[tuple[str, str, str]]: + def _get_rpm_updates(self) -> list[dict[str, Any]]: """Helper to get updates via DNF/YUM.""" updates = [] for pm in ("dnf", "yum"): @@ -384,7 +416,7 @@ def _get_rpm_updates(self) -> list[tuple[str, str, str]]: def _parse_rpm_check_update( self, output: str, installed: dict[str, PackageVersion] - ) -> list[tuple[str, str, str]]: + ) -> list[dict[str, Any]]: """Helper to parse DNF/YUM check-update output.""" updates = [] for line in output.strip().splitlines(): @@ -392,9 +424,9 @@ def _parse_rpm_check_update( if len(parts) >= 2: full_name = parts[0] new_ver = parts[1] + repo = parts[2] if len(parts) >= 3 else "" - # Resolve name: prefer full name if installed, then name without arch, - # then fallback to name without arch for consistency. + # Resolve name: prefer full name if installed, then name without arch name = full_name if "." in full_name: name_no_arch = full_name.rsplit(".", 1)[0] @@ -403,15 +435,17 @@ def _parse_rpm_check_update( elif name_no_arch in installed: name = name_no_arch else: - # Fallback for systems where we might not have matched yet name = name_no_arch current = installed.get(name) old_ver = str(current) if current else "0.0.0" - updates.append((name, old_ver, new_ver)) + updates.append( + {"name": name, "old_version": old_ver, "new_version": new_ver, "repo": repo} + ) return updates def analyze_change_type(self, current: PackageVersion, new: PackageVersion) -> ChangeType: + """Classify the semantic version delta between current and new versions.""" if new.major > current.major: return ChangeType.MAJOR if new.minor > current.minor: @@ -474,9 +508,9 @@ def assess_risk( def _map_score_to_risk(self, score: int) -> RiskLevel: """Map aggregate risk score to RiskLevel enum.""" - if score >= 60: - return RiskLevel.HIGH if score >= 35: + return RiskLevel.HIGH + if score >= 15: return RiskLevel.MEDIUM return RiskLevel.LOW @@ -524,7 +558,7 @@ def _get_historical_risk_adjustment(self, package_name: str) -> tuple[int, list[ return adjustment, notes def is_security_update( - self, package_name: str, changelog: str = "", description: str = "" + self, package_name: str, changelog: str = "", description: str = "", repo: str = "" ) -> bool: """ Determine if an update is security-related. @@ -533,11 +567,16 @@ def is_security_update( package_name: Name of the package changelog: Changelog content description: Update description + repo: Origin repository or suite (e.g., 'jammy-security') Returns: True if this appears to be a security update """ - combined_text = f"{package_name} {changelog} {description}".lower() + combined_text = f"{package_name} {changelog} {description} {repo}".lower() + + # Check for repo origin signals (high confidence) + if "security" in repo.lower(): + return True for indicator in self.SECURITY_INDICATORS: if indicator in combined_text: @@ -710,12 +749,25 @@ def get_recommendations(self, use_llm: bool = True) -> UpdateRecommendation: update_infos = [] groups: dict[str, list[UpdateInfo]] = {} - for pkg_name, old_ver, new_ver in updates: + for update in updates: + pkg_name = update["name"] + old_ver = update["old_version"] + new_ver = update["new_version"] + repo = update.get("repo", "") + + # Fetch extra metadata for better analysis + description, changelog = self._get_package_metadata(pkg_name) + current, new = PackageVersion.parse(old_ver), PackageVersion.parse(new_ver) change_type = self.analyze_change_type(current, new) - risk_level, breaking_changes = self.assess_risk(pkg_name, current, new) + risk_level, breaking_changes = self.assess_risk( + pkg_name, current, new, changelog=changelog + ) group = self.get_package_group(pkg_name) - is_security = self.is_security_update(pkg_name) + is_security = self.is_security_update( + pkg_name, changelog=changelog, description=description, repo=repo + ) + info = UpdateInfo( pkg_name, current, @@ -723,6 +775,8 @@ def get_recommendations(self, use_llm: bool = True) -> UpdateRecommendation: change_type, risk_level, self.categorize_update(risk_level, is_security, change_type), + description=description, + changelog=changelog, breaking_changes=breaking_changes, group=group, is_security=is_security, diff --git a/docs/SMART_UPDATE_RECOMMENDATIONS.md b/docs/SMART_UPDATE_RECOMMENDATIONS.md index a46dac68e..79b1f3116 100644 --- a/docs/SMART_UPDATE_RECOMMENDATIONS.md +++ b/docs/SMART_UPDATE_RECOMMENDATIONS.md @@ -23,7 +23,7 @@ cortex update recommend ### Example Output -``` +```text πŸ“Š Update Analysis πŸ”’ Security Updates (Apply ASAP): diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index b945fe7e4..41aa8a6ec 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -102,7 +102,7 @@ def test_risk_assessment_branches(self, r): risk, warns = r.assess_risk( "some-pkg", PackageVersion.parse("1.0"), PackageVersion.parse("1.1-beta") ) - assert risk == RiskLevel.MEDIUM + assert risk == RiskLevel.HIGH # Changelog keywords risk, warns = r.assess_risk( @@ -111,7 +111,7 @@ def test_risk_assessment_branches(self, r): PackageVersion.parse("1.0.1"), "Breaking change and deprecated", ) - assert risk == RiskLevel.MEDIUM + assert risk == RiskLevel.HIGH def test_security_detection(self, r): assert r.is_security_update("pkg", "High CVE-2024-0001 fix") @@ -187,6 +187,10 @@ def test_pkg_manager_interactions(self, mock_run, r): ] updates = r.get_available_updates() assert len(updates) == 1 + assert updates[0]["name"] == "nginx" + assert updates[0]["old_version"] == "1.24.0" + assert updates[0]["new_version"] == "1.25.0" + assert "jammy" in updates[0]["repo"] # Simulate DNF check-update (exit 100 indicates available updates) mock_run.side_effect = [ @@ -195,15 +199,20 @@ def test_pkg_manager_interactions(self, mock_run, r): MagicMock(returncode=0, stdout="Version : 8.4.0"), # dnf info ] updates = r.get_available_updates() - assert len(updates) == 1 and updates[0][0] == "curl" + assert len(updates) == 1 and updates[0]["name"] == "curl" # Handle command timeout or missing manager scenarios mock_run.side_effect = subprocess.TimeoutExpired(["cmd"], 30) assert r._run_pkg_cmd(["cmd"]) is None + @patch.object(UpdateRecommender, "_get_package_metadata") @patch.object(UpdateRecommender, "get_available_updates") - def test_get_recommendations_full(self, mock_get, r): - mock_get.return_value = [("nginx", "1.24.0", "1.25.0"), ("postgresql", "14.0", "15.0")] + def test_get_recommendations_full(self, mock_get, mock_meta, r): + mock_get.return_value = [ + {"name": "nginx", "old_version": "1.24.0", "new_version": "1.25.0", "repo": "updates"}, + {"name": "postgresql", "old_version": "14.0", "new_version": "15.0", "repo": "updates"}, + ] + mock_meta.return_value = ("desc", "changelog") rec = r.get_recommendations(use_llm=False) assert rec.total_updates == 2 assert rec.overall_risk == RiskLevel.HIGH From acf0d46ee1751915a088267adbc56d34701b1a54 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 04:10:57 +0530 Subject: [PATCH 08/16] Refactor: Resolve SonarQube reliability issues and improve code quality --- cortex/cli.py | 6 ++---- cortex/update_recommender.py | 26 ++++++++++++++------------ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index 349096c9d..b507f07f2 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2632,8 +2632,6 @@ def progress_callback(message: str, percent: float) -> None: output_json = getattr(args, "json", False) if output_json: - import json as json_module - llm_router = None if use_llm: try: @@ -2642,14 +2640,14 @@ def progress_callback(message: str, percent: float) -> None: llm_router = LLMRouter() except ImportError: pass - except Exception as e: + except (RuntimeError, ConnectionError) as e: if self.verbose: logger.debug(f"LLM router initialization failed: {e}") recommender = UpdateRecommender(llm_router=llm_router, verbose=self.verbose) recommendation = recommender.get_recommendations(use_llm=use_llm) - print(json_module.dumps(recommendation.to_dict(), indent=2)) + print(json.dumps(recommendation.to_dict(), indent=2)) return 0 else: cx_print(t("update_recommend.checking"), "thinking") diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 3f5e334ce..1129c2d85 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -160,6 +160,8 @@ def __lt__(self, other: object) -> bool: return True if not self.prerelease and other.prerelease: return False + if self.prerelease and other.prerelease: + return self.prerelease < other.prerelease return False @@ -294,14 +296,15 @@ def __init__( # Graceful initialization of subsystems try: self.history = history or InstallationHistory() - except Exception as e: - logger.warning(f"Installation history unavailable: {e}") + except (RuntimeError, OSError, ImportError) as e: + logger.warning("Installation history unavailable: %s", e) self.history = None try: self.memory = memory or ContextMemory() - except Exception as e: - logger.warning(f"Context memory unavailable: {e}") + except (RuntimeError, OSError, ImportError) as e: + # We use lazy logging formatting to satisfy SonarQube + logger.warning("Context memory unavailable: %s", e) self.memory = None def _run_pkg_cmd(self, cmd: list[str]) -> str | None: @@ -410,7 +413,7 @@ def _get_rpm_updates(self) -> list[dict[str, Any]]: except (subprocess.TimeoutExpired, FileNotFoundError): continue except subprocess.SubprocessError as e: - logger.warning(f"Package manager check failed: {e}") + logger.warning("Package manager check failed: %s", e) continue return updates @@ -432,9 +435,8 @@ def _parse_rpm_check_update( name_no_arch = full_name.rsplit(".", 1)[0] if full_name in installed: name = full_name - elif name_no_arch in installed: - name = name_no_arch else: + # Default to name without arch if not specifically found name = name_no_arch current = installed.get(name) @@ -553,7 +555,7 @@ def _get_historical_risk_adjustment(self, package_name: str) -> tuple[int, list[ except (OSError, AttributeError) as e: if self.verbose: - logger.debug(f"Historical risk lookup failed: {e}") + logger.debug("Historical risk lookup failed: %s", e) return adjustment, notes @@ -721,10 +723,10 @@ def analyze_with_llm(self, updates: list[UpdateInfo]) -> str: return response.content except (ImportError, RuntimeError, ConnectionError) as e: - logger.warning(f"LLM analysis context error: {e}") + logger.warning("LLM analysis context error: %s", e) return "" except Exception as e: - logger.error(f"Unexpected LLM analysis error: {e}") + logger.error("Unexpected LLM analysis error: %s", e, exc_info=True) return "" def get_recommendations(self, use_llm: bool = True) -> UpdateRecommendation: @@ -958,8 +960,8 @@ def recommend_updates( from cortex.llm_router import LLMRouter llm_router = LLMRouter() - except Exception as e: - logger.debug(f"LLM router not available: {e}") + except (ImportError, RuntimeError) as e: + logger.debug("LLM router not available: %s", e) recommender = UpdateRecommender( llm_router=llm_router, From 06a32a87252952ff6f843ffc71809fda527456a7 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 04:14:58 +0530 Subject: [PATCH 09/16] Refactor: Resolve SonarQube regex, complexity, and nested ternary issues --- cortex/update_recommender.py | 152 ++++++++++++++++++++--------------- 1 file changed, 86 insertions(+), 66 deletions(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 1129c2d85..300edf37b 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -319,16 +319,24 @@ def _get_package_metadata(self, package_name: str) -> tuple[str, str]: """Fetch package description and changelog metadata.""" description, changelog = "", "" - # Try APT + # Try APT: parse line-by-line to avoid complex regex issues output = self._run_pkg_cmd(["apt-cache", "show", package_name]) if output: - desc_match = re.search( - r"^Description-(?:en|.*):\s*(.*?)(?=\n\S|$)", output, re.S | re.M - ) - if desc_match: - description = desc_match.group(1).strip() - # Changelog for APT is harder to get without network, - # but sometimes present in /usr/share/doc//changelog.Debian.gz + desc_lines = [] + capturing = False + for line in output.splitlines(): + if line.startswith("Description"): + capturing = True + # Remove "Description-en: " or similar + clean_line = re.sub(r"^Description(?:-[\w-]+)?:\s*", "", line) + desc_lines.append(clean_line) + elif capturing: + if line.startswith(" "): + desc_lines.append(line.strip()) + else: + break + if desc_lines: + description = " ".join(desc_lines).strip() return description, changelog # Try DNF @@ -424,28 +432,34 @@ def _parse_rpm_check_update( updates = [] for line in output.strip().splitlines(): parts = line.split() - if len(parts) >= 2: - full_name = parts[0] - new_ver = parts[1] - repo = parts[2] if len(parts) >= 3 else "" - - # Resolve name: prefer full name if installed, then name without arch - name = full_name - if "." in full_name: - name_no_arch = full_name.rsplit(".", 1)[0] - if full_name in installed: - name = full_name - else: - # Default to name without arch if not specifically found - name = name_no_arch + if len(parts) < 2: + continue - current = installed.get(name) - old_ver = str(current) if current else "0.0.0" - updates.append( - {"name": name, "old_version": old_ver, "new_version": new_ver, "repo": repo} - ) + full_name = parts[0] + new_ver = parts[1] + repo = parts[2] if len(parts) >= 3 else "" + + # Resolve name: prefer architecture-specific if installed + name = self._resolve_rpm_name(full_name, installed) + current = installed.get(name) + old_ver = str(current) if current else "0.0.0" + + updates.append( + {"name": name, "old_version": old_ver, "new_version": new_ver, "repo": repo} + ) return updates + def _resolve_rpm_name(self, full_name: str, installed: dict[str, PackageVersion]) -> str: + """Resolve RPM package name by handling architecture suffixes.""" + if "." not in full_name: + return full_name + + name_no_arch = full_name.rsplit(".", 1)[0] + if full_name in installed: + return full_name + + return name_no_arch + def analyze_change_type(self, current: PackageVersion, new: PackageVersion) -> ChangeType: """Classify the semantic version delta between current and new versions.""" if new.major > current.major: @@ -517,47 +531,49 @@ def _map_score_to_risk(self, score: int) -> RiskLevel: return RiskLevel.LOW def _get_historical_risk_adjustment(self, package_name: str) -> tuple[int, list[str]]: - """ - Query history and memory to refine risk scores based on past performance. - Returns (score_adjustment, notes). - """ - adjustment = 0 - notes = [] + """Query history and memory to refine risk scores base on past performance.""" + adj, notes = 0, [] try: - # Check installation history for previous failures/rollbacks - if not self.history: - return adjustment, notes - - past_records = self.history.get_history(limit=50) - for record in past_records: - if not record.packages: - continue - if package_name not in record.packages: - continue - - # Check for critical status issues - if record.status in (InstallationStatus.FAILED, InstallationStatus.ROLLED_BACK): - adjustment += 25 - notes.append( - "Historical instability: previous updates failed or were rolled back" - ) - break - - # Check context memory for recurring issues - if self.memory: - memories = self.memory.get_similar_interactions(package_name, limit=5) - for m in memories: - if not m.success: - adjustment += 10 - notes.append(f"Memory: Previously caused issues during {m.action}") - break + h_adj, h_notes = self._check_history_risk(package_name) + adj += h_adj + notes.extend(h_notes) + m_adj, m_notes = self._check_memory_risk(package_name) + adj += m_adj + notes.extend(m_notes) except (OSError, AttributeError) as e: if self.verbose: logger.debug("Historical risk lookup failed: %s", e) - return adjustment, notes + return adj, notes + + def _check_history_risk(self, package_name: str) -> tuple[int, list[str]]: + """Check installation history for previous failures.""" + if not self.history: + return 0, [] + + past_records = self.history.get_history(limit=50) + for record in past_records: + if not record.packages or package_name not in record.packages: + continue + + if record.status in (InstallationStatus.FAILED, InstallationStatus.ROLLED_BACK): + return 25, ["Historical instability: previous updates failed or were rolled back"] + + return 0, [] + + def _check_memory_risk(self, package_name: str) -> tuple[int, list[str]]: + """Check context memory for recurring issues.""" + if not self.memory: + return 0, [] + + memories = self.memory.get_similar_interactions(package_name, limit=5) + for m in memories: + if not m.success: + return 10, [f"Memory: Previously caused issues during {m.action}"] + + return 0, [] def is_security_update( self, package_name: str, changelog: str = "", description: str = "", repo: str = "" @@ -647,11 +663,15 @@ def categorize_update( def generate_recommendation_text(self, update: UpdateInfo) -> str: """Generate human-readable recommendation for an update.""" - res = [ - t( - f"update_recommend.recommendations.{'security_urgent' if update.category == UpdateCategory.SECURITY else 'safe_immediate' if update.category == UpdateCategory.IMMEDIATE else 'maintenance_window' if update.category == UpdateCategory.SCHEDULED else 'consider_deferring'}" - ) - ] + # Use a mapping to avoid nested ternary expressions (SonarQube) + category_keys = { + UpdateCategory.SECURITY: "security_urgent", + UpdateCategory.IMMEDIATE: "safe_immediate", + UpdateCategory.SCHEDULED: "maintenance_window", + UpdateCategory.DEFERRED: "consider_deferring", + } + key = category_keys.get(update.category, "maintenance_window") + res = [t(f"update_recommend.recommendations.{key}")] if update.change_type == ChangeType.MAJOR: res.append( t( From ac9263950eb25826c7796890256132362fe89871 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 11:54:09 +0530 Subject: [PATCH 10/16] fix(update-recommender): implement changelog fetching for APT/DNF and add unit tests Populate changelog data for breaking-change detection in _get_package_metadata and support DNF/YUM fallback. --- cortex/update_recommender.py | 31 +++++++++++++++++------- tests/test_update_recommender.py | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 300edf37b..2926bfd5c 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -337,18 +337,31 @@ def _get_package_metadata(self, package_name: str) -> tuple[str, str]: break if desc_lines: description = " ".join(desc_lines).strip() - return description, changelog - # Try DNF - output = self._run_pkg_cmd(["dnf", "info", "-q", package_name]) - if output: - lines = output.splitlines() - for i, line in enumerate(lines): - if line.startswith("Description :"): - description = " ".join(lines[i:]).replace("Description :", "").strip() - break + # Fetch changelog (best-effort, trimmed) + changelog_out = self._run_pkg_cmd(["apt-get", "changelog", package_name]) + if changelog_out: + changelog = "\n".join(changelog_out.splitlines()[:200]) + return description, changelog + # Try DNF/YUM + for pm in ("dnf", "yum"): + output = self._run_pkg_cmd([pm, "info", "-q", package_name]) + if output: + lines = output.splitlines() + for i, line in enumerate(lines): + if line.startswith("Description :"): + description = " ".join(lines[i:]).replace("Description :", "").strip() + break + + # Fetch changelog (best-effort, trimmed) + changelog_out = self._run_pkg_cmd([pm, "repoquery", "--changelog", package_name]) + if changelog_out: + changelog = "\n".join(changelog_out.splitlines()[:200]) + + return description, changelog + return description, changelog def get_installed_packages(self) -> dict[str, PackageVersion]: diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index 41aa8a6ec..c1f6c5d84 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -90,6 +90,47 @@ def test_change_analysis(self, r, curr, new, expected): r.analyze_change_type(PackageVersion.parse(curr), PackageVersion.parse(new)) == expected ) + @patch("cortex.update_recommender.subprocess.run") + def test_get_package_metadata(self, mock_run, r): + # Test APT success path + mock_run.side_effect = [ + MagicMock(returncode=0, stdout="Description: A test package\n Full description here."), + MagicMock(returncode=0, stdout="* v1.1.0: fixed security hole\n" + "* line\n" * 250), + ] + desc, changelog = r._get_package_metadata("test-pkg") + assert "A test package" in desc + assert "fixed security hole" in changelog + assert len(changelog.splitlines()) == 200 # Truncation check + assert mock_run.call_args_list[0][0][0] == ["apt-cache", "show", "test-pkg"] + assert mock_run.call_args_list[1][0][0] == ["apt-get", "changelog", "test-pkg"] + + mock_run.reset_mock() + # Test DNF success path (APT fails) + mock_run.side_effect = [ + MagicMock(returncode=1), # apt-cache show fail + MagicMock(returncode=0, stdout="Description : A DNF test package"), + MagicMock(returncode=0, stdout="* Mon Jan 01 2024 User - 1.0.1-1\n- Breaking change"), + ] + desc, changelog = r._get_package_metadata("test-pkg") + assert "A DNF test package" in desc + assert "Breaking change" in changelog + assert mock_run.call_args_list[1][0][0] == ["dnf", "info", "-q", "test-pkg"] + assert mock_run.call_args_list[2][0][0] == ["dnf", "repoquery", "--changelog", "test-pkg"] + + mock_run.reset_mock() + # Test YUM fallback path (APT and DNF fail) + mock_run.side_effect = [ + MagicMock(returncode=1), # apt-cache show fail + MagicMock(returncode=1), # dnf info fail + MagicMock(returncode=0, stdout="Description : A YUM test package"), + MagicMock(returncode=0, stdout="* Mon Jan 01 2024 User - 1.0.1-1\n- Breaking change"), + ] + desc, changelog = r._get_package_metadata("test-pkg") + assert "A YUM test package" in desc + assert "Breaking change" in changelog + assert mock_run.call_args_list[2][0][0] == ["yum", "info", "-q", "test-pkg"] + assert mock_run.call_args_list[3][0][0] == ["yum", "repoquery", "--changelog", "test-pkg"] + def test_risk_assessment_branches(self, r): # High risk package + major version risk, warns = r.assess_risk( From 132778283801ffb0fd35730d10cd08736a8e6fcc Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 12:03:35 +0530 Subject: [PATCH 11/16] fix(update-recommender): handle Debian prerelease markers and add type hints - Update PackageVersion.parse to support ~ markers\n- Add explicit -> None to UpdateRecommender.__init__\n- Add unit test for ~ version markers --- cortex/update_recommender.py | 4 ++-- tests/test_update_recommender.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 2926bfd5c..140d75688 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -95,7 +95,7 @@ def parse(cls, version_str: str) -> "PackageVersion": # Parse major.minor.patch major, minor, patch = cls._parse_components(core_ver) - pr_match = re.search(r"[-+](alpha|beta|rc|dev|pre)[\d.]*", raw_str, re.I) + pr_match = re.search(r"[-+~](alpha|beta|rc|dev|pre)[\d.]*", raw_str, re.I) pr = pr_match.group(0) if pr_match else "" return cls(raw_str, major, minor, patch, pr, epoch) @@ -280,7 +280,7 @@ def __init__( history: InstallationHistory | None = None, memory: ContextMemory | None = None, verbose: bool = False, - ): + ) -> None: """ Initialize the Update Recommender. diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index c1f6c5d84..99dad94a5 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -38,6 +38,7 @@ class TestPackageVersion: ("abc:1.2.3", (1, 2, 3, 0, "")), # Invalid epoch ("1.2", (1, 2, 0, 0, "")), ("1", (1, 0, 0, 0, "")), + ("1.2.3~rc1", (1, 2, 3, 0, "~rc1")), ], ) def test_parse(self, version_str, expected): From 85fe99eaa8e8bfa6e8444cea3499ac0f4e6050c4 Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Fri, 23 Jan 2026 12:17:57 +0530 Subject: [PATCH 12/16] fix(update-recommender): filter non-package lines in RPM check-update parsing Ignore lines in dnf/yum check-update output that do not contain a digit in the version field (e.g., metadata expiration notices). --- cortex/update_recommender.py | 4 ++++ tests/test_update_recommender.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 140d75688..4b512c997 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -448,6 +448,10 @@ def _parse_rpm_check_update( if len(parts) < 2: continue + # Skip non-package/status lines (e.g., metadata expiration notices) + if not re.search(r"\d", parts[1]): + continue + full_name = parts[0] new_ver = parts[1] repo = parts[2] if len(parts) >= 3 else "" diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index 99dad94a5..fa82b5ede 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -237,7 +237,10 @@ def test_pkg_manager_interactions(self, mock_run, r): # Simulate DNF check-update (exit 100 indicates available updates) mock_run.side_effect = [ MagicMock(returncode=1), # apt update fail - MagicMock(returncode=100, stdout="curl.x86_64 8.5.0 updates"), # dnf check-update + MagicMock( + returncode=100, + stdout="Last metadata expiration check: 1:00:00 ago\ncurl.x86_64 8.5.0 updates", + ), # dnf check-update MagicMock(returncode=0, stdout="Version : 8.4.0"), # dnf info ] updates = r.get_available_updates() From 121746fd885952a2551da85636ef8dd74c1cf7fd Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Sat, 24 Jan 2026 01:40:51 +0530 Subject: [PATCH 13/16] Refactor update recommender: address PR feedback on magic numbers, timeouts, and logging --- cortex/cli.py | 3 +- cortex/i18n/locales/en.yaml | 1 + cortex/update_recommender.py | 96 +++++++++++++++++++++++--------- tests/test_update_recommender.py | 2 +- 4 files changed, 73 insertions(+), 29 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index b507f07f2..de98aa30b 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2641,8 +2641,7 @@ def progress_callback(message: str, percent: float) -> None: except ImportError: pass except (RuntimeError, ConnectionError) as e: - if self.verbose: - logger.debug(f"LLM router initialization failed: {e}") + logger.debug(f"LLM router initialization failed: {e}") recommender = UpdateRecommender(llm_router=llm_router, verbose=self.verbose) recommendation = recommender.get_recommendations(use_llm=use_llm) diff --git a/cortex/i18n/locales/en.yaml b/cortex/i18n/locales/en.yaml index 03c08a41c..631a919d1 100644 --- a/cortex/i18n/locales/en.yaml +++ b/cortex/i18n/locales/en.yaml @@ -510,6 +510,7 @@ update_recommend: ai_analysis: "AI Analysis" more_updates: "... and {count} more" + no_description: "No description available" recommendations: security_urgent: "Security update - prioritize installation" diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 4b512c997..59849837b 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -108,6 +108,7 @@ def _parse_epoch(raw: str) -> tuple[int, str]: try: return int(parts[0]), parts[1] except (ValueError, IndexError): + logger.warning("Failed to parse epoch from version string: '%s'. Defaulting to 0.", raw) return 0, raw @staticmethod @@ -274,12 +275,46 @@ class UpdateRecommender: "urgent", ] + # Breaking change indicators for changelog scanning + BREAKING_CHANGE_INDICATORS = [ + "breaking change", + "backwards incompatible", + "deprecated", + "removed", + "migration required", + "manual action", + ] + + # Timeouts for external commands (in seconds) + DEFAULT_TIMEOUT = 30 + CHECK_UPDATE_TIMEOUT = 120 + + # Risk score thresholds + RISK_THRESHOLD_HIGH = 35 + RISK_THRESHOLD_MEDIUM = 15 + + # Risk score penalties + PENALTY_HIGH_IMPACT_PKG = 30 + PENALTY_MAJOR_VERSION = 40 + PENALTY_MINOR_VERSION = 15 + PENALTY_PATCH_VERSION = 5 + PENALTY_PRERELEASE = 25 + PENALTY_CHANGELOG_KEYWORD = 15 + PENALTY_HISTORY_FAILURE = 25 + PENALTY_MEMORY_ISSUE = 10 + + # UI and LLM limits + MAX_LLM_UPDATES = 10 + MAX_DISPLAY_UPDATES = 10 + def __init__( self, llm_router: Any | None = None, history: InstallationHistory | None = None, memory: ContextMemory | None = None, verbose: bool = False, + timeout: int | None = None, + check_timeout: int | None = None, ) -> None: """ Initialize the Update Recommender. @@ -289,9 +324,13 @@ def __init__( history: Optional installation history for learning memory: Optional context memory for pattern recognition verbose: Enable verbose output + timeout: Timeout for external commands + check_timeout: Timeout for update check commands """ self.llm_router = llm_router self.verbose = verbose + self.timeout = timeout or self.DEFAULT_TIMEOUT + self.check_timeout = check_timeout or self.CHECK_UPDATE_TIMEOUT # Graceful initialization of subsystems try: @@ -310,7 +349,7 @@ def __init__( def _run_pkg_cmd(self, cmd: list[str]) -> str | None: """Internal helper to run package manager commands.""" try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=self.timeout) return result.stdout.strip() if result.returncode == 0 else None except (subprocess.TimeoutExpired, FileNotFoundError): return None @@ -337,6 +376,8 @@ def _get_package_metadata(self, package_name: str) -> tuple[str, str]: break if desc_lines: description = " ".join(desc_lines).strip() + else: + description = t("update_recommend.no_description") # Fetch changelog (best-effort, trimmed) changelog_out = self._run_pkg_cmd(["apt-get", "changelog", package_name]) @@ -360,9 +401,9 @@ def _get_package_metadata(self, package_name: str) -> tuple[str, str]: if changelog_out: changelog = "\n".join(changelog_out.splitlines()[:200]) - return description, changelog + return description or t("update_recommend.no_description"), changelog - return description, changelog + return description or t("update_recommend.no_description"), changelog def get_installed_packages(self) -> dict[str, PackageVersion]: """Get all installed packages with their versions.""" @@ -424,7 +465,10 @@ def _get_rpm_updates(self) -> list[dict[str, Any]]: for pm in ("dnf", "yum"): try: result = subprocess.run( - [pm, "check-update", "-q"], capture_output=True, text=True, timeout=120 + [pm, "check-update", "-q"], + capture_output=True, + text=True, + timeout=self.check_timeout, ) if result.returncode in (0, 100) and result.stdout: installed = self.get_installed_packages() @@ -498,32 +542,29 @@ def assess_risk( # Score penalty for known high-impact system packages for pkg, reason in self.HIGH_RISK_PACKAGES.items(): if pkg in package_name.lower(): - score += 30 + score += self.PENALTY_HIGH_IMPACT_PKG warnings.append(reason) break # Score penalty based on Semantic Versioning delta severity ctype = self.analyze_change_type(current, new) - score += {ChangeType.MAJOR: 40, ChangeType.MINOR: 15, ChangeType.PATCH: 5}.get(ctype, 0) + score += { + ChangeType.MAJOR: self.PENALTY_MAJOR_VERSION, + ChangeType.MINOR: self.PENALTY_MINOR_VERSION, + ChangeType.PATCH: self.PENALTY_PATCH_VERSION, + }.get(ctype, 0) if ctype == ChangeType.MAJOR: warnings.append(f"Major version change ({current.major} β†’ {new.major})") # Additional penalty for unstable pre-release versions if new.prerelease: - score += 25 + score += self.PENALTY_PRERELEASE warnings.append(f"Pre-release version: {new.prerelease}") # Scan changelogs for keyword indicators of breaking changes - for ind in [ - "breaking change", - "backwards incompatible", - "deprecated", - "removed", - "migration required", - "manual action", - ]: + for ind in self.BREAKING_CHANGE_INDICATORS: if ind in changelog.lower(): - score += 15 + score += self.PENALTY_CHANGELOG_KEYWORD warnings.append(f"Changelog mentions: {ind}") # Map aggregate score to RiskLevel enum @@ -541,9 +582,9 @@ def assess_risk( def _map_score_to_risk(self, score: int) -> RiskLevel: """Map aggregate risk score to RiskLevel enum.""" - if score >= 35: + if score >= self.RISK_THRESHOLD_HIGH: return RiskLevel.HIGH - if score >= 15: + if score >= self.RISK_THRESHOLD_MEDIUM: return RiskLevel.MEDIUM return RiskLevel.LOW @@ -560,8 +601,7 @@ def _get_historical_risk_adjustment(self, package_name: str) -> tuple[int, list[ adj += m_adj notes.extend(m_notes) except (OSError, AttributeError) as e: - if self.verbose: - logger.debug("Historical risk lookup failed: %s", e) + logger.debug("Historical risk lookup failed: %s", e) return adj, notes @@ -576,7 +616,9 @@ def _check_history_risk(self, package_name: str) -> tuple[int, list[str]]: continue if record.status in (InstallationStatus.FAILED, InstallationStatus.ROLLED_BACK): - return 25, ["Historical instability: previous updates failed or were rolled back"] + return self.PENALTY_HISTORY_FAILURE, [ + "Historical instability: previous updates failed or were rolled back" + ] return 0, [] @@ -588,7 +630,9 @@ def _check_memory_risk(self, package_name: str) -> tuple[int, list[str]]: memories = self.memory.get_similar_interactions(package_name, limit=5) for m in memories: if not m.success: - return 10, [f"Memory: Previously caused issues during {m.action}"] + return self.PENALTY_MEMORY_ISSUE, [ + f"Memory: Previously caused issues during {m.action}" + ] return 0, [] @@ -728,7 +772,7 @@ def analyze_with_llm(self, updates: list[UpdateInfo]) -> str: try: # Build a summary for the LLM update_summary = [] - for u in updates[:10]: # Limit to first 10 for context length + for u in updates[: self.MAX_LLM_UPDATES]: # Limit for context length update_summary.append( f"- {u.package_name}: {u.current_version} β†’ {u.new_version} " f"({u.change_type.value}, {u.risk_level.value} risk)" @@ -938,7 +982,7 @@ def _display_update_table(self, updates: list[UpdateInfo]) -> None: table.add_column("Risk") table.add_column("Notes") - for update in updates[:10]: # Limit display + for update in updates[: self.MAX_DISPLAY_UPDATES]: # Limit display risk_color = self.RISK_COLORS.get(update.risk_level, "white") risk_display = t(f"update_recommend.risks.{update.risk_level.value_str}") type_str = update.change_type.value @@ -962,9 +1006,9 @@ def _display_update_table(self, updates: list[UpdateInfo]) -> None: " | ".join(notes) if notes else "-", ) - if len(updates) > 10: + if len(updates) > self.MAX_DISPLAY_UPDATES: table.add_row( - t("update_recommend.more_updates", count=len(updates) - 10), + t("update_recommend.more_updates", count=len(updates) - self.MAX_DISPLAY_UPDATES), "", "", "", diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index fa82b5ede..9436be83f 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -101,7 +101,7 @@ def test_get_package_metadata(self, mock_run, r): desc, changelog = r._get_package_metadata("test-pkg") assert "A test package" in desc assert "fixed security hole" in changelog - assert len(changelog.splitlines()) == 200 # Truncation check + assert len(changelog.splitlines()) <= 200 # Truncation check assert mock_run.call_args_list[0][0][0] == ["apt-cache", "show", "test-pkg"] assert mock_run.call_args_list[1][0][0] == ["apt-get", "changelog", "test-pkg"] From 721d43a9743317ef8be4369cc165b95bd673543b Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Sat, 24 Jan 2026 02:25:54 +0530 Subject: [PATCH 14/16] Fix: ensure --json output is always valid JSON even on error --- cortex/cli.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index de98aa30b..49b26752f 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2632,22 +2632,31 @@ def progress_callback(message: str, percent: float) -> None: output_json = getattr(args, "json", False) if output_json: - llm_router = None - if use_llm: - try: - from cortex.llm_router import LLMRouter + try: + llm_router = None + if use_llm: + try: + from cortex.llm_router import LLMRouter - llm_router = LLMRouter() - except ImportError: - pass - except (RuntimeError, ConnectionError) as e: - logger.debug(f"LLM router initialization failed: {e}") + llm_router = LLMRouter() + except ImportError: + pass + except (RuntimeError, ConnectionError) as e: + logger.debug(f"LLM router initialization failed: {e}") - recommender = UpdateRecommender(llm_router=llm_router, verbose=self.verbose) - recommendation = recommender.get_recommendations(use_llm=use_llm) + recommender = UpdateRecommender(llm_router=llm_router, verbose=self.verbose) + recommendation = recommender.get_recommendations(use_llm=use_llm) - print(json.dumps(recommendation.to_dict(), indent=2)) - return 0 + print(json.dumps(recommendation.to_dict(), indent=2)) + return 0 + except Exception as e: + error_payload = { + "success": False, + "error": str(e), + "error_type": type(e).__name__, + } + print(json.dumps(error_payload)) + return 1 else: cx_print(t("update_recommend.checking"), "thinking") return recommend_updates(use_llm=use_llm, verbose=self.verbose) From e04ef9ec8d39170cd11cce4eaecb6cfb26714e1b Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Sat, 24 Jan 2026 02:25:57 +0530 Subject: [PATCH 15/16] Fix: use check_timeout for slow APT operations and support custom timeouts in _run_pkg_cmd --- cortex/update_recommender.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 59849837b..8d3a47e46 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -346,10 +346,12 @@ def __init__( logger.warning("Context memory unavailable: %s", e) self.memory = None - def _run_pkg_cmd(self, cmd: list[str]) -> str | None: + def _run_pkg_cmd(self, cmd: list[str], timeout: int | None = None) -> str | None: """Internal helper to run package manager commands.""" try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=self.timeout) + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout or self.timeout + ) return result.stdout.strip() if result.returncode == 0 else None except (subprocess.TimeoutExpired, FileNotFoundError): return None @@ -438,11 +440,11 @@ def get_available_updates(self) -> list[dict[str, Any]]: def _get_apt_updates(self) -> list[dict[str, Any]]: """Helper to get updates via APT.""" updates = [] - if self._run_pkg_cmd(["apt-get", "update", "-q"]) is None: + if self._run_pkg_cmd(["apt-get", "update", "-q"], timeout=self.check_timeout) is None: logger.warning("APT update check failed. Skipping APT updates.") return updates - output = self._run_pkg_cmd(["apt", "list", "--upgradable"]) + output = self._run_pkg_cmd(["apt", "list", "--upgradable"], timeout=self.check_timeout) if not output: return updates From f8c9b1b04752dcd9195897811403c480a864647a Mon Sep 17 00:00:00 2001 From: pratyush07-hub Date: Sat, 24 Jan 2026 02:37:42 +0530 Subject: [PATCH 16/16] Fix: avoid RPM fallback on systems with APT using shutil.which and update tests --- cortex/update_recommender.py | 6 +++--- tests/test_update_recommender.py | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py index 8d3a47e46..b189897ad 100644 --- a/cortex/update_recommender.py +++ b/cortex/update_recommender.py @@ -11,6 +11,7 @@ import logging import re +import shutil import subprocess from dataclasses import dataclass, field from datetime import datetime @@ -431,9 +432,8 @@ def get_installed_packages(self) -> dict[str, PackageVersion]: def get_available_updates(self) -> list[dict[str, Any]]: """Get list of packages with available updates.""" - updates = self._get_apt_updates() - if updates: - return updates + if shutil.which("apt-get") and shutil.which("apt"): + return self._get_apt_updates() return self._get_rpm_updates() diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py index 9436be83f..dc0f2fb74 100644 --- a/tests/test_update_recommender.py +++ b/tests/test_update_recommender.py @@ -210,8 +210,10 @@ def test_recommendation_text_branches(self, r): ) assert "part of python" in r.generate_recommendation_text(u).lower() + @patch("cortex.update_recommender.shutil.which") @patch("cortex.update_recommender.subprocess.run") - def test_pkg_manager_interactions(self, mock_run, r): + def test_pkg_manager_interactions(self, mock_run, mock_which, r): + mock_which.return_value = True # Default to APT present # Verify DPKG version parsing (Debian/Ubuntu) mock_run.return_value = MagicMock(returncode=0, stdout="pkg1 1.0\npkg2 2.0") pkgs = r.get_installed_packages() @@ -235,8 +237,8 @@ def test_pkg_manager_interactions(self, mock_run, r): assert "jammy" in updates[0]["repo"] # Simulate DNF check-update (exit 100 indicates available updates) + mock_which.return_value = False # Simulate APT not present mock_run.side_effect = [ - MagicMock(returncode=1), # apt update fail MagicMock( returncode=100, stdout="Last metadata expiration check: 1:00:00 ago\ncurl.x86_64 8.5.0 updates",