diff --git a/cortex/cli.py b/cortex/cli.py index 267228b0e..f8d9b07b7 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -5428,6 +5428,29 @@ def main(): help="Enable verbose output", ) + # Netplan Validator + netplan_parser = subparsers.add_parser( + "netplan", help="Validate and manage Netplan network configuration" + ) + netplan_parser.add_argument( + "action", choices=["validate", "diff", "apply", "dry-run"], help="Action to perform" + ) + netplan_parser.add_argument( + "config_file", nargs="?", help="Path to netplan config file (auto-detects if not provided)" + ) + netplan_parser.add_argument( + "--new-config", help="Path to new config file for diff/apply operations" + ) + netplan_parser.add_argument( + "--timeout", + type=int, + default=60, + help="Auto-revert timeout in seconds for dry-run mode (default: 60)", + ) + netplan_parser.add_argument( + "-v", "--verbose", action="store_true", help="Enable verbose output" + ) + args = parser.parse_args() # Configure logging based on parsed arguments @@ -5646,6 +5669,70 @@ def main(): action=getattr(args, "action", "check"), verbose=getattr(args, "verbose", False), ) + elif args.command == "netplan": + from cortex.netplan_validator import NetplanValidator + + try: + validator = NetplanValidator(args.config_file) + + if args.action == "validate": + # Validate configuration + result = validator.validate_file() + validator.print_validation_results(result) + return 0 if result.is_valid else 1 + + elif args.action == "diff": + # Show diff between current and new config + if not args.new_config: + console.print("[red]Error: --new-config required for diff[/red]") + return 1 + validator.show_diff(args.new_config) + return 0 + + elif args.action == "apply": + # Apply new configuration + if not args.new_config: + console.print("[red]Error: --new-config required for apply[/red]") + return 1 + + # Show diff first + validator.show_diff(args.new_config) + console.print() + + # Confirm with user + from rich.prompt import Confirm + + if not Confirm.ask("[yellow]Apply this configuration?[/yellow]"): + console.print("[yellow]Cancelled[/yellow]") + return 0 + + success, message = validator.apply_config(args.new_config) + if success: + console.print(f"[green]✓[/green] {message}") + return 0 + else: + console.print(f"[red]✗[/red] {message}") + return 1 + + elif args.action == "dry-run": + # Dry-run with auto-revert + if not args.new_config: + console.print("[red]Error: --new-config required for dry-run[/red]") + return 1 + + confirmed = validator.dry_run_with_revert(args.new_config, args.timeout) + return 0 if confirmed else 1 + + except FileNotFoundError as e: + console.print(f"[red]Error:[/red] {str(e)}") + return 1 + except Exception as e: + console.print(f"[red]Unexpected error:[/red] {str(e)}") + if args.verbose: + import traceback + + traceback.print_exc() + return 1 else: parser.print_help() return 1 diff --git a/cortex/netplan_validator.py b/cortex/netplan_validator.py new file mode 100644 index 000000000..e07092ea9 --- /dev/null +++ b/cortex/netplan_validator.py @@ -0,0 +1,848 @@ +""" +Netplan/NetworkManager Configuration Validator + +Validates network configuration YAML files before applying them to prevent +network outages from simple typos. Provides semantic validation, diff preview, +dry-run mode with auto-revert, and plain English error messages. + +Addresses GitHub Issue #445 - Network Config Validator +""" + +import difflib +import ipaddress +import logging +import re +import shutil +import subprocess +import threading +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml +from rich.console import Console +from rich.panel import Panel + +logger = logging.getLogger(__name__) +console = Console() + + +@dataclass +class ValidationResult: + """Result of a validation check.""" + + is_valid: bool + errors: list[str] + warnings: list[str] + info: list[str] + + +@dataclass +class NetworkInterface: + """Represents a network interface configuration.""" + + name: str + dhcp4: bool = False + dhcp6: bool = False + addresses: list[str] = field(default_factory=list) + gateway4: str | None = None + gateway6: str | None = None + nameservers: dict[str, Any] = field(default_factory=dict) + routes: list[dict[str, str]] = field(default_factory=list) + + +class NetplanValidator: + """ + Validates Netplan network configuration files. + + Features: + - YAML syntax validation + - Semantic validation (IPs, routes, gateways) + - Configuration diff preview + - Dry-run mode with auto-revert timer + - Plain English error messages + """ + + NETPLAN_DIR = Path("/etc/netplan") + BACKUP_DIR = Path.home() / ".cortex" / "netplan_backups" + DEFAULT_REVERT_TIMEOUT = 60 # seconds + + def __init__(self, config_file: Path | str | None = None): + """ + Initialize the validator. + + Args: + config_file: Path to netplan YAML file to validate. + If None, will find the first .yaml file in /etc/netplan + """ + self.config_file = Path(config_file) if config_file else self._find_netplan_config() + self.backup_dir = self.BACKUP_DIR + self.backup_dir.mkdir(parents=True, exist_ok=True) + + def _find_netplan_config(self) -> Path: + """ + Find the netplan configuration file. + + Returns: + Path to the netplan config file + + Raises: + FileNotFoundError: If no netplan config found + """ + if not self.NETPLAN_DIR.exists(): + raise FileNotFoundError(f"Netplan directory {self.NETPLAN_DIR} not found") + + yaml_files = list(self.NETPLAN_DIR.glob("*.yaml")) + # Also check for .yml extension + yml_files = list(self.NETPLAN_DIR.glob("*.yml")) + all_files = yaml_files + yml_files + + if not all_files: + raise FileNotFoundError(f"No .yaml or .yml files found in {self.NETPLAN_DIR}") + + # Use the first yaml file (typically 00-installer-config.yaml or 01-netcfg.yaml) + return sorted(all_files)[0] + + def validate_yaml_syntax(self, content: str) -> ValidationResult: + """ + Validate YAML syntax. + + Args: + content: YAML content to validate + + Returns: + ValidationResult with syntax validation results + """ + errors = [] + warnings = [] + info = [] + + try: + data = yaml.safe_load(content) + if data is None: + errors.append("YAML file is empty") + return ValidationResult(False, errors, warnings, info) + + info.append("✓ YAML syntax is valid") + return ValidationResult(True, errors, warnings, info) + + except yaml.YAMLError as e: + error_msg = str(e) + # Extract line and column info for better error messages + if hasattr(e, "problem_mark"): + mark = e.problem_mark + errors.append( + f"YAML syntax error at line {mark.line + 1}, column {mark.column + 1}: {e.problem}" + ) + else: + errors.append(f"YAML syntax error: {error_msg}") + + return ValidationResult(False, errors, warnings, info) + + def validate_ip_address(self, ip_str: str, allow_cidr: bool = True) -> tuple[bool, str]: + """ + Validate an IP address. + + Args: + ip_str: IP address string to validate + allow_cidr: Whether to allow CIDR notation (e.g., 192.168.1.1/24) + + Returns: + Tuple of (is_valid, error_message) + """ + try: + if "/" in ip_str: + if not allow_cidr: + return False, f"CIDR notation not allowed for '{ip_str}'" + # Validate CIDR notation + ipaddress.ip_network(ip_str, strict=False) + else: + # Validate plain IP + ipaddress.ip_address(ip_str) + return True, "" + except ValueError as e: + return False, f"Invalid IP address '{ip_str}': {str(e)}" + + def validate_route(self, route: dict[str, str]) -> tuple[bool, list[str]]: + """ + Validate a route configuration. + + Args: + route: Route dictionary with 'to' and 'via' keys + + Returns: + Tuple of (is_valid, list of errors) + """ + errors = [] + + if "to" not in route: + errors.append("Route missing required 'to' field") + return False, errors + + if "via" not in route: + errors.append("Route missing required 'via' field") + return False, errors + + # Validate destination network + valid, error = self.validate_ip_address(route["to"], allow_cidr=True) + if not valid: + errors.append(f"Invalid route destination: {error}") + + # Validate gateway + valid, error = self.validate_ip_address(route["via"], allow_cidr=False) + if not valid: + errors.append(f"Invalid route gateway: {error}") + + return len(errors) == 0, errors + + def validate_semantics(self, config: dict[str, Any]) -> ValidationResult: + """ + Validate semantic correctness of network configuration. + + Args: + config: Parsed YAML configuration + + Returns: + ValidationResult with semantic validation results + """ + errors = [] + warnings = [] + info = [] + + # Check for required 'network' key + if "network" not in config: + errors.append("Configuration must have a 'network' key") + return ValidationResult(False, errors, warnings, info) + + network = config["network"] + + # Validate version + if "version" not in network: + warnings.append("Missing 'version' key (recommended: version: 2)") + elif network["version"] != 2: + warnings.append(f"Using version {network['version']}, version 2 is recommended") + + # Validate ethernet interfaces + if "ethernets" in network: + if not isinstance(network["ethernets"], dict): + errors.append("'ethernets' must be a mapping/dictionary") + else: + for iface_name, iface_config in network["ethernets"].items(): + self._validate_interface(iface_name, iface_config, errors, warnings, info) + + # Validate WiFi interfaces + if "wifis" in network: + if not isinstance(network["wifis"], dict): + errors.append("'wifis' must be a mapping/dictionary") + else: + for iface_name, iface_config in network["wifis"].items(): + self._validate_interface(iface_name, iface_config, errors, warnings, info) + + # Validate bridges + if "bridges" in network: + if not isinstance(network["bridges"], dict): + errors.append("'bridges' must be a mapping/dictionary") + else: + for bridge_name, bridge_config in network["bridges"].items(): + self._validate_interface(bridge_name, bridge_config, errors, warnings, info) + + if not errors: + info.append("✓ Semantic validation passed") + + return ValidationResult(len(errors) == 0, errors, warnings, info) + + def _validate_interface( + self, + iface_name: str, + iface_config: dict[str, Any], + errors: list[str], + warnings: list[str], + info: list[str], + ) -> None: + """ + Validate a single network interface configuration. + + Args: + iface_name: Interface name + iface_config: Interface configuration dictionary + errors: List to append errors to + warnings: List to append warnings to + info: List to append info messages to + """ + # Type check: ensure iface_config is a mapping/dict + if not isinstance(iface_config, dict): + errors.append( + f"Interface '{iface_name}' config must be a mapping/dictionary, got {type(iface_config).__name__}" + ) + return + + # Validate interface name format + if not re.match(r"^[a-zA-Z0-9_-]+$", iface_name): + errors.append( + f"Invalid interface name '{iface_name}': must contain only alphanumeric, dash, or underscore" + ) + + # Check for DHCP vs static IP conflict + has_dhcp4 = iface_config.get("dhcp4", False) + has_dhcp6 = iface_config.get("dhcp6", False) + has_addresses = "addresses" in iface_config and iface_config["addresses"] + + if (has_dhcp4 or has_dhcp6) and has_addresses: + warnings.append( + f"Interface '{iface_name}' has both DHCP and static addresses configured" + ) + + if not has_dhcp4 and not has_dhcp6 and not has_addresses: + warnings.append( + f"Interface '{iface_name}' has neither DHCP nor static addresses configured" + ) + + # Validate IP addresses + if has_addresses: + for addr in iface_config["addresses"]: + valid, error = self.validate_ip_address(addr, allow_cidr=True) + if not valid: + errors.append(f"Interface '{iface_name}': {error}") + elif "/" not in addr: + warnings.append( + f"Interface '{iface_name}': Address '{addr}' missing CIDR notation (e.g., /24)" + ) + + # Validate gateway + if "gateway4" in iface_config: + valid, error = self.validate_ip_address(iface_config["gateway4"], allow_cidr=False) + if not valid: + errors.append(f"Interface '{iface_name}' gateway4: {error}") + + if "gateway6" in iface_config: + valid, error = self.validate_ip_address(iface_config["gateway6"], allow_cidr=False) + if not valid: + errors.append(f"Interface '{iface_name}' gateway6: {error}") + + # Validate nameservers + if "nameservers" in iface_config: + ns_config = iface_config["nameservers"] + if "addresses" in ns_config: + for ns_addr in ns_config["addresses"]: + valid, error = self.validate_ip_address(ns_addr, allow_cidr=False) + if not valid: + errors.append(f"Interface '{iface_name}' nameserver: {error}") + + # Validate routes + if "routes" in iface_config: + for route in iface_config["routes"]: + valid, route_errors = self.validate_route(route) + if not valid: + for err in route_errors: + errors.append(f"Interface '{iface_name}' route: {err}") + + def validate_file(self, config_file: Path | str | None = None) -> ValidationResult: + """ + Validate a netplan configuration file. + + Args: + config_file: Path to config file. If None, uses self.config_file + + Returns: + ValidationResult with all validation results + """ + file_path = Path(config_file) if config_file else self.config_file + + if not file_path.exists(): + return ValidationResult(False, [f"Configuration file not found: {file_path}"], [], []) + + try: + content = file_path.read_text() + except PermissionError: + return ValidationResult( + False, [f"Permission denied reading {file_path}. Try running with sudo."], [], [] + ) + except Exception as e: + return ValidationResult(False, [f"Error reading file {file_path}: {str(e)}"], [], []) + + # Validate YAML syntax + syntax_result = self.validate_yaml_syntax(content) + if not syntax_result.is_valid: + return syntax_result + + # Parse and validate semantics + config = yaml.safe_load(content) + semantic_result = self.validate_semantics(config) + + # Combine results + return ValidationResult( + syntax_result.is_valid and semantic_result.is_valid, + syntax_result.errors + semantic_result.errors, + syntax_result.warnings + semantic_result.warnings, + syntax_result.info + semantic_result.info, + ) + + def generate_diff(self, new_config_file: Path | str) -> str: + """ + Generate a diff between current and new configuration. + + Args: + new_config_file: Path to new configuration file + + Returns: + Unified diff string + """ + new_path = Path(new_config_file) + + if not self.config_file.exists(): + return f"Current config {self.config_file} does not exist" + + if not new_path.exists(): + return f"New config {new_path} does not exist" + + try: + current_lines = self.config_file.read_text().splitlines(keepends=True) + new_lines = new_path.read_text().splitlines(keepends=True) + + diff = difflib.unified_diff( + current_lines, + new_lines, + fromfile=str(self.config_file), + tofile=str(new_path), + lineterm="", + ) + + return "".join(diff) + except Exception as e: + return f"Error generating diff: {str(e)}" + + def show_diff(self, new_config_file: Path | str) -> None: + """ + Display a colored diff in the terminal. + + Args: + new_config_file: Path to new configuration file + """ + diff_text = self.generate_diff(new_config_file) + + if not diff_text: + console.print("[green]No changes detected[/green]") + return + + console.print("\n[bold]Configuration Changes:[/bold]\n") + + # Color diff output + for line in diff_text.split("\n"): + if line.startswith("+") and not line.startswith("+++"): + console.print(f"[green]{line}[/green]") + elif line.startswith("-") and not line.startswith("---"): + console.print(f"[red]{line}[/red]") + elif line.startswith("@@"): + console.print(f"[cyan]{line}[/cyan]") + else: + console.print(line) + + def backup_current_config(self) -> Path: + """ + Create a backup of the current configuration. + + Returns: + Path to backup file + + Raises: + IOError: If backup fails + """ + if not self.config_file.exists(): + raise FileNotFoundError(f"Config file {self.config_file} not found") + + timestamp = time.strftime("%Y%m%d_%H%M%S") + backup_name = f"{self.config_file.stem}_{timestamp}.yaml" + backup_path = self.backup_dir / backup_name + + try: + shutil.copy2(self.config_file, backup_path) + logger.info(f"Created backup: {backup_path}") + return backup_path + except Exception as e: + raise OSError(f"Failed to create backup: {str(e)}") from e + + def apply_config(self, new_config_file: Path | str, backup: bool = True) -> tuple[bool, str]: + """ + Apply a new network configuration. + + Args: + new_config_file: Path to new configuration file + backup: Whether to create a backup first + + Returns: + Tuple of (success, message) + """ + new_path = Path(new_config_file) + + if not new_path.exists(): + return False, f"New config file {new_path} not found" + + # Validate first + result = self.validate_file(new_path) + if not result.is_valid: + error_msg = "\n".join(result.errors) + return False, f"Validation failed:\n{error_msg}" + + # Create backup + if backup: + try: + self.backup_current_config() + except Exception as e: + return False, f"Backup failed: {str(e)}" + + # Apply configuration + backup_path = None + try: + # Get backup path before applying + if backup: + backup_files = sorted(self.backup_dir.glob("*.yaml")) + if backup_files: + backup_path = backup_files[-1] # Most recent backup + + # Copy new config to netplan directory + shutil.copy2(new_path, self.config_file) + + # Run netplan apply + result = subprocess.run( + ["netplan", "apply"], + capture_output=True, + text=True, + timeout=30, + shell=False, + ) + + if result.returncode == 0: + return True, "Configuration applied successfully" + else: + # Revert to backup if apply failed + if backup and backup_path and backup_path.exists(): + shutil.copy2(backup_path, self.config_file) + console.print("[yellow]Reverted to backup after netplan apply failure[/yellow]") + return False, f"netplan apply failed: {result.stderr}" + + except subprocess.TimeoutExpired: + # Revert to backup on timeout + if backup and backup_path and backup_path.exists(): + shutil.copy2(backup_path, self.config_file) + console.print("[yellow]Reverted to backup after timeout[/yellow]") + return False, "netplan apply timed out" + except Exception as e: + # Revert to backup on any error + if backup and backup_path and backup_path.exists(): + shutil.copy2(backup_path, self.config_file) + console.print("[yellow]Reverted to backup after error[/yellow]") + return False, f"Failed to apply config: {str(e)}" + + def dry_run_with_revert( + self, new_config_file: Path | str, timeout: int = DEFAULT_REVERT_TIMEOUT + ) -> bool: + """ + Apply configuration with automatic revert if not confirmed. + + Args: + new_config_file: Path to new configuration file + timeout: Seconds to wait for confirmation before reverting + + Returns: + True if config was confirmed and kept, False if reverted + """ + console.print( + Panel.fit( + f"[bold yellow]DRY-RUN MODE[/bold yellow]\n\n" + f"Configuration will be applied temporarily.\n" + f"You have {timeout} seconds to confirm the changes.\n" + f"If not confirmed, configuration will auto-revert.", + title="⚠️ Safety Mode", + ) + ) + + # Validate first + result = self.validate_file(new_config_file) + if not result.is_valid: + console.print("\n[bold red]Validation Failed:[/bold red]") + for error in result.errors: + console.print(f" [red]✗[/red] {error}") + return False + + # Show diff + self.show_diff(new_config_file) + + # Create backup + try: + backup_path = self.backup_current_config() + except Exception as e: + console.print(f"\n[bold red]Backup failed:[/bold red] {str(e)}") + return False + + # Apply configuration + success, message = self.apply_config(new_config_file, backup=False) + if not success: + console.print(f"\n[bold red]Apply failed:[/bold red] {message}") + # Revert to backup since apply failed + if backup_path.exists(): + console.print("[yellow]Reverting to backup...[/yellow]") + self._revert_config(backup_path) + return False + + console.print("\n[bold green]✓[/bold green] Configuration applied") + console.print("[bold]Testing network connectivity...[/bold]") + + # Test connectivity + if not self._test_connectivity(): + console.print("[bold red]Network connectivity lost![/bold red]") + console.print("[yellow]Auto-reverting in 5 seconds...[/yellow]") + time.sleep(5) + revert_success = self._revert_config(backup_path) + if not revert_success: + console.print( + "[bold red]⚠️ REVERT FAILED - System may be in unstable state![/bold red]" + ) + return False + + console.print("[bold green]✓[/bold green] Network is working\n") + + # Start countdown timer + confirmed = self._countdown_confirmation(timeout) + + if confirmed: + console.print("\n[bold green]✓ Configuration confirmed and saved[/bold green]") + return True + else: + console.print("\n[bold yellow]⟳ Reverting to previous configuration...[/bold yellow]") + revert_success = self._revert_config(backup_path) + if not revert_success: + console.print( + "[bold red]⚠️ REVERT FAILED - System may be in unstable state![/bold red]" + ) + console.print( + "[yellow]Please follow the manual recovery steps shown above.[/yellow]" + ) + return False + + def _test_connectivity(self) -> bool: + """ + Test network connectivity by pinging common DNS servers. + + Returns: + True if network is working + """ + # noinspection PyUnresolvedReference + # Using public DNS servers (Google: 8.8.8.8, Cloudflare: 1.1.1.1) for network testing + # These are standard, public, well-known DNS endpoints for connectivity checks + test_hosts = ["8.8.8.8", "1.1.1.1"] + + for host in test_hosts: + try: + result = subprocess.run( + ["ping", "-c", "1", "-W", "2", host], + capture_output=True, + timeout=3, + shell=False, + ) + if result.returncode == 0: + return True + except (subprocess.TimeoutExpired, Exception): + continue + + return False + + def _countdown_confirmation(self, timeout: int) -> bool: + """ + Display countdown and wait for user confirmation. + + Args: + timeout: Seconds to wait + + Returns: + True if user confirmed, False if timeout + """ + console.print(f"[bold]Press 'y' to keep changes, or wait {timeout}s to auto-revert[/bold]") + + confirmed = threading.Event() + + def wait_for_input(): + try: + import sys + import termios + import tty + + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(sys.stdin.fileno()) + ch = sys.stdin.read(1) + if ch.lower() == "y": + confirmed.set() + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + except (AttributeError, OSError) as e: + # Fallback for non-Unix systems or no TTY + # IOError/OSError: stdin issues, AttributeError: termios missing + response = input().strip().lower() + if response == "y": + confirmed.set() + + # Start input thread + input_thread = threading.Thread(target=wait_for_input, daemon=True) + input_thread.start() + + # Countdown + for remaining in range(timeout, 0, -1): + if confirmed.is_set(): + break + console.print(f"\r⏱ Reverting in [{remaining}] seconds... ", end="") + time.sleep(1) + + console.print() # New line + return confirmed.is_set() + + def _revert_config(self, backup_path: Path) -> bool: + """ + Revert to backup configuration. + + Args: + backup_path: Path to backup file + + Returns: + True if revert was successful, False otherwise + """ + try: + # Copy backup to active config location + shutil.copy2(backup_path, self.config_file) + logger.info(f"Copied backup {backup_path} to {self.config_file}") + + # Apply the reverted configuration + result = subprocess.run( + ["netplan", "apply"], + capture_output=True, + text=True, + timeout=30, + shell=False, + ) + + # Check if netplan apply succeeded + if result.returncode != 0: + console.print( + "[bold red]✗ CRITICAL: Revert failed - netplan apply returned non-zero exit code[/bold red]" + ) + console.print(f"\n[bold]Exit Code:[/bold] {result.returncode}") + + if result.stderr: + console.print("\n[bold red]Error Output:[/bold red]") + console.print(f"[red]{result.stderr}[/red]") + + if result.stdout: + console.print("\n[bold]Standard Output:[/bold]") + console.print(result.stdout) + + console.print("\n[bold yellow]Configuration State:[/bold yellow]") + console.print(f" Current config: {self.config_file}") + console.print(f" Backup location: {backup_path}") + + console.print("\n[bold red]⚠️ MANUAL RECOVERY REQUIRED:[/bold red]") + console.print(" 1. Restore backup manually:") + console.print(f" [cyan]sudo cp {backup_path} {self.config_file}[/cyan]") + console.print(" [cyan]sudo netplan apply[/cyan]") + console.print(" 2. If network is completely broken:") + console.print(" [cyan]sudo systemctl restart systemd-networkd[/cyan]") + console.print(" 3. If still not working, reboot into recovery mode:") + console.print(" - Reboot and select 'Advanced options' > 'Recovery mode'") + console.print(" - Select 'network' to enable networking") + console.print(" - Manually restore the backup file") + console.print(" 4. Last resort - restore default config:") + console.print(f" [cyan]sudo rm {self.config_file}[/cyan]") + console.print(" [cyan]sudo netplan generate[/cyan]") + + logger.error(f"Revert failed: netplan apply returned {result.returncode}") + logger.error(f"stderr: {result.stderr}") + logger.error(f"stdout: {result.stdout}") + + return False + + # Success case + console.print("[bold green]✓ Reverted to previous configuration[/bold green]") + logger.info("Successfully reverted to backup configuration") + return True + + except subprocess.TimeoutExpired as e: + console.print( + "[bold red]✗ CRITICAL: Revert failed - netplan apply timed out[/bold red]" + ) + console.print("\n[bold yellow]Configuration State:[/bold yellow]") + console.print(f" Current config: {self.config_file}") + console.print(f" Backup location: {backup_path}") + console.print( + "\n[bold red]⚠️ MANUAL RECOVERY REQUIRED (timeout after 30s):[/bold red]" + ) + console.print(" 1. Try applying manually:") + console.print(" [cyan]sudo netplan apply[/cyan]") + console.print(" 2. Restart networking:") + console.print(" [cyan]sudo systemctl restart systemd-networkd[/cyan]") + console.print(" 3. Restore backup manually:") + console.print(f" [cyan]sudo cp {backup_path} {self.config_file}[/cyan]") + console.print(" 4. Reboot if necessary") + logger.error(f"Revert timeout: {e}") + return False + + except Exception as e: + console.print("[bold red]✗ CRITICAL: Failed to revert configuration[/bold red]") + console.print(f"[red]Error: {str(e)}[/red]") + console.print("\n[bold yellow]Configuration State:[/bold yellow]") + console.print(f" Current config: {self.config_file}") + console.print(f" Backup location: {backup_path}") + console.print("\n[bold red]⚠️ MANUAL RECOVERY REQUIRED:[/bold red]") + console.print(" 1. Restore backup manually:") + console.print(f" [cyan]sudo cp {backup_path} {self.config_file}[/cyan]") + console.print(" [cyan]sudo netplan apply[/cyan]") + console.print(" 2. If network is broken, reboot into recovery mode") + logger.error(f"Revert exception: {e}", exc_info=True) + return False + + def print_validation_results(self, result: ValidationResult) -> None: + """ + Print validation results in a user-friendly format. + + Args: + result: ValidationResult to display + """ + if result.is_valid: + console.print("\n[bold green]✓ Validation Passed[/bold green]\n") + else: + console.print("\n[bold red]✗ Validation Failed[/bold red]\n") + + if result.errors: + console.print("[bold red]Errors:[/bold red]") + for error in result.errors: + console.print(f" [red]✗[/red] {error}") + console.print() + + if result.warnings: + console.print("[bold yellow]Warnings:[/bold yellow]") + for warning in result.warnings: + console.print(f" [yellow]⚠[/yellow] {warning}") + console.print() + + if result.info: + for info_msg in result.info: + console.print(f" [blue]ℹ[/blue] {info_msg}") + + +def validate_netplan_config(config_file: Path | str | None = None) -> bool: + """ + Convenience function to validate a netplan configuration file. + + Args: + config_file: Path to config file, or None to auto-detect + + Returns: + True if validation passed, False otherwise + """ + try: + validator = NetplanValidator(config_file) + result = validator.validate_file() + validator.print_validation_results(result) + return result.is_valid + except Exception as e: + console.print(f"[bold red]Validation error:[/bold red] {str(e)}") + return False diff --git a/docs/NETPLAN_VALIDATOR.md b/docs/NETPLAN_VALIDATOR.md new file mode 100644 index 000000000..85f6c2d0f --- /dev/null +++ b/docs/NETPLAN_VALIDATOR.md @@ -0,0 +1,459 @@ +# Netplan Configuration Validator + +## Overview + +The Netplan Configuration Validator addresses Issue #445 by providing safe, intelligent network configuration management for Netplan/NetworkManager. It prevents network outages from simple YAML typos by validating configurations before applying them. + +## Features + +✅ **YAML Syntax Validation** - Catches syntax errors before they break networking +✅ **Semantic Validation** - Validates IP addresses, routes, gateways, and DNS servers +✅ **Configuration Diff** - Shows exactly what will change +✅ **Dry-Run Mode** - Apply changes with automatic revert timer for safety +✅ **Plain English Errors** - User-friendly error messages, not technical jargon +✅ **Automatic Backups** - Creates timestamped backups before applying changes + +## Installation + +The validator is included in Cortex Linux 0.1.0+. No additional installation required. + +## Usage + +### Validate a Configuration + +```bash +# Validate the system's current netplan config +sudo cortex netplan validate + +# Validate a specific config file +sudo cortex netplan validate /path/to/config.yaml +``` + +### Show Configuration Diff + +```bash +# Preview changes between current and new configuration +sudo cortex netplan diff --new-config /path/to/new-config.yaml +``` + +### Apply Configuration (Safe Mode) + +```bash +# Apply with confirmation prompt +sudo cortex netplan apply --new-config /path/to/new-config.yaml +``` + +### Dry-Run with Auto-Revert + +```bash +# Apply temporarily with 60-second auto-revert (default) +sudo cortex netplan dry-run --new-config /path/to/new-config.yaml + +# Custom timeout (e.g., 120 seconds) +sudo cortex netplan dry-run --new-config /path/to/new-config.yaml --timeout 120 +``` + +## How It Works + +### Validation Process + +1. **YAML Syntax Check** - Ensures the file is valid YAML +2. **Schema Validation** - Checks for required fields (`network`, `version`) +3. **IP Address Validation** - Validates all IPs and CIDR notation +4. **Route Validation** - Checks route destinations and gateways +5. **Gateway Validation** - Ensures gateways are valid IPs +6. **DNS Validation** - Verifies nameserver addresses + +### Dry-Run Mode Safety + +When using dry-run mode: + +1. Configuration is validated first +2. Backup is created automatically +3. Changes are applied temporarily +4. Network connectivity is tested +5. User has N seconds to confirm (default: 60) +6. If not confirmed, **automatically reverts** to previous config +7. If network fails, **immediately reverts** + +This prevents network lockouts from configuration errors. + +## Examples + +### Example 1: Validate Existing Config + +```bash +$ sudo cortex netplan validate + +✓ Validation Passed + + ℹ ✓ YAML syntax is valid + ℹ ✓ Semantic validation passed +``` + +### Example 2: Detect Invalid IP Address + +```yaml +# bad-config.yaml +network: + version: 2 + ethernets: + eth0: + addresses: + - 999.999.999.999/24 # Invalid IP + gateway4: 192.168.1.1 +``` + +```bash +$ sudo cortex netplan validate bad-config.yaml + +✗ Validation Failed + +Errors: + ✗ Interface 'eth0': Invalid IP address '999.999.999.999/24': ... +``` + +### Example 3: Missing CIDR Notation + +```yaml +# config-without-cidr.yaml +network: + version: 2 + ethernets: + eth0: + addresses: + - 192.168.1.100 # Missing /24 +``` + +```bash +$ sudo cortex netplan validate config-without-cidr.yaml + +✓ Validation Passed + +Warnings: + ⚠ Interface 'eth0': Address '192.168.1.100' missing CIDR notation (e.g., /24) +``` + +### Example 4: Preview Changes + +```bash +$ sudo cortex netplan diff --new-config new-network.yaml + +Configuration Changes: + +--- /etc/netplan/01-netcfg.yaml ++++ new-network.yaml +@@ -5,7 +5,7 @@ + eth0: +- dhcp4: true ++ dhcp4: false ++ addresses: ++ - 192.168.1.100/24 ++ gateway4: 192.168.1.1 +``` + +### Example 5: Safe Apply with Auto-Revert + +```bash +$ sudo cortex netplan dry-run --new-config new-network.yaml + +┌─────────────────────────────────────┐ +│ ⚠️ Safety Mode │ +│ │ +│ DRY-RUN MODE │ +│ │ +│ Configuration will be applied │ +│ temporarily. │ +│ You have 60 seconds to confirm the │ +│ changes. │ +│ If not confirmed, configuration │ +│ will auto-revert. │ +└─────────────────────────────────────┘ + +Configuration Changes: +[... diff output ...] + +✓ Configuration applied +✓ Network is working + +Press 'y' to keep changes, or wait 60s to auto-revert +Reverting in 60 seconds... +``` + +## Configuration File Format + +Netplan uses YAML configuration files located in `/etc/netplan/`. + +### Basic Ethernet with DHCP + +```yaml +network: + version: 2 + renderer: networkd + ethernets: + eth0: + dhcp4: true +``` + +### Static IP Configuration + +```yaml +network: + version: 2 + ethernets: + eth0: + addresses: + - 192.168.1.100/24 + gateway4: 192.168.1.1 + nameservers: + addresses: + - 8.8.8.8 + - 8.8.4.4 +``` + +### Multiple Interfaces + +```yaml +network: + version: 2 + ethernets: + eth0: + dhcp4: true + eth1: + addresses: + - 10.0.0.1/24 + routes: + - to: 0.0.0.0/0 + via: 10.0.0.254 +``` + +### WiFi Configuration + +```yaml +network: + version: 2 + wifis: + wlan0: + dhcp4: true + access-points: + "MyWiFiNetwork": + password: "SecurePassword123" +``` + +## Validation Rules + +### YAML Syntax +- Must be valid YAML (no tabs, proper indentation) +- Must not be empty + +### Network Configuration +- Must have `network` key +- Recommended to have `version: 2` + +### Interface Names +- Must contain only alphanumeric, dash, or underscore characters +- Examples: `eth0`, `wlan0`, `enp3s0`, `br-lan` + +### IP Addresses +- IPv4: `192.168.1.1` or `192.168.1.0/24` +- IPv6: `2001:db8::1` or `2001:db8::/32` +- Static IPs should include CIDR notation + +### Routes +- Must have `to` field (destination network) +- Must have `via` field (gateway IP) +- Both must be valid IP addresses + +### Gateways +- Must be valid IP addresses (no CIDR) +- `gateway4` for IPv4 +- `gateway6` for IPv6 + +### DNS Servers +- Must be valid IP addresses +- Specified in `nameservers.addresses` list + +## Common Errors & Solutions + +### Error: "YAML syntax error" + +**Cause**: Invalid YAML format (often tabs instead of spaces) + +**Solution**: Use spaces for indentation, not tabs + +```yaml +# ❌ Bad (tabs) +network: + version: 2 + +# ✓ Good (spaces) +network: + version: 2 +``` + +### Error: "Invalid IP address" + +**Cause**: IP address format is incorrect + +**Solution**: Check IP format and CIDR notation + +```yaml +# ❌ Bad +addresses: + - 999.999.999.999/24 + - 192.168.1.1 + +# ✓ Good +addresses: + - 192.168.1.100/24 + - 10.0.0.1/24 +``` + +### Warning: "Missing CIDR notation" + +**Cause**: Static IP without subnet mask + +**Solution**: Add CIDR notation + +```yaml +# ⚠ Warning +addresses: + - 192.168.1.100 + +# ✓ Better +addresses: + - 192.168.1.100/24 +``` + +### Error: "Route missing required 'to' field" + +**Cause**: Route configuration incomplete + +**Solution**: Include both `to` and `via` + +```yaml +# ❌ Bad +routes: + - via: 192.168.1.1 + +# ✓ Good +routes: + - to: 0.0.0.0/0 + via: 192.168.1.1 +``` + +## Safety Features + +### Automatic Backups + +All applied configurations are backed up to `~/.cortex/netplan_backups/` with timestamps: + +``` +~/.cortex/netplan_backups/ + └── 01-netcfg_20260117_143022.yaml + └── 01-netcfg_20260117_151530.yaml +``` + +### Connectivity Testing + +Dry-run mode tests connectivity by pinging: +- 8.8.8.8 (Google DNS) +- 1.1.1.1 (Cloudflare DNS) + +If both fail, configuration is **immediately reverted**. + +### Auto-Revert Timer + +Default 60-second countdown allows you to: +1. Test network connectivity +2. Verify services are working +3. Confirm or revert changes + +## Python API + +You can also use the validator programmatically: + +```python +from cortex.netplan_validator import NetplanValidator + +# Create validator instance +validator = NetplanValidator("/etc/netplan/01-netcfg.yaml") + +# Validate configuration +result = validator.validate_file() +if result.is_valid: + print("Configuration is valid!") +else: + print("Errors found:") + for error in result.errors: + print(f" - {error}") + +# Generate diff +validator.show_diff("/path/to/new-config.yaml") + +# Apply with dry-run +confirmed = validator.dry_run_with_revert("/path/to/new-config.yaml", timeout=60) +``` + +## Testing + +Run the test suite: + +```bash +pytest tests/test_netplan_validator.py -v +``` + +With coverage: + +```bash +pytest tests/test_netplan_validator.py --cov=cortex.netplan_validator --cov-report=term-missing +``` + +## Known Limitations + +1. **Requires sudo** - Network configuration changes require root privileges +2. **Netplan only** - Currently supports Netplan, not direct NetworkManager config +3. **Single file** - Validates one config file at a time +4. **Basic validation** - Doesn't validate advanced features like VLANs, bonds, etc. + +## Future Enhancements + +- [ ] Support for NetworkManager native config +- [ ] Advanced network feature validation (VLANs, bridges, bonds) +- [ ] Integration with `networkctl` for systemd-networkd +- [ ] Web UI for configuration management +- [ ] Configuration templates for common setups + +## Troubleshooting + +### "Permission denied" error + +Run with `sudo`: + +```bash +sudo cortex netplan validate +``` + +### "Netplan directory not found" + +Netplan is not installed or not configured. Install with: + +```bash +sudo apt install netplan.io +``` + +### "No .yaml files found" + +No netplan configuration exists. Create one: + +```bash +sudo nano /etc/netplan/01-netcfg.yaml +``` + +### Changes don't apply + +1. Check validation errors with `cortex netplan validate` +2. Verify you used `sudo` +3. Check `/var/log/syslog` for netplan errors + + diff --git a/examples/test_netplan_validator.sh b/examples/test_netplan_validator.sh new file mode 100755 index 000000000..6e50f6b88 --- /dev/null +++ b/examples/test_netplan_validator.sh @@ -0,0 +1,136 @@ +#!/bin/bash +# Test script for Netplan Validator +# Demonstrates all features of the netplan validator + +set -e + +echo "=====================================" +echo "Netplan Validator Test Script" +echo "=====================================" +echo + +# Create test directory +TEST_DIR="/tmp/cortex_netplan_test_$$" +mkdir -p "$TEST_DIR" +echo "Created test directory: $TEST_DIR" +echo + +# Create a valid config +echo "Creating valid configuration..." +cat > "$TEST_DIR/valid-config.yaml" << 'EOF' +network: + version: 2 + renderer: networkd + ethernets: + eth0: + dhcp4: true + eth1: + addresses: + - 192.168.1.100/24 + gateway4: 192.168.1.1 + nameservers: + addresses: + - 8.8.8.8 + - 8.8.4.4 +EOF + +# Create a config with invalid YAML +echo "Creating invalid YAML configuration..." +cat > "$TEST_DIR/invalid-yaml.yaml" << 'EOF' +network: + version: 2 + ethernets: + eth0: + dhcp4: true + invalid indentation here +EOF + +# Create a config with invalid IPs +echo "Creating configuration with invalid IPs..." +cat > "$TEST_DIR/invalid-ips.yaml" << 'EOF' +network: + version: 2 + ethernets: + eth0: + addresses: + - 999.999.999.999/24 + gateway4: invalid.ip.address + nameservers: + addresses: + - 8.8.8.8.8 +EOF + +# Create a config without CIDR +echo "Creating configuration without CIDR notation..." +cat > "$TEST_DIR/no-cidr.yaml" << 'EOF' +network: + version: 2 + ethernets: + eth0: + addresses: + - 192.168.1.100 + gateway4: 192.168.1.1 +EOF + +# Create a modified config for diff testing +echo "Creating modified configuration..." +cat > "$TEST_DIR/modified-config.yaml" << 'EOF' +network: + version: 2 + renderer: networkd + ethernets: + eth0: + dhcp4: false + addresses: + - 10.0.0.100/24 + gateway4: 10.0.0.1 + eth1: + addresses: + - 192.168.1.100/24 + gateway4: 192.168.1.1 + nameservers: + addresses: + - 8.8.8.8 + - 8.8.4.4 +EOF + +echo "Test configurations created in $TEST_DIR" +echo + +# Run tests +echo "=====================================" +echo "Test 1: Validate valid configuration" +echo "=====================================" +cortex netplan validate "$TEST_DIR/valid-config.yaml" +echo + +echo "=====================================" +echo "Test 2: Validate invalid YAML" +echo "=====================================" +cortex netplan validate "$TEST_DIR/invalid-yaml.yaml" || true +echo + +echo "=====================================" +echo "Test 3: Validate invalid IPs" +echo "=====================================" +cortex netplan validate "$TEST_DIR/invalid-ips.yaml" || true +echo + +echo "=====================================" +echo "Test 4: Validate config without CIDR" +echo "=====================================" +cortex netplan validate "$TEST_DIR/no-cidr.yaml" +echo + +echo "=====================================" +echo "Test 5: Show diff between configs" +echo "=====================================" +cortex netplan diff "$TEST_DIR/valid-config.yaml" --new-config "$TEST_DIR/modified-config.yaml" || true +echo + +echo "=====================================" +echo "All tests completed!" +echo "=====================================" +echo +echo "Test files are in: $TEST_DIR" +echo "To clean up: rm -rf $TEST_DIR" diff --git a/tests/test_netplan_validator.py b/tests/test_netplan_validator.py new file mode 100644 index 000000000..ab651b724 --- /dev/null +++ b/tests/test_netplan_validator.py @@ -0,0 +1,1354 @@ +""" +Tests for Netplan Configuration Validator + +Comprehensive test suite for the netplan_validator module. +""" + +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, Mock, patch + +import pytest +import yaml + +from cortex.netplan_validator import ( + NetplanValidator, + ValidationResult, + validate_netplan_config, +) + + +@pytest.fixture +def temp_netplan_dir(tmp_path): + """Create a temporary netplan directory structure.""" + netplan_dir = tmp_path / "etc" / "netplan" + netplan_dir.mkdir(parents=True) + return netplan_dir + + +@pytest.fixture +def temp_backup_dir(tmp_path): + """Create a temporary backup directory.""" + backup_dir = tmp_path / "backups" + backup_dir.mkdir(parents=True) + return backup_dir + + +@pytest.fixture +def valid_netplan_config(): + """Return a valid netplan configuration.""" + return """network: + version: 2 + renderer: networkd + ethernets: + eth0: + dhcp4: true + eth1: + addresses: + - 192.168.1.100/24 + gateway4: 192.168.1.1 + nameservers: + addresses: + - 8.8.8.8 + - 8.8.4.4 +""" + + +@pytest.fixture +def invalid_yaml_config(): + """Return an invalid YAML configuration.""" + return """network: + version: 2 + ethernets: + eth0: + dhcp4: true + invalid indentation here +""" + + +@pytest.fixture +def invalid_ip_config(): + """Return a config with invalid IP addresses.""" + return """network: + version: 2 + ethernets: + eth0: + addresses: + - 999.999.999.999/24 + gateway4: invalid.ip.address + nameservers: + addresses: + - 8.8.8.8.8 +""" + + +@pytest.fixture +def netplan_validator(temp_netplan_dir, temp_backup_dir, valid_netplan_config, monkeypatch): + """Create a NetplanValidator instance with mocked paths.""" + config_file = temp_netplan_dir / "01-netcfg.yaml" + config_file.write_text(valid_netplan_config) + + # Mock the class attributes + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + return NetplanValidator(config_file) + + +class TestYAMLSyntaxValidation: + """Test YAML syntax validation.""" + + def test_valid_yaml_syntax(self, netplan_validator, valid_netplan_config): + """Test validation of valid YAML syntax.""" + result = netplan_validator.validate_yaml_syntax(valid_netplan_config) + + assert result.is_valid is True + assert len(result.errors) == 0 + assert "✓ YAML syntax is valid" in result.info + + def test_invalid_yaml_syntax(self, netplan_validator, invalid_yaml_config): + """Test detection of invalid YAML syntax.""" + result = netplan_validator.validate_yaml_syntax(invalid_yaml_config) + + assert result.is_valid is False + assert len(result.errors) > 0 + assert any("syntax error" in err.lower() for err in result.errors) + + def test_empty_yaml(self, netplan_validator): + """Test handling of empty YAML.""" + result = netplan_validator.validate_yaml_syntax("") + + assert result.is_valid is False + assert "empty" in result.errors[0].lower() + + def test_yaml_with_tabs(self, netplan_validator): + """Test YAML with tab characters (common mistake).""" + config_with_tabs = "network:\n\tversion: 2\n\tethernets:\n\t\teth0:\n\t\t\tdhcp4: true" + result = netplan_validator.validate_yaml_syntax(config_with_tabs) + + # YAML should reject tabs for indentation + assert result.is_valid is False + + +class TestIPAddressValidation: + """Test IP address validation.""" + + def test_valid_ipv4_address(self, netplan_validator): + """Test validation of valid IPv4 address.""" + valid, error = netplan_validator.validate_ip_address("192.168.1.1", allow_cidr=False) + assert valid is True + assert error == "" + + def test_valid_ipv4_cidr(self, netplan_validator): + """Test validation of valid IPv4 CIDR.""" + valid, error = netplan_validator.validate_ip_address("192.168.1.0/24", allow_cidr=True) + assert valid is True + assert error == "" + + def test_valid_ipv6_address(self, netplan_validator): + """Test validation of valid IPv6 address.""" + valid, error = netplan_validator.validate_ip_address("2001:db8::1", allow_cidr=False) + assert valid is True + assert error == "" + + def test_valid_ipv6_cidr(self, netplan_validator): + """Test validation of valid IPv6 CIDR.""" + valid, error = netplan_validator.validate_ip_address("2001:db8::/32", allow_cidr=True) + assert valid is True + assert error == "" + + def test_invalid_ipv4_address(self, netplan_validator): + """Test detection of invalid IPv4 address.""" + valid, error = netplan_validator.validate_ip_address("999.999.999.999", allow_cidr=False) + assert valid is False + assert "invalid" in error.lower() + + def test_invalid_ipv4_cidr(self, netplan_validator): + """Test detection of invalid IPv4 CIDR.""" + valid, error = netplan_validator.validate_ip_address("192.168.1.0/33", allow_cidr=True) + assert valid is False + + def test_malformed_ip(self, netplan_validator): + """Test detection of malformed IP addresses.""" + invalid_ips = [ + "not.an.ip.address", + "192.168.1", + "192.168.1.1.1", + "", + "....", + ] + + for ip in invalid_ips: + valid, error = netplan_validator.validate_ip_address(ip, allow_cidr=False) + assert valid is False, f"IP '{ip}' should be invalid" + + def test_cidr_when_not_allowed(self, netplan_validator): + """Test CIDR notation when allow_cidr=False.""" + valid, error = netplan_validator.validate_ip_address("192.168.1.1/24", allow_cidr=False) + # CIDR should be invalid when allow_cidr=False + assert valid is False + assert "cidr" in error.lower() or "not allowed" in error.lower() + + +class TestRouteValidation: + """Test route validation.""" + + def test_valid_route(self, netplan_validator): + """Test validation of valid route.""" + route = {"to": "0.0.0.0/0", "via": "192.168.1.1"} + valid, errors = netplan_validator.validate_route(route) + + assert valid is True + assert len(errors) == 0 + + def test_route_missing_to(self, netplan_validator): + """Test detection of route missing 'to' field.""" + route = {"via": "192.168.1.1"} + valid, errors = netplan_validator.validate_route(route) + + assert valid is False + assert any("missing" in err.lower() and "to" in err.lower() for err in errors) + + def test_route_missing_via(self, netplan_validator): + """Test detection of route missing 'via' field.""" + route = {"to": "0.0.0.0/0"} + valid, errors = netplan_validator.validate_route(route) + + assert valid is False + assert any("missing" in err.lower() and "via" in err.lower() for err in errors) + + def test_route_invalid_destination(self, netplan_validator): + """Test detection of invalid route destination.""" + route = {"to": "invalid/24", "via": "192.168.1.1"} + valid, errors = netplan_validator.validate_route(route) + + assert valid is False + assert any("destination" in err.lower() for err in errors) + + def test_route_invalid_gateway(self, netplan_validator): + """Test detection of invalid route gateway.""" + route = {"to": "0.0.0.0/0", "via": "999.999.999.999"} + valid, errors = netplan_validator.validate_route(route) + + assert valid is False + assert any("gateway" in err.lower() for err in errors) + + +class TestSemanticValidation: + """Test semantic validation of network configuration.""" + + def test_valid_config_semantics(self, netplan_validator, valid_netplan_config): + """Test semantic validation of valid configuration.""" + config = yaml.safe_load(valid_netplan_config) + result = netplan_validator.validate_semantics(config) + + assert result.is_valid is True + assert len(result.errors) == 0 + + def test_missing_network_key(self, netplan_validator): + """Test detection of missing 'network' key.""" + config = {"version": 2} + result = netplan_validator.validate_semantics(config) + + assert result.is_valid is False + assert any("network" in err.lower() for err in result.errors) + + def test_missing_version_warning(self, netplan_validator): + """Test warning for missing version.""" + config = {"network": {"ethernets": {}}} + result = netplan_validator.validate_semantics(config) + + assert any("version" in warn.lower() for warn in result.warnings) + + def test_dhcp_and_static_ip_warning(self, netplan_validator): + """Test warning when both DHCP and static IPs are configured.""" + config = { + "network": { + "version": 2, + "ethernets": {"eth0": {"dhcp4": True, "addresses": ["192.168.1.100/24"]}}, + } + } + result = netplan_validator.validate_semantics(config) + + assert any("dhcp" in warn.lower() and "static" in warn.lower() for warn in result.warnings) + + def test_no_ip_configuration_warning(self, netplan_validator): + """Test warning when interface has no IP configuration.""" + config = {"network": {"version": 2, "ethernets": {"eth0": {}}}} + result = netplan_validator.validate_semantics(config) + + assert any("neither" in warn.lower() for warn in result.warnings) + + def test_invalid_interface_name(self, netplan_validator): + """Test detection of invalid interface name.""" + config = { + "network": {"version": 2, "ethernets": {"eth@0": {"dhcp4": True}}} # Invalid character + } + result = netplan_validator.validate_semantics(config) + + assert any("invalid interface name" in err.lower() for err in result.errors) + + def test_address_without_cidr_warning(self, netplan_validator): + """Test warning for IP address without CIDR notation.""" + config = { + "network": { + "version": 2, + "ethernets": {"eth0": {"addresses": ["192.168.1.100"]}}, # Missing /24 + } + } + result = netplan_validator.validate_semantics(config) + + assert any("cidr" in warn.lower() for warn in result.warnings) + + def test_invalid_nameserver(self, netplan_validator): + """Test detection of invalid nameserver IP.""" + config = { + "network": { + "version": 2, + "ethernets": { + "eth0": {"dhcp4": True, "nameservers": {"addresses": ["999.999.999.999"]}} + }, + } + } + result = netplan_validator.validate_semantics(config) + + assert any("nameserver" in err.lower() for err in result.errors) + + +class TestFileValidation: + """Test file validation.""" + + def test_validate_existing_file(self, netplan_validator): + """Test validation of existing file.""" + result = netplan_validator.validate_file() + + assert result.is_valid is True + assert len(result.errors) == 0 + + def test_validate_nonexistent_file(self, netplan_validator): + """Test validation of non-existent file.""" + result = netplan_validator.validate_file("/nonexistent/file.yaml") + + assert result.is_valid is False + assert any("not found" in err.lower() for err in result.errors) + + def test_validate_file_with_invalid_yaml( + self, temp_netplan_dir, netplan_validator, invalid_yaml_config + ): + """Test validation of file with invalid YAML.""" + invalid_file = temp_netplan_dir / "invalid.yaml" + invalid_file.write_text(invalid_yaml_config) + + result = netplan_validator.validate_file(invalid_file) + + assert result.is_valid is False + assert len(result.errors) > 0 + + def test_validate_file_with_invalid_ips( + self, temp_netplan_dir, netplan_validator, invalid_ip_config + ): + """Test validation of file with invalid IP addresses.""" + invalid_file = temp_netplan_dir / "invalid_ips.yaml" + invalid_file.write_text(invalid_ip_config) + + result = netplan_validator.validate_file(invalid_file) + + assert result.is_valid is False + assert any("invalid ip" in err.lower() for err in result.errors) + + +class TestDiffGeneration: + """Test diff generation.""" + + def test_generate_diff_with_changes( + self, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test diff generation when there are changes.""" + # Create a modified config + modified_config = valid_netplan_config.replace("dhcp4: true", "dhcp4: false") + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(modified_config) + + diff = netplan_validator.generate_diff(new_file) + + assert diff != "" + assert "-" in diff # Should have deletions + assert "+" in diff # Should have additions + assert "dhcp4" in diff + + def test_generate_diff_no_changes( + self, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test diff generation when there are no changes.""" + # Create identical config + new_file = temp_netplan_dir / "identical.yaml" + new_file.write_text(valid_netplan_config) + + diff = netplan_validator.generate_diff(new_file) + + # Identical files produce empty diff + assert diff == "" + + def test_generate_diff_nonexistent_new_file(self, netplan_validator): + """Test diff generation with non-existent new file.""" + diff = netplan_validator.generate_diff("/nonexistent/file.yaml") + + assert "does not exist" in diff.lower() + + +class TestBackupAndRestore: + """Test backup and restore functionality.""" + + def test_backup_current_config(self, netplan_validator): + """Test creating a backup of current configuration.""" + backup_path = netplan_validator.backup_current_config() + + assert backup_path.exists() + assert backup_path.parent == netplan_validator.backup_dir + assert backup_path.suffix == ".yaml" + + def test_backup_nonexistent_config(self, temp_netplan_dir, temp_backup_dir, monkeypatch): + """Test backup fails when config doesn't exist.""" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + nonexistent_file = temp_netplan_dir / "nonexistent.yaml" + validator = NetplanValidator(nonexistent_file) + + with pytest.raises(FileNotFoundError): + validator.backup_current_config() + + def test_multiple_backups_have_unique_names(self, netplan_validator): + """Test that multiple backups create unique filenames.""" + import time + + backup1 = netplan_validator.backup_current_config() + time.sleep(1.1) # Ensure different timestamp + backup2 = netplan_validator.backup_current_config() + + assert backup1 != backup2 + assert backup1.exists() + assert backup2.exists() + + +class TestConfigApplication: + """Test configuration application.""" + + @patch("subprocess.run") + def test_apply_valid_config( + self, mock_run, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test applying a valid configuration.""" + # Mock successful netplan apply + mock_run.return_value = Mock(returncode=0, stderr="") + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + success, message = netplan_validator.apply_config(new_file) + + assert success is True + assert "success" in message.lower() + mock_run.assert_called_once() + + def test_apply_invalid_config(self, temp_netplan_dir, netplan_validator, invalid_yaml_config): + """Test applying an invalid configuration fails validation.""" + new_file = temp_netplan_dir / "invalid.yaml" + new_file.write_text(invalid_yaml_config) + + success, message = netplan_validator.apply_config(new_file) + + assert success is False + assert "validation failed" in message.lower() + + def test_apply_nonexistent_config(self, netplan_validator): + """Test applying a non-existent configuration.""" + success, message = netplan_validator.apply_config("/nonexistent/file.yaml") + + assert success is False + assert "not found" in message.lower() + + @patch("subprocess.run") + def test_apply_config_netplan_fails( + self, mock_run, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test handling of netplan apply failure.""" + # Mock failed netplan apply + mock_run.return_value = Mock(returncode=1, stderr="Error applying config") + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + success, message = netplan_validator.apply_config(new_file) + + assert success is False + assert "failed" in message.lower() + + +class TestConnectivityTesting: + """Test network connectivity testing.""" + + @patch("subprocess.run") + def test_connectivity_success(self, mock_run, netplan_validator): + """Test successful connectivity check.""" + # Mock successful ping + mock_run.return_value = Mock(returncode=0) + + result = netplan_validator._test_connectivity() + + assert result is True + + @patch("subprocess.run") + def test_connectivity_failure(self, mock_run, netplan_validator): + """Test failed connectivity check.""" + # Mock failed ping + mock_run.return_value = Mock(returncode=1) + + result = netplan_validator._test_connectivity() + + assert result is False + + @patch("subprocess.run") + def test_connectivity_timeout(self, mock_run, netplan_validator): + """Test connectivity check timeout.""" + # Mock timeout + mock_run.side_effect = subprocess.TimeoutExpired("ping", 3) + + result = netplan_validator._test_connectivity() + + assert result is False + + +class TestValidationResultPrinting: + """Test validation result printing.""" + + def test_print_valid_results(self, netplan_validator, capsys): + """Test printing valid validation results.""" + result = ValidationResult(is_valid=True, errors=[], warnings=[], info=["Test passed"]) + + netplan_validator.print_validation_results(result) + + # Just verify it doesn't crash - rich output is hard to test + + def test_print_invalid_results(self, netplan_validator, capsys): + """Test printing invalid validation results.""" + result = ValidationResult( + is_valid=False, errors=["Error 1", "Error 2"], warnings=["Warning 1"], info=["Info 1"] + ) + + netplan_validator.print_validation_results(result) + + # Just verify it doesn't crash + + +class TestConvenienceFunction: + """Test the convenience validate_netplan_config function.""" + + @patch("cortex.netplan_validator.NetplanValidator") + def test_validate_netplan_config_success(self, mock_validator_class): + """Test successful validation via convenience function.""" + mock_validator = MagicMock() + mock_validator.validate_file.return_value = ValidationResult( + is_valid=True, errors=[], warnings=[], info=[] + ) + mock_validator_class.return_value = mock_validator + + result = validate_netplan_config("/path/to/config.yaml") + + assert result is True + + @patch("cortex.netplan_validator.NetplanValidator") + def test_validate_netplan_config_failure(self, mock_validator_class): + """Test failed validation via convenience function.""" + mock_validator = MagicMock() + mock_validator.validate_file.return_value = ValidationResult( + is_valid=False, errors=["Error"], warnings=[], info=[] + ) + mock_validator_class.return_value = mock_validator + + result = validate_netplan_config("/path/to/config.yaml") + + assert result is False + + @patch("cortex.netplan_validator.NetplanValidator") + def test_validate_netplan_config_exception(self, mock_validator_class): + """Test exception handling in convenience function.""" + mock_validator_class.side_effect = Exception("Test error") + + result = validate_netplan_config("/path/to/config.yaml") + + assert result is False + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_validator_with_none_config_file( + self, temp_netplan_dir, temp_backup_dir, valid_netplan_config, monkeypatch + ): + """Test validator auto-detection of config file.""" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + # Create a config file + config_file = temp_netplan_dir / "01-netcfg.yaml" + config_file.write_text(valid_netplan_config) + + validator = NetplanValidator(None) + + assert validator.config_file == config_file + + def test_validator_no_netplan_directory(self, tmp_path, temp_backup_dir, monkeypatch): + """Test validator when netplan directory doesn't exist.""" + nonexistent_dir = tmp_path / "nonexistent" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", nonexistent_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + with pytest.raises(FileNotFoundError, match="Netplan directory"): + NetplanValidator(None) + + def test_validator_no_yaml_files(self, temp_netplan_dir, temp_backup_dir, monkeypatch): + """Test validator when no YAML files exist.""" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + with pytest.raises(FileNotFoundError, match="No .yaml or .yml files"): + NetplanValidator(None) + + def test_wifi_interface_validation(self, netplan_validator): + """Test validation of WiFi interfaces.""" + config = { + "network": { + "version": 2, + "wifis": { + "wlan0": { + "dhcp4": True, + "access-points": {"MySSID": {"password": ""}}, + } + }, + } + } + result = netplan_validator.validate_semantics(config) + + # Should validate without errors + assert result.is_valid is True + + def test_bridge_interface_validation(self, netplan_validator): + """Test validation of bridge interfaces.""" + config = { + "network": { + "version": 2, + "bridges": { + "br0": {"addresses": ["192.168.1.1/24"], "interfaces": ["eth0", "eth1"]} + }, + } + } + result = netplan_validator.validate_semantics(config) + + # Should validate without errors + assert result.is_valid is True + + def test_multiple_interfaces(self, netplan_validator): + """Test validation of multiple interfaces.""" + config = { + "network": { + "version": 2, + "ethernets": { + "eth0": {"dhcp4": True}, + "eth1": {"addresses": ["10.0.0.1/24"]}, + "eth2": {"dhcp6": True}, + }, + } + } + result = netplan_validator.validate_semantics(config) + + assert result.is_valid is True + + def test_permission_denied_error_on_read(self, temp_netplan_dir, temp_backup_dir, monkeypatch): + """Test handling of permission denied when reading config file.""" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + # Create a config file + config_file = temp_netplan_dir / "01-netcfg.yaml" + config_file.write_text("network:\n version: 2\n") + + validator = NetplanValidator(config_file) + + # Mock Path.read_text to raise PermissionError + with patch.object(Path, "read_text", side_effect=PermissionError("Permission denied")): + result = validator.validate_file() + + assert result.is_valid is False + assert any("permission denied" in err.lower() for err in result.errors) + + def test_generic_read_error(self, temp_netplan_dir, temp_backup_dir, monkeypatch): + """Test handling of generic read errors.""" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + # Create a config file + config_file = temp_netplan_dir / "01-netcfg.yaml" + config_file.write_text("network:\n version: 2\n") + + validator = NetplanValidator(config_file) + + # Mock Path.read_text to raise generic exception + with patch.object(Path, "read_text", side_effect=Exception("Read error")): + result = validator.validate_file() + + assert result.is_valid is False + assert any("error reading file" in err.lower() for err in result.errors) + + +class TestAdvancedDryRunFeatures: + """Test advanced dry-run and interactive features.""" + + @patch("subprocess.run") + @patch("cortex.netplan_validator.NetplanValidator._test_connectivity") + @patch("cortex.netplan_validator.NetplanValidator._countdown_confirmation") + def test_dry_run_user_confirms( + self, + mock_countdown, + mock_connectivity, + mock_run, + temp_netplan_dir, + netplan_validator, + valid_netplan_config, + ): + """Test dry-run when user confirms changes.""" + # Mock successful operations + mock_run.return_value = Mock(returncode=0, stderr="") + mock_connectivity.return_value = True + mock_countdown.return_value = True + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + result = netplan_validator.dry_run_with_revert(new_file, timeout=5) + + assert result is True + mock_connectivity.assert_called_once() + mock_countdown.assert_called_once_with(5) + + @patch("subprocess.run") + @patch("cortex.netplan_validator.NetplanValidator._test_connectivity") + @patch("cortex.netplan_validator.NetplanValidator._countdown_confirmation") + def test_dry_run_user_cancels( + self, + mock_countdown, + mock_connectivity, + mock_run, + temp_netplan_dir, + netplan_validator, + valid_netplan_config, + ): + """Test dry-run when user doesn't confirm (timeout).""" + # Mock successful operations but user doesn't confirm + mock_run.return_value = Mock(returncode=0, stderr="") + mock_connectivity.return_value = True + mock_countdown.return_value = False + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + result = netplan_validator.dry_run_with_revert(new_file, timeout=5) + + assert result is False + mock_countdown.assert_called_once_with(5) + + @patch("subprocess.run") + @patch("cortex.netplan_validator.NetplanValidator._test_connectivity") + def test_dry_run_connectivity_fails( + self, mock_connectivity, mock_run, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test dry-run when connectivity test fails.""" + # Mock successful apply but failed connectivity + mock_run.return_value = Mock(returncode=0, stderr="") + mock_connectivity.return_value = False + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + result = netplan_validator.dry_run_with_revert(new_file, timeout=5) + + assert result is False + mock_connectivity.assert_called_once() + + def test_dry_run_validation_fails( + self, temp_netplan_dir, netplan_validator, invalid_yaml_config + ): + """Test dry-run when initial validation fails.""" + invalid_file = temp_netplan_dir / "invalid.yaml" + invalid_file.write_text(invalid_yaml_config) + + result = netplan_validator.dry_run_with_revert(invalid_file, timeout=5) + + assert result is False + + def test_dry_run_backup_fails(self, temp_netplan_dir, netplan_validator, valid_netplan_config): + """Test dry-run when backup creation fails.""" + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + # Mock backup to fail + with patch.object( + NetplanValidator, "backup_current_config", side_effect=OSError("Backup failed") + ): + result = netplan_validator.dry_run_with_revert(new_file, timeout=5) + + assert result is False + + @patch("subprocess.run") + def test_dry_run_apply_fails( + self, mock_run, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test dry-run when apply fails.""" + # Mock failed netplan apply + mock_run.return_value = Mock(returncode=1, stderr="Apply failed") + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + result = netplan_validator.dry_run_with_revert(new_file, timeout=5) + + assert result is False + + +class TestBackupEdgeCases: + """Test backup-related edge cases.""" + + def test_backup_with_io_error( + self, temp_netplan_dir, temp_backup_dir, valid_netplan_config, monkeypatch + ): + """Test backup when copy operation fails.""" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + config_file = temp_netplan_dir / "01-netcfg.yaml" + config_file.write_text(valid_netplan_config) + + validator = NetplanValidator(config_file) + + # Mock shutil.copy2 to fail + with patch("shutil.copy2", side_effect=OSError("Disk full")): + with pytest.raises(OSError, match="Failed to create backup"): + validator.backup_current_config() + + +class TestApplyConfigEdgeCases: + """Test apply_config edge cases.""" + + @patch("subprocess.run") + def test_apply_config_with_timeout( + self, mock_run, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test apply_config when netplan times out.""" + # Mock timeout + mock_run.side_effect = subprocess.TimeoutExpired("netplan", 30) + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + success, message = netplan_validator.apply_config(new_file, backup=False) + + assert success is False + assert "timed out" in message.lower() + + @patch("subprocess.run") + @patch("shutil.copy2") + def test_apply_config_copy_fails( + self, mock_copy, mock_run, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test apply_config when copying config fails.""" + mock_copy.side_effect = OSError("Permission denied") + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + success, message = netplan_validator.apply_config(new_file, backup=False) + + assert success is False + assert "failed to apply" in message.lower() + + +class TestRevertConfig: + """Test configuration revert functionality.""" + + @patch("subprocess.run") + @patch("shutil.copy2") + def test_revert_config_success( + self, mock_copy, mock_run, temp_netplan_dir, temp_backup_dir, netplan_validator + ): + """Test successful config revert.""" + mock_copy.return_value = None + mock_run.return_value = Mock(returncode=0) + + backup_path = temp_backup_dir / "backup.yaml" + backup_path.write_text("network:\n version: 2\n") + + # Should not raise + netplan_validator._revert_config(backup_path) + + mock_copy.assert_called_once() + mock_run.assert_called_once() + + @patch("subprocess.run") + @patch("shutil.copy2") + def test_revert_config_fails( + self, mock_copy, mock_run, temp_netplan_dir, temp_backup_dir, netplan_validator + ): + """Test config revert failure.""" + mock_copy.side_effect = OSError("Copy failed") + + backup_path = temp_backup_dir / "backup.yaml" + backup_path.write_text("network:\n version: 2\n") + + # Should handle error gracefully + netplan_validator._revert_config(backup_path) + + mock_copy.assert_called_once() + + +class TestDiffEdgeCases: + """Test diff generation edge cases.""" + + def test_diff_current_file_not_exists(self, temp_netplan_dir, temp_backup_dir, monkeypatch): + """Test diff when current config doesn't exist.""" + monkeypatch.setattr(NetplanValidator, "NETPLAN_DIR", temp_netplan_dir) + monkeypatch.setattr(NetplanValidator, "BACKUP_DIR", temp_backup_dir) + + # Create validator with non-existent current file + nonexistent = temp_netplan_dir / "nonexistent.yaml" + validator = NetplanValidator(nonexistent) + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text("network:\n version: 2\n") + + diff = validator.generate_diff(new_file) + + assert "does not exist" in diff.lower() + + def test_diff_with_read_error(self, temp_netplan_dir, netplan_validator, valid_netplan_config): + """Test diff generation when file read fails.""" + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + # Mock read_text to fail + with patch.object(Path, "read_text", side_effect=Exception("Read error")): + diff = netplan_validator.generate_diff(new_file) + + assert "error generating diff" in diff.lower() + + +class TestShowDiff: + """Test show_diff display functionality.""" + + def test_show_diff_with_changes( + self, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test show_diff with actual changes.""" + modified_config = valid_netplan_config.replace("dhcp4: true", "dhcp4: false") + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(modified_config) + + # Should not raise + netplan_validator.show_diff(new_file) + + def test_show_diff_no_changes(self, temp_netplan_dir, netplan_validator, valid_netplan_config): + """Test show_diff with no changes.""" + new_file = temp_netplan_dir / "identical.yaml" + new_file.write_text(valid_netplan_config) + + # Should not raise + netplan_validator.show_diff(new_file) + + def test_show_diff_with_all_line_types(self, temp_netplan_dir, netplan_validator): + """Test showing diff with all different line types including @@ markers.""" + # Create modified config + modified = """network: + version: 2 + ethernets: + eth0: + dhcp4: false + addresses: + - 192.168.1.100/24 +""" + new_file = temp_netplan_dir / "modified.yaml" + new_file.write_text(modified) + + # Generate and show diff - should handle @@ markers + netplan_validator.show_diff(new_file) + + +class TestNetworkInterfaceInitialization: + """Test NetworkInterface initialization with None values.""" + + def test_network_interface_with_none_values(self): + """Test that None values are properly initialized to empty lists/dicts.""" + from cortex.netplan_validator import NetworkInterface + + # Create interface with None values (which should be converted to empty collections) + iface = NetworkInterface(name="eth0") + + # These should be initialized to empty collections even if not provided + assert isinstance(iface.addresses, list) + assert isinstance(iface.nameservers, dict) + assert isinstance(iface.routes, list) + assert len(iface.addresses) == 0 + assert len(iface.nameservers) == 0 + assert len(iface.routes) == 0 + + +class TestYAMLErrorHandling: + """Test YAML error handling edge cases.""" + + def test_yaml_error_without_mark(self, temp_netplan_dir): + """Test handling of YAML errors without mark information.""" + config_file = temp_netplan_dir / "test.yaml" + # Create a YAML error that won't have mark info + config_file.write_text("network:\n - invalid list where dict expected\n nested: value") + + validator = NetplanValidator(str(config_file)) + + # Read the file content and test + with open(config_file) as f: + content = f.read() + + result = validator.validate_yaml_syntax(content) + + # Should still report error even without mark + assert result.is_valid is False + assert len(result.errors) > 0 + assert "YAML syntax error" in result.errors[0] + + +class TestVersionValidation: + """Test version validation edge cases.""" + + def test_version_not_equal_to_2(self, temp_netplan_dir): + """Test warning when version is not 2.""" + config = """network: + version: 1 + ethernets: + eth0: + dhcp4: true +""" + config_file = temp_netplan_dir / "test.yaml" + config_file.write_text(config) + + validator = NetplanValidator(str(config_file)) + + # Load and parse the config + import yaml + + with open(config_file) as f: + config_dict = yaml.safe_load(f) + + result = validator.validate_semantics(config_dict) + + # Should have warning about version + assert any("version" in w.lower() for w in result.warnings) + assert any("version 1" in w.lower() or "Using version 1" in w for w in result.warnings) + + +class TestGatewayAndNameserverValidation: + """Test gateway6 and nameserver validation.""" + + def test_invalid_gateway6(self, temp_netplan_dir): + """Test validation of invalid gateway6.""" + config = """network: + version: 2 + ethernets: + eth0: + addresses: + - 2001:db8::1/64 + gateway6: invalid::gateway::address +""" + config_file = temp_netplan_dir / "test.yaml" + config_file.write_text(config) + + validator = NetplanValidator(str(config_file)) + + # Load and parse the config + import yaml + + with open(config_file) as f: + config_dict = yaml.safe_load(f) + + result = validator.validate_semantics(config_dict) + + # Should have error about gateway6 + assert result.is_valid is False + assert any("gateway6" in e.lower() for e in result.errors) + + def test_invalid_nameserver_addresses(self, temp_netplan_dir): + """Test validation of invalid nameserver addresses.""" + config = """network: + version: 2 + ethernets: + eth0: + dhcp4: true + nameservers: + addresses: + - 8.8.8.8 + - invalid.nameserver.address + - 1.1.1.1 +""" + config_file = temp_netplan_dir / "test.yaml" + config_file.write_text(config) + + validator = NetplanValidator(str(config_file)) + + # Load and parse the config + import yaml + + with open(config_file) as f: + config_dict = yaml.safe_load(f) + + result = validator.validate_semantics(config_dict) + + # Should have error about nameserver + assert result.is_valid is False + assert any("nameserver" in e.lower() for e in result.errors) + + def test_valid_nameservers(self, temp_netplan_dir): + """Test validation of valid nameserver configuration.""" + config = """network: + version: 2 + ethernets: + eth0: + dhcp4: true + nameservers: + addresses: + - 8.8.8.8 + - 8.8.4.4 + - 2001:4860:4860::8888 +""" + config_file = temp_netplan_dir / "test.yaml" + config_file.write_text(config) + + validator = NetplanValidator(str(config_file)) + + # Load and parse the config + import yaml + + with open(config_file) as f: + config_dict = yaml.safe_load(f) + + result = validator.validate_semantics(config_dict) + + # Should be valid + assert result.is_valid is True + + +class TestRouteValidationInConfig: + """Test route validation within interface configuration.""" + + def test_interface_with_invalid_routes(self, temp_netplan_dir): + """Test interface with invalid route configuration.""" + config = """network: + version: 2 + ethernets: + eth0: + addresses: + - 192.168.1.100/24 + routes: + - to: invalid-destination + via: 192.168.1.1 + - to: 10.0.0.0/8 + via: invalid-gateway +""" + config_file = temp_netplan_dir / "test.yaml" + config_file.write_text(config) + + validator = NetplanValidator(str(config_file)) + + # Load and parse the config + import yaml + + with open(config_file) as f: + config_dict = yaml.safe_load(f) + + result = validator.validate_semantics(config_dict) + + # Should have errors about routes + assert result.is_valid is False + assert any("route" in e.lower() for e in result.errors) + + +class TestBackupFailureInApply: + """Test backup failure during apply_config.""" + + @patch("cortex.netplan_validator.NetplanValidator.backup_current_config") + def test_apply_with_backup_failure( + self, mock_backup, temp_netplan_dir, netplan_validator, valid_netplan_config + ): + """Test apply_config when backup fails.""" + mock_backup.side_effect = Exception("Backup failed due to disk full") + + new_file = temp_netplan_dir / "new.yaml" + new_file.write_text(valid_netplan_config) + + success, message = netplan_validator.apply_config(new_file, backup=True) + + assert success is False + assert "backup failed" in message.lower() + + +class TestShowDiffElseBranch: + """Test the else branch in show_diff for lines that don't start with +, -, or @@.""" + + def test_show_diff_with_unchanged_lines(self, temp_netplan_dir, netplan_validator): + """Test diff display with unchanged context lines.""" + # Create modified config + modified = """network: + version: 2 + ethernets: + eth0: + dhcp4: false + addresses: + - 192.168.1.200/24 +""" + new_file = temp_netplan_dir / "modified.yaml" + new_file.write_text(modified) + + # This will generate a diff with unchanged context lines (the else branch) + netplan_validator.show_diff(new_file) + + +class TestCountdownConfirmationPaths: + """Test countdown confirmation with different execution paths.""" + + @patch("time.sleep") + @patch("builtins.input", return_value="y") + @patch("termios.tcgetattr", side_effect=ImportError("No termios")) + def test_countdown_with_input_fallback_confirms( + self, mock_tcgetattr, mock_input, mock_sleep, temp_netplan_dir, valid_netplan_config + ): + """Test countdown using input() fallback when termios fails - user confirms.""" + # Setup + config_file = temp_netplan_dir / "config.yaml" + config_file.write_text(valid_netplan_config) + validator = NetplanValidator(str(config_file)) + + # Call method - termios will fail, fallback to input() + result = validator._countdown_confirmation(timeout=1) + + # Input was mocked to return 'y' + assert isinstance(result, bool) + + @patch("time.sleep") + @patch("builtins.input", return_value="n") + @patch("termios.tcgetattr", side_effect=ImportError("No termios")) + def test_countdown_with_input_fallback_no_confirm( + self, mock_tcgetattr, mock_input, mock_sleep, temp_netplan_dir, valid_netplan_config + ): + """Test countdown using input() fallback when termios fails - user doesn't confirm.""" + # Setup + config_file = temp_netplan_dir / "config.yaml" + config_file.write_text(valid_netplan_config) + validator = NetplanValidator(str(config_file)) + + # Call method - termios will fail, fallback to input() + result = validator._countdown_confirmation(timeout=1) + + # Input was mocked to return 'n' + assert isinstance(result, bool) + + @patch("time.sleep") + @patch("sys.stdin") + @patch("termios.tcgetattr", return_value=["fake_settings"]) + @patch("termios.tcsetattr") + @patch("tty.setraw") + def test_countdown_with_termios_user_presses_y( + self, + mock_setraw, + mock_tcsetattr, + mock_tcgetattr, + mock_stdin, + mock_sleep, + temp_netplan_dir, + valid_netplan_config, + ): + """Test countdown using termios when user presses 'y'.""" + # Setup + config_file = temp_netplan_dir / "config.yaml" + config_file.write_text(valid_netplan_config) + validator = NetplanValidator(str(config_file)) + + # Mock stdin to simulate user pressing 'y' + mock_stdin.fileno.return_value = 0 + mock_stdin.read.return_value = "y" + + result = validator._countdown_confirmation(timeout=1) + + # Verify method completed + assert isinstance(result, bool) + + @patch("time.sleep") + @patch("sys.stdin") + @patch("termios.tcgetattr", return_value=["fake_settings"]) + @patch("termios.tcsetattr") + @patch("tty.setraw") + def test_countdown_with_termios_user_presses_n( + self, + mock_setraw, + mock_tcsetattr, + mock_tcgetattr, + mock_stdin, + mock_sleep, + temp_netplan_dir, + valid_netplan_config, + ): + """Test countdown using termios when user presses a key other than 'y'.""" + # Setup + config_file = temp_netplan_dir / "config.yaml" + config_file.write_text(valid_netplan_config) + validator = NetplanValidator(str(config_file)) + + # Mock stdin to simulate user pressing 'n' + mock_stdin.fileno.return_value = 0 + mock_stdin.read.return_value = "n" + + result = validator._countdown_confirmation(timeout=1) + + # Verify method completed + assert isinstance(result, bool) + + @patch("time.sleep") + def test_countdown_timeout_expires_no_confirmation( + self, mock_sleep, temp_netplan_dir, valid_netplan_config + ): + """Test countdown when timeout is very short.""" + # Setup + config_file = temp_netplan_dir / "config.yaml" + config_file.write_text(valid_netplan_config) + validator = NetplanValidator(str(config_file)) + + # Very short timeout + result = validator._countdown_confirmation(timeout=0) + + # When timeout is 0, range(0, 0, -1) produces no iterations + assert isinstance(result, bool) + + @patch("time.sleep") + @patch("builtins.input", return_value="y") + @patch("termios.tcgetattr", side_effect=OSError("Terminal error")) + def test_countdown_with_generic_exception_in_termios( + self, mock_tcgetattr, mock_input, mock_sleep, temp_netplan_dir, valid_netplan_config + ): + """Test countdown when termios raises a generic exception.""" + # Setup + config_file = temp_netplan_dir / "config.yaml" + config_file.write_text(valid_netplan_config) + validator = NetplanValidator(str(config_file)) + + # Call method - termios will fail with OSError, fallback to input() + result = validator._countdown_confirmation(timeout=1) + + assert isinstance(result, bool) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])