From f22db3cbb3e342cbf46b00e1c370623832ed0058 Mon Sep 17 00:00:00 2001 From: roost-io Date: Thu, 4 Dec 2025 11:15:55 +0530 Subject: [PATCH] Add API Tests (Pytest Framework, Open AI) generated by RoostGPT Using AI Model gpt-5 --- Roost-README.md | 25 ++ requirements-roost.txt | 14 ++ tests/AUTH_API/api.json | 105 ++++++++ tests/AUTH_API/api_auth_login.json | 72 ++++++ tests/AUTH_API/config.yml | 17 ++ tests/AUTH_API/conftest.py | 272 +++++++++++++++++++++ tests/AUTH_API/test_api_auth_login_post.py | 263 ++++++++++++++++++++ tests/AUTH_API/validator.py | 226 +++++++++++++++++ 8 files changed, 994 insertions(+) create mode 100644 Roost-README.md create mode 100644 requirements-roost.txt create mode 100644 tests/AUTH_API/api.json create mode 100644 tests/AUTH_API/api_auth_login.json create mode 100644 tests/AUTH_API/config.yml create mode 100644 tests/AUTH_API/conftest.py create mode 100644 tests/AUTH_API/test_api_auth_login_post.py create mode 100644 tests/AUTH_API/validator.py diff --git a/Roost-README.md b/Roost-README.md new file mode 100644 index 00000000..dd5bcc8d --- /dev/null +++ b/Roost-README.md @@ -0,0 +1,25 @@ + +# RoostGPT generated pytest code for API Testing + +RoostGPT generats code in `tests` folder within given project path. +Dependency file i.e. `requirements-roost.txt` is also created in the given project path + +Below are the sample steps to run the generated tests. Sample commands contains use of package manager i.e. `uv`. Alternatively python and pip can be used directly. +1. ( Optional ) Create virtual Env . +2. Install dependencies +``` +uv venv // Create virtual Env +uv pip install -r requirements-roost.txt // Install all dependencies + +``` + +Test configurations and test_data is loaded from config.yml. e.g. API HOST, auth, common path parameters of endpoint. +Either set defalt value in this config.yml file OR use ENV. e.g. export API_HOST="https://example.com/api/v2" + +Once configuration values are set, use below commands to run the tests. +``` +// Run generated tests +uv run pytest -m smoke // Run only smoke tests +uv run pytest -s tests/generated-test.py // Run specific test file +``` + \ No newline at end of file diff --git a/requirements-roost.txt b/requirements-roost.txt new file mode 100644 index 00000000..bdaa6d20 --- /dev/null +++ b/requirements-roost.txt @@ -0,0 +1,14 @@ + +connexion +Flask +flask_testing +jsonschema +pytest +python_dateutil +PyYAML +referencing +Requests +setuptools +six +urllib3 +xmltodict \ No newline at end of file diff --git a/tests/AUTH_API/api.json b/tests/AUTH_API/api.json new file mode 100644 index 00000000..a53698b7 --- /dev/null +++ b/tests/AUTH_API/api.json @@ -0,0 +1,105 @@ +{ + "openapi": "3.0.0", + "info": { + "title": "Auth API", + "version": "1.0.0" + }, + "paths": { + "/api/auth/login": { + "post": { + "summary": "User login", + "operationId": "login", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/LoginRequest" + } + } + } + }, + "responses": { + "200": { + "description": "Successful login", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/LoginResponse" + } + } + } + }, + "400": { + "description": "Invalid credentials" + } + } + } + } + }, + "components": { + "schemas": { + "LoginRequest": { + "type": "object", + "required": [ + "email", + "password" + ], + "properties": { + "email": { + "type": "string", + "format": "email", + "example": "user@example.com" + }, + "password": { + "type": "string", + "format": "password", + "example": "P@ssw0rd" + } + } + }, + "LoginResponse": { + "type": "object", + "properties": { + "token": { + "type": "string", + "example": "jwt-token-123" + }, + "refreshToken": { + "type": "string", + "example": "refresh-token-123" + }, + "user": { + "$ref": "#/components/schemas/User" + } + } + }, + "User": { + "type": "object", + "properties": { + "id": { + "type": "string", + "example": "12345" + }, + "email": { + "type": "string", + "format": "email", + "example": "user@example.com" + }, + "firstname": { + "type": "string", + "example": "John" + }, + "role": { + "type": "string", + "example": "ADMIN" + }, + "accountId": { + "type": "string", + "example": "acct-8749" + } + } + } + } + } +} \ No newline at end of file diff --git a/tests/AUTH_API/api_auth_login.json b/tests/AUTH_API/api_auth_login.json new file mode 100644 index 00000000..5259d6f8 --- /dev/null +++ b/tests/AUTH_API/api_auth_login.json @@ -0,0 +1,72 @@ +[ + { + "email": "user@example.com", + "password": "StrongPassw0rd!", + "statusCode": 200, + "scenario": "Successful responses: OK" + }, + { + "email": "admin@contoso.io", + "password": "Adm1n#Secure#2025", + "statusCode": 200, + "scenario": "Successful responses: OK" + }, + { + "password": "NoEmailButPass123!", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "missing.password@example.com", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "invalid-email", + "password": "ValidLikePass123!", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "short.pass@example.com", + "password": "123", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "unknown.user@nomail.zzz", + "password": "WrongPassword!234", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "user@example.com", + "password": "' OR '1'='1", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "super.long.username.with.many.parts.and.dots.and.plus+aliasing.for.testing.purposes.2025.12.04@very-long-subdomain.example-verylongdomain-name.co", + "password": "LongEmailPass123!", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "john.doe+login@test-mail.org", + "password": "S3curePass!@#2025", + "statusCode": 200, + "scenario": "Successful responses: OK" + }, + { + "email": "space.trailing@example.com ", + "password": "SpaceyPass123!", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + }, + { + "email": "bad email@example.com", + "password": "PassWith Spaces123!", + "statusCode": 400, + "scenario": "Client error responses: Bad Request" + } +] \ No newline at end of file diff --git a/tests/AUTH_API/config.yml b/tests/AUTH_API/config.yml new file mode 100644 index 00000000..94272606 --- /dev/null +++ b/tests/AUTH_API/config.yml @@ -0,0 +1,17 @@ + +# This config.yml contains user provided data for api testing. Allows to define values here or use ENV to load values. e.g. ENV[API_HOST] = "https://exampl2.com" +# api: +# host: "${API_HOST:-https://example.com/api/v2}" # includes base path +# auth: +# api_key: "${API_KEY:-}" +# api_key_header: "${KEYNAME:-DefaultValue}" # openapi.spec.security.KEY_NAME +# basic_auth: "${username:-}:${password:-}" +# test_data: +# id: "${TEST_ID:-282739-1238371-219393-2833}" # Any test data key value pair e.g. GET /api/v1/cart/:id +# context-id: "${TEST_context-id:-}" # GET /api/v1/{context-id}/summary + + + +api: + host: "${AUTH_API_API_HOST:-}" +test_data: {} diff --git a/tests/AUTH_API/conftest.py b/tests/AUTH_API/conftest.py new file mode 100644 index 00000000..2d54d1de --- /dev/null +++ b/tests/AUTH_API/conftest.py @@ -0,0 +1,272 @@ +# conftest.py +import json +import os +import re +from pathlib import Path +from typing import Any, Callable, Dict, Mapping, Optional, Union + +import pytest +import requests +import yaml +from requests import Response, Session +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + + +# ---------------------------- +# Utilities +# ---------------------------- + +_ENV_WITH_DEFAULT_PATTERN = re.compile(r"\$\{([^}:s]+):-([^}]+)\}") +_ENV_NO_DEFAULT_PATTERN = re.compile(r"\$\{([^}]+)\}") + + +def _expand_env_in_string(value: str) -> str: + """ + Expand environment variables in the given string using patterns: + - ${VAR:-default} + - ${VAR} + + The first uses default when VAR is unset or empty. The second uses + empty string when VAR is unset. + """ + + def repl_with_default(match: re.Match) -> str: + var = match.group(1) + default = match.group(2) + env_val = os.environ.get(var) + if env_val is None or env_val == "": + return default + return env_val + + def repl_no_default(match: re.Match) -> str: + var = match.group(1) + return os.environ.get(var, "") + + # Handle ${VAR:-default} + value = _ENV_WITH_DEFAULT_PATTERN.sub(repl_with_default, value) + # Handle ${VAR} + value = _ENV_NO_DEFAULT_PATTERN.sub(repl_no_default, value) + return value + + +def _expand_env(obj: Any) -> Any: + """ + Recursively expand environment variables in strings within + dictionaries and lists. + """ + if isinstance(obj, dict): + return {k: _expand_env(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_expand_env(i) for i in obj] + if isinstance(obj, str): + return _expand_env_in_string(obj).strip() + return obj + + +def _load_yaml(path: Path) -> Dict[str, Any]: + """ + Load a YAML file from the given path with error handling. + """ + if not path.exists(): + raise FileNotFoundError(f"Config file not found at: {path}") + + try: + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + except yaml.YAMLError as e: + raise ValueError(f"Failed to parse YAML config at {path}: {e}") from e + + if not isinstance(data, dict): + raise TypeError(f"Expected YAML root to be a mapping/dict, got: {type(data).__name__}") + + return data + + +def load_test_data_json(path: Union[Path, str]) -> Any: + """ + Load and return JSON test data from a file path. + """ + p = Path(path) + if not p.exists(): + raise FileNotFoundError(f"Test data JSON file not found: {p}") + try: + with p.open("r", encoding="utf-8") as f: + return json.load(f) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in test data file {p}: {e}") from e + + +# ---------------------------- +# API Client +# ---------------------------- + +class ApiClient: + def __init__( + self, + base_url: str, + session: Optional[Session] = None, + default_headers: Optional[Mapping[str, str]] = None, + timeout: Union[int, float] = 30, + max_retries: int = 3, + backoff_factor: float = 0.3, + status_forcelist: Optional[tuple] = (429, 500, 502, 503, 504), + ) -> None: + base_url = (base_url or "").strip().rstrip("/") + if not base_url: + raise ValueError("Base URL for ApiClient cannot be empty.") + self.base_url = base_url + self.timeout = timeout + self.default_headers = dict(default_headers) if default_headers else {} + + self.session = session or requests.Session() + # Configure retries for robust HTTP communication + retry = Retry( + total=max_retries, + read=max_retries, + connect=max_retries, + status=max_retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + allowed_methods=None, # retry on all methods by default + raise_on_status=False, + ) + adapter = HTTPAdapter(max_retries=retry) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + def _build_url(self, endpoint: str) -> str: + endpoint = (endpoint or "").strip() + if not endpoint: + return self.base_url + if endpoint.startswith("http://") or endpoint.startswith("https://"): + return endpoint.strip() + return f"{self.base_url}/{endpoint.lstrip('/')}" + + def make_request( + self, + endpoint: str, + method: str = "GET", + headers: Optional[Mapping[str, str]] = None, + timeout: Optional[Union[int, float]] = None, + allow_redirects: bool = True, + **kwargs: Any, + ) -> Response: + url = self._build_url(endpoint) + req_headers = self.default_headers.copy() + if headers: + req_headers.update(headers) + + # Respect per-call timeout; fallback to client default + effective_timeout = timeout if timeout is not None else self.timeout + + response = self.session.request( + method=method.upper(), + url=url, + headers=req_headers, + timeout=effective_timeout, + allow_redirects=allow_redirects, + **kwargs, + ) + return response + + def get(self, endpoint: str, **kwargs: Any) -> Response: + return self.make_request(endpoint, method="GET", **kwargs) + + def post(self, endpoint: str, **kwargs: Any) -> Response: + return self.make_request(endpoint, method="POST", **kwargs) + + def put(self, endpoint: str, **kwargs: Any) -> Response: + return self.make_request(endpoint, method="PUT", **kwargs) + + def patch(self, endpoint: str, **kwargs: Any) -> Response: + return self.make_request(endpoint, method="PATCH", **kwargs) + + def delete(self, endpoint: str, **kwargs: Any) -> Response: + return self.make_request(endpoint, method="DELETE", **kwargs) + + def head(self, endpoint: str, **kwargs: Any) -> Response: + return self.make_request(endpoint, method="HEAD", **kwargs) + + def options(self, endpoint: str, **kwargs: Any) -> Response: + return self.make_request(endpoint, method="OPTIONS", **kwargs) + + +# ---------------------------- +# Pytest hooks +# ---------------------------- + +def pytest_configure(config: pytest.Config) -> None: + config.addinivalue_line("markers", "smoke: mark test as smoke for success scenarios") + + +# ---------------------------- +# Fixtures +# ---------------------------- + +@pytest.fixture(scope="session") +def config() -> Dict[str, Any]: + """ + Load configuration from config.yml located in the same directory as this file. + Supports environment variable expansion within string values. + """ + # per requirement: use os.path.join to find config in same directory + cfg_path_str = os.path.join(os.path.dirname(__file__), "config.yml") + cfg_path = Path(cfg_path_str).resolve() + + try: + raw = _load_yaml(cfg_path) + except Exception as e: + pytest.fail(f"Failed to load config.yml at {cfg_path}: {e}") + + expanded = _expand_env(raw) + + # Validate expected structure strictly as provided + # Configuration Structure: + # api: + # host: "${AUTH_API_API_HOST:-}" + # test_data: {} + if "api" not in expanded or not isinstance(expanded["api"], dict): + pytest.fail("Invalid config.yml: missing 'api' mapping.") + if "host" not in expanded["api"]: + pytest.fail("Invalid config.yml: missing 'api.host' key.") + if "test_data" not in expanded: + pytest.fail("Invalid config.yml: missing 'test_data' key.") + + return expanded + + +@pytest.fixture(scope="session") +def api_host(config: Dict[str, Any]) -> str: + """ + Provides the API host string, stripped of whitespace and trailing slash. + """ + host = (config["api"]["host"] or "").strip().rstrip("/") + if host == "": + pytest.fail("Config 'api.host' is empty. Ensure AUTH_API_API_HOST is set or default is provided.") + return host + + +@pytest.fixture(scope="session") +def api_client(api_host: str) -> ApiClient: + """ + Provides a configured ApiClient using the host from config. + """ + client = ApiClient(base_url=api_host) + return client + + +@pytest.fixture(scope="session") +def config_test_data(config: Dict[str, Any]) -> Any: + """ + Provides test_data loaded from the configuration. + """ + return config.get("test_data", {}) + + +@pytest.fixture(scope="session") +def load_test_data() -> Callable[[Union[Path, str]], Any]: + """ + Provides a helper to load JSON test data from a given path. + """ + return load_test_data_json diff --git a/tests/AUTH_API/test_api_auth_login_post.py b/tests/AUTH_API/test_api_auth_login_post.py new file mode 100644 index 00000000..6325a8ed --- /dev/null +++ b/tests/AUTH_API/test_api_auth_login_post.py @@ -0,0 +1,263 @@ +# ********RoostGPT******** + +# Test generated by RoostGPT for test api-test-pytest-circleci using AI Type Open AI and AI Model gpt-5 +# +# Test file generated for /api/auth/login_post for http method type POST +# RoostTestHash=05f67556da +# +# + +# ********RoostGPT******** +""" +Pytest suite for /api/auth/login endpoint based on the provided OpenAPI spec. + +Summary of analysis: +- Endpoints: + - POST /api/auth/login +- Security Schemas: + - None defined in the given spec for this endpoint. +- Response status codes per operation: + - 200: Successful login (application/json body with token, refreshToken, user object) + - 400: Invalid credentials (no schema/content defined) +- Request schema: + - application/json body required with fields: + - email (string, email format) [required] + - password (string, password format) [required] + +Test plan highlights: +- Table-driven tests parameterized using ENDPOINT_TEST_DATA loaded from api_auth_login.json. +- Use config_test_data fixture for baseline/common values, and override/merge with ENDPOINT_TEST_DATA per scenario. +- Validate request (best-effort using validator components where available) before sending. +- Validate all responses using SwaggerSchemaValidator.validate_schema_by_response. +- Cover success (200) and client error (400) pathways; if test data does not include some scenarios, those cases are skipped with pytest.skip. +- Authentication/security tests are skipped because no security schemes are defined for this endpoint. +- Each test function iterates over all objects in the JSON array and conditionally runs/skip per scenario requirements. + +Instructions: +- Ensure the following files exist in the same directory as this test module: + - conftest.py (provided) + - config.yml (provided) + - api.json (OpenAPI spec) + - api_auth_login.json (endpoint test data) +- Run tests: + - pytest -q +""" + +from pathlib import Path +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple + +import pytest +from requests import Response + +from validator import SwaggerSchemaValidator +# Load endpoint test data using the same loader as exposed by conftest utilities (module-level to avoid lazy fixture issues) +from conftest import load_test_data_json as _load_test_data_json + + +# ---------------------------- +# Constants and module-level data setup +# ---------------------------- + +TEST_DATA_FILENAME = "api_auth_login.json" +OPENAPI_SPEC_FILENAME = "api.json" + +ENDPOINT = "/api/auth/login" +METHOD = "POST" + +_here = Path(__file__).resolve().parent +_spec_path = _here / OPENAPI_SPEC_FILENAME +_endpoint_data_path = _here / TEST_DATA_FILENAME + +# Load ENDPOINT_TEST_DATA at import-time to support pytest parametrize without lazy fixtures +try: + _RAW_ENDPOINT_DATA: Any = _load_test_data_json(_endpoint_data_path) +except Exception as _e: + # If file is missing or invalid, keep an empty list; tests will skip with a clear message later. + _RAW_ENDPOINT_DATA = [] + +# Normalize to list of dicts +_ENDPOINT_DATA: List[Dict[str, Any]] = _RAW_ENDPOINT_DATA if isinstance(_RAW_ENDPOINT_DATA, list) else [] + +def _param_ids(cases: Iterable[Mapping[str, Any]]) -> List[str]: + ids: List[str] = [] + for idx, c in enumerate(cases): + scenario = str(c.get("scenario") or "").strip() or f"case-{idx+1}" + status = str(c.get("statusCode") or "").strip() or "unknown" + ids.append(f"{idx+1}:{status}:{scenario}") + return ids + +_PARAM_CASES = _ENDPOINT_DATA +_PARAM_IDS = _param_ids(_ENDPOINT_DATA) + + +# ---------------------------- +# Validators and helpers +# ---------------------------- + +@pytest.fixture(scope="session") +def swagger_validator() -> SwaggerSchemaValidator: + """ + Session-scoped Swagger/OpenAPI validator against api.json. + """ + if not _spec_path.exists(): + pytest.fail(f"OpenAPI spec file not found at: {_spec_path}") + return SwaggerSchemaValidator(str(_spec_path)) + + +def _merge_login_payload(base: Mapping[str, Any], override: Mapping[str, Any]) -> Dict[str, Any]: + """ + Construct payload for /api/auth/login by merging base (config_test_data) with + scenario-specific override (ENDPOINT_TEST_DATA). Only required fields are considered. + """ + payload: Dict[str, Any] = {} + + # Only required properties per spec: email, password + if "email" in base and base["email"] is not None: + payload["email"] = base["email"] + if "password" in base and base["password"] is not None: + payload["password"] = base["password"] + + if "email" in override and override["email"] is not None: + payload["email"] = override["email"] + if "password" in override and override["password"] is not None: + payload["password"] = override["password"] + + return payload + + +def _try_validate_request_payload( + validator: SwaggerSchemaValidator, + payload: Mapping[str, Any], + schema_candidates: Tuple[str, ...] = ( + # Fallback candidate names; these may not be present in the spec. + # We rely solely on validator methods and do not define schemas here. + "LoginRequest", + "AuthLoginRequest", + "LoginBody", + "login_request", + ), +) -> None: + """ + Best-effort request validation using available component schemas in the spec. + If none of the candidate schemas exist, continue without failing the test. + """ + for name in schema_candidates: + try: + res = validator.validate_json(dict(payload), name) + except Exception: + # Schema not found or other validator internals; try next candidate + continue + # If validator returned a dict with "valid": bool, accept its result + if isinstance(res, dict): + if not res.get("valid", False): + pytest.fail(f"Request payload validation failed for schema '{name}': {res}") + return + # No candidate matched; proceed without failing (inline requestBody schema cannot be referenced by name) + + +def _assert_response_schema( + validator: SwaggerSchemaValidator, response: Response, endpoint: str, method: str, status_code: int +) -> None: + """ + Validate the response schema for a given endpoint/method/status using the swagger validator. + """ + res = validator.validate_schema_by_response(endpoint, method, str(status_code), response) + if not isinstance(res, dict): + pytest.fail("Validator returned an unexpected result type") + + if not res.get("valid", False): + pytest.fail(f"Response schema validation failed: {res}") + + +def _ensure_endpoint_data_available(): + if not _PARAM_CASES: + pytest.skip(f"No ENDPOINT_TEST_DATA loaded from {TEST_DATA_FILENAME}") + + +# ---------------------------- +# Tests +# ---------------------------- + +@pytest.mark.parametrize("scenario_case", _PARAM_CASES, ids=_PARAM_IDS) +@pytest.mark.smoke +def test_auth_login_smoke_success(api_client, config_test_data: Mapping[str, Any], swagger_validator: SwaggerSchemaValidator, scenario_case: Mapping[str, Any]) -> None: + """ + Smoke: Happy path using only required fields for POST /api/auth/login. + Uses scenario-specific data merged with default config_test_data. + """ + _ensure_endpoint_data_available() + + expected_status = scenario_case.get("statusCode") + if expected_status != 200: + pytest.skip(f"Scenario not for success (expected 200), got {expected_status}") + + base_defaults = config_test_data or {} + payload = _merge_login_payload(base_defaults, scenario_case) + + # Guard: required fields must be present; skip if missing, do not error during request send. + if "email" not in payload or "password" not in payload: + pytest.skip("Missing required fields 'email' or 'password' for success scenario in provided test data") + + # Best-effort request validation (component schema name may not exist; handled gracefully) + _try_validate_request_payload(swagger_validator, payload) + + resp = api_client.post(ENDPOINT, json=payload) + + assert resp.status_code == expected_status, f"Unexpected status code. Body: {resp.text}" + _assert_response_schema(swagger_validator, resp, ENDPOINT, METHOD, expected_status) + + +@pytest.mark.parametrize("scenario_case", _PARAM_CASES, ids=_PARAM_IDS) +def test_auth_login_invalid_credentials_400(api_client, config_test_data: Mapping[str, Any], swagger_validator: SwaggerSchemaValidator, scenario_case: Mapping[str, Any]) -> None: + """ + Negative: Invalid credentials should yield 400 per spec. + This test only executes when scenario_case['statusCode'] == 400 in ENDPOINT_TEST_DATA; otherwise skipped. + """ + _ensure_endpoint_data_available() + + expected_status = scenario_case.get("statusCode") + if expected_status != 400: + pytest.skip(f"Scenario not for invalid credentials (expected 400), got {expected_status}") + + base_defaults = config_test_data or {} + payload = _merge_login_payload(base_defaults, scenario_case) + + # If required fields are missing for the negative case, we still send the request if possible + # because server may respond 400 for bad/missing data. If payload is empty, send empty dict. + _try_validate_request_payload(swagger_validator, payload) + + resp = api_client.post(ENDPOINT, json=payload if payload else {}) + + assert resp.status_code == expected_status, f"Unexpected status code. Body: {resp.text}" + # 400 response has no schema/content in spec; validator should return valid=True (no schema) + _assert_response_schema(swagger_validator, resp, ENDPOINT, METHOD, expected_status) + + +@pytest.mark.parametrize("scenario_case", _PARAM_CASES, ids=_PARAM_IDS) +def test_auth_login_missing_body_bad_request(api_client, swagger_validator: SwaggerSchemaValidator, scenario_case: Mapping[str, Any]) -> None: + """ + Edge case: Missing body. Only executes for scenarios explicitly indicating this pathway via statusCode==400. + Otherwise, skipped per case. Ensures table iteration across all ENDPOINT_TEST_DATA objects. + """ + _ensure_endpoint_data_available() + + expected_status = scenario_case.get("statusCode") + if expected_status != 400: + pytest.skip(f"Scenario not indicating missing/invalid body (expected 400), got {expected_status}") + + # Send request with no body to simulate missing payload case + resp = api_client.post(ENDPOINT) + + # Server might respond with 400; enforce spec expectation of scenario's statusCode + assert resp.status_code == expected_status, f"Unexpected status code. Body: {resp.text}" + _assert_response_schema(swagger_validator, resp, ENDPOINT, METHOD, expected_status) + + +@pytest.mark.parametrize("scenario_case", _PARAM_CASES, ids=_PARAM_IDS) +def test_auth_login_security_schemes(api_client, scenario_case: Mapping[str, Any]) -> None: + """ + Security schema validations for the endpoint. Since spec defines no security for /api/auth/login, + this test is skipped for all scenarios. + """ + _ensure_endpoint_data_available() + pytest.skip("No security schemes defined for /api/auth/login in the provided API spec") diff --git a/tests/AUTH_API/validator.py b/tests/AUTH_API/validator.py new file mode 100644 index 00000000..76cf47fb --- /dev/null +++ b/tests/AUTH_API/validator.py @@ -0,0 +1,226 @@ + +import json +import yaml +from jsonschema import ( + Draft202012Validator, + Draft7Validator, + Draft4Validator, + ValidationError, +) +from referencing import Registry, Resource +from pathlib import Path +from typing import Dict, Any +import requests + + +class SwaggerSchemaValidator: + """ + Validates JSON, XML, and text responses + """ + + def __init__(self, swagger_source: str): + self.spec = self._load_spec(swagger_source) + self.is_swagger2 = False + self.schemas = self._extract_schemas() + self.registry = Registry() + + for name, schema in self.schemas.items(): + pointer = ( + f"#/definitions/{name}" if self.is_swagger2 + else f"#/components/schemas/{name}" + ) + + wrapped = { + "$schema": "https://json-schema.org/draft/2020-12/schema", + **schema, + } + self.registry = self.registry.with_resource( + pointer, + Resource.from_contents(wrapped) + ) + + def _load_spec(self, source: str) -> Dict[str, Any]: + # Convert Path to string if needed + if isinstance(source, Path): + source = str(source) + + if source.startswith(("http://", "https://")): + resp = requests.get(source) + resp.raise_for_status() + text = resp.text + + try: + return yaml.safe_load(text) + except yaml.YAMLError: + try: + return json.loads(text) + except json.JSONDecodeError: + raise ValueError("URL does not contain valid YAML or JSON") + + with open(source, "r") as f: + text = f.read() + + if source.endswith((".yaml", ".yml")): + return yaml.safe_load(text) + if source.endswith(".json"): + return json.loads(text) + + raise ValueError("File must be YAML or JSON") + + def _extract_schemas(self): + if "components" in self.spec and "schemas" in self.spec["components"]: + self.is_swagger2 = False + return self.spec["components"]["schemas"] + + if "definitions" in self.spec: + self.is_swagger2 = True + return self.spec["definitions"] + + raise ValueError("No schemas found under components/schemas or definitions") + + def get_version(self): + return self.spec.get("openapi") or self.spec.get("swagger") or "" + + def select_validator(self): + v = self.get_version() + + if v.startswith("2."): + return Draft4Validator + if v.startswith("3.0"): + return Draft7Validator + if v.startswith("3.1"): + return Draft202012Validator + + return Draft202012Validator + + def resolve_ref(self, ref): + if ref.startswith("#/"): + parts = ref.lstrip("#/").split("/") + node = self.spec + for p in parts: + node = node[p] + return node + + raise ValueError(f"External refs not supported: {ref}") + + def deref(self, schema): + if isinstance(schema, dict): + if "$ref" in schema: + resolved = self.resolve_ref(schema["$ref"]) + return self.deref(resolved) + return {k: self.deref(v) for k, v in schema.items()} + + if isinstance(schema, list): + return [self.deref(v) for v in schema] + + return schema + + def detect_format(self, response): + ctype = response.headers.get("Content-Type", "").lower() + if "json" in ctype: + return "json" + if "xml" in ctype: + return "xml" + if "text" in ctype: + return "text" + return "binary" + + def parse_body(self, response, fmt): + if fmt == "json": + return json.loads(response.text) + + if fmt == "xml": + import xmltodict + return xmltodict.parse(response.text) + + if fmt == "text": + return response.text + + return response.content + + def extract_schema_for_media_type(self, response_block, content_type): + content = response_block.get("content", {}) + + if content_type in content: + return content[content_type].get("schema") + + if "json" in content_type: + for k, v in content.items(): + if k == "application/json" or k.endswith("+json"): + return v.get("schema") + + if "xml" in content_type: + for k, v in content.items(): + if "xml" in k: + return v.get("schema") + + if "text/plain" in content: + return content["text/plain"].get("schema") + + return None + + + def validate_json(self, data, schema_name): + if schema_name not in self.schemas: + raise ValueError(f"Schema '{schema_name}' not found") + + schema = self.deref(self.schemas[schema_name]) + validator_cls = self.select_validator() + validator = validator_cls(schema, registry=self.registry) + + try: + validator.validate(data) + return {"valid": True} + except ValidationError as e: + return { + "valid": False, + "message": e.message, + "path": list(e.path), + "schema_path": list(e.schema_path), + } + + def validate_schema_by_response(self, endpoint, method, status_code, response): + fmt = self.detect_format(response) + + paths = self.spec.get("paths", {}) + op = paths.get(endpoint, {}).get(method.lower()) + + if not op: + return {"valid": False, "message": f"Method {method} not found at path {endpoint}"} + + responses = op.get("responses", {}) + response_block = responses.get(status_code) + + if not response_block: + return {"valid": False, "message": f"No response block for {status_code}"} + + ctype = response.headers.get("Content-Type", "").split(";")[0].strip() + + if "content" in response_block: + schema = self.extract_schema_for_media_type(response_block, ctype) + else: + schema = response_block.get("schema") + + if schema is None: + return {"valid": True, "message": "No schema defined for this content type"} + + try: + data = self.parse_body(response, fmt) + except Exception as e: + return {"valid": False, "message": f"Body parsing failed: {e}"} + + schema = self.deref(schema) + + validator_cls = self.select_validator() + validator = validator_cls(schema, registry=self.registry) + + try: + validator.validate(data) + return {"valid": True} + except ValidationError as e: + return { + "valid": False, + "message": e.message, + "path": list(e.path), + "schema_path": list(e.schema_path), + }