From eccd71a51aa4be9d5d42511c5db50e6e1931257b Mon Sep 17 00:00:00 2001
From: jmorascalyr <42879226+jmorascalyr@users.noreply.github.com>
Date: Fri, 20 Feb 2026 06:23:06 -0700
Subject: [PATCH 1/8] feat: Add trace ID support and M365 parser improvements
for correlation scenarios (#70)
- Added CorrelationRunRequest model with trace_id, tag_phase, and tag_trace fields for correlation scenario execution
- Implemented /correlation/run endpoint to execute scenarios with SIEM context and trace ID tagging
- Updated start_correlation_scenario() and _execute_correlation_scenario() to accept and pass trace_id via S1_TRACE_ID environment variable
- Added tag_phase and tag_trace boolean flags with S1_TAG
---
Backend/api/app/routers/scenarios.py | 62 +++++++++++++++++++
Backend/api/app/services/scenario_service.py | 44 +++++++++++--
.../microsoft_365_collaboration.json | 2 +-
.../scenarios/apollo_ransomware_scenario.py | 23 ++++---
.../microsoft_365_collaboration.conf | 16 ++++-
.../microsoft_365_collaboration.conf | 2 +-
6 files changed, 131 insertions(+), 18 deletions(-)
diff --git a/Backend/api/app/routers/scenarios.py b/Backend/api/app/routers/scenarios.py
index eed6dd3..8489a41 100644
--- a/Backend/api/app/routers/scenarios.py
+++ b/Backend/api/app/routers/scenarios.py
@@ -6,6 +6,7 @@
import asyncio
import time
import json
+import sys
from datetime import datetime, timedelta
from pathlib import Path as PathLib
@@ -29,6 +30,17 @@ class SIEMQueryRequest(BaseModel):
end_time_hours: int = 0 # How far back to end (default now)
anchor_configs: Optional[List[Dict[str, Any]]] = None
+
+class CorrelationRunRequest(BaseModel):
+ """Request model for correlation scenario execution"""
+ scenario_id: str
+ destination_id: str
+ siem_context: Optional[Dict[str, Any]] = None
+ trace_id: Optional[str] = None
+ tag_phase: bool = True
+ tag_trace: bool = True
+ workers: int = 10
+
# Initialize scenario service
scenario_service = ScenarioService()
@@ -278,6 +290,56 @@ async def execute_siem_query(
raise HTTPException(status_code=500, detail=str(e))
+@router.post("/correlation/run", response_model=BaseResponse)
+async def run_correlation_scenario(
+ request: CorrelationRunRequest,
+ background_tasks: BackgroundTasks,
+ _: str = Depends(require_write_access)
+):
+ """
+ Execute a correlation scenario with SIEM context and trace ID support
+ """
+ try:
+ # Validate scenario exists and supports correlation
+ scenarios_dir = PathLib(__file__).parent.parent.parent / "scenarios"
+ if str(scenarios_dir) not in sys.path:
+ sys.path.insert(0, str(scenarios_dir))
+
+ scenario_modules = {
+ "apollo_ransomware_scenario": "apollo_ransomware_scenario"
+ }
+
+ if request.scenario_id not in scenario_modules:
+ raise HTTPException(status_code=404, detail=f"Correlation scenario '{request.scenario_id}' not found")
+
+ # Start correlation scenario execution with trace ID
+ execution_id = await scenario_service.start_correlation_scenario(
+ scenario_id=request.scenario_id,
+ siem_context=request.siem_context or {},
+ trace_id=request.trace_id,
+ tag_phase=request.tag_phase,
+ tag_trace=request.tag_trace,
+ background_tasks=background_tasks
+ )
+
+ return BaseResponse(
+ success=True,
+ data={
+ "execution_id": execution_id,
+ "scenario_id": request.scenario_id,
+ "status": "started",
+ "trace_id": request.trace_id,
+ "tag_phase": request.tag_phase,
+ "tag_trace": request.tag_trace,
+ "started_at": datetime.utcnow().isoformat()
+ }
+ )
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=str(e))
+
+
# =============================================================================
# GENERIC SCENARIO ENDPOINTS (catch-all /{scenario_id} routes must be last)
# =============================================================================
diff --git a/Backend/api/app/services/scenario_service.py b/Backend/api/app/services/scenario_service.py
index 9a40a2f..c8e3d07 100644
--- a/Backend/api/app/services/scenario_service.py
+++ b/Backend/api/app/services/scenario_service.py
@@ -5,12 +5,12 @@
import uuid
import time
import asyncio
+import json
from datetime import datetime
import logging
import os
import sys
import importlib
-import json
from pathlib import Path
logger = logging.getLogger(__name__)
@@ -264,11 +264,14 @@ async def start_correlation_scenario(
self,
scenario_id: str,
siem_context: Dict[str, Any],
+ trace_id: Optional[str] = None,
+ tag_phase: bool = True,
+ tag_trace: bool = True,
speed: str = "fast",
dry_run: bool = False,
background_tasks=None
) -> str:
- """Start correlation scenario execution with SIEM context"""
+ """Start correlation scenario execution with SIEM context and trace ID support"""
execution_id = str(uuid.uuid4())
self.running_scenarios[execution_id] = {
@@ -279,16 +282,35 @@ async def start_correlation_scenario(
"speed": speed,
"dry_run": dry_run,
"siem_context": siem_context,
+ "trace_id": trace_id,
+ "tag_phase": tag_phase,
+ "tag_trace": tag_trace,
"progress": 0
}
if background_tasks:
- background_tasks.add_task(self._execute_correlation_scenario, execution_id, scenario_id, siem_context)
+ background_tasks.add_task(
+ self._execute_correlation_scenario,
+ execution_id,
+ scenario_id,
+ siem_context,
+ trace_id,
+ tag_phase,
+ tag_trace
+ )
return execution_id
- async def _execute_correlation_scenario(self, execution_id: str, scenario_id: str, siem_context: Dict[str, Any]):
- """Execute correlation scenario with SIEM context"""
+ async def _execute_correlation_scenario(
+ self,
+ execution_id: str,
+ scenario_id: str,
+ siem_context: Dict[str, Any],
+ trace_id: Optional[str] = None,
+ tag_phase: bool = True,
+ tag_trace: bool = True
+ ):
+ """Execute correlation scenario with SIEM context and trace ID support"""
import sys
import os
from pathlib import Path
@@ -303,6 +325,12 @@ async def _execute_correlation_scenario(self, execution_id: str, scenario_id: st
siem_context_json = json.dumps(siem_context)
os.environ['SIEM_CONTEXT'] = siem_context_json
+ # Set trace ID and tagging environment variables
+ if trace_id:
+ os.environ['S1_TRACE_ID'] = trace_id
+ os.environ['S1_TAG_PHASE'] = '1' if tag_phase else '0'
+ os.environ['S1_TAG_TRACE'] = '1' if tag_trace else '0'
+
# Import and run the scenario
module = __import__(scenario_id)
scenario_result = module.generate_apollo_ransomware_scenario(siem_context=siem_context)
@@ -321,8 +349,12 @@ async def _execute_correlation_scenario(self, execution_id: str, scenario_id: st
self.running_scenarios[execution_id]["error"] = str(e)
self.running_scenarios[execution_id]["completed_at"] = datetime.utcnow().isoformat()
finally:
- # Clean up environment variable
+ # Clean up environment variables
os.environ.pop('SIEM_CONTEXT', None)
+ if trace_id:
+ os.environ.pop('S1_TRACE_ID', None)
+ os.environ.pop('S1_TAG_PHASE', None)
+ os.environ.pop('S1_TAG_TRACE', None)
async def _execute_scenario(self, execution_id: str, scenario: Dict[str, Any]):
"""Execute scenario in background"""
diff --git a/Backend/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.json b/Backend/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.json
index 29e2a89..2d5869a 100644
--- a/Backend/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.json
+++ b/Backend/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.json
@@ -1,6 +1,6 @@
{
"attributes": {
- "dataSource.name": "Microsoft 365 Collaboration",
+ "dataSource.name": "Microsoft O365",
"dataSource.vendor": "Microsoft",
"dataSource.category": "security",
"metadata.product.name": "Microsoft 365 SharePoint/OneDrive",
diff --git a/Backend/scenarios/apollo_ransomware_scenario.py b/Backend/scenarios/apollo_ransomware_scenario.py
index fca6fb0..fcd0f56 100644
--- a/Backend/scenarios/apollo_ransomware_scenario.py
+++ b/Backend/scenarios/apollo_ransomware_scenario.py
@@ -88,7 +88,8 @@
"name": "Apollo Ransomware - STARFLEET Attack",
"description": "Correlates Proofpoint and M365 events with existing EDR/WEL data for the Apollo ransomware attack chain targeting STARFLEET.",
- "default_query": """dataSource.name in ('SentinelOne','Windows Event Logs') endpoint.name contains ("Enterprise", "bridge")
+ "default_query": """dataSource.name in ('SentinelOne','Windows Event Logs') endpoint.name contains ("Enterprise", "bridge") AND (winEventLog.id = 4698 or * contains "apollo")
+
| group newest_timestamp = newest(timestamp), oldest_timestamp = oldest(timestamp) by event.type, src.process.user, endpoint.name, src.endpoint.ip.address, dst.ip.address
| sort newest_timestamp
| columns event.type, src.process.user, endpoint.name, oldest_timestamp, newest_timestamp, src.endpoint.ip.address, dst.ip.address""",
@@ -450,8 +451,8 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
"""Generate M365 events for email access and attachment download"""
events = []
- # Email accessed - 5 minutes after delivery (user checks email)
- email_access_time = get_scenario_time(base_time, 5)
+ # Email accessed - 30 seconds after Proofpoint delivery
+ email_access_time = get_scenario_time(base_time, 0, 30)
m365_email_access = microsoft_365_collaboration_log()
m365_email_access['TimeStamp'] = email_access_time
m365_email_access['UserId'] = VICTIM_PROFILE['email']
@@ -467,8 +468,8 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
m365_email_access['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
events.append(create_event(email_access_time, "microsoft_365_collaboration", "email_interaction", m365_email_access))
- # Attachment preview/download - 6 minutes after delivery
- attachment_time = get_scenario_time(base_time, 6)
+ # Attachment preview/download - 30 seconds after MailItemsAccessed (1 min after delivery)
+ attachment_time = get_scenario_time(base_time, 1)
m365_attachment = microsoft_365_collaboration_log()
m365_attachment['TimeStamp'] = attachment_time
m365_attachment['UserId'] = VICTIM_PROFILE['email']
@@ -483,8 +484,8 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
m365_attachment['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
events.append(create_event(attachment_time, "microsoft_365_collaboration", "email_interaction", m365_attachment))
- # File opened in Excel Online / locally - 7 minutes after delivery
- file_open_time = get_scenario_time(base_time, 7)
+ # File opened in Excel Online / locally - 30 seconds after FileDownloaded (1 min 30 sec after delivery)
+ file_open_time = get_scenario_time(base_time, 1, 30)
m365_file_open = microsoft_365_collaboration_log()
m365_file_open['TimeStamp'] = file_open_time
m365_file_open['UserId'] = VICTIM_PROFILE['email']
@@ -785,9 +786,15 @@ def send_to_hec(event_data: dict, event_type: str, trace_id: str = None, phase:
product = type_to_product.get(event_type, event_type)
+ # Special handling for dataSource.name to match expected values
+ if event_type == "microsoft_365_collaboration":
+ data_source_name = "Microsoft O365"
+ else:
+ data_source_name = event_type.replace('_', ' ').title()
+
attr_fields = {
"dataSource.vendor": event_type.split('_')[0].title() if '_' in event_type else event_type.title(),
- "dataSource.name": event_type.replace('_', ' ').title(),
+ "dataSource.name": data_source_name,
"dataSource.category": "security"
}
diff --git a/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf b/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
index 29e2a89..5158a5e 100644
--- a/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
+++ b/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
@@ -1,6 +1,6 @@
{
"attributes": {
- "dataSource.name": "Microsoft 365 Collaboration",
+ "dataSource.name": "Microsoft O365",
"dataSource.vendor": "Microsoft",
"dataSource.category": "security",
"metadata.product.name": "Microsoft 365 SharePoint/OneDrive",
@@ -93,11 +93,23 @@
}
},
{
- "rename": {
+ "copy": {
"from": "unmapped.Operation",
"to": "activity_name"
}
},
+ {
+ "copy": {
+ "from": "unmapped.Operation",
+ "to": "unmapped.operation"
+ }
+ },
+ {
+ "copy": {
+ "from": "unmapped.Operation",
+ "to": "event.type"
+ }
+ },
{
"rename": {
"from": "unmapped.SiteUrl",
diff --git a/Backend/utilities/parsers/sentinelone_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf b/Backend/utilities/parsers/sentinelone_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
index 29e2a89..2d5869a 100644
--- a/Backend/utilities/parsers/sentinelone_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
+++ b/Backend/utilities/parsers/sentinelone_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
@@ -1,6 +1,6 @@
{
"attributes": {
- "dataSource.name": "Microsoft 365 Collaboration",
+ "dataSource.name": "Microsoft O365",
"dataSource.vendor": "Microsoft",
"dataSource.category": "security",
"metadata.product.name": "Microsoft 365 SharePoint/OneDrive",
From 22b3b253a6bb6269d11a473589730a1fc089d488 Mon Sep 17 00:00:00 2001
From: jmorascalyr <42879226+jmorascalyr@users.noreply.github.com>
Date: Fri, 20 Feb 2026 06:41:09 -0700
Subject: [PATCH 2/8] feat: Add user.email_addr field to M365 collaboration
events for actor email correlation
- Added user.email_addr field to M365 email interaction events (MailItemsAccessed, FileDownloaded, FileAccessed) using VICTIM_PROFILE['email']
- Updated microsoft_365_collaboration parser to copy unmapped.user.email_addr to user.email_addr for OCSF actor.user.email_addr mapping
- Enables correlation of M365 collaboration events with email security events via actor email address
---
Backend/scenarios/apollo_ransomware_scenario.py | 3 +++
.../microsoft_365_collaboration.conf | 6 ++++++
2 files changed, 9 insertions(+)
diff --git a/Backend/scenarios/apollo_ransomware_scenario.py b/Backend/scenarios/apollo_ransomware_scenario.py
index fcd0f56..fb7895b 100644
--- a/Backend/scenarios/apollo_ransomware_scenario.py
+++ b/Backend/scenarios/apollo_ransomware_scenario.py
@@ -466,6 +466,7 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
m365_email_access['SiteUrl'] = f"https://outlook.office365.com/mail/inbox"
# Parser-mapped fields for OCSF synthetic columns
m365_email_access['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
+ m365_email_access['user.email_addr'] = VICTIM_PROFILE['email'] # -> actor.user.email_addr
events.append(create_event(email_access_time, "microsoft_365_collaboration", "email_interaction", m365_email_access))
# Attachment preview/download - 30 seconds after MailItemsAccessed (1 min after delivery)
@@ -482,6 +483,7 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
m365_attachment['SiteUrl'] = f"https://outlook.office365.com/mail/inbox"
# Parser-mapped fields for OCSF synthetic columns
m365_attachment['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
+ m365_attachment['user.email_addr'] = VICTIM_PROFILE['email'] # -> actor.user.email_addr
events.append(create_event(attachment_time, "microsoft_365_collaboration", "email_interaction", m365_attachment))
# File opened in Excel Online / locally - 30 seconds after FileDownloaded (1 min 30 sec after delivery)
@@ -498,6 +500,7 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
m365_file_open['SiteUrl'] = f"https://starfleet-my.sharepoint.com/personal/{VICTIM_PROFILE['username']}"
# Parser-mapped fields for OCSF synthetic columns
m365_file_open['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
+ m365_file_open['user.email_addr'] = VICTIM_PROFILE['email'] # -> actor.user.email_addr
events.append(create_event(file_open_time, "microsoft_365_collaboration", "file_access", m365_file_open))
return events
diff --git a/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf b/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
index 5158a5e..a6932cd 100644
--- a/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
+++ b/Backend/utilities/parsers/community_new/ai-siem-main/parsers/community/microsoft_365_collaboration-latest/microsoft_365_collaboration.conf
@@ -140,6 +140,12 @@
"to": "actor.user.name"
}
},
+ {
+ "copy": {
+ "from": "unmapped.user.email_addr",
+ "to": "user.email_addr"
+ }
+ },
{
"rename": {
"from": "unmapped.Details",
From 8d13ad555dac6d4cdeb5d65b1fa47c02c9788e8e Mon Sep 17 00:00:00 2001
From: jmorascalyr <42879226+jmorascalyr@users.noreply.github.com>
Date: Fri, 20 Feb 2026 06:47:57 -0700
Subject: [PATCH 3/8] feat: Add object_id field to M365 email interaction
events for mail items analysis
- Added object_id field to MailItemsAccessed, FileDownloaded, and FileAccessed events with contextual paths (/Inbox/, /Attachments/, /Documents/)
- Enables mail items analysis and tracking of malicious attachment flow through M365 collaboration events
- Maps to OCSF object_id field for consistent object identification across email interaction phases
---
Backend/scenarios/apollo_ransomware_scenario.py | 3 +++
1 file changed, 3 insertions(+)
diff --git a/Backend/scenarios/apollo_ransomware_scenario.py b/Backend/scenarios/apollo_ransomware_scenario.py
index fb7895b..cc2b46b 100644
--- a/Backend/scenarios/apollo_ransomware_scenario.py
+++ b/Backend/scenarios/apollo_ransomware_scenario.py
@@ -467,6 +467,7 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
# Parser-mapped fields for OCSF synthetic columns
m365_email_access['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
m365_email_access['user.email_addr'] = VICTIM_PROFILE['email'] # -> actor.user.email_addr
+ m365_email_access['object_id'] = f"/Inbox/{ATTACKER_PROFILE['malicious_xlsx']}" # -> object_id for mail items analysis
events.append(create_event(email_access_time, "microsoft_365_collaboration", "email_interaction", m365_email_access))
# Attachment preview/download - 30 seconds after MailItemsAccessed (1 min after delivery)
@@ -484,6 +485,7 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
# Parser-mapped fields for OCSF synthetic columns
m365_attachment['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
m365_attachment['user.email_addr'] = VICTIM_PROFILE['email'] # -> actor.user.email_addr
+ m365_attachment['object_id'] = f"/Attachments/{ATTACKER_PROFILE['malicious_xlsx']}" # -> object_id for mail items analysis
events.append(create_event(attachment_time, "microsoft_365_collaboration", "email_interaction", m365_attachment))
# File opened in Excel Online / locally - 30 seconds after FileDownloaded (1 min 30 sec after delivery)
@@ -501,6 +503,7 @@ def generate_m365_email_interaction(base_time: datetime) -> List[Dict]:
# Parser-mapped fields for OCSF synthetic columns
m365_file_open['RequestedBy'] = VICTIM_PROFILE['name'] # -> actor.user.name
m365_file_open['user.email_addr'] = VICTIM_PROFILE['email'] # -> actor.user.email_addr
+ m365_file_open['object_id'] = f"/Documents/{ATTACKER_PROFILE['malicious_xlsx']}" # -> object_id for mail items analysis
events.append(create_event(file_open_time, "microsoft_365_collaboration", "file_access", m365_file_open))
return events
From d309967c56c3e8ea1934bb1fffd9a195cf9469aa Mon Sep 17 00:00:00 2001
From: jmorascalyr <42879226+jmorascalyr@users.noreply.github.com>
Date: Fri, 20 Feb 2026 10:38:11 -0700
Subject: [PATCH 4/8] feat: Add overwrite_parser flag to enable parser updates
across all execution paths
- Added overwrite_parser boolean field to GeneratorExecuteRequest, ScenarioExecuteRequest, CorrelationRunRequest, ParserSyncRequest, and SingleParserSyncRequest models
- Updated parser sync service to skip existence check when overwrite=True, allowing parser updates instead of skipping existing parsers
- Added JARVIS_OVERWRITE_PARSER environment variable support in hec_sender for generator-level parser overwrite control
---
Backend/api/app/models/requests.py | 2 +
Backend/api/app/routers/generators.py | 3 +-
Backend/api/app/routers/parser_sync.py | 6 +-
Backend/api/app/routers/scenarios.py | 4 +
Backend/api/app/services/generator_service.py | 3 +-
.../api/app/services/parser_sync_service.py | 28 ++++--
Backend/api/app/services/scenario_service.py | 26 ++++--
Backend/event_generators/shared/hec_sender.py | 2 +
Frontend/log_generator_ui.py | 92 ++++++++++++++++++-
Frontend/templates/log_generator.html | 36 +++++++-
10 files changed, 179 insertions(+), 23 deletions(-)
diff --git a/Backend/api/app/models/requests.py b/Backend/api/app/models/requests.py
index 2d379a7..0f07f2a 100644
--- a/Backend/api/app/models/requests.py
+++ b/Backend/api/app/models/requests.py
@@ -14,6 +14,7 @@ class GeneratorExecuteRequest(BaseModel):
continuous: bool = Field(default=False, description="Run indefinitely (ignores count)")
eps: Optional[float] = Field(None, ge=0.1, le=10000, description="Events per second rate")
speed_mode: bool = Field(False, description="Pre-generate 1K events and loop for max throughput (auto-enabled for EPS > 1000)")
+ overwrite_parser: bool = Field(False, description="Overwrite existing parsers during execution")
options: Dict[str, Any] = Field(default_factory=dict, description="Generator-specific options")
@validator('count')
@@ -52,6 +53,7 @@ class ScenarioExecuteRequest(BaseModel):
"""Scenario execution request"""
speed: str = Field("fast", pattern="^(realtime|fast|instant)$")
dry_run: bool = Field(False)
+ overwrite_parser: bool = Field(False, description="Overwrite existing parsers during execution")
class Config:
extra = "forbid"
diff --git a/Backend/api/app/routers/generators.py b/Backend/api/app/routers/generators.py
index 8f575f1..8f0e155 100644
--- a/Backend/api/app/routers/generators.py
+++ b/Backend/api/app/routers/generators.py
@@ -190,7 +190,8 @@ async def execute_generator(
count=request.count,
format=request.format,
star_trek_theme=request.star_trek_theme,
- options=request.options
+ options=request.options,
+ overwrite_parser=request.overwrite_parser
)
execution_time = (time.time() - start_time) * 1000 # Convert to ms
diff --git a/Backend/api/app/routers/parser_sync.py b/Backend/api/app/routers/parser_sync.py
index d68c8a3..7cb2ba5 100644
--- a/Backend/api/app/routers/parser_sync.py
+++ b/Backend/api/app/routers/parser_sync.py
@@ -22,6 +22,7 @@ class ParserSyncRequest(BaseModel):
github_repo_urls: Optional[List[str]] = Field(None, description="Optional GitHub repository URLs to fetch parsers from")
github_token: Optional[str] = Field(None, description="Optional GitHub token for private repositories")
selected_parsers: Optional[Dict[str, Dict]] = Field(None, description="Optional user-selected parsers for similar name resolution")
+ overwrite_parser: bool = Field(False, description="If true, overwrite existing parsers instead of skipping them")
class ParserSyncResponse(BaseModel):
@@ -77,7 +78,8 @@ async def sync_parsers(
config_write_token=request.config_write_token,
github_repo_urls=request.github_repo_urls,
github_token=request.github_token,
- selected_parsers=request.selected_parsers
+ selected_parsers=request.selected_parsers,
+ overwrite=request.overwrite_parser
)
# Count results (include uploaded_from_github in uploaded count)
@@ -107,6 +109,7 @@ class SingleParserSyncRequest(BaseModel):
config_write_token: str = Field(..., description="Config API token for reading and writing parsers")
github_repo_urls: Optional[List[str]] = Field(None, description="Optional GitHub repository URLs to fetch parsers from")
github_token: Optional[str] = Field(None, description="Optional GitHub token for private repositories")
+ overwrite_parser: bool = Field(False, description="If true, overwrite existing parser instead of skipping")
class SingleParserSyncResponse(BaseModel):
@@ -141,6 +144,7 @@ async def sync_single_parser(
config_write_token=request.config_write_token,
github_repo_urls=request.github_repo_urls,
github_token=request.github_token,
+ overwrite=request.overwrite_parser,
)
status = result.get("status", "no_parser")
message = result.get("message", "Unknown status")
diff --git a/Backend/api/app/routers/scenarios.py b/Backend/api/app/routers/scenarios.py
index 8489a41..c0d118c 100644
--- a/Backend/api/app/routers/scenarios.py
+++ b/Backend/api/app/routers/scenarios.py
@@ -40,6 +40,7 @@ class CorrelationRunRequest(BaseModel):
tag_phase: bool = True
tag_trace: bool = True
workers: int = 10
+ overwrite_parser: bool = False
# Initialize scenario service
scenario_service = ScenarioService()
@@ -319,6 +320,7 @@ async def run_correlation_scenario(
trace_id=request.trace_id,
tag_phase=request.tag_phase,
tag_trace=request.tag_trace,
+ overwrite_parser=request.overwrite_parser,
background_tasks=background_tasks
)
@@ -371,6 +373,7 @@ async def execute_scenario(
scenario_id: str = Path(..., description="Scenario identifier"),
speed: str = Query("fast", description="Execution speed: realtime, fast, instant"),
dry_run: bool = Query(False, description="Simulate without generating events"),
+ overwrite_parser: bool = Query(False, description="Overwrite existing parsers during execution"),
_: str = Depends(require_write_access)
):
"""Execute an attack scenario"""
@@ -385,6 +388,7 @@ async def execute_scenario(
scenario_id=scenario_id,
speed=speed,
dry_run=dry_run,
+ overwrite_parser=overwrite_parser,
background_tasks=background_tasks
)
diff --git a/Backend/api/app/services/generator_service.py b/Backend/api/app/services/generator_service.py
index 2d8438e..26ee8b0 100644
--- a/Backend/api/app/services/generator_service.py
+++ b/Backend/api/app/services/generator_service.py
@@ -172,7 +172,8 @@ async def execute_generator(
count: int = 1,
format: str = "json",
star_trek_theme: bool = True,
- options: Dict[str, Any] = None
+ options: Dict[str, Any] = None,
+ overwrite_parser: bool = False
) -> List[Dict[str, Any]]:
"""Execute a generator and return events"""
if generator_id not in self.generator_metadata:
diff --git a/Backend/api/app/services/parser_sync_service.py b/Backend/api/app/services/parser_sync_service.py
index dd7d429..a79c20b 100644
--- a/Backend/api/app/services/parser_sync_service.py
+++ b/Backend/api/app/services/parser_sync_service.py
@@ -136,12 +136,9 @@ def get_parser_path_in_siem(self, sourcetype: str) -> str:
Returns:
The parser path in SIEM (e.g., '/logParsers/okta_authentication-latest')
"""
- # In the Scalyr/SentinelOne config tree, log parsers are stored as JSON files
- # under /logParsers.
- leaf = sourcetype
- if not leaf.endswith(".json"):
- leaf = f"{leaf}.json"
- return f"/logParsers/{leaf}"
+ # In the Scalyr/SentinelOne config tree, log parsers are stored
+ # under /logParsers without a file extension.
+ return f"/logParsers/{sourcetype}"
def _local_parser_directories_for_sourcetype(self, sourcetype: str) -> List[Path]:
local_name = LOCAL_PARSER_ALIASES.get(sourcetype, sourcetype)
@@ -207,15 +204,19 @@ def ensure_parser_for_sourcetype(
github_repo_urls: Optional[List[str]] = None,
github_token: Optional[str] = None,
selected_parser: Optional[Dict] = None,
+ overwrite: bool = False,
) -> Dict[str, str]:
parser_path = self.get_parser_path_in_siem(sourcetype)
exists, _ = self.check_parser_exists(config_write_token, parser_path)
- if exists:
+ if exists and not overwrite:
return {
"status": "exists",
"message": f"Parser already exists: {parser_path}",
}
+
+ if exists and overwrite:
+ logger.info(f"Overwriting existing parser: {parser_path}")
parser_content = self.load_local_parser(sourcetype)
from_github = False
@@ -434,7 +435,8 @@ def ensure_parsers_for_sources(
config_write_token: str,
github_repo_urls: Optional[List[str]] = None,
github_token: Optional[str] = None,
- selected_parsers: Optional[Dict[str, Dict]] = None
+ selected_parsers: Optional[Dict[str, Dict]] = None,
+ overwrite: bool = False
) -> Dict[str, dict]:
"""
Ensure all required parsers exist in the destination SIEM
@@ -445,6 +447,7 @@ def ensure_parsers_for_sources(
github_repo_urls: Optional list of GitHub repository URLs to fetch parsers from
github_token: Optional GitHub personal access token for private repos
selected_parsers: Optional dict mapping sourcetype to user-selected parser info
+ overwrite: If True, overwrite existing parsers instead of skipping them
Returns:
Dict with status for each source:
@@ -492,7 +495,7 @@ def ensure_parsers_for_sources(
# Check if parser exists (write token can also read)
exists, _ = self.check_parser_exists(config_write_token, parser_path)
- if exists:
+ if exists and not overwrite:
results[source] = {
"status": "exists",
"sourcetype": sourcetype,
@@ -500,6 +503,13 @@ def ensure_parsers_for_sources(
}
continue
+ if exists and overwrite:
+ # Parser exists but we need to overwrite it
+ logger.info(f"Overwriting existing parser: {parser_path}")
+ else:
+ # Parser doesn't exist, we need to create it
+ logger.info(f"Creating new parser: {parser_path}")
+
# Parser doesn't exist, try to find it
parser_content = None
from_github = False
diff --git a/Backend/api/app/services/scenario_service.py b/Backend/api/app/services/scenario_service.py
index c8e3d07..cbfbd35 100644
--- a/Backend/api/app/services/scenario_service.py
+++ b/Backend/api/app/services/scenario_service.py
@@ -1,16 +1,15 @@
"""
Scenario service for managing attack scenarios
"""
-from typing import Dict, Any, List, Optional
-import uuid
-import time
-import asyncio
-import json
-from datetime import datetime
import logging
import os
import sys
+import json
+import uuid
+import asyncio
import importlib
+from datetime import datetime
+from typing import Dict, Any, Optional, List
from pathlib import Path
logger = logging.getLogger(__name__)
@@ -129,6 +128,17 @@ def __init__(self):
]
}
,
+ "apollo_ransomware_scenario": {
+ "id": "apollo_ransomware_scenario",
+ "name": "Apollo Ransomware Scenario",
+ "description": "Proofpoint phishing, M365 email interaction, SharePoint recon & exfiltration",
+ "phases": [
+ {"name": "Phishing Delivery", "generators": ["proofpoint"], "duration": 5},
+ {"name": "Email Interaction", "generators": ["microsoft_365_collaboration"], "duration": 5},
+ {"name": "SharePoint Recon", "generators": ["microsoft_365_collaboration"], "duration": 15},
+ {"name": "Data Exfiltration", "generators": ["microsoft_365_collaboration"], "duration": 10}
+ ]
+ },
"hr_phishing_pdf_c2": {
"id": "hr_phishing_pdf_c2",
"name": "HR Phishing PDF -> PowerShell -> Scheduled Task -> C2",
@@ -236,6 +246,7 @@ async def start_scenario(
scenario_id: str,
speed: str = "fast",
dry_run: bool = False,
+ overwrite_parser: bool = False,
background_tasks=None
) -> str:
"""Start scenario execution"""
@@ -252,6 +263,7 @@ async def start_scenario(
"started_at": datetime.utcnow().isoformat(),
"speed": speed,
"dry_run": dry_run,
+ "overwrite_parser": overwrite_parser,
"progress": 0
}
@@ -269,6 +281,7 @@ async def start_correlation_scenario(
tag_trace: bool = True,
speed: str = "fast",
dry_run: bool = False,
+ overwrite_parser: bool = False,
background_tasks=None
) -> str:
"""Start correlation scenario execution with SIEM context and trace ID support"""
@@ -285,6 +298,7 @@ async def start_correlation_scenario(
"trace_id": trace_id,
"tag_phase": tag_phase,
"tag_trace": tag_trace,
+ "overwrite_parser": overwrite_parser,
"progress": 0
}
diff --git a/Backend/event_generators/shared/hec_sender.py b/Backend/event_generators/shared/hec_sender.py
index 1cbc99b..14e358c 100644
--- a/Backend/event_generators/shared/hec_sender.py
+++ b/Backend/event_generators/shared/hec_sender.py
@@ -958,6 +958,7 @@ def _send_batch(lines: list, is_json: bool, product: str):
# Optional: ensure parsers exist in the destination SIEM before sending events
_ENSURE_PARSER = os.getenv("JARVIS_ENSURE_PARSER", "false").lower() == "true"
+_OVERWRITE_PARSER = os.getenv("JARVIS_OVERWRITE_PARSER", "false").lower() == "true"
_JARVIS_API_BASE_URL = os.getenv("JARVIS_API_BASE_URL", "http://localhost:8000").rstrip("/")
_JARVIS_API_KEY = os.getenv("JARVIS_API_KEY")
_S1_CONFIG_API_URL = os.getenv("S1_CONFIG_API_URL")
@@ -1002,6 +1003,7 @@ def _ensure_parser_in_destination(product: str) -> None:
"sourcetype": sync_sourcetype,
"config_api_url": _S1_CONFIG_API_URL,
"config_write_token": _S1_CONFIG_WRITE_TOKEN,
+ "overwrite_parser": _OVERWRITE_PARSER,
}
try:
diff --git a/Frontend/log_generator_ui.py b/Frontend/log_generator_ui.py
index fc7bc91..ffde8e1 100644
--- a/Frontend/log_generator_ui.py
+++ b/Frontend/log_generator_ui.py
@@ -544,6 +544,7 @@ def run_correlation_scenario():
tag_trace = data.get('tag_trace', True)
trace_id = (data.get('trace_id') or '').strip()
local_token = data.get('hec_token')
+ overwrite_parser = data.get('overwrite_parser', False)
if not scenario_id:
return jsonify({'error': 'scenario_id is required'}), 400
@@ -551,6 +552,10 @@ def run_correlation_scenario():
return jsonify({'error': 'destination_id is required'}), 400
# Resolve destination
+ config_write_token = None
+ config_api_url = None
+ github_repo_urls = []
+ github_token = None
try:
dest_resp = requests.get(
f"{API_BASE_URL}/api/v1/destinations/{destination_id}",
@@ -580,6 +585,35 @@ def run_correlation_scenario():
if not hec_url or not hec_token:
return jsonify({'error': 'HEC destination incomplete'}), 400
+
+ # Fetch config token and URL for parser sync if available
+ config_api_url = chosen.get('config_api_url')
+ if chosen.get('has_config_write_token') and config_api_url:
+ try:
+ config_resp = requests.get(
+ f"{API_BASE_URL}/api/v1/destinations/{destination_id}/config-tokens",
+ headers=_get_api_headers(),
+ timeout=10
+ )
+ if config_resp.status_code == 200:
+ config_tokens = config_resp.json()
+ config_write_token = config_tokens.get('config_write_token')
+ except Exception as ce:
+ logger.warning(f"Failed to retrieve config token for correlation: {ce}")
+
+ # Fetch GitHub parser repositories from settings
+ try:
+ repos_resp = requests.get(
+ f"{API_BASE_URL}/api/v1/settings/parser-repositories",
+ headers=_get_api_headers(),
+ timeout=10
+ )
+ if repos_resp.status_code == 200:
+ repos_data = repos_resp.json()
+ github_repo_urls = [url for url in repos_data.get('repositories', []) if url]
+ github_token = repos_data.get('github_token')
+ except Exception as ge:
+ logger.warning(f"Failed to retrieve GitHub parser repositories: {ge}")
except Exception as e:
return jsonify({'error': f'Failed to resolve destination: {str(e)}'}), 500
@@ -608,6 +642,50 @@ def generate_and_stream():
try:
yield "INFO: Starting correlation scenario execution...\n"
+ # Parser sync: Check and upload required parsers before running scenario
+ if config_write_token and config_api_url:
+ if overwrite_parser:
+ yield "INFO: Checking required parsers in destination SIEM (overwrite mode ON)...\n"
+ else:
+ yield "INFO: Checking required parsers in destination SIEM...\n"
+ try:
+ sync_payload = {
+ "scenario_id": scenario_id,
+ "config_api_url": config_api_url,
+ "config_write_token": config_write_token,
+ "overwrite_parser": overwrite_parser
+ }
+ if github_repo_urls:
+ sync_payload["github_repo_urls"] = github_repo_urls
+ if github_token:
+ sync_payload["github_token"] = github_token
+
+ sync_resp = requests.post(
+ f"{API_BASE_URL}/api/v1/parser-sync/sync",
+ headers=_get_api_headers(),
+ json=sync_payload,
+ timeout=120
+ )
+ if sync_resp.status_code == 200:
+ sync_result = sync_resp.json()
+ for source, info in sync_result.get('results', {}).items():
+ status = info.get('status', 'unknown')
+ message = info.get('message', '')
+ sourcetype = info.get('sourcetype', 'unknown')
+ if status == 'exists':
+ yield f"INFO: Parser exists: {source} -> {sourcetype}\n"
+ elif status in ('uploaded', 'uploaded_from_github'):
+ yield f"INFO: Parser uploaded: {source} -> {sourcetype}\n"
+ elif status == 'failed':
+ yield f"WARN: Parser sync failed: {source} -> {sourcetype} - {message}\n"
+ elif status == 'no_parser':
+ yield f"WARN: No parser mapping: {source}\n"
+ yield "INFO: Parser sync complete\n"
+ else:
+ yield f"WARN: Parser sync API returned {sync_resp.status_code}, continuing without sync\n"
+ except Exception as pe:
+ yield f"WARN: Parser sync failed: {pe}, continuing without sync\n"
+
if siem_context and siem_context.get('results'):
yield f"INFO: Using SIEM context with {len(siem_context.get('results', []))} results\n"
if siem_context.get('anchors'):
@@ -843,6 +921,7 @@ def run_scenario():
local_token = data.get('hec_token') # Token from browser localStorage
sync_parsers = data.get('sync_parsers', True) # Enable parser sync by default
debug_mode = data.get('debug_mode', False) # Verbose logging mode
+ overwrite_parser = data.get('overwrite_parser', False) # Overwrite existing parsers
if not scenario_id:
return jsonify({'error': 'scenario_id is required'}), 400
@@ -952,13 +1031,17 @@ def generate_and_stream():
# Parser sync: Check and upload required parsers before running scenario
if sync_parsers and config_write_token and config_api_url:
- yield "INFO: Checking required parsers in destination SIEM...\n"
+ if overwrite_parser:
+ yield "INFO: Checking required parsers in destination SIEM (overwrite mode ON)...\n"
+ else:
+ yield "INFO: Checking required parsers in destination SIEM...\n"
try:
# Call the parser sync API with GitHub repos
sync_payload = {
"scenario_id": scenario_id,
"config_api_url": config_api_url,
- "config_write_token": config_write_token
+ "config_write_token": config_write_token,
+ "overwrite_parser": overwrite_parser
}
if github_repo_urls:
sync_payload["github_repo_urls"] = github_repo_urls
@@ -1686,7 +1769,8 @@ def generate_logs():
eps = float(data.get('eps', 1.0))
continuous = data.get('continuous', False)
speed_mode = data.get('speed_mode', False)
- ensure_parser = bool(data.get('ensure_parser', False))
+ overwrite_parser = bool(data.get('overwrite_parser', False))
+ ensure_parser = bool(data.get('ensure_parser', False)) or overwrite_parser
syslog_ip = data.get('ip')
syslog_port = int(data.get('port')) if data.get('port') is not None else None
syslog_protocol = data.get('protocol')
@@ -1916,6 +2000,8 @@ def _normalize_hec_url(u: str) -> str:
if ensure_parser:
env['JARVIS_ENSURE_PARSER'] = 'true'
+ if overwrite_parser:
+ env['JARVIS_OVERWRITE_PARSER'] = 'true'
env['JARVIS_API_BASE_URL'] = API_BASE_URL
if BACKEND_API_KEY:
env['JARVIS_API_KEY'] = BACKEND_API_KEY
diff --git a/Frontend/templates/log_generator.html b/Frontend/templates/log_generator.html
index 5bfe58a..0afa7ea 100644
--- a/Frontend/templates/log_generator.html
+++ b/Frontend/templates/log_generator.html
@@ -379,6 +379,14 @@
For HEC destinations: checks & uploads the parser to the destination SIEM before sending events (requires Config API URL + write token on the destination).
+
+
+
If checked, will overwrite existing parsers in the destination SIEM instead of skipping them. Use with caution.
+
+
+
+ Parser Options
+
+
+
If checked, will overwrite existing parsers in the destination SIEM instead of skipping them. Use with caution.
+
+
+
+
+
If checked, will overwrite existing parsers in the destination SIEM instead of skipping them. Use with caution.