From 34967b1ab3f747f35647f2f8e42fd53c554940d7 Mon Sep 17 00:00:00 2001 From: ASigma213 Date: Sat, 31 Jan 2026 09:42:01 +0100 Subject: [PATCH] Add files via upload --- README.md | 59 ++++++++++++++++++++++++- __init__.py | 1 + __main__.py | 66 +++++++++++++++++++++++++++ filters.py | 65 +++++++++++++++++++++++++++ fixer.py | 51 +++++++++++++++++++++ injector.py | 78 ++++++++++++++++++++++++++++++++ predictor.py | 113 +++++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 22 +++++++++ remediation.py | 66 +++++++++++++++++++++++++++ requirements.txt | 3 ++ scanner.py | 64 +++++++++++++++++++++++++++ tester.py | 88 ++++++++++++++++++++++++++++++++++++ 12 files changed, 674 insertions(+), 2 deletions(-) create mode 100644 __init__.py create mode 100644 __main__.py create mode 100644 filters.py create mode 100644 fixer.py create mode 100644 injector.py create mode 100644 predictor.py create mode 100644 pyproject.toml create mode 100644 remediation.py create mode 100644 requirements.txt create mode 100644 scanner.py create mode 100644 tester.py diff --git a/README.md b/README.md index 4b0d9a7..dcebd34 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,57 @@ -# SQLbase -An advanced and automated sql injector that replace the work from a 1:1 sandboxed operation to a real life one in just 1 simple command. +# SQLbase – Security Scanning Toolkit + +Cross-platform (Linux, Windows, macOS) security scanning: SQL injection scanning, dynamic testing, code fixing, and remediation. Uses `pathlib` and portable paths throughout. + +## Install + +```bash +python -m venv .venv +# Linux/macOS: +.venv/bin/activate +# Windows: +.venv\Scripts\activate +pip install -r requirements.txt +pip install -e . +``` + +Or run without install (from repo root): + +```bash +# Linux/macOS/Windows (same commands) +PYTHONPATH=. python -m sqlbase scan . +PYTHONPATH=. python -m sqlbase predict . +PYTHONPATH=. python -m sqlbase remediate SQL_INJECTION python +``` + +## Usage + +```bash +# Scan path for SQL injection patterns (file or directory) +python -m sqlbase scan [path] [-o report.json] [--fail-on-findings] + +# Predict vulnerability likelihood (heuristic/ML-ready) +python -m sqlbase predict [path] + +# Get remediation for a vulnerability type and language +python -m sqlbase remediate SQL_INJECTION python +``` + +Programmatic use: + +```python +from pathlib import Path +from sqlbase.scanner import SQLInjectionScanner +from sqlbase.tester import DynamicSQLiTester +from sqlbase.fixer import SqliCodeFixer +from sqlbase.remediation import RemediationKnowledgeBase +from sqlbase.injector import SecurityPatternInjector +from sqlbase.predictor import VulnerabilityPredictor + +scanner = SQLInjectionScanner() +for v in scanner.scan_path(Path("src")): + print(v["file"], v["line"], v["type"]) +``` + +## CI + +GitHub Actions: `.github/workflows/security-scan.yml` runs on **ubuntu-latest**, **windows-latest**, and **macos-latest** on push/PR. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..84efe81 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +# SQLbase – cross-platform security scanning toolkit diff --git a/__main__.py b/__main__.py new file mode 100644 index 0000000..013d410 --- /dev/null +++ b/__main__.py @@ -0,0 +1,66 @@ +""" +CLI entrypoint: python -m sqlbase [scan|predict|remediate] ... +Cross-platform: Linux, Windows, macOS. +""" +import argparse +import json +import sys +from pathlib import Path + + +def cmd_scan(args) -> int: + from sqlbase.scanner import SQLInjectionScanner + scanner = SQLInjectionScanner() + path = Path(args.path).resolve() + results = scanner.scan_path(path, extensions=args.extensions) + out = json.dumps(results, indent=2) + if args.output: + Path(args.output).write_text(out, encoding="utf-8") + print(f"Wrote {len(results)} findings to {args.output}", file=sys.stderr) + else: + print(out) + return 0 if not args.fail_on_findings or len(results) == 0 else 1 + + +def cmd_predict(args) -> int: + from sqlbase.predictor import VulnerabilityPredictor + predictor = VulnerabilityPredictor() + result = predictor.predict_vulnerability_likelihood(Path(args.path)) + print(json.dumps(result, indent=2)) + return 0 + + +def cmd_remediate(args) -> int: + from sqlbase.remediation import RemediationKnowledgeBase + kb = RemediationKnowledgeBase() + r = kb.get_remediation(args.type, args.language) + print(json.dumps(r, indent=2)) + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description="SQLbase security toolkit") + sub = parser.add_subparsers(dest="command", required=True) + # scan + p_scan = sub.add_parser("scan", help="Scan path for SQL injection patterns") + p_scan.add_argument("path", nargs="?", default=".", help="File or directory to scan") + p_scan.add_argument("-o", "--output", metavar="FILE", help="Write JSON report to FILE (cross-platform path)") + p_scan.add_argument("--fail-on-findings", action="store_true", help="Exit 1 if any finding") + p_scan.add_argument("--extensions", nargs="+", default=[".py", ".java", ".js", ".ts", ".php", ".rb", ".go", ".cs"], help="File extensions") + p_scan.set_defaults(func=cmd_scan) + # predict + p_predict = sub.add_parser("predict", help="Predict vulnerability likelihood") + p_predict.add_argument("path", nargs="?", default=".", help="File or directory") + p_predict.set_defaults(func=cmd_predict) + # remediate + p_rem = sub.add_parser("remediate", help="Get remediation for vulnerability type + language") + p_rem.add_argument("type", help="e.g. SQL_INJECTION") + p_rem.add_argument("language", help="e.g. python, java") + p_rem.set_defaults(func=cmd_remediate) + + args = parser.parse_args() + return args.func(args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/filters.py b/filters.py new file mode 100644 index 0000000..923a720 --- /dev/null +++ b/filters.py @@ -0,0 +1,65 @@ +""" +Security filter stubs for intercepting filter pattern. Cross-platform. +""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + + +class BaseFilter(ABC): + @abstractmethod + def apply(self, data: Any) -> Any: + pass + + @abstractmethod + def name(self) -> str: + pass + + +class SQLInjectionFilter(BaseFilter): + def name(self) -> str: + return "SQLi" + + def apply(self, data: Any) -> Any: + if isinstance(data, str): + return data.replace("'", "''").replace("\\", "\\\\") + if isinstance(data, dict): + return {k: self.apply(v) for k, v in data.items()} + if isinstance(data, (list, tuple)): + return type(data)(self.apply(x) for x in data) + return data + + +class XSSFilter(BaseFilter): + def name(self) -> str: + return "XSS" + + def apply(self, data: Any) -> Any: + if isinstance(data, str): + return ( + data.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) + if isinstance(data, dict): + return {k: self.apply(v) for k, v in data.items()} + if isinstance(data, (list, tuple)): + return type(data)(self.apply(x) for x in data) + return data + + +class CommandInjectionFilter(BaseFilter): + def name(self) -> str: + return "CommandInjection" + + def apply(self, data: Any) -> Any: + if isinstance(data, str): + for char in [";", "|", "&", "$", "`", "\n", "\r"]: + data = data.replace(char, "") + return data + if isinstance(data, dict): + return {k: self.apply(v) for k, v in data.items()} + if isinstance(data, (list, tuple)): + return type(data)(self.apply(x) for x in data) + return data diff --git a/fixer.py b/fixer.py new file mode 100644 index 0000000..435337a --- /dev/null +++ b/fixer.py @@ -0,0 +1,51 @@ +""" +SQL injection code fixer. Supports Java and Python. Cross-platform. +""" +import re +from typing import Optional + + +class SqliCodeFixer: + def fix_concatenation(self, code_snippet: str, language: str) -> str: + language = (language or "").strip().lower() + if language == "java": + return self.fix_java_sqli(code_snippet) + if language in ("python", "py"): + return self.fix_python_sqli(code_snippet) + return code_snippet + + def fix_java_sqli(self, code: str) -> str: + # Pattern: String sql = "SELECT ..." + userInput; + pattern = r'(\w+)\s*=\s*["\'](SELECT\s+.*?)["\']\s*\+\s*(\w+)' + replacement = ( + r'String \1 = "\2 ?";\n' + r'PreparedStatement stmt = conn.prepareStatement(\1);\n' + r'stmt.setString(1, \3);' + ) + result = re.sub(pattern, replacement, code, flags=re.IGNORECASE) + # Also: "SELECT ..." + var + " ..." + pattern2 = r'["\'](SELECT\s+.*?)\s*\+\s*(\w+)\s*\+\s*["\'](.*?)["\']' + if not re.search(pattern2, result, re.IGNORECASE): + return result + result = re.sub( + pattern2, + r'"\1 ? \3"; PreparedStatement stmt = conn.prepareStatement(sql); stmt.setString(1, \2);', + result, + flags=re.IGNORECASE, + ) + return result + + def fix_python_sqli(self, code: str) -> str: + # cursor.execute("SELECT ... " + user_id) + pattern = r'\.(execute|executemany)\s*\(\s*["\']([^"\']*?)["\']\s*\+\s*(\w+)' + replacement = r'.\1("\2 %s", (\3,))' + result = re.sub(pattern, replacement, code) + # cursor.execute("SELECT ... %s" % user_id) -> parameterized + pattern2 = r'\.(execute|executemany)\s*\(\s*["\']([^"\']*?)["\']\s*%\s*(\w+)' + replacement2 = r'.\1("\2", (\3,))' + result = re.sub(pattern2, replacement2, result) + # f"SELECT ... {var}" + pattern3 = r'\.(execute|executemany)\s*\(\s*f["\']([^"\']*)\{(\w+)\}([^"\']*)["\']\s*\)' + replacement3 = r'.\1("\2%s\3", (\4,))' + result = re.sub(pattern3, replacement3, result) + return result diff --git a/injector.py b/injector.py new file mode 100644 index 0000000..7592d74 --- /dev/null +++ b/injector.py @@ -0,0 +1,78 @@ +""" +Security pattern injector: applies intercepting filters and injects filter calls. +Cross-platform: uses pathlib for Linux, Windows, macOS. +""" +from pathlib import Path +from typing import Dict, Optional + +from sqlbase.filters import ( + BaseFilter, + SQLInjectionFilter, + XSSFilter, + CommandInjectionFilter, +) + + +class FilterManager: + def __init__(self, filters: Dict[str, BaseFilter]) -> None: + self.filters = filters + + def apply_all(self, data): + for _name, f in self.filters.items(): + data = f.apply(data) + return data + + +class SecurityPatternInjector: + def __init__(self) -> None: + self.filter_manager: Optional[FilterManager] = None + + def apply_intercepting_filter(self, project_path: str | Path) -> None: + filters: Dict[str, BaseFilter] = { + "SQLi": SQLInjectionFilter(), + "XSS": XSSFilter(), + "CommandInjection": CommandInjectionFilter(), + } + self.filter_manager = self.generate_filter_manager(filters) + self.inject_filter_calls(Path(project_path)) + + def generate_filter_manager(self, filters: Dict[str, BaseFilter]) -> FilterManager: + return FilterManager(filters) + + def inject_filter_calls(self, project_path: Path) -> None: + project_path = Path(project_path).resolve() + if not project_path.exists(): + return + # Write a small bootstrap module that projects can import to get FilterManager + bootstrap_dir = project_path / ".security_filters" + bootstrap_dir.mkdir(exist_ok=True) + bootstrap_file = bootstrap_dir / "filter_manager.py" + content = '''""" +Auto-generated security filter bootstrap. Cross-platform. +Import: from .security_filters.filter_manager import get_filter_manager +""" +from pathlib import Path + +_filters = None + +def get_filter_manager(): + global _filters + if _filters is None: + from sqlbase.filters import SQLInjectionFilter, XSSFilter, CommandInjectionFilter + from sqlbase.injector import FilterManager + _filters = FilterManager({ + "SQLi": SQLInjectionFilter(), + "XSS": XSSFilter(), + "CommandInjection": CommandInjectionFilter(), + }) + return _filters + +def apply_security_filters(data): + return get_filter_manager().apply_all(data) +''' + bootstrap_file.write_text(content, encoding="utf-8") + init_file = bootstrap_dir / "__init__.py" + init_file.write_text( + "from .filter_manager import get_filter_manager, apply_security_filters\n", + encoding="utf-8", + ) diff --git a/predictor.py b/predictor.py new file mode 100644 index 0000000..a8d8f95 --- /dev/null +++ b/predictor.py @@ -0,0 +1,113 @@ +""" +Vulnerability predictor: heuristic-based (ML-ready). Cross-platform. +""" +from pathlib import Path +from typing import Dict, Any, List, Optional +import re + + +class VulnerabilityPredictor: + def __init__(self) -> None: + self.model = self.load_trained_model() + self.features = [ + "code_complexity", + "input_sources_count", + "database_interactions", + "authentication_points", + ] + + def load_trained_model(self) -> Optional[Any]: + try: + from sklearn.ensemble import RandomForestClassifier + import numpy as np + # Placeholder: no real training data; use simple heuristic weights + clf = RandomForestClassifier(n_estimators=10, random_state=42) + # Dummy fit so predict doesn't fail; real usage would load a serialized model + X = np.zeros((5, 4)) + y = np.array([0, 0, 1, 0, 1]) + clf.fit(X, y) + return clf + except ImportError: + return None + + def extract_features(self, codebase: str | Path) -> List[float]: + path = Path(codebase) + text = "" + if path.is_file(): + try: + text = path.read_text(encoding="utf-8", errors="replace") + except (OSError, PermissionError): + pass + elif path.is_dir(): + for ext in [".py", ".java", ".js", ".ts", ".php"]: + for f in path.rglob(f"*{ext}"): + try: + text += f.read_text(encoding="utf-8", errors="replace") + "\n" + except (OSError, PermissionError): + continue + else: + text = str(codebase) + + # Heuristic features + code_complexity = min(1.0, (len(text) / 10000) + (text.count("\n") / 500) * 0.1) + input_sources = len(re.findall(r"(input|request\.(get|post)|argv|getParameter)", text, re.I)) + input_sources_count = min(1.0, input_sources / 20) + db_interactions = len( + re.findall( + r"(execute|query|raw|prepareStatement|SELECT|INSERT|UPDATE|DELETE)", + text, + re.I, + ) + ) + database_interactions = min(1.0, db_interactions / 30) + auth_points = len( + re.findall(r"(password|login|auth|session|token|credential)", text, re.I) + ) + authentication_points = min(1.0, auth_points / 15) + + return [ + code_complexity, + input_sources_count, + database_interactions, + authentication_points, + ] + + def generate_recommendations(self, predictions: Dict[str, float]) -> List[str]: + recs: List[str] = [] + if predictions.get("sqli", 0) > 0.5: + recs.append("Use parameterized queries / PreparedStatement for all DB access.") + if predictions.get("xss", 0) > 0.5: + recs.append("Escape user-controlled output; consider CSP and encoding libraries.") + if not recs: + recs.append("Review input validation and output encoding.") + return recs + + def predict_vulnerability_likelihood( + self, codebase: str | Path + ) -> Dict[str, Any]: + features = self.extract_features(codebase) + predictions = {"sqli": 0.0, "xss": 0.0} + if self.model is not None: + try: + import numpy as np + X = np.array([features]) + pred = self.model.predict_proba(X) + if pred.shape[1] >= 2: + predictions["sqli"] = float(pred[0][1]) + predictions["xss"] = float(pred[0][1]) * 0.8 + else: + predictions["sqli"] = 0.3 * (features[1] + features[2]) + predictions["xss"] = 0.3 * (features[1] + features[3]) + except Exception: + predictions["sqli"] = 0.3 * (features[1] + features[2]) + predictions["xss"] = 0.3 * (features[1] + features[3]) + else: + predictions["sqli"] = 0.3 * (features[1] + features[2]) + predictions["xss"] = 0.3 * (features[1] + features[3]) + + return { + "sql_injection_risk": round(predictions["sqli"], 4), + "xss_risk": round(predictions["xss"], 4), + "recommended_fixes": self.generate_recommendations(predictions), + "features": dict(zip(self.features, features)), + } diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4fb692d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=61", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "sqlbase" +version = "0.1.0" +description = "Cross-platform security scanning toolkit (SQL injection, XSS, remediation)" +readme = "README.md" +requires-python = ">=3.9" +dependencies = [ + "requests>=2.28.0", + "scikit-learn>=1.2.0", + "numpy>=1.24.0", +] + +[project.scripts] +sqlbase-scan = "sqlbase.__main__:main" + +[tool.setuptools.packages.find] +where = ["."] +include = ["sqlbase*"] diff --git a/remediation.py b/remediation.py new file mode 100644 index 0000000..08ab844 --- /dev/null +++ b/remediation.py @@ -0,0 +1,66 @@ +""" +Remediation knowledge base for SQL injection and related fixes. Cross-platform. +""" +from typing import Dict, Any, List + + +class RemediationKnowledgeBase: + def __init__(self) -> None: + self.patterns: Dict[str, Dict[str, Dict[str, Any]]] = { + "SQL_INJECTION": { + "java": { + "solution": "Use PreparedStatement", + "example": 'String sql = "SELECT * FROM users WHERE id = ?";', + "libraries": ["java.sql.PreparedStatement"], + "extra": "stmt.setString(1, userInput);", + }, + "python": { + "solution": "Use parameterized queries", + "example": 'cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))', + "libraries": ["psycopg2", "sqlite3", "SQLAlchemy", "mysql.connector"], + "extra": "Never use % or .format() on the query string; pass params as second argument.", + }, + "php": { + "solution": "Use PDO prepared statements", + "example": '$stmt = $pdo->prepare("SELECT * FROM users WHERE id = ?"); $stmt->execute([$id]);', + "libraries": ["PDO", "mysqli"], + }, + "csharp": { + "solution": "Use parameterized SqlCommand", + "example": 'cmd.CommandText = "SELECT * FROM users WHERE id = @id"; cmd.Parameters.AddWithValue("@id", id);', + "libraries": ["System.Data.SqlClient"], + }, + }, + "XSS": { + "python": { + "solution": "Escape output and use CSP", + "example": "from markupsafe import escape; escape(user_input)", + "libraries": ["markupsafe", "bleach"], + }, + "java": { + "solution": "Use OWASP Java Encoder", + "example": "Encoder.forHtml(userInput)", + "libraries": ["org.owasp.encoder"], + }, + }, + "COMMAND_INJECTION": { + "python": { + "solution": "Use subprocess with list args, never shell=True with user input", + "example": "subprocess.run([\"ls\", \"-la\"], capture_output=True)", + "libraries": ["subprocess"], + }, + }, + } + + def get_remediation( + self, vulnerability_type: str, language: str + ) -> Dict[str, Any]: + return ( + self.patterns.get(vulnerability_type, {}) + .get(language, {}) + .copy() + or {} + ) + + def get_languages_for_type(self, vulnerability_type: str) -> List[str]: + return list(self.patterns.get(vulnerability_type, {}).keys()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3f9d876 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests>=2.28.0 +scikit-learn>=1.2.0 +numpy>=1.24.0 diff --git a/scanner.py b/scanner.py new file mode 100644 index 0000000..2d20839 --- /dev/null +++ b/scanner.py @@ -0,0 +1,64 @@ +""" +SQL injection static scanner. Cross-platform: Linux, Windows, macOS. +Uses pathlib for portable paths. +""" +import re +from pathlib import Path +from typing import List, Dict, Any, Optional + + +class SQLInjectionScanner: + def __init__(self) -> None: + self.patterns = [ + (r"execute\s*\([^)]*\+[^)]*\)", "String concatenation in execute"), + (r"\.(execute|executemany)\s*\([^)]*%\s*s", "%-format in query"), + (r"\.(execute|executemany)\s*\([^)]*\.format\s*\(", "str.format in query"), + (r'"(?:SELECT|INSERT|UPDATE|DELETE)\s+[^"]*\{[^}]*\}', "F-string in SQL"), + (r"'(?:SELECT|INSERT|UPDATE|DELETE)\s+[^']*\{[^}]*\}", "F-string in SQL"), + (r"query\s*=\s*[^;]+;\s*query\s*\+=", "Query built with +="), + (r"Statement\.execute\s*\([^)]*\+", "Java statement concatenation"), + (r"createStatement\s*\(\s*\)\s*\.\s*execute\s*\([^)]*\+", "Statement + string"), + (r"raw\s*\(\s*[^)]*\+", "Raw query concatenation"), + (r"\.format\s*\([^)]*\)\s*\)\s*\.(execute|query)", "Format then execute"), + ] + + def scan_file(self, file_path: str | Path) -> List[Dict[str, Any]]: + vulnerabilities: List[Dict[str, Any]] = [] + path = Path(file_path) + if not path.exists(): + return vulnerabilities + try: + text = path.read_text(encoding="utf-8", errors="replace") + except (OSError, PermissionError): + return vulnerabilities + lines = text.splitlines() + for i, line in enumerate(lines): + for pattern, desc in self.patterns: + if re.search(pattern, line, re.IGNORECASE | re.DOTALL): + vulnerabilities.append({ + "line": i + 1, + "code": line.strip(), + "type": "SQL_INJECTION", + "description": desc, + }) + break + return vulnerabilities + + def scan_path(self, path: str | Path, extensions: Optional[List[str]] = None) -> List[Dict[str, Any]]: + path = Path(path).resolve() + extensions = extensions or [".py", ".java", ".js", ".ts", ".php", ".rb", ".go", ".cs"] + results: List[Dict[str, Any]] = [] + if path.is_file(): + for v in self.scan_file(path): + v["file"] = str(path) + results.append(v) + return results + for ext in extensions: + for f in path.rglob(f"*{ext}"): + try: + for v in self.scan_file(f): + v["file"] = str(f) + results.append(v) + except (OSError, PermissionError): + continue + return results diff --git a/tester.py b/tester.py new file mode 100644 index 0000000..f600714 --- /dev/null +++ b/tester.py @@ -0,0 +1,88 @@ +""" +Dynamic SQL injection tester. Cross-platform; uses requests with timeouts. +""" +import re +from typing import List, Dict, Any, Optional +from urllib.parse import urljoin + +import requests + + +class DynamicSQLiTester: + def __init__(self, target_url: str, timeout: float = 10.0, verify_ssl: bool = True) -> None: + self.target_url = target_url.rstrip("/") + self.timeout = timeout + self.verify_ssl = verify_ssl + self.session = requests.Session() + self.session.verify = verify_ssl + self.payloads = [ + "' OR '1'='1", + "'; DROP TABLE users; --", + "' UNION SELECT NULL --", + "1' OR '1'='1' --", + "1 OR 1=1", + "admin'--", + "' OR 1=1--", + "1; SELECT pg_sleep(5)--", + ] + + def is_vulnerable(self, response: requests.Response, param: str, payload: str) -> bool: + text = (response.text or "").lower() + # Error-based indicators (cross-DB common messages) + error_indicators = [ + "sql syntax", + "syntax error", + "mysql_fetch", + "pg_query", + "sqlite_", + "ora-01", + "unclosed quotation", + "quoted string not properly terminated", + "unexpected end of sql", + "warning: mysql", + "valid mysql result", + "myisam", + "mysqli", + "postgresql", + "sqlstate", + ] + for indicator in error_indicators: + if indicator in text: + return True + # Optional: compare with baseline (no payload) response length/content + return False + + def test_endpoint( + self, + endpoint: str, + params: Dict[str, str], + method: str = "POST", + headers: Optional[Dict[str, str]] = None, + ) -> List[Dict[str, Any]]: + vulnerabilities: List[Dict[str, Any]] = [] + url = endpoint if endpoint.startswith("http") else urljoin(self.target_url + "/", endpoint) + headers = headers or {"Content-Type": "application/x-www-form-urlencoded"} + + for param in params: + for payload in self.payloads: + test_params = dict(params) + test_params[param] = payload + try: + if method.upper() == "POST": + response = self.session.post( + url, data=test_params, headers=headers, timeout=self.timeout + ) + else: + response = self.session.get( + url, params=test_params, headers=headers, timeout=self.timeout + ) + if self.is_vulnerable(response, param, payload): + vulnerabilities.append({ + "parameter": param, + "payload": payload, + "endpoint": url, + "status_code": response.status_code, + }) + except requests.RequestException: + continue + return vulnerabilities