diff --git a/docs/remediation-reporting.md b/docs/remediation-reporting.md new file mode 100644 index 0000000..dcfaa20 --- /dev/null +++ b/docs/remediation-reporting.md @@ -0,0 +1,898 @@ +# Remediation Reporting + +hier_config provides powerful reporting capabilities for aggregating and analyzing remediation configurations from multiple network devices. This enables network engineers to understand the scope of changes across their infrastructure, prioritize work, and generate reports for change management. + +## Overview + +The `RemediationReporter` class allows you to: + +- **Merge multiple device remediations** into a single hierarchical tree +- **Track instances** of each configuration change across devices +- **Generate statistics** about remediation scope and impact +- **Filter by tags** to create category-specific reports +- **Export reports** in multiple formats (JSON, CSV, Markdown, Text) +- **Query specific changes** to understand their impact + +## Quick Start + +### Basic Usage + +```python +from hier_config import ( + RemediationReporter, + WorkflowRemediation, + get_hconfig, + Platform, +) + +# Generate remediations for each device +devices_remediations = [] +for device in devices: + running = get_hconfig(Platform.CISCO_IOS, device.running_config) + generated = get_hconfig(Platform.CISCO_IOS, device.generated_config) + wfr = WorkflowRemediation(running, generated) + devices_remediations.append(wfr.remediation_config) + +# Create reporter and merge all remediations +reporter = RemediationReporter.from_remediations(devices_remediations) + +# Get summary statistics +summary = reporter.summary() +print(f"Total devices: {summary.total_devices}") +print(f"Unique changes: {summary.total_unique_changes}") + +# Print human-readable summary +print(reporter.summary_text()) +``` + +### Output Example + +``` +Remediation Summary +================================================== +Total devices: 150 +Unique changes: 87 + +Top 10 Most Common Changes: +-------------------------------------------------- +1. line vty 0 4 + 145 devices (96.7%) +2. ntp server 10.2.2.2 + 132 devices (88.0%) +3. snmp-server community public RO + 89 devices (59.3%) +... +``` + +## Creating a Reporter + +### Method 1: From Remediations (Recommended) + +```python +reporter = RemediationReporter.from_remediations([ + device1_remediation, + device2_remediation, + device3_remediation, +]) +``` + +### Method 2: Add Incrementally + +```python +reporter = RemediationReporter() + +# Add one at a time +for device_remediation in device_remediations: + reporter.add_remediation(device_remediation) + +# Or add multiple at once +reporter.add_remediations(more_remediations) +``` + +### Method 3: From Pre-Merged Config + +```python +# If you already have a merged configuration +merged = get_hconfig(Platform.CISCO_IOS) +merged.merge([device1, device2, device3]) + +reporter = RemediationReporter.from_merged_config(merged) +``` + +## Querying Changes + +### Count Devices Affected by a Change + +```python +# How many devices need this specific change? +count = reporter.get_device_count("line vty 0 4") +print(f"{count} devices need this change") +``` + +### Get Detailed Information + +```python +detail = reporter.get_change_detail("ntp server 10.2.2.2") + +print(f"Line: {detail.line}") +print(f"Affects {detail.device_count} devices") +print(f"Device IDs: {detail.device_ids}") +print(f"Tags: {detail.tags}") +print(f"Comments: {detail.comments}") + +# Access individual instances +for instance in detail.instances: + print(f"Device {instance.id}: {instance.tags}") +``` + +### Find High-Impact Changes + +```python +# Changes affecting at least 50 devices +high_impact = reporter.get_changes_by_threshold(min_devices=50) + +for change in high_impact: + print(f"{change.text}: {len(change.instances)} devices") + +# Changes affecting between 1-5 devices +isolated = reporter.get_changes_by_threshold( + min_devices=1, + max_devices=5, +) +``` + +### Get Top N Most Common Changes + +```python +top_10 = reporter.get_top_changes(10) + +for child, count in top_10: + print(f"{child.text}: {count} devices") +``` + +### Pattern Matching + +```python +# Find all VLAN interface changes +vlan_changes = reporter.get_changes_matching(r"interface Vlan\d+") + +# Find all NTP-related changes +ntp_changes = reporter.get_changes_matching(r"ntp") + +# More complex regex +acl_changes = reporter.get_changes_matching(r"access-list \d+ (permit|deny)") +``` + +## Tag-Based Reporting + +Tags allow you to categorize changes and generate filtered reports. + +### Apply Tag Rules + +```python +from hier_config import TagRule, MatchRule + +# Define tag rules +tag_rules = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp", "time-sync", "safe"}), + ), + TagRule( + match_rules=(MatchRule(startswith="snmp"),), + apply_tags=frozenset({"snmp", "monitoring"}), + ), + TagRule( + match_rules=(MatchRule(startswith="line vty"),), + apply_tags=frozenset({"security", "access", "critical"}), + ), + TagRule( + match_rules=(MatchRule(contains="password"),), + apply_tags=frozenset({"security", "critical"}), + ), +] + +# Apply tags to the merged configuration +reporter.apply_tag_rules(tag_rules) +``` + +### Filter by Tags + +```python +# Get only NTP changes +ntp_changes = reporter.get_all_changes(include_tags=["ntp"]) + +# Get only security-related changes +security_changes = reporter.get_all_changes(include_tags=["security"]) + +# Get all changes except those tagged as "critical" +safe_changes = reporter.get_all_changes(exclude_tags=["critical"]) + +# Combine filters: security changes that are not critical +moderate_security = reporter.get_all_changes( + include_tags=["security"], + exclude_tags=["critical"], +) +``` + +### Tag-Based Summary + +```python +# Get summary breakdown by tags +tag_summary = reporter.summary_by_tags(["security", "ntp", "snmp"]) + +for tag, stats in tag_summary.items(): + print(f"\n{tag.upper()} Changes:") + print(f" Devices affected: {stats['device_count']}") + print(f" Total changes: {stats['change_count']}") + print(f" Changes:") + for change in stats['changes'][:5]: # Show first 5 + print(f" - {change}") +``` + +Output: +``` +SECURITY Changes: + Devices affected: 145 + Total changes: 23 + Changes: + - line vty 0 4 + - enable secret 5 ... + - username admin privilege 15 + +NTP Changes: + Devices affected: 132 + Total changes: 8 + Changes: + - ntp server 10.2.2.2 + - ntp authenticate +``` + +## Analysis and Statistics + +### Summary Statistics + +```python +summary = reporter.summary() + +print(f"Total devices: {summary.total_devices}") +print(f"Unique changes: {summary.total_unique_changes}") + +# Top changes +for line, count in summary.most_common_changes[:5]: + print(f"{line}: {count} devices") + +# Changes by tag +for tag, count in summary.changes_by_tag.items(): + print(f"{tag}: {count} changes") +``` + +### Impact Distribution + +```python +# Get distribution of changes by device impact +distribution = reporter.get_impact_distribution( + bins=[1, 10, 25, 50, 100] +) + +for range_label, count in distribution.items(): + print(f"{range_label} devices: {count} changes") +``` + +Output: +``` +1-10 devices: 15 changes +10-25 devices: 8 changes +25-50 devices: 5 changes +50-100 devices: 3 changes +100+ devices: 2 changes +``` + +### Tag Distribution + +```python +tag_dist = reporter.get_tag_distribution() + +for tag, count in sorted(tag_dist.items(), key=lambda x: x[1], reverse=True): + print(f"{tag}: {count} occurrences") +``` + +### Group by Parent + +```python +# Group changes by their parent configuration context +grouped = reporter.group_by_parent() + +for parent, children in grouped.items(): + print(f"\n{parent}:") + for child in children[:3]: # Show first 3 + print(f" - {child.text} ({len(child.instances)} devices)") +``` + +Output: +``` +interface Vlan2: + - ip address 10.0.0.2 255.255.255.0 (15 devices) + - description Updated (12 devices) + +line vty 0 4: + - transport input ssh (145 devices) + - exec-timeout 5 0 (145 devices) +``` + +## Exporting Reports + +### Export to Text + +```python +# Export with merged style (shows instance counts) +reporter.to_text("remediation.txt", style="merged") + +# Export with comments +reporter.to_text("remediation_comments.txt", style="with_comments") + +# Export without comments (standard config format) +reporter.to_text("remediation_clean.txt", style="without_comments") + +# Export only security changes +reporter.to_text( + "security_remediation.txt", + style="merged", + include_tags=["security"], +) +``` + +**Text Output Example (`style="merged"`):** +``` +interface Vlan2 !15 instances + ip address 10.0.0.2 255.255.255.0 !15 instances +line vty 0 4 !145 instances + transport input ssh !145 instances + exec-timeout 5 0 !145 instances +ntp server 10.2.2.2 !132 instances +``` + +### Export to JSON + +```python +reporter.to_json("remediation_report.json") + +# With tag filters +reporter.to_json( + "ntp_report.json", + include_tags=["ntp"], +) + +# Custom indentation +reporter.to_json("remediation_report.json", indent=4) +``` + +**JSON Output Structure:** +```json +{ + "summary": { + "total_devices": 150, + "total_unique_changes": 87 + }, + "changes": [ + { + "line": "line vty 0 4", + "device_count": 145, + "device_ids": [1, 2, 3, ...], + "tags": ["security", "access", "critical"], + "comments": ["Update VTY settings"], + "instances": [ + { + "id": 1, + "tags": ["security"], + "comments": ["Update VTY settings"] + }, + ... + ] + } + ] +} +``` + +### Export to CSV + +```python +reporter.to_csv("remediation_report.csv") + +# With tag filters +reporter.to_csv( + "security_report.csv", + include_tags=["security"], +) +``` + +**CSV Output:** +```csv +line,device_count,percentage,tags,comments,device_ids +"line vty 0 4",145,96.7,"security,access,critical","Update VTY settings","1,2,3,..." +"ntp server 10.2.2.2",132,88.0,"ntp,safe","Update NTP server","1,2,3,..." +``` + +### Export to Markdown + +```python +reporter.to_markdown("remediation_report.md", top_n=20) + +# With tag filters +reporter.to_markdown( + "security_report.md", + include_tags=["security"], + top_n=10, +) +``` + +**Markdown Output:** +```markdown +# Remediation Report + +## Summary + +- **Total Devices**: 150 +- **Unique Changes**: 87 + +## Top 20 Changes by Impact + +| # | Configuration Line | Device Count | Percentage | +|---|-------------------|--------------|------------| +| 1 | `line vty 0 4` | 145 | 96.7% | +| 2 | `ntp server 10.2.2.2` | 132 | 88.0% | +... + +## Changes by Tag + +| Tag | Count | +|-----|-------| +| security | 45 | +| ntp | 32 | +``` + +### Export All Formats + +```python +# Export all formats at once +reporter.export_all( + output_dir="reports/", + formats=["json", "csv", "markdown", "text"], +) + +# With tag filters +reporter.export_all( + output_dir="reports/security/", + formats=["json", "csv", "markdown"], + include_tags=["security"], +) +``` + +This creates: +- `reports/remediation_report.json` +- `reports/remediation_report.csv` +- `reports/remediation_report.md` +- `reports/remediation_report.txt` + +## Real-World Use Cases + +### Use Case 1: Impact Analysis + +**Question**: *"How many devices will be affected if I push NTP server changes?"* + +```python +from hier_config import RemediationReporter + +reporter = RemediationReporter.from_remediations(all_device_remediations) + +ntp_count = reporter.get_device_count("ntp server 10.2.2.2") +print(f"NTP change will affect {ntp_count} devices") + +# Get the specific device IDs +detail = reporter.get_change_detail("ntp server 10.2.2.2") +print(f"Affected device IDs: {sorted(detail.device_ids)}") +``` + +### Use Case 2: Risk-Based Change Management + +**Scenario**: Separate changes into risk categories + +```python +from hier_config import TagRule, MatchRule + +# Define risk-based tagging +tag_rules = [ + TagRule( + match_rules=( + MatchRule(startswith="ntp"), + MatchRule(startswith="logging"), + ), + apply_tags=frozenset({"low-risk", "safe"}), + ), + TagRule( + match_rules=(MatchRule(contains="password"),), + apply_tags=frozenset({"high-risk", "critical"}), + ), + TagRule( + match_rules=( + MatchRule(startswith="interface"), + MatchRule(startswith="routing"), + ), + apply_tags=frozenset({"medium-risk", "requires-review"}), + ), +] + +reporter.apply_tag_rules(tag_rules) + +# Generate separate change windows +reporter.to_text("low_risk_changes.txt", include_tags=["low-risk"]) +reporter.to_text("high_risk_changes.txt", include_tags=["high-risk"]) +reporter.to_text("medium_risk_changes.txt", include_tags=["medium-risk"]) + +# Get statistics +low_risk_changes = reporter.get_all_changes(include_tags=["low-risk"]) +high_risk_changes = reporter.get_all_changes(include_tags=["high-risk"]) + +print(f"Low risk: {len(low_risk_changes)} change types") +print(f"High risk: {len(high_risk_changes)} change types") +``` + +### Use Case 3: Compliance Reporting + +**Scenario**: Generate audit reports for security compliance + +```python +# Tag security-related changes +security_tags = [ + TagRule( + match_rules=(MatchRule(contains="password"),), + apply_tags=frozenset({"security", "authentication"}), + ), + TagRule( + match_rules=(MatchRule(startswith="line vty"),), + apply_tags=frozenset({"security", "access-control"}), + ), + TagRule( + match_rules=(MatchRule(startswith="snmp"),), + apply_tags=frozenset({"security", "monitoring"}), + ), +] + +reporter.apply_tag_rules(security_tags) + +# Generate compliance report +security_summary = reporter.summary_by_tags(["security"]) + +print("Security Compliance Remediation Report") +print("=" * 50) +for tag, stats in security_summary.items(): + print(f"\nCategory: {tag}") + print(f"Devices requiring changes: {stats['device_count']}") + print(f"Total change items: {stats['change_count']}") + +# Export for audit trail +reporter.to_markdown( + "security_compliance_report.md", + include_tags=["security"], +) +``` + +### Use Case 4: Prioritization by Scope + +**Scenario**: Focus on changes affecting the most devices + +```python +# Get the top 10 most widespread changes +top_changes = reporter.get_top_changes(10) + +print("Top 10 Changes by Device Count") +print("=" * 60) +for i, (change, count) in enumerate(top_changes, 1): + percentage = (count / reporter.device_count) * 100 + print(f"{i}. {change.text}") + print(f" Affects {count} devices ({percentage:.1f}%)") + print() + +# Focus on changes affecting >80% of devices +high_impact = reporter.get_changes_by_threshold( + min_devices=int(reporter.device_count * 0.8) +) + +print(f"\nFound {len(high_impact)} changes affecting >80% of devices") +``` + +### Use Case 5: Category-Based Rollout + +**Scenario**: Deploy changes in stages by category + +```python +# Tag by functional category +category_tags = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"phase-1", "time-services"}), + ), + TagRule( + match_rules=(MatchRule(startswith="logging"),), + apply_tags=frozenset({"phase-1", "logging"}), + ), + TagRule( + match_rules=(MatchRule(startswith="snmp"),), + apply_tags=frozenset({"phase-2", "monitoring"}), + ), + TagRule( + match_rules=(MatchRule(startswith="interface"),), + apply_tags=frozenset({"phase-3", "interfaces"}), + ), +] + +reporter.apply_tag_rules(category_tags) + +# Generate phase-specific remediations +for phase in ["phase-1", "phase-2", "phase-3"]: + reporter.export_all( + output_dir=f"rollout/{phase}/", + formats=["json", "csv", "text"], + include_tags=[phase], + ) + + # Get statistics for each phase + phase_changes = reporter.get_all_changes(include_tags=[phase]) + print(f"{phase}: {len(phase_changes)} change types") +``` + +### Use Case 6: Exception Reporting + +**Scenario**: Find devices with unique or uncommon changes + +```python +# Find changes affecting only 1-3 devices (potential exceptions) +exceptions = reporter.get_changes_by_threshold( + min_devices=1, + max_devices=3, +) + +print(f"Found {len(exceptions)} exceptional changes") +print("\nUncommon Configuration Changes:") +print("=" * 60) + +for change in exceptions: + detail = reporter.get_change_detail(change.text) + print(f"\n{change.text}") + print(f" Affects {detail.device_count} device(s): {sorted(detail.device_ids)}") + if detail.comments: + print(f" Comments: {', '.join(detail.comments)}") +``` + +## Best Practices + +### 1. Apply Tags for Better Organization + +Always apply tags to enable flexible filtering and reporting: + +```python +tag_rules = [ + # By function + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp", "infrastructure"}), + ), + # By risk + TagRule( + match_rules=(MatchRule(contains="password"),), + apply_tags=frozenset({"critical", "security"}), + ), + # By deployment phase + TagRule( + match_rules=(MatchRule(startswith="logging"),), + apply_tags=frozenset({"phase-1"}), + ), +] + +reporter.apply_tag_rules(tag_rules) +``` + +### 2. Use Multiple Report Formats + +Different stakeholders need different formats: + +```python +# Technical team: detailed JSON +reporter.to_json("detailed_report.json") + +# Management: summary markdown +reporter.to_markdown("executive_summary.md", top_n=10) + +# Analysis: CSV for Excel +reporter.to_csv("analysis_data.csv") + +# Implementation: text config +reporter.to_text("deployment_config.txt", style="without_comments") +``` + +### 3. Validate Scope Before Deployment + +Always check impact before pushing changes: + +```python +# Check high-impact changes +high_impact = reporter.get_changes_by_threshold(min_devices=100) + +if high_impact: + print("WARNING: The following changes affect >100 devices:") + for change in high_impact: + print(f" - {change.text}: {len(change.instances)} devices") + + # Require manual approval + approval = input("Proceed? (yes/no): ") + if approval.lower() != "yes": + print("Deployment cancelled") + exit(1) +``` + +### 4. Track Changes Over Time + +Export reports with timestamps for historical tracking: + +```python +from datetime import datetime + +timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +output_dir = f"reports/{timestamp}/" + +reporter.export_all(output_dir, formats=["json", "markdown", "csv"]) +print(f"Reports saved to {output_dir}") +``` + +### 5. Combine with Existing Workflows + +Integrate reporting into your existing remediation workflow: + +```python +from hier_config import WorkflowRemediation, RemediationReporter + +# Generate remediations +remediations = [] +for device in inventory: + running = get_device_config(device) + generated = generate_desired_config(device) + + wfr = WorkflowRemediation(running, generated) + remediations.append(wfr.remediation_config) + +# Create reports +reporter = RemediationReporter.from_remediations(remediations) + +# Apply your organization's tag rules +reporter.apply_tag_rules(company_tag_rules) + +# Generate required reports +reporter.export_all("reports/current/") + +# Print summary for quick review +print(reporter.summary_text()) +``` + +## API Reference + +### RemediationReporter Class + +#### Constructor + +```python +reporter = RemediationReporter() +``` + +#### Class Methods + +- `from_remediations(remediations)` - Create from iterable of HConfig objects +- `from_merged_config(merged_config)` - Create from pre-merged HConfig + +#### Instance Methods + +**Adding Data:** +- `add_remediation(remediation)` - Add single remediation +- `add_remediations(remediations)` - Add multiple remediations + +**Tagging:** +- `apply_tag_rules(tag_rules)` - Apply TagRule sequence + +**Querying:** +- `get_all_changes(include_tags=[], exclude_tags=[])` - Get all changes +- `get_change_detail(line, tag=None)` - Get detailed info about a line +- `get_device_count(line, tag=None)` - Count devices needing a change +- `get_changes_by_threshold(min_devices=0, max_devices=None, ...)` - Filter by impact +- `get_top_changes(n=10, ...)` - Get N most common changes +- `get_changes_matching(pattern, ...)` - Get changes matching regex + +**Analysis:** +- `summary()` - Get ReportSummary object +- `summary_text(top_n=10)` - Get human-readable summary +- `summary_by_tags(tags=None)` - Get breakdown by tags +- `group_by_parent()` - Group changes by parent line +- `get_impact_distribution(bins=...)` - Get distribution of changes by impact +- `get_tag_distribution()` - Get tag occurrence counts + +**Exporting:** +- `to_text(file_path, style="merged", ...)` - Export to text file +- `to_json(file_path, indent=2, ...)` - Export to JSON +- `to_csv(file_path, ...)` - Export to CSV +- `to_markdown(file_path, top_n=20, ...)` - Export to Markdown +- `export_all(output_dir, formats=[], ...)` - Export all formats + +#### Properties + +- `merged_config` - The merged HConfig object +- `device_count` - Number of unique devices + +### Models + +#### ReportSummary + +```python +class ReportSummary: + total_devices: int + total_unique_changes: int + most_common_changes: tuple[tuple[str, int], ...] + changes_by_tag: dict[str, int] +``` + +#### ChangeDetail + +```python +class ChangeDetail: + line: str + full_path: tuple[str, ...] + device_count: int + device_ids: frozenset[int] + tags: frozenset[str] + comments: frozenset[str] + instances: tuple[Instance, ...] + children: tuple[ChangeDetail, ...] +``` + +## Troubleshooting + +### Issue: No changes showing up + +```python +# Check if remediations were added +print(f"Device count: {reporter.device_count}") + +# Check if remediation configs are empty +for i, remediation in enumerate(remediations): + change_count = len(tuple(remediation.all_children())) + print(f"Remediation {i}: {change_count} changes") +``` + +### Issue: Tags not working + +```python +# Verify tags were applied +all_changes = reporter.get_all_changes() +for change in all_changes[:5]: + print(f"{change.text}: tags={change.tags}") + +# Check tag distribution +print(reporter.get_tag_distribution()) +``` + +### Issue: Device count seems wrong + +```python +# Check unique device IDs +all_changes = reporter.get_all_changes() +all_device_ids = set() +for change in all_changes: + for instance in change.instances: + all_device_ids.add(instance.id) + +print(f"Unique device IDs found: {len(all_device_ids)}") +print(f"Reporter device count: {reporter.device_count}") +``` + +## See Also + +- [Tags](tags.md) - Learn more about tagging configuration lines +- [Custom Workflows](custom-workflows.md) - Integrate reporting into your workflows +- [Getting Started](getting-started.md) - Basic hier_config usage diff --git a/hier_config/__init__.py b/hier_config/__init__.py index d307054..9f78fe2 100644 --- a/hier_config/__init__.py +++ b/hier_config/__init__.py @@ -6,14 +6,20 @@ get_hconfig_from_dump, get_hconfig_view, ) -from .models import Platform +from .models import ChangeDetail, MatchRule, Platform, ReportSummary, TagRule +from .reporting import RemediationReporter from .root import HConfig from .workflows import WorkflowRemediation __all__ = ( + "ChangeDetail", "HConfig", "HConfigChild", + "MatchRule", "Platform", + "RemediationReporter", + "ReportSummary", + "TagRule", "WorkflowRemediation", "get_hconfig", "get_hconfig_driver", diff --git a/hier_config/base.py b/hier_config/base.py index 6ca7557..0941c76 100644 --- a/hier_config/base.py +++ b/hier_config/base.py @@ -216,7 +216,7 @@ def add_shallow_copy_of( merged: bool = False, ) -> HConfigChild: """Add a nested copy of a child_to_add to self.children.""" - new_child = self.add_child(child_to_add.text) + new_child = self.add_child(child_to_add.text, return_if_present=merged) if merged: new_child.instances.append(child_to_add.instance) diff --git a/hier_config/models.py b/hier_config/models.py index 3e61e81..d075793 100644 --- a/hier_config/models.py +++ b/hier_config/models.py @@ -109,3 +109,21 @@ class Platform(str, Enum): class Dump(BaseModel): lines: tuple[DumpLine, ...] + + +class ChangeDetail(BaseModel): + line: str + full_path: tuple[str, ...] + device_count: NonNegativeInt + device_ids: frozenset[PositiveInt] + tags: frozenset[str] + comments: frozenset[str] + instances: tuple[Instance, ...] + children: tuple["ChangeDetail", ...] = () + + +class ReportSummary(BaseModel): + total_devices: NonNegativeInt + total_unique_changes: NonNegativeInt + most_common_changes: tuple[tuple[str, NonNegativeInt], ...] + changes_by_tag: dict[str, NonNegativeInt] diff --git a/hier_config/reporting.py b/hier_config/reporting.py new file mode 100644 index 0000000..f2036a9 --- /dev/null +++ b/hier_config/reporting.py @@ -0,0 +1,921 @@ +"""Remediation reporting functionality for hier_config. + +This module provides tools for aggregating and analyzing remediation +configurations from multiple network devices. +""" + +import csv +import json +import operator +import re +from collections import Counter, defaultdict +from collections.abc import Iterable, Sequence +from pathlib import Path +from typing import Any + +from hier_config.child import HConfigChild +from hier_config.models import ChangeDetail, ReportSummary, TagRule +from hier_config.root import HConfig + + +class RemediationReporter: # noqa: PLR0904 + """A reporting tool for aggregating and analyzing remediation configurations. + + This class provides methods to merge multiple device remediations, + generate statistics, filter by tags, and export reports in various formats. + + Example: + ```python + from hier_config import RemediationReporter + + reporter = RemediationReporter() + reporter.add_remediations([device1_remediation, device2_remediation]) + + # Get summary statistics + summary = reporter.summary() + + # Export reports + reporter.to_json("report.json") + reporter.to_csv("report.csv") + ``` + + """ + + def __init__(self) -> None: + """Initialize a new RemediationReporter.""" + self._merged_config: HConfig | None = None + self._device_count: int = 0 + self._device_ids: set[int] = set() + + @property + def merged_config(self) -> HConfig: + """Get the merged configuration. + + Raises: + ValueError: If no remediations have been added yet. + + """ + if self._merged_config is None: + msg = "No remediations have been added yet" + raise ValueError(msg) + return self._merged_config + + @property + def device_count(self) -> int: + """Get the number of unique devices that have been added.""" + return self._device_count + + def add_remediation(self, remediation: HConfig) -> None: + """Add a single remediation configuration to the reporter. + + Args: + remediation: An HConfig object representing a device remediation. + + """ + device_id = id(remediation) + if device_id not in self._device_ids: + self._device_ids.add(device_id) + self._device_count += 1 + + if self._merged_config is None: + # Create a new empty HConfig with the same driver + self._merged_config = HConfig(remediation.driver) + + self._merged_config.merge(remediation) + + def add_remediations(self, remediations: Iterable[HConfig]) -> None: + """Add multiple remediation configurations to the reporter. + + Args: + remediations: An iterable of HConfig objects. + + """ + for remediation in remediations: + self.add_remediation(remediation) + + @classmethod + def from_remediations( + cls, + remediations: Iterable[HConfig], + ) -> "RemediationReporter": + """Create a RemediationReporter from an iterable of remediations. + + Args: + remediations: An iterable of HConfig objects. + + Returns: + A new RemediationReporter instance with all remediations merged. + + Example: + ```python + reporter = RemediationReporter.from_remediations( + [ + device1_remediation, + device2_remediation, + ] + ) + ``` + + """ + reporter = cls() + reporter.add_remediations(remediations) + return reporter + + @classmethod + def from_merged_config(cls, merged_config: HConfig) -> "RemediationReporter": + """Create a RemediationReporter from an already merged configuration. + + Args: + merged_config: An HConfig object with merged remediations. + + Returns: + A new RemediationReporter instance. + + Example: + ```python + merged = get_hconfig(Platform.CISCO_IOS) + merged.merge([device1, device2]) + reporter = RemediationReporter.from_merged_config(merged) + ``` + + """ + reporter = cls() + reporter._merged_config = merged_config + + # Count unique device IDs from instances + device_ids: set[int] = set() + for child in merged_config.all_children_sorted(): + device_ids.update(instance.id for instance in child.instances) + + reporter._device_ids = device_ids + reporter._device_count = len(device_ids) + return reporter + + def apply_tag_rules(self, tag_rules: Sequence[TagRule]) -> None: + """Apply tag rules to the merged configuration. + + Args: + tag_rules: A sequence of TagRule objects to apply. + + Example: + ```python + from hier_config import MatchRule, TagRule + + tag_rules = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp", "safe"}), + ) + ] + reporter.apply_tag_rules(tag_rules) + ``` + + """ + for tag_rule in tag_rules: + for child in self.merged_config.get_children_deep(tag_rule.match_rules): + child.tags_add(tag_rule.apply_tags) + + def get_all_changes( + self, + *, + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + ) -> tuple[HConfigChild, ...]: + """Get all configuration changes, optionally filtered by tags. + + Args: + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + + Returns: + A tuple of HConfigChild objects representing changes. + + """ + if include_tags or exclude_tags: + return tuple( + self.merged_config.all_children_sorted_by_tags( + include_tags, + exclude_tags, + ) + ) + return tuple(self.merged_config.all_children_sorted()) + + def get_change_detail( + self, + line: str, + *, + tag: str | None = None, + ) -> ChangeDetail | None: + """Get detailed information about a specific configuration line. + + Args: + line: The configuration line to search for. + tag: Optional tag to filter instances. + + Returns: + A ChangeDetail object or None if the line is not found. + + Example: + ```python + detail = reporter.get_change_detail("line vty 0 4") + print(f"Affects {detail.device_count} devices") + ``` + + """ + child = self.merged_config.get_child(equals=line) + if child is None: + return None + + return self._build_change_detail(child, tag=tag) + + def _build_change_detail( + self, + child: HConfigChild, + *, + tag: str | None = None, + ) -> ChangeDetail: + """Build a ChangeDetail object from an HConfigChild. + + Args: + child: The HConfigChild to build details from. + tag: Optional tag to filter instances. + + Returns: + A ChangeDetail object with aggregated information. + + """ + # Filter instances by tag if specified + relevant_instances = tuple( + instance + for instance in child.instances + if tag is None or tag in instance.tags + ) + + # Aggregate data + device_ids = frozenset(instance.id for instance in relevant_instances) + all_tags = frozenset( + tag_item for instance in relevant_instances for tag_item in instance.tags + ) + all_comments = frozenset( + comment for instance in relevant_instances for comment in instance.comments + ) + + # Build path + path_parts: list[str] = [] + current: HConfigChild | HConfig | None = child + while current is not None and hasattr(current, "text"): + if isinstance(current, HConfigChild): + path_parts.insert(0, current.text) + current = getattr(current, "parent", None) + + # Get children details + children_details = tuple( + self._build_change_detail(grandchild, tag=tag) + for grandchild in child.children + ) + + return ChangeDetail( + line=child.text, + full_path=tuple(path_parts), + device_count=len(device_ids), + device_ids=device_ids, + tags=all_tags, + comments=all_comments, + instances=relevant_instances, + children=children_details, + ) + + def get_device_count(self, line: str, *, tag: str | None = None) -> int: + """Get the number of devices that need a specific configuration line. + + Args: + line: The configuration line to search for. + tag: Optional tag to filter instances. + + Returns: + The number of devices requiring this change. + + Example: + ```python + count = reporter.get_device_count("line vty 0 4") + print(f"{count} devices need this change") + ``` + + """ + detail = self.get_change_detail(line, tag=tag) + return detail.device_count if detail else 0 + + def get_changes_by_threshold( + self, + *, + min_devices: int = 0, + max_devices: int | None = None, + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + ) -> tuple[HConfigChild, ...]: + """Get changes affecting a certain number of devices. + + Args: + min_devices: Minimum number of devices (inclusive). + max_devices: Maximum number of devices (inclusive), or None for unlimited. + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + + Returns: + A tuple of HConfigChild objects matching the criteria. + + Example: + ```python + # Get high-impact changes affecting 50+ devices + high_impact = reporter.get_changes_by_threshold(min_devices=50) + + # Get isolated changes affecting 1-5 devices + isolated = reporter.get_changes_by_threshold( + min_devices=1, + max_devices=5, + ) + ``` + + """ + all_changes = self.get_all_changes( + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + + filtered_changes: list[HConfigChild] = [] + for child in all_changes: + instance_count = len(child.instances) + if instance_count >= min_devices and ( + max_devices is None or instance_count <= max_devices + ): + filtered_changes.append(child) + + result: tuple[HConfigChild, ...] = tuple(filtered_changes) + return result + + def get_top_changes( + self, + n: int = 10, + *, + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + ) -> tuple[tuple[HConfigChild, int], ...]: + """Get the top N most common changes across devices. + + Args: + n: Number of top changes to return. + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + + Returns: + A tuple of (HConfigChild, count) pairs, sorted by count descending. + + Example: + ```python + top_10 = reporter.get_top_changes(10) + for child, count in top_10: + print(f"{child.text}: {count} devices") + ``` + + """ + all_changes = self.get_all_changes( + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + + # Create list of (child, instance_count) pairs + changes_with_counts = [(child, len(child.instances)) for child in all_changes] + + # Sort by count descending and take top N + changes_with_counts.sort(key=operator.itemgetter(1), reverse=True) + return tuple(changes_with_counts[:n]) + + def get_changes_matching( + self, + pattern: str, + *, + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + ) -> tuple[HConfigChild, ...]: + r"""Get changes matching a regex pattern. + + Args: + pattern: A regex pattern to match against configuration lines. + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + + Returns: + A tuple of HConfigChild objects matching the pattern. + + Example: + ```python + # Get all VLAN interface changes + vlan_changes = reporter.get_changes_matching(r"interface Vlan\\d+") + ``` + + """ + all_changes = self.get_all_changes( + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + + regex = re.compile(pattern) + return tuple(child for child in all_changes if regex.search(child.text)) + + def summary(self) -> ReportSummary: + """Generate a summary of all remediations. + + Returns: + A ReportSummary object with aggregate statistics. + + Example: + ```python + summary = reporter.summary() + print(f"Total devices: {summary.total_devices}") + print(f"Unique changes: {summary.total_unique_changes}") + ``` + + """ + all_changes = self.get_all_changes() + + # Get most common changes + changes_with_counts = [ + (child.text, len(child.instances)) for child in all_changes + ] + changes_with_counts.sort(key=operator.itemgetter(1), reverse=True) + most_common = tuple(changes_with_counts[:10]) + + # Count changes by tag + tag_counter: Counter[str] = Counter() + for child in all_changes: + for tag in child.tags: + tag_counter[tag] += 1 + + return ReportSummary( + total_devices=self.device_count, + total_unique_changes=len(all_changes), + most_common_changes=most_common, + changes_by_tag=dict(tag_counter), + ) + + def summary_by_tags( + self, + tags: Iterable[str] | None = None, + ) -> dict[str, dict[str, Any]]: + """Generate a summary breakdown by tags. + + Args: + tags: Specific tags to include, or None for all tags. + + Returns: + A dictionary mapping tag names to their statistics. + + Example: + ```python + tag_summary = reporter.summary_by_tags(["security", "ntp"]) + for tag, stats in tag_summary.items(): + print(f"{tag}: {stats['device_count']} devices") + ``` + + """ + all_changes = self.get_all_changes() + + # Collect all tags if not specified + all_tags = ( + {tag for child in all_changes for tag in child.tags} + if tags is None + else set(tags) + ) + + result: dict[str, dict[str, Any]] = {} + for tag in all_tags: + tagged_changes = self.get_all_changes(include_tags=[tag]) + + # Count unique devices for this tag + device_ids = { + instance.id + for child in tagged_changes + for instance in child.instances + if tag in instance.tags + } + + result[tag] = { + "device_count": len(device_ids), + "change_count": len(tagged_changes), + "changes": [child.text for child in tagged_changes], + } + + final_result: dict[str, dict[str, Any]] = result + return final_result + + def summary_text(self, *, top_n: int = 10) -> str: + """Generate a human-readable text summary. + + Args: + top_n: Number of top changes to include in the summary. + + Returns: + A formatted string with summary statistics. + + Example: + ```python + print(reporter.summary_text()) + ``` + + """ + summary = self.summary() + lines = [ + "Remediation Summary", + "=" * 50, + f"Total devices: {summary.total_devices}", + f"Unique changes: {summary.total_unique_changes}", + "", + f"Top {min(top_n, len(summary.most_common_changes))} Most Common Changes:", + "-" * 50, + ] + + for i, (line, count) in enumerate(summary.most_common_changes[:top_n], 1): + percentage = ( + (count / summary.total_devices * 100) if summary.total_devices else 0 + ) + lines.extend((f"{i}. {line}", f" {count} devices ({percentage:.1f}%)")) + + if summary.changes_by_tag: + lines.extend(["", "Changes by Tag:", "-" * 50]) + for tag, count in sorted( + summary.changes_by_tag.items(), + key=operator.itemgetter(1), + reverse=True, + ): + lines.append(f" {tag}: {count} changes") + + return "\n".join(lines) + + def group_by_parent(self) -> dict[str, list[HConfigChild]]: + """Group all changes by their parent configuration line. + + Returns: + A dictionary mapping parent lines to their children. + + Example: + ```python + grouped = reporter.group_by_parent() + for parent, children in grouped.items(): + print(f"{parent}: {len(children)} changes") + ``` + + """ + groups: dict[str, list[HConfigChild]] = defaultdict(list) + + for child in self.get_all_changes(): + parent_text = ( + child.parent.text if isinstance(child.parent, HConfigChild) else "root" + ) + groups[parent_text].append(child) + + return dict(groups) + + def get_impact_distribution( + self, + bins: Sequence[int] = (1, 10, 25, 50, 100), + ) -> dict[str, int]: + """Get the distribution of changes by device impact. + + Args: + bins: Boundaries for impact ranges. + + Returns: + A dictionary with range labels as keys and counts as values. + + Example: + ```python + dist = reporter.get_impact_distribution() + # {'1-10': 15, '10-25': 8, '25-50': 5, '50-100': 3, '100+': 2} + ``` + + """ + all_changes = self.get_all_changes() + distribution: dict[str, int] = defaultdict(int) + + for child in all_changes: + instance_count = len(child.instances) + + # Find the appropriate bin + for i, threshold in enumerate(bins): + if instance_count < threshold: + label = f"1-{threshold}" if i == 0 else f"{bins[i - 1]}-{threshold}" + distribution[label] += 1 + break + else: + # Higher than all bins + distribution[f"{bins[-1]}+"] += 1 + + return dict(distribution) + + def get_tag_distribution(self) -> dict[str, int]: + """Get the distribution of tags across all changes. + + Returns: + A dictionary mapping tag names to their occurrence count. + + Example: + ```python + tags = reporter.get_tag_distribution() + # {'security': 45, 'ntp': 32, 'snmp': 28} + ``` + + """ + tag_counter: Counter[str] = Counter() + + for child in self.get_all_changes(): + for tag in child.tags: + tag_counter[tag] += 1 + + return dict(tag_counter) + + def to_text( + self, + file_path: str | Path, + *, + style: str = "merged", + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + ) -> None: + """Export remediation configuration to a text file. + + Args: + file_path: Path to the output file. + style: Text style ('merged', 'with_comments', or 'without_comments'). + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + + Example: + ```python + reporter.to_text("remediation.txt", style="merged") + ``` + + """ + changes = self.get_all_changes( + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + + lines = [child.cisco_style_text(style=style) for child in changes] + + output_path = Path(file_path) + output_path.write_text("\n".join(lines), encoding="utf-8") + + def to_json( + self, + file_path: str | Path, + *, + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + indent: int = 2, + ) -> None: + """Export remediation data to a JSON file. + + Args: + file_path: Path to the output file. + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + indent: JSON indentation level. + + Example: + ```python + reporter.to_json("remediation.json") + ``` + + """ + changes = self.get_all_changes( + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + + data = { + "summary": { + "total_devices": self.device_count, + "total_unique_changes": len(changes), + }, + "changes": [ + { + "line": child.text, + "device_count": len(child.instances), + "device_ids": sorted(instance.id for instance in child.instances), + "tags": sorted(child.tags), + "comments": sorted(child.comments), + "instances": [ + { + "id": instance.id, + "tags": sorted(instance.tags), + "comments": sorted(instance.comments), + } + for instance in child.instances + ], + } + for child in changes + ], + } + + output_path = Path(file_path) + output_path.write_text( + json.dumps(data, indent=indent), + encoding="utf-8", + ) + + def to_csv( + self, + file_path: str | Path, + *, + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + ) -> None: + """Export remediation data to a CSV file. + + Args: + file_path: Path to the output file. + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + + Example: + ```python + reporter.to_csv("remediation.csv") + ``` + + """ + changes = self.get_all_changes( + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + + output_path = Path(file_path) + with output_path.open("w", encoding="utf-8", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerow( + [ + "line", + "device_count", + "percentage", + "tags", + "comments", + "device_ids", + ] + ) + + for child in changes: + device_count = len(child.instances) + percentage = ( + (device_count / self.device_count * 100) if self.device_count else 0 + ) + tags = ",".join(sorted(child.tags)) + comments = " | ".join(sorted(child.comments)) + device_ids = ",".join(str(instance.id) for instance in child.instances) + + writer.writerow( + [ + child.text, + device_count, + f"{percentage:.1f}", + tags, + comments, + device_ids, + ] + ) + + def to_markdown( + self, + file_path: str | Path, + *, + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + top_n: int = 20, + ) -> None: + """Export remediation report to a Markdown file. + + Args: + file_path: Path to the output file. + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + top_n: Number of top changes to include in detail. + + Example: + ```python + reporter.to_markdown("remediation.md", top_n=15) + ``` + + """ + changes = self.get_all_changes( + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + + lines = [ + "# Remediation Report", + "", + "## Summary", + "", + f"- **Total Devices**: {self.device_count}", + f"- **Unique Changes**: {len(changes)}", + "", + f"## Top {min(top_n, len(changes))} Changes by Impact", + "", + "| # | Configuration Line | Device Count | Percentage |", + "|---|-------------------|--------------|------------|", + ] + + # Sort by instance count + sorted_changes = sorted( + changes, + key=lambda c: len(c.instances), + reverse=True, + ) + + for i, child in enumerate(sorted_changes[:top_n], 1): + device_count = len(child.instances) + percentage = ( + (device_count / self.device_count * 100) if self.device_count else 0 + ) + lines.append( + f"| {i} | `{child.text}` | {device_count} | {percentage:.1f}% |" + ) + + # Add tag summary if tags exist + tag_dist = self.get_tag_distribution() + if tag_dist: + lines.extend( + [ + "", + "## Changes by Tag", + "", + "| Tag | Count |", + "|-----|-------|", + ] + ) + for tag, count in sorted( + tag_dist.items(), + key=operator.itemgetter(1), + reverse=True, + ): + lines.append(f"| {tag} | {count} |") + + output_path = Path(file_path) + output_path.write_text("\n".join(lines), encoding="utf-8") + + def export_all( + self, + output_dir: str | Path, + *, + formats: Iterable[str] = ("json", "csv", "markdown", "text"), + include_tags: Iterable[str] = (), + exclude_tags: Iterable[str] = (), + ) -> None: + """Export reports in multiple formats to a directory. + + Args: + output_dir: Directory to write reports to. + formats: Formats to export ('json', 'csv', 'markdown', 'text'). + include_tags: Only include changes with these tags. + exclude_tags: Exclude changes with these tags. + + Example: + ```python + reporter.export_all( + "reports/", + formats=["json", "csv", "markdown"], + ) + ``` + + """ + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + # Map format names to file extensions + ext_map = {"json": "json", "csv": "csv", "markdown": "md", "text": "txt"} + + for fmt in formats: + file_ext = ext_map.get(fmt, fmt) + file_path_str = str(output_path / f"remediation_report.{file_ext}") + if fmt == "json": + self.to_json( + file_path_str, + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + elif fmt == "csv": + self.to_csv( + file_path_str, + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + elif fmt == "markdown": + self.to_markdown( + file_path_str, + include_tags=include_tags, + exclude_tags=exclude_tags, + ) + elif fmt == "text": + self.to_text( + file_path_str, + include_tags=include_tags, + exclude_tags=exclude_tags, + ) diff --git a/mkdocs.yml b/mkdocs.yml index 5ba601e..1618b8a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -16,3 +16,4 @@ nav: - Unified Diff: unified-diff.md - Utilities: utilities.md - Working with Tags: tags.md +- Remediation Reporting: remediation-reporting.md diff --git a/tests/test_reporting.py b/tests/test_reporting.py new file mode 100644 index 0000000..d5aa498 --- /dev/null +++ b/tests/test_reporting.py @@ -0,0 +1,696 @@ +"""Comprehensive tests for RemediationReporter functionality.""" +# pylint: disable=redefined-outer-name # pytest fixtures +# pylint: disable=too-many-try-statements # test cleanup code + +import json +import tempfile +from pathlib import Path + +import pytest + +from hier_config import ( + HConfig, + MatchRule, + Platform, + RemediationReporter, + TagRule, + WorkflowRemediation, + get_hconfig, +) +from hier_config.models import ChangeDetail, ReportSummary + + +@pytest.fixture +def sample_remediation_1() -> tuple[HConfig, str]: + """Create a sample remediation configuration for device 1.""" + running = get_hconfig( + Platform.CISCO_IOS, + """interface Vlan2 + ip address 10.0.0.1 255.255.255.0 +line vty 0 4 + transport input ssh +ntp server 10.1.1.1""", + ) + + generated = get_hconfig( + Platform.CISCO_IOS, + """interface Vlan2 + ip address 10.0.0.2 255.255.255.0 +line vty 0 4 + transport input ssh + exec-timeout 5 0 +ntp server 10.2.2.2 +snmp-server community public RO""", + ) + + wfr = WorkflowRemediation(running, generated) + return wfr.remediation_config, "device1" + + +@pytest.fixture +def sample_remediation_2() -> tuple[HConfig, str]: + """Create a sample remediation configuration for device 2.""" + running = get_hconfig( + Platform.CISCO_IOS, + """interface Vlan3 + ip address 10.0.1.1 255.255.255.0 +line vty 0 4 + transport input telnet +ntp server 10.1.1.1""", + ) + + generated = get_hconfig( + Platform.CISCO_IOS, + """interface Vlan3 + ip address 10.0.1.2 255.255.255.0 +line vty 0 4 + transport input ssh + exec-timeout 5 0 +ntp server 10.2.2.2 +snmp-server community public RO""", + ) + + wfr = WorkflowRemediation(running, generated) + return wfr.remediation_config, "device2" + + +@pytest.fixture +def sample_remediation_3() -> tuple[HConfig, str]: + """Create a sample remediation configuration for device 3.""" + running = get_hconfig( + Platform.CISCO_IOS, + """interface Vlan4 + ip address 10.0.2.1 255.255.255.0 +ntp server 10.1.1.1""", + ) + + generated = get_hconfig( + Platform.CISCO_IOS, + """interface Vlan4 + ip address 10.0.2.2 255.255.255.0 +ntp server 10.2.2.2""", + ) + + wfr = WorkflowRemediation(running, generated) + return wfr.remediation_config, "device3" + + +def test_reporter_initialization() -> None: + """Test RemediationReporter initialization.""" + reporter = RemediationReporter() + assert reporter.device_count == 0 + + with pytest.raises(ValueError, match="No remediations have been added"): + _ = reporter.merged_config + + +def test_add_single_remediation(sample_remediation_1: tuple[HConfig, str]) -> None: + """Test adding a single remediation.""" + remediation, _ = sample_remediation_1 + reporter = RemediationReporter() + reporter.add_remediation(remediation) + + assert reporter.device_count == 1 + assert reporter.merged_config is not None + + +def test_add_multiple_remediations( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], + sample_remediation_3: tuple[HConfig, str], +) -> None: + """Test adding multiple remediations.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + rem3, _ = sample_remediation_3 + + reporter = RemediationReporter() + reporter.add_remediations([rem1, rem2, rem3]) + + assert reporter.device_count == 3 + + +def test_from_remediations( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test creating reporter from remediations.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + assert reporter.device_count == 2 + assert reporter.merged_config is not None + + +def test_from_merged_config( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test creating reporter from already merged config.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + # Create a merged config manually + merged = get_hconfig(Platform.CISCO_IOS) + merged.merge([rem1, rem2]) + + reporter = RemediationReporter.from_merged_config(merged) + + assert reporter.device_count == 2 + + +def test_get_all_changes( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test getting all changes.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + changes = reporter.get_all_changes() + + assert len(changes) > 0 + assert all(hasattr(change, "text") for change in changes) + + +def test_get_device_count( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test getting device count for a specific line.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + # Both devices need "ntp server 10.2.2.2" + ntp_count = reporter.get_device_count("ntp server 10.2.2.2") + assert ntp_count == 2 + + # Both devices need "snmp-server community public RO" + snmp_count = reporter.get_device_count("snmp-server community public RO") + assert snmp_count == 2 + + +def test_get_change_detail( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test getting detailed information about a change.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + detail = reporter.get_change_detail("ntp server 10.2.2.2") + + assert detail is not None + assert isinstance(detail, ChangeDetail) + assert detail.line == "ntp server 10.2.2.2" + assert detail.device_count == 2 + assert len(detail.device_ids) == 2 + + +def test_get_change_detail_not_found(sample_remediation_1: tuple[HConfig, str]) -> None: + """Test getting detail for a non-existent change.""" + rem1, _ = sample_remediation_1 + + reporter = RemediationReporter.from_remediations([rem1]) + + detail = reporter.get_change_detail("nonexistent line") + assert detail is None + + +def test_get_changes_by_threshold( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], + sample_remediation_3: tuple[HConfig, str], +) -> None: + """Test filtering changes by device threshold.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + rem3, _ = sample_remediation_3 + + reporter = RemediationReporter.from_remediations([rem1, rem2, rem3]) + + # Get changes affecting at least 2 devices + high_impact = reporter.get_changes_by_threshold(min_devices=2) + assert len(high_impact) > 0 + assert all(len(change.instances) >= 2 for change in high_impact) + + # Get changes affecting exactly 1 device + low_impact = reporter.get_changes_by_threshold(min_devices=1, max_devices=1) + assert all(len(change.instances) == 1 for change in low_impact) + + +def test_get_top_changes( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], + sample_remediation_3: tuple[HConfig, str], +) -> None: + """Test getting top N most common changes.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + rem3, _ = sample_remediation_3 + + reporter = RemediationReporter.from_remediations([rem1, rem2, rem3]) + + top_5 = reporter.get_top_changes(5) + + assert len(top_5) > 0 + assert len(top_5) <= 5 + assert all(isinstance(item, tuple) for item in top_5) + assert all(len(item) == 2 for item in top_5) + + # Verify sorted by count descending + counts = [count for _, count in top_5] + assert counts == sorted(counts, reverse=True) + + +def test_get_changes_matching( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test getting changes matching a regex pattern.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + # Find all interface-related changes + interface_changes = reporter.get_changes_matching(r"interface Vlan\d+") + assert len(interface_changes) > 0 + assert all("interface Vlan" in change.text for change in interface_changes) + + # Find NTP changes + ntp_changes = reporter.get_changes_matching(r"ntp") + assert len(ntp_changes) > 0 + assert all("ntp" in change.text.lower() for change in ntp_changes) + + +def test_summary( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test generating a summary.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + summary = reporter.summary() + + assert isinstance(summary, ReportSummary) + assert summary.total_devices == 2 + assert summary.total_unique_changes > 0 + assert len(summary.most_common_changes) > 0 + assert isinstance(summary.changes_by_tag, dict) + + +def test_summary_text( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test generating a text summary.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + summary_text = reporter.summary_text(top_n=5) + + assert isinstance(summary_text, str) + assert "Remediation Summary" in summary_text + assert "Total devices: 2" in summary_text + assert "Unique changes:" in summary_text + + +def test_apply_tag_rules( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test applying tag rules.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + # Create tag rules + tag_rules = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp", "time-sync"}), + ), + TagRule( + match_rules=(MatchRule(startswith="snmp"),), + apply_tags=frozenset({"snmp", "monitoring"}), + ), + ] + + reporter.apply_tag_rules(tag_rules) + + # Verify tags were applied + ntp_changes = reporter.get_all_changes(include_tags=["ntp"]) + assert len(ntp_changes) > 0 + + snmp_changes = reporter.get_all_changes(include_tags=["snmp"]) + assert len(snmp_changes) > 0 + + +def test_get_all_changes_with_tag_filters( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test filtering changes by tags.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + # Apply tags + tag_rules = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp"}), + ), + TagRule( + match_rules=(MatchRule(startswith="snmp"),), + apply_tags=frozenset({"snmp"}), + ), + ] + reporter.apply_tag_rules(tag_rules) + + # Get only NTP changes + ntp_changes = reporter.get_all_changes(include_tags=["ntp"]) + assert all(change.text.startswith("ntp") for change in ntp_changes) + + # Get all except SNMP changes + non_snmp_changes = reporter.get_all_changes(exclude_tags=["snmp"]) + assert all(not change.text.startswith("snmp") for change in non_snmp_changes) + + +def test_summary_by_tags( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test generating summary by tags.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + # Apply tags + tag_rules = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp"}), + ), + ] + reporter.apply_tag_rules(tag_rules) + + tag_summary = reporter.summary_by_tags(["ntp"]) + + assert "ntp" in tag_summary + assert "device_count" in tag_summary["ntp"] + assert "change_count" in tag_summary["ntp"] + assert "changes" in tag_summary["ntp"] + + +def test_group_by_parent( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test grouping changes by parent.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + grouped = reporter.group_by_parent() + + assert isinstance(grouped, dict) + assert len(grouped) > 0 + + # Should have some root-level changes + assert "root" in grouped or len(grouped) > 0 + + +def test_get_impact_distribution( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], + sample_remediation_3: tuple[HConfig, str], +) -> None: + """Test getting impact distribution.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + rem3, _ = sample_remediation_3 + + reporter = RemediationReporter.from_remediations([rem1, rem2, rem3]) + + distribution = reporter.get_impact_distribution() + + assert isinstance(distribution, dict) + assert len(distribution) > 0 + assert all(isinstance(v, int) for v in distribution.values()) + + +def test_get_tag_distribution( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test getting tag distribution.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + # Apply tags first + tag_rules = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp"}), + ), + TagRule( + match_rules=(MatchRule(startswith="snmp"),), + apply_tags=frozenset({"snmp"}), + ), + ] + reporter.apply_tag_rules(tag_rules) + + tag_dist = reporter.get_tag_distribution() + + assert isinstance(tag_dist, dict) + assert "ntp" in tag_dist or "snmp" in tag_dist + + +def test_to_text( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test exporting to text file.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + with tempfile.NamedTemporaryFile( + encoding="utf-8", + mode="w", + suffix=".txt", + delete=False, + ) as f: + temp_path = f.name + + try: + reporter.to_text(temp_path, style="merged") + + # Verify file was created and has content + output_path = Path(temp_path) + assert output_path.exists() + content = output_path.read_text(encoding="utf-8") + assert len(content) > 0 + assert "instance" in content.lower() + finally: + Path(temp_path).unlink(missing_ok=True) + + +def test_to_json( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test exporting to JSON file.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + with tempfile.NamedTemporaryFile( + encoding="utf-8", + mode="w", + suffix=".json", + delete=False, + ) as f: + temp_path = f.name + + try: + reporter.to_json(temp_path) + + # Verify file was created and has valid JSON + output_path = Path(temp_path) + assert output_path.exists() + content = output_path.read_text(encoding="utf-8") + data = json.loads(content) + assert "summary" in data + assert "changes" in data + assert data["summary"]["total_devices"] == 2 + finally: + Path(temp_path).unlink(missing_ok=True) + + +def test_to_csv( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test exporting to CSV file.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + with tempfile.NamedTemporaryFile( + encoding="utf-8", + mode="w", + suffix=".csv", + delete=False, + ) as f: + temp_path = f.name + + try: + reporter.to_csv(temp_path) + + # Verify file was created and has CSV headers + output_path = Path(temp_path) + assert output_path.exists() + content = output_path.read_text(encoding="utf-8") + assert "line,device_count,percentage,tags,comments,device_ids" in content + finally: + Path(temp_path).unlink(missing_ok=True) + + +def test_to_markdown( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test exporting to Markdown file.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + with tempfile.NamedTemporaryFile( + encoding="utf-8", + mode="w", + suffix=".md", + delete=False, + ) as f: + temp_path = f.name + + try: + reporter.to_markdown(temp_path, top_n=5) + + # Verify file was created and has markdown formatting + output_path = Path(temp_path) + assert output_path.exists() + content = output_path.read_text(encoding="utf-8") + assert "# Remediation Report" in content + assert "## Summary" in content + assert "Total Devices" in content + finally: + Path(temp_path).unlink(missing_ok=True) + + +def test_export_all( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test exporting all formats at once.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + with tempfile.TemporaryDirectory() as temp_dir: + reporter.export_all( + temp_dir, + formats=["json", "csv", "markdown", "text"], + ) + + # Verify all files were created + output_path = Path(temp_dir) + assert (output_path / "remediation_report.json").exists() + assert (output_path / "remediation_report.csv").exists() + assert (output_path / "remediation_report.md").exists() + assert (output_path / "remediation_report.txt").exists() + + +def test_export_with_tag_filters( + sample_remediation_1: tuple[HConfig, str], + sample_remediation_2: tuple[HConfig, str], +) -> None: + """Test exporting with tag filters.""" + rem1, _ = sample_remediation_1 + rem2, _ = sample_remediation_2 + + reporter = RemediationReporter.from_remediations([rem1, rem2]) + + # Apply tags + tag_rules = [ + TagRule( + match_rules=(MatchRule(startswith="ntp"),), + apply_tags=frozenset({"ntp"}), + ), + ] + reporter.apply_tag_rules(tag_rules) + + with tempfile.NamedTemporaryFile( + encoding="utf-8", + mode="w", + suffix=".json", + delete=False, + ) as f: + temp_path = f.name + + try: + reporter.to_json(temp_path, include_tags=["ntp"]) + + # Verify only NTP changes were exported + output_path = Path(temp_path) + content = output_path.read_text(encoding="utf-8") + data = json.loads(content) + changes = data["changes"] + assert all("ntp" in change["line"].lower() for change in changes) + finally: + Path(temp_path).unlink(missing_ok=True) + + +def test_empty_reporter() -> None: + """Test reporter with no remediations.""" + reporter = RemediationReporter() + + with pytest.raises(ValueError, match="No remediations have been added"): + reporter.summary() + + +def test_reporter_with_empty_remediation() -> None: + """Test reporter with empty remediation config.""" + empty_config = get_hconfig(Platform.CISCO_IOS) + + reporter = RemediationReporter() + reporter.add_remediation(empty_config) + + assert reporter.device_count == 1 + summary = reporter.summary() + assert summary.total_unique_changes == 0