Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions agentic_security/attack_rules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from agentic_security.attack_rules.models import AttackRule, AttackRuleSeverity
from agentic_security.attack_rules.loader import RuleLoader, load_rules_from_directory
from agentic_security.attack_rules.dataset import (
rules_to_dataset,
load_rules_as_dataset,
YAMLRulesDatasetLoader,
)

__all__ = [
"AttackRule",
"AttackRuleSeverity",
"RuleLoader",
"load_rules_from_directory",
"rules_to_dataset",
"load_rules_as_dataset",
"YAMLRulesDatasetLoader",
]
128 changes: 128 additions & 0 deletions agentic_security/attack_rules/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from pathlib import Path

from agentic_security.attack_rules.loader import RuleLoader
from agentic_security.attack_rules.models import AttackRule, AttackRuleSeverity
from agentic_security.probe_data.models import ProbeDataset


def rules_to_dataset(
rules: list[AttackRule],
name: str = "YAML Rules",
variables: dict[str, str] | None = None,
) -> ProbeDataset:
prompts = [rule.render_prompt(variables) for rule in rules]
tokens = sum(len(p.split()) for p in prompts)

return ProbeDataset(
dataset_name=name,
metadata={
"source": "yaml_rules",
"rule_count": len(rules),
"types": list({r.type for r in rules}),
},
prompts=prompts,
tokens=tokens,
approx_cost=0.0,
)


def load_rules_as_dataset(
directory: str | Path,
types: list[str] | None = None,
severities: list[str] | None = None,
recursive: bool = True,
variables: dict[str, str] | None = None,
) -> ProbeDataset:
loader = RuleLoader()
rules = loader.load_rules_from_directory(directory, recursive)

severity_enums = None
if severities:
severity_enums = [AttackRuleSeverity.from_string(s) for s in severities]

filtered = loader.filter_rules(rules, types=types, severities=severity_enums)

name = f"YAML Rules ({Path(directory).name})"
if types:
name = f"YAML Rules [{', '.join(types)}]"

return rules_to_dataset(filtered, name=name, variables=variables)


class YAMLRulesDatasetLoader:
def __init__(
self,
directories: list[str | Path] | None = None,
types: list[str] | None = None,
severities: list[str] | None = None,
recursive: bool = True,
):
self.directories = directories or []
self.types = types
self.severities = severities
self.recursive = recursive
self._loader = RuleLoader()

def add_directory(self, directory: str | Path):
self.directories.append(directory)

def add_builtin_rules(self, rules_subdir: str = "rules"):
builtin = Path(__file__).parent / rules_subdir
if builtin.exists():
self.directories.append(builtin)

def load(self, variables: dict[str, str] | None = None) -> list[ProbeDataset]:
datasets = []

for directory in self.directories:
directory = Path(directory)
if not directory.exists():
continue

rules = self._loader.load_rules_from_directory(directory, self.recursive)

severity_enums = None
if self.severities:
severity_enums = [
AttackRuleSeverity.from_string(s) for s in self.severities
]

filtered = self._loader.filter_rules(
rules, types=self.types, severities=severity_enums
)

if not filtered:
continue

dataset = rules_to_dataset(
filtered,
name=f"YAML Rules ({directory.name})",
variables=variables,
)
datasets.append(dataset)

return datasets

def load_merged(self, variables: dict[str, str] | None = None) -> ProbeDataset:
all_rules = []

for directory in self.directories:
directory = Path(directory)
if not directory.exists():
continue
rules = self._loader.load_rules_from_directory(directory, self.recursive)
all_rules.extend(rules)

severity_enums = None
if self.severities:
severity_enums = [
AttackRuleSeverity.from_string(s) for s in self.severities
]

filtered = self._loader.filter_rules(
all_rules, types=self.types, severities=severity_enums
)

return rules_to_dataset(
filtered, name="YAML Rules (merged)", variables=variables
)
156 changes: 156 additions & 0 deletions agentic_security/attack_rules/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from pathlib import Path

import yaml

from agentic_security.attack_rules.models import AttackRule, AttackRuleSeverity
from agentic_security.logutils import logger


class RuleValidationError(Exception):
pass


class RuleLoader:
REQUIRED_FIELDS = {"name", "prompt"}
VALID_EXTENSIONS = {".yaml", ".yml"}

def __init__(self, rules_dir: str | Path | None = None):
self.rules_dir = Path(rules_dir) if rules_dir else None
self._rules: list[AttackRule] = []

def validate_rule_data(self, data: dict, filepath: str | None = None) -> list[str]:
errors = []
for field in self.REQUIRED_FIELDS:
if field not in data or not data[field]:
errors.append(f"Missing required field: {field}")

if "severity" in data and data["severity"]:
if data["severity"].lower() not in {"low", "medium", "high"}:
errors.append(f"Invalid severity: {data['severity']}")

if filepath:
errors = [f"{filepath}: {e}" for e in errors]
return errors

def load_rule_from_file(self, filepath: str | Path) -> AttackRule | None:
filepath = Path(filepath)
if filepath.suffix.lower() not in self.VALID_EXTENSIONS:
return None

try:
with open(filepath, encoding="utf-8") as f:
data = yaml.safe_load(f)

if not isinstance(data, dict):
logger.warning(f"Invalid YAML structure in {filepath}")
return None

errors = self.validate_rule_data(data, str(filepath))
if errors:
for error in errors:
logger.warning(error)
return None

rule = AttackRule.from_dict(data)
rule.metadata["source_file"] = str(filepath)
return rule

except yaml.YAMLError as e:
logger.error(f"YAML parsing error in {filepath}: {e}")
return None
except Exception as e:
logger.error(f"Error loading rule from {filepath}: {e}")
return None

def load_rule_from_string(self, yaml_content: str) -> AttackRule | None:
try:
data = yaml.safe_load(yaml_content)
if not isinstance(data, dict):
return None

errors = self.validate_rule_data(data)
if errors:
for error in errors:
logger.warning(error)
return None

return AttackRule.from_dict(data)
except yaml.YAMLError as e:
logger.error(f"YAML parsing error: {e}")
return None

def load_rules_from_directory(
self, directory: str | Path | None = None, recursive: bool = True
) -> list[AttackRule]:
directory = Path(directory) if directory else self.rules_dir
if not directory or not directory.exists():
logger.warning(f"Rules directory does not exist: {directory}")
return []

rules = []
# pattern = "**/*.yaml" if recursive else "*.yaml"

for ext in [".yaml", ".yml"]:
glob_pattern = f"**/*{ext}" if recursive else f"*{ext}"
for filepath in directory.glob(glob_pattern):
rule = self.load_rule_from_file(filepath)
if rule:
rules.append(rule)

logger.info(f"Loaded {len(rules)} rules from {directory}")
self._rules.extend(rules)
return rules

def load_multiple_directories(
self, directories: list[str | Path], recursive: bool = True
) -> list[AttackRule]:
all_rules = []
for directory in directories:
rules = self.load_rules_from_directory(directory, recursive)
all_rules.extend(rules)
return all_rules

def filter_rules(
self,
rules: list[AttackRule] | None = None,
types: list[str] | None = None,
severities: list[AttackRuleSeverity] | None = None,
name_pattern: str | None = None,
) -> list[AttackRule]:
rules = rules if rules is not None else self._rules
result = rules

if types:
result = [r for r in result if r.type in types]

if severities:
result = [r for r in result if r.severity in severities]

if name_pattern:
import re

pattern = re.compile(name_pattern, re.IGNORECASE)
result = [r for r in result if pattern.search(r.name)]

return result

def get_rules_by_type(self, rule_type: str) -> list[AttackRule]:
return self.filter_rules(types=[rule_type])

def get_rules_by_severity(self, severity: AttackRuleSeverity) -> list[AttackRule]:
return self.filter_rules(severities=[severity])

@property
def rules(self) -> list[AttackRule]:
return self._rules

@property
def rule_types(self) -> set[str]:
return {r.type for r in self._rules}


def load_rules_from_directory(
directory: str | Path, recursive: bool = True
) -> list[AttackRule]:
loader = RuleLoader()
return loader.load_rules_from_directory(directory, recursive)
81 changes: 81 additions & 0 deletions agentic_security/attack_rules/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class AttackRuleSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"

@classmethod
def from_string(cls, value: str) -> "AttackRuleSeverity":
try:
return cls(value.lower())
except ValueError:
return cls.MEDIUM


@dataclass
class AttackRule:
name: str
type: str
prompt: str
severity: AttackRuleSeverity = AttackRuleSeverity.MEDIUM
pass_conditions: list[str] = field(default_factory=list)
fail_conditions: list[str] = field(default_factory=list)
source: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AttackRule":
severity = AttackRuleSeverity.from_string(data.get("severity", "medium"))
return cls(
name=data["name"],
type=data.get("type", "unknown"),
prompt=data["prompt"],
severity=severity,
pass_conditions=data.get("pass_conditions", []),
fail_conditions=data.get("fail_conditions", []),
source=data.get("source"),
metadata={
k: v
for k, v in data.items()
if k
not in {
"name",
"type",
"prompt",
"severity",
"pass_conditions",
"fail_conditions",
"source",
}
},
)

def to_dict(self) -> dict[str, Any]:
result = {
"name": self.name,
"type": self.type,
"prompt": self.prompt,
"severity": self.severity.value,
}
if self.pass_conditions:
result["pass_conditions"] = self.pass_conditions
if self.fail_conditions:
result["fail_conditions"] = self.fail_conditions
if self.source:
result["source"] = self.source
if self.metadata:
result.update(self.metadata)
return result

def render_prompt(self, variables: dict[str, str] | None = None) -> str:
if not variables:
return self.prompt
result = self.prompt
for key, value in variables.items():
result = result.replace(f"{{{key}}}", value)
result = result.replace(f"{{{{ {key} }}}}", value)
return result
Loading
Loading