From 98be10ea44ae9b4fa9d542eb985b10b0c68c673d Mon Sep 17 00:00:00 2001 From: "codebeaver-ai[bot]" <192081515+codebeaver-ai[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 13:16:34 +0000 Subject: [PATCH 1/5] test: Update coverage improvement test for agentic_security/core/test_app.py --- agentic_security/core/test_app.py | 195 +++++++++++++++++++++++++++++- 1 file changed, 194 insertions(+), 1 deletion(-) diff --git a/agentic_security/core/test_app.py b/agentic_security/core/test_app.py index 1f03239..ddd6d4e 100644 --- a/agentic_security/core/test_app.py +++ b/agentic_security/core/test_app.py @@ -10,7 +10,23 @@ def setup_env_vars(): # Set up environment variables for testing os.environ["TEST_ENV_VAR"] = "test_value" - +@pytest.fixture(autouse=True) +def reset_globals(): + """Reset global state between tests.""" + from agentic_security.core.app import current_run, _secrets, tools_inbox, get_stop_event + # Reset current_run to initial empty state. + current_run["spec"] = "" + current_run["id"] = "" + # Clear _secrets. + _secrets.clear() + # Drain the tools_inbox queue. + while not tools_inbox.empty(): + try: + tools_inbox.get_nowait() + except Exception: + break + # Ensure stop_event is cleared. + get_stop_event().clear() def test_expand_secrets_with_env_var(): secrets = {"secret_key": "$TEST_ENV_VAR"} expand_secrets(secrets) @@ -27,3 +43,180 @@ def test_expand_secrets_without_dollar_sign(): secrets = {"secret_key": "plain_value"} expand_secrets(secrets) assert secrets["secret_key"] == "plain_value" + +def test_create_app(): + """Test that FastAPI app is created with ORJSONResponse as the default response class.""" + from fastapi.responses import ORJSONResponse + from agentic_security.core.app import create_app + app = create_app() + assert app is not None + # Add a dummy route to verify that ORJSONResponse is used as the default response class + @app.get("/dummy") + def dummy(): + return {"message": "dummy"} + # Retrieve the dummy route from the application's routes + dummy_route = next((route for route in app.routes if getattr(route, 'path', None) == "/dummy"), None) + assert dummy_route is not None + # Assert that the route's default response class is ORJSONResponse + from fastapi.responses import ORJSONResponse + assert dummy_route.response_class == ORJSONResponse + +def test_get_tools_inbox(): + """Test that the global tools inbox is a Queue and works as expected.""" + from asyncio import Queue + from agentic_security.core.app import get_tools_inbox, tools_inbox + inbox = get_tools_inbox() + assert isinstance(inbox, Queue) + # Test that the returned inbox is the same global instance. + assert inbox is tools_inbox + # Enqueue and then dequeue a message. + inbox.put_nowait("test_message") + assert inbox.get_nowait() == "test_message" + +def test_get_stop_event(): + """Test that the global stop event is returned correctly and can be set and cleared.""" + from asyncio import Event + from agentic_security.core.app import get_stop_event, stop_event + event = get_stop_event() + assert isinstance(event, Event) + event.set() + # Verify that the global stop event is set. + assert stop_event.is_set() + event.clear() + assert not stop_event.is_set() + +def test_current_run_initial_and_set(): + """Test getting and setting of the global current_run variable.""" + from agentic_security.core.app import get_current_run, set_current_run + # Because global state might be mutated by other tests, we focus on the update logic. + class DummyLLMSpec: + pass + dummy_spec = DummyLLMSpec() + updated_run = set_current_run(dummy_spec) + assert updated_run["spec"] is dummy_spec + assert updated_run["id"] == hash(id(dummy_spec)) + +def test_get_and_set_secrets(): + """Test that secrets are set and retrieved correctly, including environment variable expansion.""" + from agentic_security.core.app import get_secrets, set_secrets + import os + # Set up an environment variable for expansion. + os.environ["NEW_SECRET"] = "secret_value" + new_secrets = {"plain": "value", "env": "$NEW_SECRET"} + set_secrets(new_secrets) + secrets = get_secrets() + assert secrets["plain"] == "value" + assert secrets["env"] == "secret_value" + +def test_set_secrets_update(): + """Test that setting secrets multiple times updates the secrets without losing existing keys.""" + from agentic_security.core.app import get_secrets, set_secrets + import os + # Initialize secrets with a plain value. + set_secrets({"key1": "initial"}) + # Update key1 with an environment variable and add key2. + os.environ["KEY1"] = "updated_value" + set_secrets({"key1": "$KEY1", "key2": "new_value"}) + secrets = get_secrets() + assert secrets["key1"] == "updated_value" + assert secrets["key2"] == "new_value" +def test_get_current_run_initial_value(): + """Test that the global current_run returns empty values initially.""" + from agentic_security.core.app import get_current_run + run = get_current_run() + # Since reset_globals fixture runs before each test, the initial values should be empty. + assert run["spec"] == "" + assert run["id"] == "" + +def test_expand_secrets_with_whitespace(): + """Test expand_secrets when the secret value has extra whitespace after the dollar sign.""" + from agentic_security.core.app import expand_secrets + # Provide a secret with extra whitespace after '$'; lookup will likely fail. + secrets = {"secret_key": "$ NON_EXISTENT_VAR"} + expand_secrets(secrets) + # os.getenv(" NON_EXISTENT_VAR") returns None, so the secret value should be None. + assert secrets["secret_key"] is None + +def test_set_secrets_empty(): + """Test that setting secrets with an empty dictionary does not change the global secrets.""" + from agentic_security.core.app import get_secrets, set_secrets + # First, set a secret. + set_secrets({"key": "value"}) + secrets_before = get_secrets().copy() + # Then call set_secrets with an empty dict; expect no change to the existing secrets. + set_secrets({}) + secrets_after = get_secrets() + assert secrets_after == secrets_before +def test_expand_secrets_empty_dict(): + """Test that calling expand_secrets with an empty dictionary does not change it and does not error.""" + from agentic_security.core.app import expand_secrets + secrets = {} + expand_secrets(secrets) + assert secrets == {} + +def test_expand_secrets_env_empty(): + """Test expand_secrets with an environment variable that exists but has an empty string as value.""" + from agentic_security.core.app import expand_secrets + os.environ["EMPTY_VAR"] = "" + secrets = {"secret_key": "$EMPTY_VAR"} + expand_secrets(secrets) + assert secrets["secret_key"] == "" + +def test_get_tools_inbox_multiple_messages(): + """Test that the global tools_inbox queue correctly handles multiple messages in FIFO order.""" + from asyncio import Queue + from agentic_security.core.app import get_tools_inbox, tools_inbox + inbox = get_tools_inbox() + # Enqueue multiple messages. + messages = ["first", "second", "third"] + for msg in messages: + inbox.put_nowait(msg) + # Dequeue the messages and test the order. + for expected_msg in messages: + assert inbox.get_nowait() == expected_msg + +def test_get_stop_event_multiple_calls(): + """Test that get_stop_event returns the same global event instance across multiple calls and that its modify operations are consistent.""" + from asyncio import Event + from agentic_security.core.app import get_stop_event, stop_event + event1 = get_stop_event() + event2 = get_stop_event() + # They should be the same instance. + assert event1 is event2 is stop_event + # Set the event using event1 and check that event2 is set. + event1.set() + assert event2.is_set() + # Now clear using event2 and verify both are cleared. + event2.clear() + assert not event1.is_set() + +def test_create_app_with_testclient(): + """ + Test that the FastAPI app created with create_app() works with TestClient, + returns a valid HTTP response, and uses ORJSONResponse as the default response class. + """ + from fastapi.testclient import TestClient + from agentic_security.core.app import create_app + app = create_app() + @app.get("/hello") + def hello(): + return {"hello": "world"} + + client = TestClient(app) + response = client.get("/hello") + assert response.status_code == 200 + # ORJSONResponse should produce a response with application/json content-type. + assert "application/json" in response.headers["content-type"] + +def test_get_current_run_mutability(): + """ + Test that modifying the dictionary returned by get_current_run directly affects the global state. + """ + from agentic_security.core.app import get_current_run + cr = get_current_run() + cr["spec"] = "modified" + cr["id"] = 12345 + # Verify that subsequent calls reflect the change (i.e., the dictionaries are the same reference). + cr2 = get_current_run() + assert cr2["spec"] == "modified" + assert cr2["id"] == 12345 \ No newline at end of file From a59552c40b56662a7ec7aa7c9b96ace63774154f Mon Sep 17 00:00:00 2001 From: "codebeaver-ai[bot]" <192081515+codebeaver-ai[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 13:16:36 +0000 Subject: [PATCH 2/5] test: Add coverage improvement test for agentic_security/probe_actor/test_refusal.py --- agentic_security/probe_actor/test_refusal.py | 140 +++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 agentic_security/probe_actor/test_refusal.py diff --git a/agentic_security/probe_actor/test_refusal.py b/agentic_security/probe_actor/test_refusal.py new file mode 100644 index 0000000..1a176c7 --- /dev/null +++ b/agentic_security/probe_actor/test_refusal.py @@ -0,0 +1,140 @@ +from agentic_security.probe_actor.refusal import DefaultRefusalClassifier, refusal_heuristic, RefusalClassifierManager, refusal_classifier_manager, RefusalClassifierPlugin + + +class TestCheckRefusal: + # The function correctly identifies a refusal phrase in the response. + def test_identify_refusal_phrase(self): + response = "I'm sorry, but I cannot provide that information." + assert DefaultRefusalClassifier().is_refusal(response) + + # The response is an empty string. + def test_empty_response(self): + response = "" + assert not DefaultRefusalClassifier().is_refusal(response) + def test_manager_with_always_refuse_plugin(self): + """Test that the manager returns True when a dummy plugin that always refuses is registered.""" + class AlwaysRefusePlugin(RefusalClassifierPlugin): + def is_refusal(self, response: str) -> bool: + return True + manager = RefusalClassifierManager() + manager.register_plugin('always', AlwaysRefusePlugin()) + # Even if the response doesn't contain any default refusal phrases, the manager should detect refusal + assert manager.is_refusal("This is a normal response.") + + def test_refusal_heuristic_with_json_input(self, monkeypatch): + """Patch the ml_classifier to return False so that a normal JSON input is not flagged as a refusal.""" + monkeypatch.setattr(refusal_classifier_manager.plugins["ml_classifier"], "is_refusal", lambda response: False) + """Test the refusal_heuristic function with a JSON-like input; it should convert to string and evaluate correctly.""" + # A JSON-like dictionary that contains a refusal phrase + request_input = {"message": "I'm sorry, I cannot comply."} + assert refusal_heuristic(request_input) + # A JSON-like dictionary without a refusal phrase + request_input = {"message": "Hello, how can I help you?"} + assert not refusal_heuristic(request_input) + + def test_concatenated_refusal_phrase(self): + """Test that a concatenated refusal phrase (due to the missing comma) is detected properly.""" + # The refusal phrases list contains "I am really sorry" "I cannot create" which is concatenated in the code. + response = "Here is a message: I am really sorryI cannot create because of policy." + assert DefaultRefusalClassifier().is_refusal(response) + + def test_non_refusal_similar_pattern(self): + """Test that a response similar to a refusal phrase but not exactly matching is not flagged as refusal.""" + response = "I'm so sorry, but I'll try to help as best as I can." + assert not DefaultRefusalClassifier().is_refusal(response) + + def test_ml_classifier_plugin(self, monkeypatch): + """Test that monkey-patching the ml_classifier plugin's is_refusal method affects the global refusal check.""" + original_ml_method = refusal_classifier_manager.plugins["ml_classifier"].is_refusal + monkeypatch.setattr(refusal_classifier_manager.plugins["ml_classifier"], "is_refusal", lambda response: True) + # Now, regardless of the actual content, the heuristic should return True since the ml_classifier plugin is patched. + assert refusal_heuristic("Any response without a default refusal phrase.") + # No need to call monkeypatch.undo() because the monkeypatch fixture automatically reverts changes after the test. + # Verify that when using a non-refusal input (with default plugins), it returns False. + monkeypatch.setattr(refusal_classifier_manager.plugins["ml_classifier"], "is_refusal", lambda response: False) + def test_refusal_heuristic_with_none_input(self, monkeypatch): + """Test that passing None to refusal_heuristic returns False after patching ml_classifier to avoid false positives.""" + monkeypatch.setattr(refusal_classifier_manager.plugins["ml_classifier"], "is_refusal", lambda response: False) + # None will be converted to the string "None" which does not match any refusal phrase. + assert not refusal_heuristic(None) + + def test_manager_no_plugins_registered(self): + """Test that a newly created RefusalClassifierManager with no plugins returns False for any input.""" + empty_manager = RefusalClassifierManager() + assert not empty_manager.is_refusal("Any random text that might normally trigger a refusal.") + + def test_multiple_plugins_interaction(self): + """Test that the manager returns True if any one of multiple registered plugins identifies a refusal.""" + class FalsePlugin(RefusalClassifierPlugin): + def is_refusal(self, response: str) -> bool: + return False + + class ForbiddenPlugin(RefusalClassifierPlugin): + def is_refusal(self, response: str) -> bool: + # Returns True only if the response contains the word "FORBIDDEN" + return "FORBIDDEN" in response + + multi_manager = RefusalClassifierManager() + multi_manager.register_plugin("false", FalsePlugin()) + multi_manager.register_plugin("forbidden", ForbiddenPlugin()) + + # When the response does not contain "FORBIDDEN", both plugins return False. + assert not multi_manager.is_refusal("This text appears normal.") + + # When the response contains "FORBIDDEN", the ForbiddenPlugin returns True. + assert multi_manager.is_refusal("This message includes FORBIDDEN content.") + + def test_default_plugin_partial_match(self): + """Test that a slight variation in a refusal phrase does not trigger a false refusal detection.""" + # "I cannot" is an exact phrase in the refusal phrases list, but "I can not" (with a space) is slightly different. + response = "I can not provide the details you requested." + assert not DefaultRefusalClassifier().is_refusal(response) + + def test_default_plugin_whitespace(self): + """Test that the default refusal classifier detects a refusal phrase even when wrapped in extra whitespace.""" + # "I apologize" is a refusal phrase in the list. + response = " I apologize, but that's not allowed. " + assert DefaultRefusalClassifier().is_refusal(response) + def test_custom_refusal_classifier(self): + """Test that a custom refusal phrase list properly detects a refusal.""" + custom_phrases = ["deny", "refuse"] + custom_classifier = DefaultRefusalClassifier(refusal_phrases=custom_phrases) + response = "I will refuse to do that." + # "refuse" is present in the custom phrases, so the response should be flagged. + assert custom_classifier.is_refusal(response) + + def test_manager_duplicate_plugin(self): + """Test that registering two plugins under the same name gives priority to the last registered plugin.""" + manager = RefusalClassifierManager() + class PluginA(RefusalClassifierPlugin): + def is_refusal(self, response: str) -> bool: + return "A" in response + + class PluginB(RefusalClassifierPlugin): + def is_refusal(self, response: str) -> bool: + return "B" in response + + manager.register_plugin("dup", PluginA()) + # Re-register with a different plugin; this should override PluginA. + manager.register_plugin("dup", PluginB()) + # The response is flagged only if "B" is present. + assert manager.is_refusal("This has B in it.") + assert not manager.is_refusal("This has A in it.") + + def test_non_string_input_in_default(self): + """Test that passing a non-string input to DefaultRefusalClassifier raises an exception.""" + classifier = DefaultRefusalClassifier() + try: + # Pass an integer; since "in" operator is used, this should raise a TypeError. + classifier.is_refusal(12345) + except TypeError: + pass + else: + assert False, "Expected a TypeError when a non-string input is provided" + + def test_case_sensitivity_in_default(self): + """Test that the refusal detection is case sensitive so that lower-case variations do not falsely trigger.""" + classifier = DefaultRefusalClassifier() + # Default refusal phrase "I cannot" is case sensitive so "i cannot" should not be flagged. + response = "i cannot provide the details." + assert not classifier.is_refusal(response) \ No newline at end of file From 265029b19962bab7d3a47a12a5374ab91c671932 Mon Sep 17 00:00:00 2001 From: "codebeaver-ai[bot]" <192081515+codebeaver-ai[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 13:16:38 +0000 Subject: [PATCH 3/5] test: Add coverage improvement test for tests/test_report_chart.py --- tests/test_report_chart.py | 162 +++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 tests/test_report_chart.py diff --git a/tests/test_report_chart.py b/tests/test_report_chart.py new file mode 100644 index 0000000..f6a0533 --- /dev/null +++ b/tests/test_report_chart.py @@ -0,0 +1,162 @@ +import io +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import pytest + +from agentic_security.report_chart import plot_security_report, generate_identifiers + +def test_generate_identifiers_short(): + """Test generate_identifiers for a short DataFrame with 3 rows.""" + df = pd.DataFrame({ + 'failureRate': [10, 20, 30], + 'tokens': [100, 200, 300], + 'module': ['Module1', 'Module2', 'Module3'] + }) + ids = generate_identifiers(df) + assert ids == ["A1", "A2", "A3"] + +def test_generate_identifiers_long(): + """Test generate_identifiers for a longer DataFrame to validate wrapping of identifiers beyond 26 rows.""" + num_rows = 30 + data = { + "failureRate": np.linspace(0, 100, num_rows), + "tokens": np.linspace(50, 150, num_rows), + "module": [f"module_{i}" for i in range(num_rows)] + } + df = pd.DataFrame(data) + ids = generate_identifiers(df) + assert ids[0] == "A1" + # For row 26: letter index = 26//26 = 1, (26 %26)+1 = 1 so identifier is "B1" + assert ids[26] == "B1" + # For row 29: expected "B4" + assert ids[29] == "B4" + +def test_plot_security_report_output(): + """Test that plot_security_report returns a valid PNG image in a BytesIO object.""" + table = [ + {"failureRate": 50, "tokens": 100, "module": "ModuleA"}, + {"failureRate": 70, "tokens": 150, "module": "ModuleB"}, + {"failureRate": 30, "tokens": 80, "module": "ModuleC"} + ] + buf = plot_security_report(table) + assert isinstance(buf, io.BytesIO) + content = buf.getvalue() + # PNG signature: 0x89 50 4E 47 0D 0A 1A 0A + assert content.startswith(b'\211PNG\r\n\032\n') + # Ensure that the output has a reasonable size + assert len(content) > 1000 + +def test_plot_report_with_more_rows(): + """Test plot_security_report with multiple rows to ensure proper handling of angles and color mapping.""" + num_rows = 10 + table = [] + for i in range(num_rows): + table.append({ + "failureRate": np.random.uniform(10, 90), + "tokens": np.random.randint(50, 200), + "module": f"Module_{i}" + }) + buf = plot_security_report(table) + content = buf.getvalue() + assert content.startswith(b'\211PNG\r\n\032\n') + assert len(content) > 1000 + +def test_plot_closure(): + """Test that the matplotlib figure is closed after generating the plot.""" + table = [{"failureRate": 40, "tokens": 120, "module": "ModuleX"}] + buf = plot_security_report(table) + # After plot generation, there should be no open matplotlib figures. + assert plt.get_fignums() == [] +def test_generate_identifiers_empty(): + """Test that generate_identifiers returns an empty list when given an empty DataFrame.""" + df = pd.DataFrame(columns=["failureRate", "tokens", "module"]) + ids = generate_identifiers(df) + assert ids == [] + +def test_plot_security_report_empty(): + """Test that plot_security_report with an empty table raises a KeyError due to missing required columns.""" + with pytest.raises(KeyError): + plot_security_report([]) + +def test_plot_security_report_missing_tokens(): + """Test that plot_security_report raises a KeyError when the 'tokens' field is missing.""" + table = [{"failureRate": 50, "module": "ModuleA"}] + with pytest.raises(KeyError): + plot_security_report(table) + +def test_plot_security_report_missing_module(): + """Test that plot_security_report raises a KeyError when the 'module' field is missing.""" + table = [{"failureRate": 50, "tokens": 100}] + with pytest.raises(KeyError): + plot_security_report(table) + +def test_plot_security_report_non_numeric(): + """Test that plot_security_report raises an error when non-numeric values are provided for 'failureRate' and 'tokens'.""" + table = [{"failureRate": "high", "tokens": "many", "module": "ModuleA"}] + with pytest.raises(Exception): + plot_security_report(table) +def test_generate_identifiers_exact_twenty_six(): + """Test generate_identifiers for a DataFrame with exactly 26 rows.""" + df = pd.DataFrame({ + 'failureRate': list(range(26)), + 'tokens': list(range(26)), + 'module': [f'Module{i}' for i in range(26)] + }) + ids = generate_identifiers(df) + # The 26th row (index 25) should be "A26" + assert ids[25] == "A26" + +def test_plot_security_report_negative_failure_rate(): + """Test plot_security_report with a negative failureRate value to ensure valid output.""" + table = [ + {"failureRate": -20, "tokens": 80, "module": "ModuleNegative"}, + {"failureRate": 10, "tokens": 100, "module": "ModulePositive"} + ] + buf = plot_security_report(table) + content = buf.getvalue() + # Check PNG signature + assert content.startswith(b'\211PNG\r\n\032\n') + assert len(content) > 1000 + +def test_plot_security_report_with_equal_tokens(): + """Test plot_security_report with tokens having the same value to check normalization edge cases.""" + table = [ + {"failureRate": 30, "tokens": 100, "module": "Module1"}, + {"failureRate": 60, "tokens": 100, "module": "Module2"}, + {"failureRate": 45, "tokens": 100, "module": "Module3"} + ] + buf = plot_security_report(table) + content = buf.getvalue() + assert content.startswith(b'\211PNG\r\n\032\n') + assert len(content) > 1000 +def test_generate_identifiers_double_wrap(): + """Test generate_identifiers with 52 rows to validate wrapping of identifiers into two-letter sequences.""" + num_rows = 52 + data = { + "failureRate": np.linspace(0, 100, num_rows), + "tokens": np.linspace(10, 100, num_rows), + "module": [f"module_{i}" for i in range(num_rows)] + } + df = pd.DataFrame(data) + ids = generate_identifiers(df) + assert ids[0] == "A1" + assert ids[25] == "A26" + assert ids[26] == "B1" + assert ids[-1] == "B26" + +def test_plot_security_report_dataframe_input(): + """Test that plot_security_report accepts a DataFrame as input and produces a valid PNG image.""" + # Create a DataFrame with the required columns. + df = pd.DataFrame({ + "failureRate": [55, 65, 75], + "tokens": [120, 130, 140], + "module": ["ModuleDF1", "ModuleDF2", "ModuleDF3"] + }) + buf = plot_security_report(df) + assert isinstance(buf, io.BytesIO) + content = buf.getvalue() + # Verify the image is a PNG by checking its signature: + assert content.startswith(b'\211PNG\r\n\032\n') + # Check that the image has a reasonable size. + assert len(content) > 1000 \ No newline at end of file From 97db2dfda77232c5201f898c450844354c494037 Mon Sep 17 00:00:00 2001 From: "codebeaver-ai[bot]" <192081515+codebeaver-ai[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 13:16:42 +0000 Subject: [PATCH 4/5] test: Add coverage improvement test for tests/test_scan.py --- tests/test_scan.py | 89 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/test_scan.py diff --git a/tests/test_scan.py b/tests/test_scan.py new file mode 100644 index 0000000..145e85d --- /dev/null +++ b/tests/test_scan.py @@ -0,0 +1,89 @@ +import asyncio +from datetime import timedelta + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +# Import the router to test from our original code +from agentic_security.routes.scan import router + +# Define a dummy specification for testing the /verify endpoint +class DummySpecification: + async def verify(self): + class DummyResponse: + status_code = 200 + text = "OK" + elapsed = timedelta(seconds=0.5) + return DummyResponse() + +def dummy_from_string(spec_str): + # For testing, we simply return our dummy specification + return DummySpecification() + +# Create a FastAPI app and include the router under test +app = FastAPI() +app.include_router(router) + +@pytest.fixture(autouse=True) +def override_dependencies(monkeypatch): + """ + Override dependencies in the scan module for testing purposes. + This includes plugging in a dummy stop event, dummy tools inbox, + patching LLMSpec.from_string, and replacing fuzzer.scan_router with a dummy async generator. + """ + # Override get_stop_event to return an asyncio.Event + monkeypatch.setattr("agentic_security.routes.scan.get_stop_event", lambda: asyncio.Event()) + + # Override get_tools_inbox to return None (or a dummy value) + monkeypatch.setattr("agentic_security.routes.scan.get_tools_inbox", lambda: None) + + # Patch LLMSpec.from_string to use our dummy specification + from agentic_security.http_spec import LLMSpec + monkeypatch.setattr(LLMSpec, "from_string", staticmethod(dummy_from_string)) + + # Patch fuzzer.scan_router to return a dummy asynchronous generator + async def dummy_scan_router(*args, **kwargs): + # Yield several dummy results + for i in range(3): + yield f"test_result_{i}" + DummyFuzzer = type("DummyFuzzer", (), {"scan_router": dummy_scan_router}) + monkeypatch.setattr("agentic_security.routes.scan.fuzzer", DummyFuzzer) + yield + +def test_verify_endpoint(): + """Test the /verify endpoint returns the expected dummy response.""" + payload = {"spec": "dummy_spec"} + client = TestClient(app) + response = client.post("/verify", json=payload) + assert response.status_code == 200 + data = response.json() + assert data["status_code"] == 200 + assert data["body"] == "OK" + assert "elapsed" in data + assert "timestamp" in data + +def test_stop_scan_endpoint(): + """Test that the /stop endpoint returns a success message.""" + client = TestClient(app) + response = client.post("/stop") + assert response.status_code == 200 + data = response.json() + assert data == {"status": "Scan stopped"} + +def test_scan_endpoint(): + """Test the /scan endpoint returns a streaming response with dummy scan results.""" + payload = { + "llmSpec": "dummy_spec", + "optimize": False, + "maxBudget": 1000, + "enableMultiStepAttack": False, + } + client = TestClient(app) + with client.stream("POST", "/scan", json=payload) as response: + assert response.status_code == 200 + # Read all chunks from the streaming response + content = b"".join(response.iter_bytes()) + # Check that the dummy scan results are present in the output + for i in range(3): + assert f"test_result_{i}".encode("utf-8") in content \ No newline at end of file From 83ba6e579796a5bc7ce74bee1738618fbc245237 Mon Sep 17 00:00:00 2001 From: "codebeaver-ai[bot]" <192081515+codebeaver-ai[bot]@users.noreply.github.com> Date: Sun, 9 Mar 2025 13:16:44 +0000 Subject: [PATCH 5/5] Adding codebeaver.yml --- codebeaver.yml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 codebeaver.yml diff --git a/codebeaver.yml b/codebeaver.yml new file mode 100644 index 0000000..145ae1d --- /dev/null +++ b/codebeaver.yml @@ -0,0 +1,2 @@ +from: python-pytest-poetry +# This file was generated automatically by CodeBeaver based on your repository. Learn how to customize it here: https://docs.codebeaver.ai/configuration/ \ No newline at end of file