From 67b8f9669ac6d5026a03b1253f64c6ad3a7a5871 Mon Sep 17 00:00:00 2001 From: Kailas Date: Sat, 21 Feb 2026 04:05:34 +0530 Subject: [PATCH] Upgrade correlation engine to behavior-based progression model --- src/correlation/attack_chain_engine.py | 70 ++++++++++++++++++-------- 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/src/correlation/attack_chain_engine.py b/src/correlation/attack_chain_engine.py index 83b5cb4..3748478 100644 --- a/src/correlation/attack_chain_engine.py +++ b/src/correlation/attack_chain_engine.py @@ -1,7 +1,5 @@ -from collections import defaultdict - - class AttackChainEngine: + def __init__(self): pass @@ -9,31 +7,63 @@ def correlate(self, alerts): if not alerts: return [] - normalized_alerts = [] + TACTIC_ORDER = [ + "Initial Access", + "Execution", + "Privilege Escalation", + "Persistence", + "Defense Evasion", + "Command and Control", + "Collection" + ] + # Group alerts by host + hosts = {} for alert in alerts: - alert["timestamp"] = alert.get("timestamp", "") - alert["host"] = alert.get("host", "unknown") - normalized_alerts.append(alert) + host = alert.get("host", "unknown") + hosts.setdefault(host, []).append(alert) + + incidents = [] - host_groups = defaultdict(list) + for host, host_alerts in hosts.items(): - for alert in normalized_alerts: - host_groups[alert["host"]].append(alert) + # Sort by timestamp + host_alerts.sort(key=lambda x: x.get("timestamp", "")) - incidents = [] + # Extract unique tactics in appearance order + progression = [] + for alert in host_alerts: + tactic = alert.get("tactic") + if tactic and tactic not in progression: + progression.append(tactic) + + # Convert tactics to index order + order_indices = [ + TACTIC_ORDER.index(t) + for t in progression + if t in TACTIC_ORDER + ] + + is_progressive = order_indices == sorted(order_indices) + unique_tactics = len(progression) - for host, host_alerts in host_groups.items(): - sorted_alerts = sorted( - host_alerts, - key=lambda x: x.get("timestamp", "") - ) + if unique_tactics >= 4 and is_progressive: + confidence = "high" + severity = "critical" + elif unique_tactics >= 3: + confidence = "medium" + severity = "high" + else: + confidence = "low" + severity = "medium" - if len(sorted_alerts) > 1: + if unique_tactics >= 2: incidents.append({ "host": host, - "alert_count": len(sorted_alerts), - "alerts": sorted_alerts + "attack_progression": progression, + "total_alerts": len(host_alerts), + "confidence": confidence, + "severity": severity }) - return incidents \ No newline at end of file + return incidents