From 71787c6ec9331e29ea73ea2becea156e5b371513 Mon Sep 17 00:00:00 2001 From: nemanjaASE <93867316+nemanjaASE@users.noreply.github.com> Date: Sun, 9 Mar 2025 13:50:18 +0100 Subject: [PATCH 1/2] Add type annotations to functions and methods for improved clarity and maintainabiliy --- agentic_security/core/app.py | 19 +++++++++++++------ agentic_security/misc/banner.py | 17 +++++++++-------- agentic_security/report_chart.py | 6 ++++-- agentic_security/routes/scan.py | 11 ++++++----- 4 files changed, 32 insertions(+), 21 deletions(-) diff --git a/agentic_security/core/app.py b/agentic_security/core/app.py index 3938ba8..f97f380 100644 --- a/agentic_security/core/app.py +++ b/agentic_security/core/app.py @@ -4,11 +4,18 @@ from fastapi import FastAPI from fastapi.responses import ORJSONResponse +from agentic_security.http_spec import LLMSpec +from typing import Any, Dict + tools_inbox: Queue = Queue() stop_event: Event = Event() current_run: str = {"spec": "", "id": ""} -_secrets = {} +_secrets: dict[str, str] = {} +current_run: Dict[str, int | LLMSpec] = { + "spec": "", + "id": "" +} def create_app() -> FastAPI: """Create and configure the FastAPI application.""" @@ -26,29 +33,29 @@ def get_stop_event() -> Event: return stop_event -def get_current_run() -> str: +def get_current_run() -> Dict[str, int | LLMSpec]: """Get the current run id.""" return current_run -def set_current_run(spec): +def set_current_run(spec : LLMSpec) -> Dict[str, int | LLMSpec]: """Set the current run id.""" current_run["id"] = hash(id(spec)) current_run["spec"] = spec return current_run -def get_secrets(): +def get_secrets() -> dict[str, str]: return _secrets -def set_secrets(secrets): +def set_secrets(secrets : dict[str, str]) -> dict[str, str]: _secrets.update(secrets) expand_secrets(_secrets) return _secrets -def expand_secrets(secrets): +def expand_secrets(secrets : dict[str, str]) -> None: for key in secrets: val = secrets[key] if val.startswith("$"): diff --git a/agentic_security/misc/banner.py b/agentic_security/misc/banner.py index f758c2f..308d42a 100644 --- a/agentic_security/misc/banner.py +++ b/agentic_security/misc/banner.py @@ -1,5 +1,6 @@ from pyfiglet import Figlet, FontNotFound from termcolor import colored +from typing import Optional try: from importlib.metadata import version @@ -8,14 +9,14 @@ def generate_banner( - title="Agentic Security", - font="slant", - version="v2.1.0", - tagline="Proactive Threat Detection & Automated Security Protocols", - author="Developed by: [Security Team]", - website="Website: https://github.com/msoedov/agentic_security", - warning="", -): + title: str = "Agentic Security", + font: str = "slant", + version: str = "v2.1.0", + tagline: str = "Proactive Threat Detection & Automated Security Protocols", + author: str = "Developed by: [Security Team]", + website: str = "Website: https://github.com/msoedov/agentic_security", + warning: Optional[str] = "", # Using Optional for warning since it might be None +) -> str: """Generate a visually enhanced banner with dynamic width and borders.""" # Define the text elements diff --git a/agentic_security/report_chart.py b/agentic_security/report_chart.py index c76e576..4f13ae9 100644 --- a/agentic_security/report_chart.py +++ b/agentic_security/report_chart.py @@ -1,5 +1,6 @@ import io import string +from typing import List import matplotlib.pyplot as plt import numpy as np @@ -7,8 +8,9 @@ from matplotlib.cm import ScalarMappable from matplotlib.colors import LinearSegmentedColormap, Normalize +from .primitives import Table -def plot_security_report(table): +def plot_security_report(table: Table) -> io.BytesIO: # Data preprocessing data = pd.DataFrame(table) @@ -141,7 +143,7 @@ def plot_security_report(table): return buf -def generate_identifiers(data): +def generate_identifiers(data : pd.DataFrame) -> List[str]: data_length = len(data) alphabet = string.ascii_uppercase num_letters = len(alphabet) diff --git a/agentic_security/routes/scan.py b/agentic_security/routes/scan.py index f5e2977..b4400f1 100644 --- a/agentic_security/routes/scan.py +++ b/agentic_security/routes/scan.py @@ -1,4 +1,5 @@ from datetime import datetime +from typing import Any, Generator from fastapi import ( APIRouter, @@ -24,7 +25,7 @@ @router.post("/verify") async def verify( info: LLMInfo, secrets: InMemorySecrets = Depends(get_in_memory_secrets) -): +) -> dict[str, int | str | float]: spec = LLMSpec.from_string(info.spec) try: r = await spec.verify() @@ -42,7 +43,7 @@ async def verify( ) -def streaming_response_generator(scan_parameters: Scan): +def streaming_response_generator(scan_parameters: Scan) -> Generator[str, Any, None]: request_factory = LLMSpec.from_string(scan_parameters.llmSpec) set_current_run(request_factory) @@ -63,7 +64,7 @@ async def scan( scan_parameters: Scan, background_tasks: BackgroundTasks, secrets: InMemorySecrets = Depends(get_in_memory_secrets), -): +) -> StreamingResponse: scan_parameters.with_secrets(secrets) return StreamingResponse( streaming_response_generator(scan_parameters), media_type="application/json" @@ -71,7 +72,7 @@ async def scan( @router.post("/stop") -async def stop_scan(): +async def stop_scan() -> dict[str, str]: get_stop_event().set() return {"status": "Scan stopped"} @@ -85,7 +86,7 @@ async def scan_csv( maxBudget: int = Query(10_000), enableMultiStepAttack: bool = Query(False), secrets: InMemorySecrets = Depends(get_in_memory_secrets), -): +) -> StreamingResponse: # TODO: content dataset to fuzzer content = await file.read() # noqa llm_spec = await llmSpec.read() From 5a4b5e11b2302118218e4b8619464d835aa499f7 Mon Sep 17 00:00:00 2001 From: nemanjaASE <93867316+nemanjaASE@users.noreply.github.com> Date: Sun, 9 Mar 2025 15:21:50 +0100 Subject: [PATCH 2/2] Fix: Remove unused imports with pycln --- agentic_security/core/app.py | 15 ++++++--------- agentic_security/misc/banner.py | 4 ++-- agentic_security/report_chart.py | 4 ++-- agentic_security/routes/scan.py | 3 ++- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/agentic_security/core/app.py b/agentic_security/core/app.py index f97f380..3cb306e 100644 --- a/agentic_security/core/app.py +++ b/agentic_security/core/app.py @@ -5,17 +5,14 @@ from fastapi.responses import ORJSONResponse from agentic_security.http_spec import LLMSpec -from typing import Any, Dict tools_inbox: Queue = Queue() stop_event: Event = Event() current_run: str = {"spec": "", "id": ""} _secrets: dict[str, str] = {} -current_run: Dict[str, int | LLMSpec] = { - "spec": "", - "id": "" -} +current_run: dict[str, int | LLMSpec] = {"spec": "", "id": ""} + def create_app() -> FastAPI: """Create and configure the FastAPI application.""" @@ -33,12 +30,12 @@ def get_stop_event() -> Event: return stop_event -def get_current_run() -> Dict[str, int | LLMSpec]: +def get_current_run() -> dict[str, int | LLMSpec]: """Get the current run id.""" return current_run -def set_current_run(spec : LLMSpec) -> Dict[str, int | LLMSpec]: +def set_current_run(spec: LLMSpec) -> dict[str, int | LLMSpec]: """Set the current run id.""" current_run["id"] = hash(id(spec)) current_run["spec"] = spec @@ -49,13 +46,13 @@ def get_secrets() -> dict[str, str]: return _secrets -def set_secrets(secrets : dict[str, str]) -> dict[str, str]: +def set_secrets(secrets: dict[str, str]) -> dict[str, str]: _secrets.update(secrets) expand_secrets(_secrets) return _secrets -def expand_secrets(secrets : dict[str, str]) -> None: +def expand_secrets(secrets: dict[str, str]) -> None: for key in secrets: val = secrets[key] if val.startswith("$"): diff --git a/agentic_security/misc/banner.py b/agentic_security/misc/banner.py index 308d42a..45edba5 100644 --- a/agentic_security/misc/banner.py +++ b/agentic_security/misc/banner.py @@ -1,6 +1,6 @@ + from pyfiglet import Figlet, FontNotFound from termcolor import colored -from typing import Optional try: from importlib.metadata import version @@ -15,7 +15,7 @@ def generate_banner( tagline: str = "Proactive Threat Detection & Automated Security Protocols", author: str = "Developed by: [Security Team]", website: str = "Website: https://github.com/msoedov/agentic_security", - warning: Optional[str] = "", # Using Optional for warning since it might be None + warning: str | None = "", # Using Optional for warning since it might be None ) -> str: """Generate a visually enhanced banner with dynamic width and borders.""" # Define the text elements diff --git a/agentic_security/report_chart.py b/agentic_security/report_chart.py index 4f13ae9..45387cb 100644 --- a/agentic_security/report_chart.py +++ b/agentic_security/report_chart.py @@ -1,6 +1,5 @@ import io import string -from typing import List import matplotlib.pyplot as plt import numpy as np @@ -10,6 +9,7 @@ from .primitives import Table + def plot_security_report(table: Table) -> io.BytesIO: # Data preprocessing data = pd.DataFrame(table) @@ -143,7 +143,7 @@ def plot_security_report(table: Table) -> io.BytesIO: return buf -def generate_identifiers(data : pd.DataFrame) -> List[str]: +def generate_identifiers(data: pd.DataFrame) -> list[str]: data_length = len(data) alphabet = string.ascii_uppercase num_letters = len(alphabet) diff --git a/agentic_security/routes/scan.py b/agentic_security/routes/scan.py index b4400f1..8313702 100644 --- a/agentic_security/routes/scan.py +++ b/agentic_security/routes/scan.py @@ -1,5 +1,6 @@ +from collections.abc import Generator from datetime import datetime -from typing import Any, Generator +from typing import Any from fastapi import ( APIRouter,