diff --git a/README.md b/README.md index 1c1c00e5..f444eec1 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ cortex install "tools for video compression" | **Docker Permission Fixer** | Fix root-owned bind mount issues automatically | | **Audit Trail** | Complete history in `~/.cortex/history.db` | | **Hardware-Aware** | Detects GPU, CPU, memory for optimized packages | +| **Smart Update Recommendations** | AI-powered update recommendations and risk assessment | | **Predictive Error Prevention** | AI-driven checks for potential installation failures | | **Multi-LLM Support** | Works with Claude, GPT-4, or local Ollama models | @@ -167,6 +168,9 @@ cortex history # Rollback an installation cortex rollback + +# Get smart update recommendations +cortex update recommend ``` ### Role Management @@ -192,6 +196,7 @@ cortex role set | `cortex sandbox ` | Test packages in Docker sandbox | | `cortex history` | View all past installations | | `cortex rollback ` | Undo a specific installation | +| `cortex update recommend` | Get AI-powered update recommendations | | `cortex --version` | Show version information | | `cortex --help` | Display help message | diff --git a/cortex/cli.py b/cortex/cli.py index 267228b0..49b26752 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -53,6 +53,7 @@ # Suppress noisy log messages in normal operation logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("cortex.installation_history").setLevel(logging.ERROR) +logger = logging.getLogger(__name__) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) @@ -2623,6 +2624,43 @@ def progress_callback(message: str, percent: float) -> None: ) return 0 + elif action == "recommend": + # Smart Update Recommendations (Issue #91) + from cortex.update_recommender import UpdateRecommender, recommend_updates + + use_llm = not getattr(args, "no_llm", False) + output_json = getattr(args, "json", False) + + if output_json: + try: + llm_router = None + if use_llm: + try: + from cortex.llm_router import LLMRouter + + llm_router = LLMRouter() + except ImportError: + pass + except (RuntimeError, ConnectionError) as e: + logger.debug(f"LLM router initialization failed: {e}") + + recommender = UpdateRecommender(llm_router=llm_router, verbose=self.verbose) + recommendation = recommender.get_recommendations(use_llm=use_llm) + + print(json.dumps(recommendation.to_dict(), indent=2)) + return 0 + except Exception as e: + error_payload = { + "success": False, + "error": str(e), + "error_type": type(e).__name__, + } + print(json.dumps(error_payload)) + return 1 + else: + cx_print(t("update_recommend.checking"), "thinking") + return recommend_updates(use_llm=use_llm, verbose=self.verbose) + else: # Default: show current version and check for updates cx_print(f"Current version: [cyan]{get_version_string()}[/cyan]", "info") @@ -5345,6 +5383,21 @@ def main(): # update backups update_subs.add_parser("backups", help="List available backups for rollback") + + # update recommend - Smart Update Recommendations (Issue #91) + update_recommend_parser = update_subs.add_parser( + "recommend", help="AI-powered update recommendations" + ) + update_recommend_parser.add_argument( + "--no-llm", + action="store_true", + help="Disable LLM analysis for recommendations", + ) + update_recommend_parser.add_argument( + "--json", + action="store_true", + help="Output recommendations in JSON format", + ) # -------------------------- # WiFi/Bluetooth Driver Matcher diff --git a/cortex/i18n/locales/en.yaml b/cortex/i18n/locales/en.yaml index eb212316..631a919d 100644 --- a/cortex/i18n/locales/en.yaml +++ b/cortex/i18n/locales/en.yaml @@ -480,6 +480,46 @@ progress: cleaning_up: "Cleaning up..." # {seconds} - duration completed_in: "Completed in {seconds} seconds" +# ============================================================================= +# Update Recommendations +# ============================================================================= +update_recommend: + checking: "Analyzing system for update recommendations..." + no_updates: "All packages are up to date! Your system is healthy." + overall_risk: "Overall risk: {risk}" + total_updates: "Total updates available: {count}" + header: "Update Analysis" + + categories: + security: "Security Updates (Apply ASAP)" + immediate: "Safe to Update Now (Low Risk)" + scheduled: "Recommended for Maintenance Window" + deferred: "Hold for Now" + groups: "Related Update Groups" + + risks: + low: "LOW" + medium: "MEDIUM" + high: "HIGH" + critical: "CRITICAL" + + notes: + security: "security" + warnings: "{count} warnings" + group: "group: {name}" + + ai_analysis: "AI Analysis" + more_updates: "... and {count} more" + no_description: "No description available" + + recommendations: + security_urgent: "Security update - prioritize installation" + safe_immediate: "Safe to update immediately" + maintenance_window: "Schedule for maintenance window" + consider_deferring: "Consider deferring this update" + major_upgrade: "Major version upgrade: {current} β†’ {new}" + potential_breaking: "Potential breaking changes:" + part_of_group: "Part of {group} update group" # ============================================================================= # Predictive Error Prevention diff --git a/cortex/update_recommender.py b/cortex/update_recommender.py new file mode 100644 index 00000000..b189897a --- /dev/null +++ b/cortex/update_recommender.py @@ -0,0 +1,1068 @@ +#!/usr/bin/env python3 +""" +Smart Update Recommender for Cortex Linux + +AI-powered system to recommend when and what to update. +Analyzes installed packages, checks for available updates, +assesses risks, and provides intelligent timing recommendations. + +Issue: #91 - Smart Update Recommendations +""" + +import logging +import re +import shutil +import subprocess +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from functools import total_ordering +from pathlib import Path +from typing import Any + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from cortex.context_memory import ContextMemory, MemoryEntry +from cortex.i18n.translator import t +from cortex.installation_history import InstallationHistory, InstallationStatus + +# Configure logging +logger = logging.getLogger(__name__) + +console = Console() + + +class RiskLevel(Enum): + """Risk level for package updates.""" + + LOW = 1 + MEDIUM = 2 + HIGH = 3 + CRITICAL = 4 + + @property + def value_str(self) -> str: + """Get string value for translation keys.""" + return {1: "low", 2: "medium", 3: "high", 4: "critical"}[self.value] + + +class UpdateCategory(Enum): + """Category of update based on recommended timing.""" + + IMMEDIATE = "immediate" # Safe to update now + SCHEDULED = "scheduled" # Recommended for maintenance window + DEFERRED = "deferred" # Hold for now + SECURITY = "security" # Security update - prioritize + + +class ChangeType(Enum): + """Type of version change.""" + + PATCH = "patch" # Bug fixes only + MINOR = "minor" # New features, backward compatible + MAJOR = "major" # Breaking changes possible + SECURITY = "security" # Security fix + UNKNOWN = "unknown" + + +@total_ordering +@dataclass +class PackageVersion: + """Represents a package version with parsed components.""" + + raw: str + major: int = 0 + minor: int = 0 + patch: int = 0 + prerelease: str = "" + epoch: int = 0 + + @classmethod + def parse(cls, version_str: str) -> "PackageVersion": + """Parse a version string into components.""" + if not version_str: + return cls(raw="0.0.0") + + raw_str = str(version_str).strip() + + # Handle epoch (e.g., "1:2.3.4") + epoch, clean_raw = cls._parse_epoch(raw_str) + + # Remove common suffixes like -1ubuntu1, +dfsg, etc. + core_ver = re.sub(r"[-+~].*$", "", clean_raw) + + # Parse major.minor.patch + major, minor, patch = cls._parse_components(core_ver) + + pr_match = re.search(r"[-+~](alpha|beta|rc|dev|pre)[\d.]*", raw_str, re.I) + pr = pr_match.group(0) if pr_match else "" + + return cls(raw_str, major, minor, patch, pr, epoch) + + @staticmethod + def _parse_epoch(raw: str) -> tuple[int, str]: + if ":" not in raw: + return 0, raw + parts = raw.split(":", 1) + try: + return int(parts[0]), parts[1] + except (ValueError, IndexError): + logger.warning("Failed to parse epoch from version string: '%s'. Defaulting to 0.", raw) + return 0, raw + + @staticmethod + def _parse_components(core: str) -> tuple[int, int, int]: + parts = core.split(".") + major, minor, patch = 0, 0, 0 + try: + if len(parts) >= 1: + major_clean = re.sub(r"^\D+", "", parts[0]) + major = int(re.sub(r"\D.*", "", major_clean) or 0) + if len(parts) >= 2: + minor = int(re.sub(r"\D.*", "", parts[1]) or 0) + if len(parts) >= 3: + p_match = re.search(r"(\d+)", parts[2]) + patch = int(p_match.group(1)) if p_match else 0 + except (ValueError, IndexError): + pass + return major, minor, patch + + def __str__(self) -> str: + return self.raw + + def __eq__(self, other: object) -> bool: + if not isinstance(other, PackageVersion): + return NotImplemented + return ( + self.epoch == other.epoch + and self.major == other.major + and self.minor == other.minor + and self.patch == other.patch + and self.prerelease == other.prerelease + ) + + def __lt__(self, other: object) -> bool: + if not isinstance(other, PackageVersion): + return NotImplemented + + # Compare components in priority order + if self.epoch != other.epoch: + return self.epoch < other.epoch + if self.major != other.major: + return self.major < other.major + if self.minor != other.minor: + return self.minor < other.minor + if self.patch != other.patch: + return self.patch < other.patch + + # Pre-release comparison: pre-release < final release + if self.prerelease and not other.prerelease: + return True + if not self.prerelease and other.prerelease: + return False + if self.prerelease and other.prerelease: + return self.prerelease < other.prerelease + + return False + + +@dataclass +class UpdateInfo: + """Information about a package update.""" + + package_name: str + current_version: PackageVersion + new_version: PackageVersion + change_type: ChangeType + risk_level: RiskLevel + category: UpdateCategory + description: str = "" + changelog: str = "" + dependencies: list[str] = field(default_factory=list) + is_security: bool = False + breaking_changes: list[str] = field(default_factory=list) + recommended_action: str = "" + group: str = "" # For grouping related updates + + def to_dict(self) -> dict[str, Any]: + """Convert to a JSON-serializable dictionary.""" + return { + "package": self.package_name, + "current": str(self.current_version), + "new": str(self.new_version), + "risk": self.risk_level.value_str, + "type": self.change_type.value, + "is_security": self.is_security, + "breaking_changes": self.breaking_changes, + "group": self.group, + } + + +@dataclass +class UpdateRecommendation: + """Full update recommendation for a system.""" + + timestamp: str + total_updates: int + immediate_updates: list[UpdateInfo] = field(default_factory=list) + scheduled_updates: list[UpdateInfo] = field(default_factory=list) + deferred_updates: list[UpdateInfo] = field(default_factory=list) + security_updates: list[UpdateInfo] = field(default_factory=list) + groups: dict[str, list[UpdateInfo]] = field(default_factory=dict) + llm_analysis: str = "" + overall_risk: RiskLevel = RiskLevel.LOW + + def to_dict(self) -> dict[str, Any]: + """Convert to a JSON-serializable dictionary.""" + return { + "timestamp": self.timestamp, + "total_updates": self.total_updates, + "overall_risk": self.overall_risk.value_str, + "security_updates": [u.to_dict() for u in self.security_updates], + "immediate_updates": [u.to_dict() for u in self.immediate_updates], + "scheduled_updates": [u.to_dict() for u in self.scheduled_updates], + "deferred_updates": [u.to_dict() for u in self.deferred_updates], + "groups": {k: [u.package_name for u in v] for k, v in self.groups.items()}, + "llm_analysis": self.llm_analysis, + } + + +class UpdateRecommender: + """ + AI-powered update recommendation system. + + Analyzes installed packages, checks for updates, assesses risks, + and provides intelligent recommendations on when and what to update. + """ + + # Package groups for related updates + PACKAGE_GROUPS = { + "python": ["python3", "python3-pip", "python3-dev", "python3-venv"], + "docker": ["docker.io", "docker-ce", "docker-compose", "containerd"], + "postgresql": ["postgresql", "postgresql-client", "postgresql-contrib"], + "mysql": ["mysql-server", "mysql-client", "mariadb-server"], + "nginx": ["nginx", "nginx-common", "nginx-core"], + "nodejs": ["nodejs", "npm", "node-gyp"], + "php": ["php", "php-fpm", "php-mysql", "php-pgsql", "php-cli"], + "kernel": ["linux-image", "linux-headers", "linux-modules"], + "gcc": ["gcc", "g++", "cpp", "build-essential"], + "ssl": ["openssl", "libssl-dev", "ca-certificates"], + } + + # Known high-risk packages + HIGH_RISK_PACKAGES = { + "linux-image": "Kernel update - requires reboot", + "linux-headers": "Kernel headers - may break compiled modules", + "glibc": "Core library - system-wide impact", + "libc6": "Core library - system-wide impact", + "systemd": "Init system - critical for boot", + "grub": "Bootloader - could affect boot", + "docker": "Container runtime - affects running containers", + "postgresql": "Database - may require dump/restore", + "mysql": "Database - may require migration", + "openssl": "Encryption - may affect all TLS connections", + } + + # Security update indicators + SECURITY_INDICATORS = [ + "security", + "cve", + "vulnerability", + "exploit", + "patch", + "critical", + "urgent", + ] + + # Breaking change indicators for changelog scanning + BREAKING_CHANGE_INDICATORS = [ + "breaking change", + "backwards incompatible", + "deprecated", + "removed", + "migration required", + "manual action", + ] + + # Timeouts for external commands (in seconds) + DEFAULT_TIMEOUT = 30 + CHECK_UPDATE_TIMEOUT = 120 + + # Risk score thresholds + RISK_THRESHOLD_HIGH = 35 + RISK_THRESHOLD_MEDIUM = 15 + + # Risk score penalties + PENALTY_HIGH_IMPACT_PKG = 30 + PENALTY_MAJOR_VERSION = 40 + PENALTY_MINOR_VERSION = 15 + PENALTY_PATCH_VERSION = 5 + PENALTY_PRERELEASE = 25 + PENALTY_CHANGELOG_KEYWORD = 15 + PENALTY_HISTORY_FAILURE = 25 + PENALTY_MEMORY_ISSUE = 10 + + # UI and LLM limits + MAX_LLM_UPDATES = 10 + MAX_DISPLAY_UPDATES = 10 + + def __init__( + self, + llm_router: Any | None = None, + history: InstallationHistory | None = None, + memory: ContextMemory | None = None, + verbose: bool = False, + timeout: int | None = None, + check_timeout: int | None = None, + ) -> None: + """ + Initialize the Update Recommender. + + Args: + llm_router: Optional LLM router for AI-powered analysis + history: Optional installation history for learning + memory: Optional context memory for pattern recognition + verbose: Enable verbose output + timeout: Timeout for external commands + check_timeout: Timeout for update check commands + """ + self.llm_router = llm_router + self.verbose = verbose + self.timeout = timeout or self.DEFAULT_TIMEOUT + self.check_timeout = check_timeout or self.CHECK_UPDATE_TIMEOUT + + # Graceful initialization of subsystems + try: + self.history = history or InstallationHistory() + except (RuntimeError, OSError, ImportError) as e: + logger.warning("Installation history unavailable: %s", e) + self.history = None + + try: + self.memory = memory or ContextMemory() + except (RuntimeError, OSError, ImportError) as e: + # We use lazy logging formatting to satisfy SonarQube + logger.warning("Context memory unavailable: %s", e) + self.memory = None + + def _run_pkg_cmd(self, cmd: list[str], timeout: int | None = None) -> str | None: + """Internal helper to run package manager commands.""" + try: + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=timeout or self.timeout + ) + return result.stdout.strip() if result.returncode == 0 else None + except (subprocess.TimeoutExpired, FileNotFoundError): + return None + + def _get_package_metadata(self, package_name: str) -> tuple[str, str]: + """Fetch package description and changelog metadata.""" + description, changelog = "", "" + + # Try APT: parse line-by-line to avoid complex regex issues + output = self._run_pkg_cmd(["apt-cache", "show", package_name]) + if output: + desc_lines = [] + capturing = False + for line in output.splitlines(): + if line.startswith("Description"): + capturing = True + # Remove "Description-en: " or similar + clean_line = re.sub(r"^Description(?:-[\w-]+)?:\s*", "", line) + desc_lines.append(clean_line) + elif capturing: + if line.startswith(" "): + desc_lines.append(line.strip()) + else: + break + if desc_lines: + description = " ".join(desc_lines).strip() + else: + description = t("update_recommend.no_description") + + # Fetch changelog (best-effort, trimmed) + changelog_out = self._run_pkg_cmd(["apt-get", "changelog", package_name]) + if changelog_out: + changelog = "\n".join(changelog_out.splitlines()[:200]) + + return description, changelog + + # Try DNF/YUM + for pm in ("dnf", "yum"): + output = self._run_pkg_cmd([pm, "info", "-q", package_name]) + if output: + lines = output.splitlines() + for i, line in enumerate(lines): + if line.startswith("Description :"): + description = " ".join(lines[i:]).replace("Description :", "").strip() + break + + # Fetch changelog (best-effort, trimmed) + changelog_out = self._run_pkg_cmd([pm, "repoquery", "--changelog", package_name]) + if changelog_out: + changelog = "\n".join(changelog_out.splitlines()[:200]) + + return description or t("update_recommend.no_description"), changelog + + return description or t("update_recommend.no_description"), changelog + + def get_installed_packages(self) -> dict[str, PackageVersion]: + """Get all installed packages with their versions.""" + packages = {} + + # Query installed packages via dpkg-query (Debian/Ubuntu) + output = self._run_pkg_cmd(["dpkg-query", "-W", "-f=${Package} ${Version}\n"]) + if output: + for line in output.split("\n"): + parts = line.split(" ", 1) + if len(parts) == 2: + packages[parts[0]] = PackageVersion.parse(parts[1]) + return packages + + # Fallback to RPM query for RHEL/Fedora/Suse systems + output = self._run_pkg_cmd(["rpm", "-qa", "--qf", "%{NAME} %{VERSION}-%{RELEASE}\n"]) + if output: + for line in output.split("\n"): + parts = line.split(" ", 1) + if len(parts) == 2: + packages[parts[0]] = PackageVersion.parse(parts[1]) + return packages + + def get_available_updates(self) -> list[dict[str, Any]]: + """Get list of packages with available updates.""" + if shutil.which("apt-get") and shutil.which("apt"): + return self._get_apt_updates() + + return self._get_rpm_updates() + + def _get_apt_updates(self) -> list[dict[str, Any]]: + """Helper to get updates via APT.""" + updates = [] + if self._run_pkg_cmd(["apt-get", "update", "-q"], timeout=self.check_timeout) is None: + logger.warning("APT update check failed. Skipping APT updates.") + return updates + + output = self._run_pkg_cmd(["apt", "list", "--upgradable"], timeout=self.check_timeout) + if not output: + return updates + + for line in output.splitlines(): + # Pattern: package/suite new_version arch [upgradable from: old_version] + match = re.search( + r"^([^/\s]+)/([^\s]+)\s+([^\s]+)\s+[^\s]+\s+\[upgradable from:\s+([^\s]+)\]", + line, + ) + if match: + pkg, suite, new_v, old_v = match.groups() + updates.append( + {"name": pkg, "old_version": old_v, "new_version": new_v, "repo": suite} + ) + return updates + + def _get_rpm_updates(self) -> list[dict[str, Any]]: + """Helper to get updates via DNF/YUM.""" + updates = [] + for pm in ("dnf", "yum"): + try: + result = subprocess.run( + [pm, "check-update", "-q"], + capture_output=True, + text=True, + timeout=self.check_timeout, + ) + if result.returncode in (0, 100) and result.stdout: + installed = self.get_installed_packages() + updates.extend(self._parse_rpm_check_update(result.stdout, installed)) + if updates: + return updates + except (subprocess.TimeoutExpired, FileNotFoundError): + continue + except subprocess.SubprocessError as e: + logger.warning("Package manager check failed: %s", e) + continue + return updates + + def _parse_rpm_check_update( + self, output: str, installed: dict[str, PackageVersion] + ) -> list[dict[str, Any]]: + """Helper to parse DNF/YUM check-update output.""" + updates = [] + for line in output.strip().splitlines(): + parts = line.split() + if len(parts) < 2: + continue + + # Skip non-package/status lines (e.g., metadata expiration notices) + if not re.search(r"\d", parts[1]): + continue + + full_name = parts[0] + new_ver = parts[1] + repo = parts[2] if len(parts) >= 3 else "" + + # Resolve name: prefer architecture-specific if installed + name = self._resolve_rpm_name(full_name, installed) + current = installed.get(name) + old_ver = str(current) if current else "0.0.0" + + updates.append( + {"name": name, "old_version": old_ver, "new_version": new_ver, "repo": repo} + ) + return updates + + def _resolve_rpm_name(self, full_name: str, installed: dict[str, PackageVersion]) -> str: + """Resolve RPM package name by handling architecture suffixes.""" + if "." not in full_name: + return full_name + + name_no_arch = full_name.rsplit(".", 1)[0] + if full_name in installed: + return full_name + + return name_no_arch + + def analyze_change_type(self, current: PackageVersion, new: PackageVersion) -> ChangeType: + """Classify the semantic version delta between current and new versions.""" + if new.major > current.major: + return ChangeType.MAJOR + if new.minor > current.minor: + return ChangeType.MINOR + if new.patch > current.patch: + return ChangeType.PATCH + if str(new) != str(current): + return ChangeType.PATCH # Tie-breaker for alphanumeric patches + return ChangeType.UNKNOWN + + def assess_risk( + self, package_name: str, current: PackageVersion, new: PackageVersion, changelog: str = "" + ) -> tuple[RiskLevel, list[str]]: + """Assess update risk.""" + warnings, score = [], 0 + + # Score penalty for known high-impact system packages + for pkg, reason in self.HIGH_RISK_PACKAGES.items(): + if pkg in package_name.lower(): + score += self.PENALTY_HIGH_IMPACT_PKG + warnings.append(reason) + break + + # Score penalty based on Semantic Versioning delta severity + ctype = self.analyze_change_type(current, new) + score += { + ChangeType.MAJOR: self.PENALTY_MAJOR_VERSION, + ChangeType.MINOR: self.PENALTY_MINOR_VERSION, + ChangeType.PATCH: self.PENALTY_PATCH_VERSION, + }.get(ctype, 0) + if ctype == ChangeType.MAJOR: + warnings.append(f"Major version change ({current.major} β†’ {new.major})") + + # Additional penalty for unstable pre-release versions + if new.prerelease: + score += self.PENALTY_PRERELEASE + warnings.append(f"Pre-release version: {new.prerelease}") + + # Scan changelogs for keyword indicators of breaking changes + for ind in self.BREAKING_CHANGE_INDICATORS: + if ind in changelog.lower(): + score += self.PENALTY_CHANGELOG_KEYWORD + warnings.append(f"Changelog mentions: {ind}") + + # Map aggregate score to RiskLevel enum + level = self._map_score_to_risk(score) + + # Learning Enhancement: Check history to refine risk + hist_adjustment, hist_notes = self._get_historical_risk_adjustment(package_name) + if hist_adjustment: + score += hist_adjustment + warnings.extend(hist_notes) + # Re-evaluate level if score changed significantly + level = self._map_score_to_risk(score) + + return level, warnings + + def _map_score_to_risk(self, score: int) -> RiskLevel: + """Map aggregate risk score to RiskLevel enum.""" + if score >= self.RISK_THRESHOLD_HIGH: + return RiskLevel.HIGH + if score >= self.RISK_THRESHOLD_MEDIUM: + return RiskLevel.MEDIUM + return RiskLevel.LOW + + def _get_historical_risk_adjustment(self, package_name: str) -> tuple[int, list[str]]: + """Query history and memory to refine risk scores base on past performance.""" + adj, notes = 0, [] + + try: + h_adj, h_notes = self._check_history_risk(package_name) + adj += h_adj + notes.extend(h_notes) + + m_adj, m_notes = self._check_memory_risk(package_name) + adj += m_adj + notes.extend(m_notes) + except (OSError, AttributeError) as e: + logger.debug("Historical risk lookup failed: %s", e) + + return adj, notes + + def _check_history_risk(self, package_name: str) -> tuple[int, list[str]]: + """Check installation history for previous failures.""" + if not self.history: + return 0, [] + + past_records = self.history.get_history(limit=50) + for record in past_records: + if not record.packages or package_name not in record.packages: + continue + + if record.status in (InstallationStatus.FAILED, InstallationStatus.ROLLED_BACK): + return self.PENALTY_HISTORY_FAILURE, [ + "Historical instability: previous updates failed or were rolled back" + ] + + return 0, [] + + def _check_memory_risk(self, package_name: str) -> tuple[int, list[str]]: + """Check context memory for recurring issues.""" + if not self.memory: + return 0, [] + + memories = self.memory.get_similar_interactions(package_name, limit=5) + for m in memories: + if not m.success: + return self.PENALTY_MEMORY_ISSUE, [ + f"Memory: Previously caused issues during {m.action}" + ] + + return 0, [] + + def is_security_update( + self, package_name: str, changelog: str = "", description: str = "", repo: str = "" + ) -> bool: + """ + Determine if an update is security-related. + + Args: + package_name: Name of the package + changelog: Changelog content + description: Update description + repo: Origin repository or suite (e.g., 'jammy-security') + + Returns: + True if this appears to be a security update + """ + combined_text = f"{package_name} {changelog} {description} {repo}".lower() + + # Check for repo origin signals (high confidence) + if "security" in repo.lower(): + return True + + for indicator in self.SECURITY_INDICATORS: + if indicator in combined_text: + return True + + # Check for CVE pattern + if re.search(r"cve-\d{4}-\d+", combined_text, re.I): + return True + + return False + + def get_package_group(self, package_name: str) -> str: + """ + Get the group a package belongs to. + + Args: + package_name: Name of the package + + Returns: + Group name or empty string if not in a group + """ + for group_name, packages in self.PACKAGE_GROUPS.items(): + for pkg in packages: + if pkg in package_name.lower() or package_name.lower().startswith(pkg): + return group_name + return "" + + def categorize_update( + self, + risk_level: RiskLevel, + is_security: bool, + change_type: ChangeType, + ) -> UpdateCategory: + """ + Determine the recommended update category/timing. + + Args: + risk_level: Assessed risk level + is_security: Whether it's a security update + change_type: Type of version change + + Returns: + UpdateCategory for recommended timing + """ + # Security updates should be applied ASAP + if is_security: + return UpdateCategory.SECURITY + + # High risk or major updates should be deferred + if risk_level == RiskLevel.HIGH or change_type == ChangeType.MAJOR: + return UpdateCategory.DEFERRED + + # Low risk updates can go immediately + if risk_level == RiskLevel.LOW and change_type in ( + ChangeType.PATCH, + ChangeType.MINOR, + ): + return UpdateCategory.IMMEDIATE + + # Medium risk or minor updates for scheduled maintenance + if risk_level == RiskLevel.MEDIUM or change_type == ChangeType.MINOR: + return UpdateCategory.SCHEDULED + + # Default to scheduled for unknown cases + return UpdateCategory.SCHEDULED + + def generate_recommendation_text(self, update: UpdateInfo) -> str: + """Generate human-readable recommendation for an update.""" + # Use a mapping to avoid nested ternary expressions (SonarQube) + category_keys = { + UpdateCategory.SECURITY: "security_urgent", + UpdateCategory.IMMEDIATE: "safe_immediate", + UpdateCategory.SCHEDULED: "maintenance_window", + UpdateCategory.DEFERRED: "consider_deferring", + } + key = category_keys.get(update.category, "maintenance_window") + res = [t(f"update_recommend.recommendations.{key}")] + if update.change_type == ChangeType.MAJOR: + res.append( + t( + "update_recommend.recommendations.major_upgrade", + current=str(update.current_version), + new=str(update.new_version), + ) + ) + if update.breaking_changes: + res.append(t("update_recommend.recommendations.potential_breaking")) + res.extend(f" - {bc}" for bc in update.breaking_changes[:3]) + if update.group: + res.append(t("update_recommend.recommendations.part_of_group", group=update.group)) + return "\n".join(res) + + # Risk colors for display + RISK_COLORS = { + RiskLevel.LOW: "green", + RiskLevel.MEDIUM: "yellow", + RiskLevel.HIGH: "red", + RiskLevel.CRITICAL: "bold red", + } + + def analyze_with_llm(self, updates: list[UpdateInfo]) -> str: + """ + Use LLM to provide additional analysis of updates. + + Args: + updates: List of update information + + Returns: + LLM analysis text + """ + if not self.llm_router or not updates: + return "" + + try: + # Build a summary for the LLM + update_summary = [] + for u in updates[: self.MAX_LLM_UPDATES]: # Limit for context length + update_summary.append( + f"- {u.package_name}: {u.current_version} β†’ {u.new_version} " + f"({u.change_type.value}, {u.risk_level.value} risk)" + ) + + prompt = f"""Analyze these pending system updates and provide a brief recommendation: + +{chr(10).join(update_summary)} + +Provide: +1. Overall assessment (1-2 sentences) +2. Any specific concerns or recommendations +3. Suggested update order if dependencies exist + +Keep response concise (under 150 words).""" + + from cortex.llm_router import TaskType + + response = self.llm_router.complete( + messages=[{"role": "user", "content": prompt}], + task_type=TaskType.SYSTEM_OPERATION, + temperature=0.3, + max_tokens=300, + ) + + if not response or not hasattr(response, "content"): + return "" + + return response.content + + except (ImportError, RuntimeError, ConnectionError) as e: + logger.warning("LLM analysis context error: %s", e) + return "" + except Exception as e: + logger.error("Unexpected LLM analysis error: %s", e, exc_info=True) + return "" + + def get_recommendations(self, use_llm: bool = True) -> UpdateRecommendation: + """ + Get complete update recommendations for the system. + + Args: + use_llm: Whether to use LLM for additional analysis + + Returns: + UpdateRecommendation with categorized updates + """ + timestamp = datetime.now().isoformat() + updates = self.get_available_updates() + + if not updates: + return UpdateRecommendation( + timestamp=timestamp, + total_updates=0, + ) + + update_infos = [] + groups: dict[str, list[UpdateInfo]] = {} + + for update in updates: + pkg_name = update["name"] + old_ver = update["old_version"] + new_ver = update["new_version"] + repo = update.get("repo", "") + + # Fetch extra metadata for better analysis + description, changelog = self._get_package_metadata(pkg_name) + + current, new = PackageVersion.parse(old_ver), PackageVersion.parse(new_ver) + change_type = self.analyze_change_type(current, new) + risk_level, breaking_changes = self.assess_risk( + pkg_name, current, new, changelog=changelog + ) + group = self.get_package_group(pkg_name) + is_security = self.is_security_update( + pkg_name, changelog=changelog, description=description, repo=repo + ) + + info = UpdateInfo( + pkg_name, + current, + new, + change_type, + risk_level, + self.categorize_update(risk_level, is_security, change_type), + description=description, + changelog=changelog, + breaking_changes=breaking_changes, + group=group, + is_security=is_security, + ) + info.recommended_action = self.generate_recommendation_text(info) + update_infos.append(info) + if group: + groups.setdefault(group, []).append(info) + + # Categorize updates + immediate = [u for u in update_infos if u.category == UpdateCategory.IMMEDIATE] + scheduled = [u for u in update_infos if u.category == UpdateCategory.SCHEDULED] + deferred = [u for u in update_infos if u.category == UpdateCategory.DEFERRED] + security = [u for u in update_infos if u.category == UpdateCategory.SECURITY] + + # Determine overall risk + overall_risk = max( + (u.risk_level for u in update_infos), key=lambda x: x.value, default=RiskLevel.LOW + ) + + # Get LLM analysis if requested + llm_analysis = "" + if use_llm and self.llm_router: + llm_analysis = self.analyze_with_llm(update_infos) + + return UpdateRecommendation( + timestamp=timestamp, + total_updates=len(update_infos), + immediate_updates=immediate, + scheduled_updates=scheduled, + deferred_updates=deferred, + security_updates=security, + groups=groups, + llm_analysis=llm_analysis, + overall_risk=overall_risk, + ) + + def display_recommendations(self, recommendation: UpdateRecommendation) -> None: + """ + Display recommendations in a formatted output. + + Args: + recommendation: The update recommendation to display + """ + if recommendation.total_updates == 0: + console.print(f"[green]βœ… {t('update_recommend.no_updates')}[/green]") + return + + console.print() + overall_risk_display = t(f"update_recommend.risks.{recommendation.overall_risk.value_str}") + color = self.RISK_COLORS.get(recommendation.overall_risk, "white") + console.print( + Panel( + f"[bold cyan]πŸ“Š {t('update_recommend.header')}[/bold cyan]\n" + f"{t('update_recommend.total_updates', count=recommendation.total_updates)}\n" + f"{t('update_recommend.overall_risk', risk=f'[{color}]{overall_risk_display}[/]')}", + title="Cortex Update Recommender", + ) + ) + + # Security updates (highest priority) + if recommendation.security_updates: + console.print() + console.print(f"[bold red]πŸ”’ {t('update_recommend.categories.security')}:[/bold red]") + self._display_update_table(recommendation.security_updates) + + # Immediate updates + if recommendation.immediate_updates: + console.print() + console.print( + f"[bold green]βœ… {t('update_recommend.categories.immediate')}:[/bold green]" + ) + self._display_update_table(recommendation.immediate_updates) + + # Scheduled updates + if recommendation.scheduled_updates: + console.print() + console.print( + f"[bold yellow]πŸ“… {t('update_recommend.categories.scheduled')}:[/bold yellow]" + ) + self._display_update_table(recommendation.scheduled_updates) + + # Deferred updates + if recommendation.deferred_updates: + console.print() + console.print( + f"[bold magenta]⏸️ {t('update_recommend.categories.deferred')}:[/bold magenta]" + ) + self._display_update_table(recommendation.deferred_updates) + + # Related update groups + if recommendation.groups: + console.print() + console.print(f"[bold cyan]πŸ“¦ {t('update_recommend.categories.groups')}:[/bold cyan]") + for group_name, group_updates in recommendation.groups.items(): + update_names = [u.package_name for u in group_updates] + console.print( + f" [cyan]{group_name}[/cyan]: {', '.join(update_names[:5])}" + + (f" +{len(update_names) - 5} more" if len(update_names) > 5 else "") + ) + + # LLM Analysis + if recommendation.llm_analysis: + console.print() + console.print( + Panel( + recommendation.llm_analysis, + title=f"[bold]πŸ€– {t('update_recommend.ai_analysis')}[/bold]", + border_style="blue", + ) + ) + + def _display_update_table(self, updates: list[UpdateInfo]) -> None: + """Display a table of updates.""" + table = Table(show_header=True, header_style="bold", box=None) + table.add_column("Package", style="cyan") + table.add_column("Current", style="dim") + table.add_column("New", style="green") + table.add_column("Type") + table.add_column("Risk") + table.add_column("Notes") + + for update in updates[: self.MAX_DISPLAY_UPDATES]: # Limit display + risk_color = self.RISK_COLORS.get(update.risk_level, "white") + risk_display = t(f"update_recommend.risks.{update.risk_level.value_str}") + type_str = update.change_type.value + + notes = [] + if update.is_security: + notes.append(f"πŸ”’ {t('update_recommend.notes.security')}") + if update.breaking_changes: + notes.append( + f"⚠️ {t('update_recommend.notes.warnings', count=len(update.breaking_changes))}" + ) + if update.group: + notes.append(f"πŸ“¦ {t('update_recommend.notes.group', name=update.group)}") + + table.add_row( + update.package_name, + str(update.current_version), + str(update.new_version), + type_str, + f"[{risk_color}]{risk_display}[/]", + " | ".join(notes) if notes else "-", + ) + + if len(updates) > self.MAX_DISPLAY_UPDATES: + table.add_row( + t("update_recommend.more_updates", count=len(updates) - self.MAX_DISPLAY_UPDATES), + "", + "", + "", + "", + "", + ) + + console.print(table) + + +def recommend_updates( + use_llm: bool = True, + verbose: bool = False, +) -> int: + """ + Convenience function to run update recommendations. + + Args: + use_llm: Whether to use LLM for analysis + verbose: Enable verbose output + + Returns: + Exit code (0 for success) + """ + try: + # Try to get LLM router if available + llm_router = None + if use_llm: + try: + from cortex.llm_router import LLMRouter + + llm_router = LLMRouter() + except (ImportError, RuntimeError) as e: + logger.debug("LLM router not available: %s", e) + + recommender = UpdateRecommender( + llm_router=llm_router, + verbose=verbose, + ) + + recommendation = recommender.get_recommendations(use_llm=use_llm) + recommender.display_recommendations(recommendation) + + return 0 + + except (RuntimeError, subprocess.SubprocessError, OSError) as e: + console.print(f"[red]System Error: {e}[/red]") + return 1 + except Exception as e: + console.print(f"[red]Unexpected Error: {e}[/red]") + if verbose: + import traceback + + traceback.print_exc() + return 1 diff --git a/docs/SMART_UPDATE_RECOMMENDATIONS.md b/docs/SMART_UPDATE_RECOMMENDATIONS.md new file mode 100644 index 00000000..79b1f311 --- /dev/null +++ b/docs/SMART_UPDATE_RECOMMENDATIONS.md @@ -0,0 +1,274 @@ +# Smart Update Recommendations + +## Overview + +Cortex's Smart Update Recommender is an AI-powered system that analyzes your installed packages, checks for available updates, and provides intelligent recommendations on **when** and **what** to update. + +## Features + +- **Scan for Available Updates**: Automatically detects packages with pending updates +- **Risk Assessment**: Evaluates each update's potential impact on your system +- **Timing Recommendations**: Suggests optimal update windows based on risk level +- **Related Updates Grouping**: Groups updates for related packages (e.g., all PostgreSQL components) +- **Breaking Change Prediction**: Identifies potential breaking changes from major version updates +- **LLM Integration**: Uses AI to provide additional context and analysis + +## Usage + +### Basic Command + +```bash +cortex update recommend +``` + +### Example Output + +```text +πŸ“Š Update Analysis + +πŸ”’ Security Updates (Apply ASAP): + - openssl 1.1.1t β†’ 1.1.1u (CVE-2024-1234) + +βœ… Safe to Update Now (Low Risk): + - nginx 1.24.0 β†’ 1.25.0 (minor, security fix) + - curl 8.4.0 β†’ 8.5.0 (patch, bug fixes) + +πŸ“… Recommended for Maintenance Window: + - python3 3.11.4 β†’ 3.11.6 (minor) + - nodejs 18.18.0 β†’ 20.10.0 (major version) + +⏸️ Hold for Now: + - postgresql 14.10 β†’ 15.5 (major version, database migration required) + - docker 24.0 β†’ 25.0 (major, wait for stability reports) + +πŸ“¦ Related Update Groups: + - postgresql: postgresql, postgresql-client, postgresql-contrib + - docker: docker.io, containerd + +πŸ€– AI Analysis: + Most updates are safe to apply. However, the PostgreSQL update requires + a major version migration. Consider backing up your databases before + proceeding. The Docker update should be deferred until version 25.0.1 + addresses reported container networking issues. +``` + +### Command Options + +| Option | Description | +|--------|-------------| +| `--no-llm` | Disable LLM-powered analysis (faster, works offline) | +| `--json` | Output recommendations in JSON format for scripting | + +### JSON Output Example + +```bash +cortex update recommend --json +``` + +```json +{ + "timestamp": "2024-01-15T10:30:00", + "total_updates": 8, + "overall_risk": "medium", + "security_updates": [ + { + "package": "openssl", + "current": "1.1.1t", + "new": "1.1.1u", + "risk": "low", + "type": "patch" + } + ], + "immediate_updates": [...], + "scheduled_updates": [...], + "deferred_updates": [ + { + "package": "postgresql", + "current": "14.10", + "new": "15.5", + "risk": "high", + "type": "major", + "breaking_changes": [ + "Major version change (14 β†’ 15)", + "Database - may require dump/restore" + ] + } + ], + "groups": { + "postgresql": ["postgresql", "postgresql-client", "postgresql-contrib"] + }, + "llm_analysis": "..." +} +``` + +## Update Categories + +### πŸ”’ Security Updates +Priority: **Critical** - Apply as soon as possible + +These updates address known security vulnerabilities. They are typically: +- Patched for specific CVEs +- Low risk to system stability +- Essential for system security + +**Recommended Action**: Apply immediately, ideally within 24-48 hours. + +### βœ… Safe to Update Now (Immediate) +Priority: **Low Risk** - Safe for immediate installation + +Updates in this category: +- Are patch or minor version updates +- Have no known breaking changes +- Don't affect critical system components + +**Recommended Action**: Apply at your convenience. + +### πŸ“… Recommended for Maintenance Window (Scheduled) +Priority: **Medium Risk** - Plan for scheduled maintenance + +These updates: +- May require service restarts +- Could have minor compatibility changes +- Include new features that may affect workflows + +**Recommended Action**: Apply during planned maintenance windows, preferably off-peak hours. + +### ⏸️ Hold for Now (Deferred) +Priority: **High Risk** - Exercise caution + +Updates flagged for deferral: +- Are major version upgrades +- May include breaking changes +- Affect critical infrastructure (databases, kernel, etc.) +- Are pre-release or recently released versions + +**Recommended Action**: Wait for stability reports, plan migration carefully, and test in staging environment first. + +## Risk Assessment Criteria + +The risk level is determined by multiple factors: + +| Factor | Impact on Risk | +|--------|---------------| +| **Version Change Type** | | +| - Patch (X.Y.Z β†’ X.Y.Z+1) | Low (+5) | +| - Minor (X.Y β†’ X.Y+1) | Low-Medium (+15) | +| - Major (X β†’ X+1) | High (+40) | +| **Package Importance** | | +| - Kernel (linux-image) | High (+30) | +| - Core libraries (glibc, libc6) | High (+30) | +| - System services (systemd) | High (+30) | +| - Databases (postgresql, mysql) | High (+30) | +| **Version Stability** | | +| - Pre-release (alpha, beta, rc) | High (+25) | +| **Changelog Analysis** | | +| - Mentions "breaking change" | Medium (+15) | +| - Mentions "deprecated" | Medium (+15) | +| - Mentions "migration required" | Medium (+15) | + +### Risk Score Thresholds + +- **Low**: Score < 15 +- **Medium**: Score 15-34 +- **High**: Score β‰₯ 35 + +## Package Grouping + +Related packages are automatically grouped to help you update them together: + +| Group | Packages | +|-------|----------| +| `python` | python3, python3-pip, python3-dev | +| `docker` | docker.io, docker-ce, containerd | +| `postgresql` | postgresql, postgresql-client, postgresql-contrib | +| `mysql` | mysql-server, mysql-client, mariadb-server | +| `nginx` | nginx, nginx-common, nginx-core | +| `nodejs` | nodejs, npm, node-gyp | +| `kernel` | linux-image, linux-headers, linux-modules | +| `ssl` | openssl, libssl-dev, ca-certificates | + +## Update Strategies + +### Strategy 1: Rolling Updates (Recommended for Most Users) + +1. **Daily**: Apply security updates +2. **Weekly**: Apply low-risk immediate updates +3. **Monthly**: Apply scheduled updates during maintenance window +4. **Quarterly**: Evaluate and plan deferred updates + +### Strategy 2: Stability-First (Production Servers) + +1. Test all updates in staging environment first +2. Apply security updates within 48 hours +3. Batch other updates monthly +4. Defer major version updates until stability is confirmed + +### Strategy 3: Always Current (Development Machines) + +1. Apply immediate and scheduled updates weekly +2. Consider early adoption of deferred updates for testing +3. Keep multiple system snapshots for quick rollback + +## Best Practices + +### Before Updating + +1. **Back up critical data**: Especially before database or kernel updates +2. **Check changelogs**: Review breaking changes for deferred updates +3. **Test in staging**: Major updates should be tested first +4. **Plan rollback**: Know how to revert if issues arise + +### After Updating + +1. **Verify services**: Check that critical services are running +2. **Monitor logs**: Watch for errors in system and application logs +3. **Test functionality**: Validate key workflows still work +4. **Document changes**: Keep record of what was updated and when + +### For Major Version Updates + +1. **Read migration guides**: Official documentation often provides migration steps +2. **Check compatibility**: Ensure dependent applications support the new version +3. **Schedule downtime**: Major updates may require service interruption +4. **Have a rollback plan**: Snapshot VMs or have package backups ready + +## Integration with Other Tools + +### Cron-based Automation + +```bash +# Check for updates daily and log results +0 8 * * * /usr/local/bin/cortex update recommend --no-llm --json >> /var/log/cortex-updates.json +``` + +### CI/CD Pipelines + +```yaml +# GitHub Actions example +- name: Check for system updates + run: | + cortex update recommend --json > updates.json + if jq -e '.security_updates | length > 0' updates.json; then + echo "::warning::Security updates available" + fi +``` + +## Troubleshooting + +### "No updates available" +- Run `sudo apt update` or equivalent to refresh package cache +- Check network connectivity to package repositories + +### LLM analysis not working +- Use `--no-llm` flag for offline operation +- Check API key configuration with `cortex config show` + +### Slow analysis +- Large number of updates may take time +- Use `--no-llm` for faster results without AI analysis + +## See Also + +- `cortex update check` - Check for Cortex self-updates +- `cortex update install` - Install Cortex updates +- `cortex install` - Install system packages with AI assistance diff --git a/docs/guides/Developer-Guide.md b/docs/guides/Developer-Guide.md index a123da33..451fad4a 100644 --- a/docs/guides/Developer-Guide.md +++ b/docs/guides/Developer-Guide.md @@ -35,6 +35,7 @@ cortex/ β”‚ β”œβ”€β”€ config_templates.py # Config generation β”‚ β”œβ”€β”€ logging_system.py # Logging & diagnostics β”‚ β”œβ”€β”€ context_memory.py # AI memory system +β”‚ └── update_recommender.py # AI-powered update recommendations β”‚ └── predictive_prevention.py # Pre-install risk analysis β”œβ”€β”€ tests/ β”‚ └── test_*.py # Unit tests @@ -87,6 +88,12 @@ Context Memory (learns patterns) - Optimization recommendations - Driver compatibility +**Update Recommender (`update_recommender.py`)** +- AI-powered update analysis +- Risk assessment per update +- Timing recommendations (Immediate vs. Deferred) +- Related update grouping + ## Contributing ### Claiming Issues diff --git a/docs/guides/User-Guide.md b/docs/guides/User-Guide.md index fe08ac95..19fbf1a9 100644 --- a/docs/guides/User-Guide.md +++ b/docs/guides/User-Guide.md @@ -30,8 +30,22 @@ cortex rollback # Rollback to specific point cortex rollback --to + +# Get smart update recommendations +cortex update recommend + +# Get recommendations in JSON format (for scripts) +cortex update recommend --json ``` +### Update Recommendations + +Cortex uses AI to analyze available updates and categorize them by risk: +- **Security Updates**: Critical fixes that should be applied immediately. +- **Safe to Update**: Low-risk updates (patches/minor) safe for now. +- **Maintenance Window**: Medium-risk updates that may need a restart. +- **Hold for Now**: High-risk or major updates that need careful planning. + ### Simulation Mode Test installations without making changes: diff --git a/tests/test_update_recommender.py b/tests/test_update_recommender.py new file mode 100644 index 00000000..dc0f2fb7 --- /dev/null +++ b/tests/test_update_recommender.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +""" +Unit tests for the Smart Update Recommender. +Validates version parsing, risk scoring, and categorization logic. +""" + +import json +import re +import subprocess +import sys +from unittest.mock import MagicMock, patch + +import pytest + +from cortex.update_recommender import ( + ChangeType, + PackageVersion, + RiskLevel, + UpdateCategory, + UpdateInfo, + UpdateRecommendation, + UpdateRecommender, + recommend_updates, +) + + +class TestPackageVersion: + @pytest.mark.parametrize( + "version_str, expected", + [ + ("1.2.3", (1, 2, 3, 0, "")), + ("v2.0.0", (2, 0, 0, 0, "")), + ("1:2.3.4", (2, 3, 4, 1, "")), + ("2.0.0-beta1", (2, 0, 0, 0, "-beta1")), + ("", (0, 0, 0, 0, "")), + (None, (0, 0, 0, 0, "")), + ("1.1.1f", (1, 1, 1, 0, "")), + ("abc:1.2.3", (1, 2, 3, 0, "")), # Invalid epoch + ("1.2", (1, 2, 0, 0, "")), + ("1", (1, 0, 0, 0, "")), + ("1.2.3~rc1", (1, 2, 3, 0, "~rc1")), + ], + ) + def test_parse(self, version_str, expected): + v = PackageVersion.parse(version_str) + assert (v.major, v.minor, v.patch, v.epoch) == expected[:4] + if expected[4]: + assert expected[4].lower() in v.prerelease.lower() + + def test_comparisons(self): + v1 = PackageVersion.parse("1.2.3") + v2 = PackageVersion.parse("1.2.4") + v3 = PackageVersion.parse("1.3.0") + v4 = PackageVersion.parse("2.0.0") + v5 = PackageVersion.parse("1:1.0.0") + v6 = PackageVersion.parse("1.2.3-beta") + + assert v1 < v2 + assert v2 < v3 + assert v3 < v4 + assert v4 < v5 + assert v6 < v1 # Pre-release is less than final + assert v1 >= v6 + assert v1 == PackageVersion.parse("1.2.3") + assert str(v1) == "1.2.3" + + +class TestUpdateRecommender: + @pytest.fixture + def r(self): + return UpdateRecommender(verbose=True) + + def test_enums_and_groups(self, r): + assert RiskLevel.LOW.value_str == "low" + assert r.get_package_group("python3-dev") == "python" + assert r.get_package_group("nginx") == "nginx" + assert r.get_package_group("unknown-pkg") == "" + + @pytest.mark.parametrize( + "curr, new, expected", + [ + ("1.0.0", "2.0.0", ChangeType.MAJOR), + ("1.0.0", "1.1.0", ChangeType.MINOR), + ("1.0.0", "1.0.1", ChangeType.PATCH), + ("1.1.1", "1.1.1f", ChangeType.PATCH), + ("1.0.0", "1.0.0", ChangeType.UNKNOWN), + ], + ) + def test_change_analysis(self, r, curr, new, expected): + assert ( + r.analyze_change_type(PackageVersion.parse(curr), PackageVersion.parse(new)) == expected + ) + + @patch("cortex.update_recommender.subprocess.run") + def test_get_package_metadata(self, mock_run, r): + # Test APT success path + mock_run.side_effect = [ + MagicMock(returncode=0, stdout="Description: A test package\n Full description here."), + MagicMock(returncode=0, stdout="* v1.1.0: fixed security hole\n" + "* line\n" * 250), + ] + desc, changelog = r._get_package_metadata("test-pkg") + assert "A test package" in desc + assert "fixed security hole" in changelog + assert len(changelog.splitlines()) <= 200 # Truncation check + assert mock_run.call_args_list[0][0][0] == ["apt-cache", "show", "test-pkg"] + assert mock_run.call_args_list[1][0][0] == ["apt-get", "changelog", "test-pkg"] + + mock_run.reset_mock() + # Test DNF success path (APT fails) + mock_run.side_effect = [ + MagicMock(returncode=1), # apt-cache show fail + MagicMock(returncode=0, stdout="Description : A DNF test package"), + MagicMock(returncode=0, stdout="* Mon Jan 01 2024 User - 1.0.1-1\n- Breaking change"), + ] + desc, changelog = r._get_package_metadata("test-pkg") + assert "A DNF test package" in desc + assert "Breaking change" in changelog + assert mock_run.call_args_list[1][0][0] == ["dnf", "info", "-q", "test-pkg"] + assert mock_run.call_args_list[2][0][0] == ["dnf", "repoquery", "--changelog", "test-pkg"] + + mock_run.reset_mock() + # Test YUM fallback path (APT and DNF fail) + mock_run.side_effect = [ + MagicMock(returncode=1), # apt-cache show fail + MagicMock(returncode=1), # dnf info fail + MagicMock(returncode=0, stdout="Description : A YUM test package"), + MagicMock(returncode=0, stdout="* Mon Jan 01 2024 User - 1.0.1-1\n- Breaking change"), + ] + desc, changelog = r._get_package_metadata("test-pkg") + assert "A YUM test package" in desc + assert "Breaking change" in changelog + assert mock_run.call_args_list[2][0][0] == ["yum", "info", "-q", "test-pkg"] + assert mock_run.call_args_list[3][0][0] == ["yum", "repoquery", "--changelog", "test-pkg"] + + def test_risk_assessment_branches(self, r): + # High risk package + major version + risk, warns = r.assess_risk( + "linux-image-generic", PackageVersion.parse("5.15"), PackageVersion.parse("6.0") + ) + assert risk == RiskLevel.HIGH + assert any("Kernel" in w for w in warns) + + # Pre-release risk + risk, warns = r.assess_risk( + "some-pkg", PackageVersion.parse("1.0"), PackageVersion.parse("1.1-beta") + ) + assert risk == RiskLevel.HIGH + + # Changelog keywords + risk, warns = r.assess_risk( + "some-pkg", + PackageVersion.parse("1.0"), + PackageVersion.parse("1.0.1"), + "Breaking change and deprecated", + ) + assert risk == RiskLevel.HIGH + + def test_security_detection(self, r): + assert r.is_security_update("pkg", "High CVE-2024-0001 fix") + assert r.is_security_update("pkg", "bug fixes", "security patch") + assert not r.is_security_update("some-pkg", "random update") + + def test_categorization_matrix(self, r): + tests = [ + (RiskLevel.LOW, True, ChangeType.PATCH, UpdateCategory.SECURITY), + (RiskLevel.LOW, False, ChangeType.PATCH, UpdateCategory.IMMEDIATE), + (RiskLevel.LOW, False, ChangeType.MINOR, UpdateCategory.IMMEDIATE), + (RiskLevel.MEDIUM, False, ChangeType.MINOR, UpdateCategory.SCHEDULED), + (RiskLevel.HIGH, False, ChangeType.MINOR, UpdateCategory.DEFERRED), + (RiskLevel.LOW, False, ChangeType.MAJOR, UpdateCategory.DEFERRED), + (RiskLevel.LOW, False, ChangeType.UNKNOWN, UpdateCategory.SCHEDULED), + ] + for risk, sec, ctype, expected in tests: + assert r.categorize_update(risk, sec, ctype) == expected + + def test_recommendation_text_branches(self, r): + # Security updates should highlight urgent priority + u = UpdateInfo( + "p", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.PATCH, + RiskLevel.LOW, + UpdateCategory.SECURITY, + is_security=True, + ) + assert "Security update" in r.generate_recommendation_text(u) + + # Major upgrades should flag potential breaking changes + u = UpdateInfo( + "p", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.MAJOR, + RiskLevel.HIGH, + UpdateCategory.DEFERRED, + breaking_changes=["broken"], + ) + assert "Potential breaking" in r.generate_recommendation_text(u) + + # Grouped updates should mention their parent category + u = UpdateInfo( + "p", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.MINOR, + RiskLevel.MEDIUM, + UpdateCategory.SCHEDULED, + group="python", + ) + assert "part of python" in r.generate_recommendation_text(u).lower() + + @patch("cortex.update_recommender.shutil.which") + @patch("cortex.update_recommender.subprocess.run") + def test_pkg_manager_interactions(self, mock_run, mock_which, r): + mock_which.return_value = True # Default to APT present + # Verify DPKG version parsing (Debian/Ubuntu) + mock_run.return_value = MagicMock(returncode=0, stdout="pkg1 1.0\npkg2 2.0") + pkgs = r.get_installed_packages() + assert "pkg1" in pkgs + + # Verify RPM version parsing fallback (Fedora/RHEL) + mock_run.side_effect = [MagicMock(returncode=1), MagicMock(returncode=0, stdout="pkg3 3.0")] + pkgs = r.get_installed_packages() + assert "pkg3" in pkgs + + # Simulate APT upgradable list output + mock_run.side_effect = [ + MagicMock(returncode=0), # apt-get update + MagicMock(returncode=0, stdout="nginx/jammy 1.25.0 amd64 [upgradable from: 1.24.0]"), + ] + updates = r.get_available_updates() + assert len(updates) == 1 + assert updates[0]["name"] == "nginx" + assert updates[0]["old_version"] == "1.24.0" + assert updates[0]["new_version"] == "1.25.0" + assert "jammy" in updates[0]["repo"] + + # Simulate DNF check-update (exit 100 indicates available updates) + mock_which.return_value = False # Simulate APT not present + mock_run.side_effect = [ + MagicMock( + returncode=100, + stdout="Last metadata expiration check: 1:00:00 ago\ncurl.x86_64 8.5.0 updates", + ), # dnf check-update + MagicMock(returncode=0, stdout="Version : 8.4.0"), # dnf info + ] + updates = r.get_available_updates() + assert len(updates) == 1 and updates[0]["name"] == "curl" + + # Handle command timeout or missing manager scenarios + mock_run.side_effect = subprocess.TimeoutExpired(["cmd"], 30) + assert r._run_pkg_cmd(["cmd"]) is None + + @patch.object(UpdateRecommender, "_get_package_metadata") + @patch.object(UpdateRecommender, "get_available_updates") + def test_get_recommendations_full(self, mock_get, mock_meta, r): + mock_get.return_value = [ + {"name": "nginx", "old_version": "1.24.0", "new_version": "1.25.0", "repo": "updates"}, + {"name": "postgresql", "old_version": "14.0", "new_version": "15.0", "repo": "updates"}, + ] + mock_meta.return_value = ("desc", "changelog") + rec = r.get_recommendations(use_llm=False) + assert rec.total_updates == 2 + assert rec.overall_risk == RiskLevel.HIGH + + # Verify LLM analysis integration + mock_router = MagicMock() + mock_router.complete.return_value = MagicMock(content="AI analysis") + r.llm_router = mock_router + with patch.dict("sys.modules", {"cortex.llm_router": MagicMock(TaskType=MagicMock())}): + rec = r.get_recommendations(use_llm=True) + assert rec.llm_analysis == "AI analysis" + + # Ensure robustness if LLM provider returns an error + mock_router.complete.side_effect = Exception("error") + assert r.analyze_with_llm(rec.immediate_updates) == "" + + def test_display_logic(self, r, capsys): + # Create a sample recommendation with mixed risk levels + u1 = UpdateInfo( + "p1", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.PATCH, + RiskLevel.LOW, + UpdateCategory.SECURITY, + is_security=True, + ) + u2 = UpdateInfo( + "p2", + PackageVersion.parse("1"), + PackageVersion.parse("1.1"), + ChangeType.MINOR, + RiskLevel.LOW, + UpdateCategory.IMMEDIATE, + ) + u3 = UpdateInfo( + "p3", + PackageVersion.parse("1"), + PackageVersion.parse("1.2"), + ChangeType.MINOR, + RiskLevel.MEDIUM, + UpdateCategory.SCHEDULED, + ) + u4 = UpdateInfo( + "p4", + PackageVersion.parse("1"), + PackageVersion.parse("2"), + ChangeType.MAJOR, + RiskLevel.HIGH, + UpdateCategory.DEFERRED, + group="db", + ) + + rec = UpdateRecommendation( + "now", + 4, + immediate_updates=[u2], + scheduled_updates=[u3], + deferred_updates=[u4], + security_updates=[u1], + groups={"db": [u4]}, + overall_risk=RiskLevel.HIGH, + ) + + r.display_recommendations(rec) + out = capsys.readouterr().out + assert "Update Analysis" in out + assert "Security Updates" in out + assert "Hold for Now" in out + + # Verify table truncation logic for large update lists + updates = [u2] * 12 + r._display_update_table(updates) + out = capsys.readouterr().out + assert "more" in out.lower() + + # Ensure clean output for healthy systems + r.display_recommendations(UpdateRecommendation("now", 0)) + assert "up to date" in capsys.readouterr().out.lower() + + +@patch("cortex.update_recommender.UpdateRecommender") +def test_convenience_function(mock_class): + mock_instance = mock_class.return_value + mock_instance.get_recommendations.return_value = UpdateRecommendation("now", 0) + assert recommend_updates() == 0 + + # Error path + mock_class.side_effect = Exception("error") + assert recommend_updates(verbose=True) == 1