Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ agentic_security.toml
/venv
*.csv
agentic_security/agents/operator_agno.py
.claude/
plan.md
auto_loop.sh
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ repos:
- id: flake8
language_version: python3.11
additional_dependencies: [flake8-docstrings]
exclude: '^(tests)/'

# - repo: https://github.com/PyCQA/isort
# rev: 7.0.0
Expand Down
8 changes: 6 additions & 2 deletions agentic_security/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from .lib import SecurityScanner
from agentic_security.cache_config import ensure_cache_dir

__all__ = ["SecurityScanner"]
ensure_cache_dir()

from .lib import SecurityScanner # noqa: E402

__all__ = ["SecurityScanner", "ensure_cache_dir"]
23 changes: 23 additions & 0 deletions agentic_security/cache_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Utilities to keep cache-to-disk storage in a writable, predictable location."""

from __future__ import annotations

import os
from pathlib import Path


def ensure_cache_dir(base_dir: Path | None = None) -> Path:
"""Ensure ``DISK_CACHE_DIR`` points to a writable directory and create it if needed."""
env_var = "DISK_CACHE_DIR"
configured_path = os.environ.get(env_var) or os.environ.get(
"AGENTIC_SECURITY_CACHE_DIR"
)
cache_dir = Path(
configured_path or base_dir or Path.cwd() / ".cache" / "agentic_security"
).expanduser()
cache_dir.mkdir(parents=True, exist_ok=True)
os.environ[env_var] = str(cache_dir)
return cache_dir


__all__ = ["ensure_cache_dir"]
23 changes: 16 additions & 7 deletions agentic_security/core/app.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import os
from asyncio import Event, Queue
from typing import TypedDict

from fastapi import FastAPI
from fastapi.responses import ORJSONResponse

from agentic_security.http_spec import LLMSpec


class CurrentRun(TypedDict):
id: int | None
spec: LLMSpec | None


tools_inbox: Queue = Queue()
stop_event: Event = Event()
current_run: str = {"spec": "", "id": ""}
current_run: CurrentRun = {"spec": None, "id": None}
_secrets: dict[str, str] = {}

current_run: dict[str, int | LLMSpec] = {"spec": "", "id": ""}


def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
Expand All @@ -30,13 +35,13 @@ def get_stop_event() -> Event:
return stop_event


def get_current_run() -> dict[str, int | LLMSpec]:
def get_current_run() -> CurrentRun:
"""Get the current run id."""
return current_run


def set_current_run(spec: LLMSpec) -> dict[str, int | LLMSpec]:
"""Set the current run id."""
def set_current_run(spec: LLMSpec) -> CurrentRun:
"""Set the current run metadata based on a spec instance."""
current_run["id"] = hash(id(spec))
current_run["spec"] = spec
return current_run
Expand All @@ -56,4 +61,8 @@ def expand_secrets(secrets: dict[str, str]) -> None:
for key in secrets:
val = secrets[key]
if val.startswith("$"):
secrets[key] = os.getenv(val.strip("$"))
env_value = os.getenv(val.strip("$"))
if env_value is not None:
secrets[key] = env_value
else:
secrets[key] = None
12 changes: 12 additions & 0 deletions agentic_security/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Advanced concurrent execution package for security scanning."""

from agentic_security.executor.rate_limiter import TokenBucketRateLimiter
from agentic_security.executor.circuit_breaker import CircuitBreaker
from agentic_security.executor.concurrent import ConcurrentExecutor, ExecutorMetrics

__all__ = [
"TokenBucketRateLimiter",
"CircuitBreaker",
"ConcurrentExecutor",
"ExecutorMetrics",
]
109 changes: 109 additions & 0 deletions agentic_security/executor/circuit_breaker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Circuit breaker pattern for fault tolerance."""

import time
from typing import Literal


CircuitState = Literal["closed", "open", "half_open"]


class CircuitBreaker:
"""Circuit breaker to prevent cascading failures.

Implements the circuit breaker pattern with three states:
- closed: Normal operation, requests pass through
- open: Failure threshold exceeded, requests fail fast
- half_open: Recovery attempt, limited requests allowed

Example:
>>> breaker = CircuitBreaker(failure_threshold=0.5, recovery_timeout=30)
>>> if breaker.is_open():
... raise Exception("Circuit breaker is open")
>>> try:
... result = make_request()
... breaker.record_success()
>>> except Exception:
... breaker.record_failure()
"""

def __init__(self, failure_threshold: float = 0.5, recovery_timeout: int = 30):
"""Initialize circuit breaker.

Args:
failure_threshold: Failure rate (0.0-1.0) that triggers open state
recovery_timeout: Seconds to wait before attempting recovery
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.successes = 0
self.state: CircuitState = "closed"
self.last_failure_time: float | None = None

def record_success(self):
"""Record a successful request."""
self.successes += 1

# If in half_open state and we have enough successes, close the circuit
if self.state == "half_open" and self.successes >= 3:
self.state = "closed"
self.failures = 0
self.successes = 0

def record_failure(self):
"""Record a failed request."""
self.failures += 1
self.last_failure_time = time.monotonic()

total = self.failures + self.successes

# Need minimum sample size before opening circuit
if total >= 10:
failure_rate = self.failures / total
if failure_rate >= self.failure_threshold:
self.state = "open"

def is_open(self) -> bool:
"""Check if circuit breaker is open.

Returns:
bool: True if circuit is open and requests should be blocked
"""
if self.state == "open":
# Check if we should attempt recovery
if self.last_failure_time is not None:
if time.monotonic() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open"
# Reset counters for half-open state
self.failures = 0
self.successes = 0
return False
return True

return False

def get_state(self) -> CircuitState:
"""Get current circuit breaker state.

Returns:
CircuitState: Current state (closed, open, or half_open)
"""
return self.state

def get_failure_rate(self) -> float:
"""Get current failure rate.

Returns:
float: Failure rate (0.0-1.0), or 0.0 if no requests recorded
"""
total = self.failures + self.successes
if total == 0:
return 0.0
return self.failures / total

def reset(self):
"""Reset circuit breaker to initial state."""
self.failures = 0
self.successes = 0
self.state = "closed"
self.last_failure_time = None
Loading