diff --git a/README.md b/README.md index 4b0d9a7..dcebd34 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,57 @@ -# SQLbase -An advanced and automated sql injector that replace the work from a 1:1 sandboxed operation to a real life one in just 1 simple command. +# SQLbase – Security Scanning Toolkit + +Cross-platform (Linux, Windows, macOS) security scanning: SQL injection scanning, dynamic testing, code fixing, and remediation. Uses `pathlib` and portable paths throughout. + +## Install + +```bash +python -m venv .venv +# Linux/macOS: +.venv/bin/activate +# Windows: +.venv\Scripts\activate +pip install -r requirements.txt +pip install -e . +``` + +Or run without install (from repo root): + +```bash +# Linux/macOS/Windows (same commands) +PYTHONPATH=. python -m sqlbase scan . +PYTHONPATH=. python -m sqlbase predict . +PYTHONPATH=. python -m sqlbase remediate SQL_INJECTION python +``` + +## Usage + +```bash +# Scan path for SQL injection patterns (file or directory) +python -m sqlbase scan [path] [-o report.json] [--fail-on-findings] + +# Predict vulnerability likelihood (heuristic/ML-ready) +python -m sqlbase predict [path] + +# Get remediation for a vulnerability type and language +python -m sqlbase remediate SQL_INJECTION python +``` + +Programmatic use: + +```python +from pathlib import Path +from sqlbase.scanner import SQLInjectionScanner +from sqlbase.tester import DynamicSQLiTester +from sqlbase.fixer import SqliCodeFixer +from sqlbase.remediation import RemediationKnowledgeBase +from sqlbase.injector import SecurityPatternInjector +from sqlbase.predictor import VulnerabilityPredictor + +scanner = SQLInjectionScanner() +for v in scanner.scan_path(Path("src")): + print(v["file"], v["line"], v["type"]) +``` + +## CI + +GitHub Actions: `.github/workflows/security-scan.yml` runs on **ubuntu-latest**, **windows-latest**, and **macos-latest** on push/PR. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..84efe81 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +# SQLbase – cross-platform security scanning toolkit diff --git a/__main__.py b/__main__.py new file mode 100644 index 0000000..013d410 --- /dev/null +++ b/__main__.py @@ -0,0 +1,66 @@ +""" +CLI entrypoint: python -m sqlbase [scan|predict|remediate] ... +Cross-platform: Linux, Windows, macOS. +""" +import argparse +import json +import sys +from pathlib import Path + + +def cmd_scan(args) -> int: + from sqlbase.scanner import SQLInjectionScanner + scanner = SQLInjectionScanner() + path = Path(args.path).resolve() + results = scanner.scan_path(path, extensions=args.extensions) + out = json.dumps(results, indent=2) + if args.output: + Path(args.output).write_text(out, encoding="utf-8") + print(f"Wrote {len(results)} findings to {args.output}", file=sys.stderr) + else: + print(out) + return 0 if not args.fail_on_findings or len(results) == 0 else 1 + + +def cmd_predict(args) -> int: + from sqlbase.predictor import VulnerabilityPredictor + predictor = VulnerabilityPredictor() + result = predictor.predict_vulnerability_likelihood(Path(args.path)) + print(json.dumps(result, indent=2)) + return 0 + + +def cmd_remediate(args) -> int: + from sqlbase.remediation import RemediationKnowledgeBase + kb = RemediationKnowledgeBase() + r = kb.get_remediation(args.type, args.language) + print(json.dumps(r, indent=2)) + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description="SQLbase security toolkit") + sub = parser.add_subparsers(dest="command", required=True) + # scan + p_scan = sub.add_parser("scan", help="Scan path for SQL injection patterns") + p_scan.add_argument("path", nargs="?", default=".", help="File or directory to scan") + p_scan.add_argument("-o", "--output", metavar="FILE", help="Write JSON report to FILE (cross-platform path)") + p_scan.add_argument("--fail-on-findings", action="store_true", help="Exit 1 if any finding") + p_scan.add_argument("--extensions", nargs="+", default=[".py", ".java", ".js", ".ts", ".php", ".rb", ".go", ".cs"], help="File extensions") + p_scan.set_defaults(func=cmd_scan) + # predict + p_predict = sub.add_parser("predict", help="Predict vulnerability likelihood") + p_predict.add_argument("path", nargs="?", default=".", help="File or directory") + p_predict.set_defaults(func=cmd_predict) + # remediate + p_rem = sub.add_parser("remediate", help="Get remediation for vulnerability type + language") + p_rem.add_argument("type", help="e.g. SQL_INJECTION") + p_rem.add_argument("language", help="e.g. python, java") + p_rem.set_defaults(func=cmd_remediate) + + args = parser.parse_args() + return args.func(args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/filters.py b/filters.py new file mode 100644 index 0000000..923a720 --- /dev/null +++ b/filters.py @@ -0,0 +1,65 @@ +""" +Security filter stubs for intercepting filter pattern. Cross-platform. +""" +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + + +class BaseFilter(ABC): + @abstractmethod + def apply(self, data: Any) -> Any: + pass + + @abstractmethod + def name(self) -> str: + pass + + +class SQLInjectionFilter(BaseFilter): + def name(self) -> str: + return "SQLi" + + def apply(self, data: Any) -> Any: + if isinstance(data, str): + return data.replace("'", "''").replace("\\", "\\\\") + if isinstance(data, dict): + return {k: self.apply(v) for k, v in data.items()} + if isinstance(data, (list, tuple)): + return type(data)(self.apply(x) for x in data) + return data + + +class XSSFilter(BaseFilter): + def name(self) -> str: + return "XSS" + + def apply(self, data: Any) -> Any: + if isinstance(data, str): + return ( + data.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) + if isinstance(data, dict): + return {k: self.apply(v) for k, v in data.items()} + if isinstance(data, (list, tuple)): + return type(data)(self.apply(x) for x in data) + return data + + +class CommandInjectionFilter(BaseFilter): + def name(self) -> str: + return "CommandInjection" + + def apply(self, data: Any) -> Any: + if isinstance(data, str): + for char in [";", "|", "&", "$", "`", "\n", "\r"]: + data = data.replace(char, "") + return data + if isinstance(data, dict): + return {k: self.apply(v) for k, v in data.items()} + if isinstance(data, (list, tuple)): + return type(data)(self.apply(x) for x in data) + return data diff --git a/fixer.py b/fixer.py new file mode 100644 index 0000000..435337a --- /dev/null +++ b/fixer.py @@ -0,0 +1,51 @@ +""" +SQL injection code fixer. Supports Java and Python. Cross-platform. +""" +import re +from typing import Optional + + +class SqliCodeFixer: + def fix_concatenation(self, code_snippet: str, language: str) -> str: + language = (language or "").strip().lower() + if language == "java": + return self.fix_java_sqli(code_snippet) + if language in ("python", "py"): + return self.fix_python_sqli(code_snippet) + return code_snippet + + def fix_java_sqli(self, code: str) -> str: + # Pattern: String sql = "SELECT ..." + userInput; + pattern = r'(\w+)\s*=\s*["\'](SELECT\s+.*?)["\']\s*\+\s*(\w+)' + replacement = ( + r'String \1 = "\2 ?";\n' + r'PreparedStatement stmt = conn.prepareStatement(\1);\n' + r'stmt.setString(1, \3);' + ) + result = re.sub(pattern, replacement, code, flags=re.IGNORECASE) + # Also: "SELECT ..." + var + " ..." + pattern2 = r'["\'](SELECT\s+.*?)\s*\+\s*(\w+)\s*\+\s*["\'](.*?)["\']' + if not re.search(pattern2, result, re.IGNORECASE): + return result + result = re.sub( + pattern2, + r'"\1 ? \3"; PreparedStatement stmt = conn.prepareStatement(sql); stmt.setString(1, \2);', + result, + flags=re.IGNORECASE, + ) + return result + + def fix_python_sqli(self, code: str) -> str: + # cursor.execute("SELECT ... " + user_id) + pattern = r'\.(execute|executemany)\s*\(\s*["\']([^"\']*?)["\']\s*\+\s*(\w+)' + replacement = r'.\1("\2 %s", (\3,))' + result = re.sub(pattern, replacement, code) + # cursor.execute("SELECT ... %s" % user_id) -> parameterized + pattern2 = r'\.(execute|executemany)\s*\(\s*["\']([^"\']*?)["\']\s*%\s*(\w+)' + replacement2 = r'.\1("\2", (\3,))' + result = re.sub(pattern2, replacement2, result) + # f"SELECT ... {var}" + pattern3 = r'\.(execute|executemany)\s*\(\s*f["\']([^"\']*)\{(\w+)\}([^"\']*)["\']\s*\)' + replacement3 = r'.\1("\2%s\3", (\4,))' + result = re.sub(pattern3, replacement3, result) + return result diff --git a/injector.py b/injector.py new file mode 100644 index 0000000..7592d74 --- /dev/null +++ b/injector.py @@ -0,0 +1,78 @@ +""" +Security pattern injector: applies intercepting filters and injects filter calls. +Cross-platform: uses pathlib for Linux, Windows, macOS. +""" +from pathlib import Path +from typing import Dict, Optional + +from sqlbase.filters import ( + BaseFilter, + SQLInjectionFilter, + XSSFilter, + CommandInjectionFilter, +) + + +class FilterManager: + def __init__(self, filters: Dict[str, BaseFilter]) -> None: + self.filters = filters + + def apply_all(self, data): + for _name, f in self.filters.items(): + data = f.apply(data) + return data + + +class SecurityPatternInjector: + def __init__(self) -> None: + self.filter_manager: Optional[FilterManager] = None + + def apply_intercepting_filter(self, project_path: str | Path) -> None: + filters: Dict[str, BaseFilter] = { + "SQLi": SQLInjectionFilter(), + "XSS": XSSFilter(), + "CommandInjection": CommandInjectionFilter(), + } + self.filter_manager = self.generate_filter_manager(filters) + self.inject_filter_calls(Path(project_path)) + + def generate_filter_manager(self, filters: Dict[str, BaseFilter]) -> FilterManager: + return FilterManager(filters) + + def inject_filter_calls(self, project_path: Path) -> None: + project_path = Path(project_path).resolve() + if not project_path.exists(): + return + # Write a small bootstrap module that projects can import to get FilterManager + bootstrap_dir = project_path / ".security_filters" + bootstrap_dir.mkdir(exist_ok=True) + bootstrap_file = bootstrap_dir / "filter_manager.py" + content = '''""" +Auto-generated security filter bootstrap. Cross-platform. +Import: from .security_filters.filter_manager import get_filter_manager +""" +from pathlib import Path + +_filters = None + +def get_filter_manager(): + global _filters + if _filters is None: + from sqlbase.filters import SQLInjectionFilter, XSSFilter, CommandInjectionFilter + from sqlbase.injector import FilterManager + _filters = FilterManager({ + "SQLi": SQLInjectionFilter(), + "XSS": XSSFilter(), + "CommandInjection": CommandInjectionFilter(), + }) + return _filters + +def apply_security_filters(data): + return get_filter_manager().apply_all(data) +''' + bootstrap_file.write_text(content, encoding="utf-8") + init_file = bootstrap_dir / "__init__.py" + init_file.write_text( + "from .filter_manager import get_filter_manager, apply_security_filters\n", + encoding="utf-8", + ) diff --git a/predictor.py b/predictor.py new file mode 100644 index 0000000..a8d8f95 --- /dev/null +++ b/predictor.py @@ -0,0 +1,113 @@ +""" +Vulnerability predictor: heuristic-based (ML-ready). Cross-platform. +""" +from pathlib import Path +from typing import Dict, Any, List, Optional +import re + + +class VulnerabilityPredictor: + def __init__(self) -> None: + self.model = self.load_trained_model() + self.features = [ + "code_complexity", + "input_sources_count", + "database_interactions", + "authentication_points", + ] + + def load_trained_model(self) -> Optional[Any]: + try: + from sklearn.ensemble import RandomForestClassifier + import numpy as np + # Placeholder: no real training data; use simple heuristic weights + clf = RandomForestClassifier(n_estimators=10, random_state=42) + # Dummy fit so predict doesn't fail; real usage would load a serialized model + X = np.zeros((5, 4)) + y = np.array([0, 0, 1, 0, 1]) + clf.fit(X, y) + return clf + except ImportError: + return None + + def extract_features(self, codebase: str | Path) -> List[float]: + path = Path(codebase) + text = "" + if path.is_file(): + try: + text = path.read_text(encoding="utf-8", errors="replace") + except (OSError, PermissionError): + pass + elif path.is_dir(): + for ext in [".py", ".java", ".js", ".ts", ".php"]: + for f in path.rglob(f"*{ext}"): + try: + text += f.read_text(encoding="utf-8", errors="replace") + "\n" + except (OSError, PermissionError): + continue + else: + text = str(codebase) + + # Heuristic features + code_complexity = min(1.0, (len(text) / 10000) + (text.count("\n") / 500) * 0.1) + input_sources = len(re.findall(r"(input|request\.(get|post)|argv|getParameter)", text, re.I)) + input_sources_count = min(1.0, input_sources / 20) + db_interactions = len( + re.findall( + r"(execute|query|raw|prepareStatement|SELECT|INSERT|UPDATE|DELETE)", + text, + re.I, + ) + ) + database_interactions = min(1.0, db_interactions / 30) + auth_points = len( + re.findall(r"(password|login|auth|session|token|credential)", text, re.I) + ) + authentication_points = min(1.0, auth_points / 15) + + return [ + code_complexity, + input_sources_count, + database_interactions, + authentication_points, + ] + + def generate_recommendations(self, predictions: Dict[str, float]) -> List[str]: + recs: List[str] = [] + if predictions.get("sqli", 0) > 0.5: + recs.append("Use parameterized queries / PreparedStatement for all DB access.") + if predictions.get("xss", 0) > 0.5: + recs.append("Escape user-controlled output; consider CSP and encoding libraries.") + if not recs: + recs.append("Review input validation and output encoding.") + return recs + + def predict_vulnerability_likelihood( + self, codebase: str | Path + ) -> Dict[str, Any]: + features = self.extract_features(codebase) + predictions = {"sqli": 0.0, "xss": 0.0} + if self.model is not None: + try: + import numpy as np + X = np.array([features]) + pred = self.model.predict_proba(X) + if pred.shape[1] >= 2: + predictions["sqli"] = float(pred[0][1]) + predictions["xss"] = float(pred[0][1]) * 0.8 + else: + predictions["sqli"] = 0.3 * (features[1] + features[2]) + predictions["xss"] = 0.3 * (features[1] + features[3]) + except Exception: + predictions["sqli"] = 0.3 * (features[1] + features[2]) + predictions["xss"] = 0.3 * (features[1] + features[3]) + else: + predictions["sqli"] = 0.3 * (features[1] + features[2]) + predictions["xss"] = 0.3 * (features[1] + features[3]) + + return { + "sql_injection_risk": round(predictions["sqli"], 4), + "xss_risk": round(predictions["xss"], 4), + "recommended_fixes": self.generate_recommendations(predictions), + "features": dict(zip(self.features, features)), + } diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4fb692d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=61", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "sqlbase" +version = "0.1.0" +description = "Cross-platform security scanning toolkit (SQL injection, XSS, remediation)" +readme = "README.md" +requires-python = ">=3.9" +dependencies = [ + "requests>=2.28.0", + "scikit-learn>=1.2.0", + "numpy>=1.24.0", +] + +[project.scripts] +sqlbase-scan = "sqlbase.__main__:main" + +[tool.setuptools.packages.find] +where = ["."] +include = ["sqlbase*"] diff --git a/remediation.py b/remediation.py new file mode 100644 index 0000000..08ab844 --- /dev/null +++ b/remediation.py @@ -0,0 +1,66 @@ +""" +Remediation knowledge base for SQL injection and related fixes. Cross-platform. +""" +from typing import Dict, Any, List + + +class RemediationKnowledgeBase: + def __init__(self) -> None: + self.patterns: Dict[str, Dict[str, Dict[str, Any]]] = { + "SQL_INJECTION": { + "java": { + "solution": "Use PreparedStatement", + "example": 'String sql = "SELECT * FROM users WHERE id = ?";', + "libraries": ["java.sql.PreparedStatement"], + "extra": "stmt.setString(1, userInput);", + }, + "python": { + "solution": "Use parameterized queries", + "example": 'cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))', + "libraries": ["psycopg2", "sqlite3", "SQLAlchemy", "mysql.connector"], + "extra": "Never use % or .format() on the query string; pass params as second argument.", + }, + "php": { + "solution": "Use PDO prepared statements", + "example": '$stmt = $pdo->prepare("SELECT * FROM users WHERE id = ?"); $stmt->execute([$id]);', + "libraries": ["PDO", "mysqli"], + }, + "csharp": { + "solution": "Use parameterized SqlCommand", + "example": 'cmd.CommandText = "SELECT * FROM users WHERE id = @id"; cmd.Parameters.AddWithValue("@id", id);', + "libraries": ["System.Data.SqlClient"], + }, + }, + "XSS": { + "python": { + "solution": "Escape output and use CSP", + "example": "from markupsafe import escape; escape(user_input)", + "libraries": ["markupsafe", "bleach"], + }, + "java": { + "solution": "Use OWASP Java Encoder", + "example": "Encoder.forHtml(userInput)", + "libraries": ["org.owasp.encoder"], + }, + }, + "COMMAND_INJECTION": { + "python": { + "solution": "Use subprocess with list args, never shell=True with user input", + "example": "subprocess.run([\"ls\", \"-la\"], capture_output=True)", + "libraries": ["subprocess"], + }, + }, + } + + def get_remediation( + self, vulnerability_type: str, language: str + ) -> Dict[str, Any]: + return ( + self.patterns.get(vulnerability_type, {}) + .get(language, {}) + .copy() + or {} + ) + + def get_languages_for_type(self, vulnerability_type: str) -> List[str]: + return list(self.patterns.get(vulnerability_type, {}).keys()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3f9d876 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +requests>=2.28.0 +scikit-learn>=1.2.0 +numpy>=1.24.0 diff --git a/scanner.py b/scanner.py new file mode 100644 index 0000000..2d20839 --- /dev/null +++ b/scanner.py @@ -0,0 +1,64 @@ +""" +SQL injection static scanner. Cross-platform: Linux, Windows, macOS. +Uses pathlib for portable paths. +""" +import re +from pathlib import Path +from typing import List, Dict, Any, Optional + + +class SQLInjectionScanner: + def __init__(self) -> None: + self.patterns = [ + (r"execute\s*\([^)]*\+[^)]*\)", "String concatenation in execute"), + (r"\.(execute|executemany)\s*\([^)]*%\s*s", "%-format in query"), + (r"\.(execute|executemany)\s*\([^)]*\.format\s*\(", "str.format in query"), + (r'"(?:SELECT|INSERT|UPDATE|DELETE)\s+[^"]*\{[^}]*\}', "F-string in SQL"), + (r"'(?:SELECT|INSERT|UPDATE|DELETE)\s+[^']*\{[^}]*\}", "F-string in SQL"), + (r"query\s*=\s*[^;]+;\s*query\s*\+=", "Query built with +="), + (r"Statement\.execute\s*\([^)]*\+", "Java statement concatenation"), + (r"createStatement\s*\(\s*\)\s*\.\s*execute\s*\([^)]*\+", "Statement + string"), + (r"raw\s*\(\s*[^)]*\+", "Raw query concatenation"), + (r"\.format\s*\([^)]*\)\s*\)\s*\.(execute|query)", "Format then execute"), + ] + + def scan_file(self, file_path: str | Path) -> List[Dict[str, Any]]: + vulnerabilities: List[Dict[str, Any]] = [] + path = Path(file_path) + if not path.exists(): + return vulnerabilities + try: + text = path.read_text(encoding="utf-8", errors="replace") + except (OSError, PermissionError): + return vulnerabilities + lines = text.splitlines() + for i, line in enumerate(lines): + for pattern, desc in self.patterns: + if re.search(pattern, line, re.IGNORECASE | re.DOTALL): + vulnerabilities.append({ + "line": i + 1, + "code": line.strip(), + "type": "SQL_INJECTION", + "description": desc, + }) + break + return vulnerabilities + + def scan_path(self, path: str | Path, extensions: Optional[List[str]] = None) -> List[Dict[str, Any]]: + path = Path(path).resolve() + extensions = extensions or [".py", ".java", ".js", ".ts", ".php", ".rb", ".go", ".cs"] + results: List[Dict[str, Any]] = [] + if path.is_file(): + for v in self.scan_file(path): + v["file"] = str(path) + results.append(v) + return results + for ext in extensions: + for f in path.rglob(f"*{ext}"): + try: + for v in self.scan_file(f): + v["file"] = str(f) + results.append(v) + except (OSError, PermissionError): + continue + return results diff --git a/tester.py b/tester.py new file mode 100644 index 0000000..f600714 --- /dev/null +++ b/tester.py @@ -0,0 +1,88 @@ +""" +Dynamic SQL injection tester. Cross-platform; uses requests with timeouts. +""" +import re +from typing import List, Dict, Any, Optional +from urllib.parse import urljoin + +import requests + + +class DynamicSQLiTester: + def __init__(self, target_url: str, timeout: float = 10.0, verify_ssl: bool = True) -> None: + self.target_url = target_url.rstrip("/") + self.timeout = timeout + self.verify_ssl = verify_ssl + self.session = requests.Session() + self.session.verify = verify_ssl + self.payloads = [ + "' OR '1'='1", + "'; DROP TABLE users; --", + "' UNION SELECT NULL --", + "1' OR '1'='1' --", + "1 OR 1=1", + "admin'--", + "' OR 1=1--", + "1; SELECT pg_sleep(5)--", + ] + + def is_vulnerable(self, response: requests.Response, param: str, payload: str) -> bool: + text = (response.text or "").lower() + # Error-based indicators (cross-DB common messages) + error_indicators = [ + "sql syntax", + "syntax error", + "mysql_fetch", + "pg_query", + "sqlite_", + "ora-01", + "unclosed quotation", + "quoted string not properly terminated", + "unexpected end of sql", + "warning: mysql", + "valid mysql result", + "myisam", + "mysqli", + "postgresql", + "sqlstate", + ] + for indicator in error_indicators: + if indicator in text: + return True + # Optional: compare with baseline (no payload) response length/content + return False + + def test_endpoint( + self, + endpoint: str, + params: Dict[str, str], + method: str = "POST", + headers: Optional[Dict[str, str]] = None, + ) -> List[Dict[str, Any]]: + vulnerabilities: List[Dict[str, Any]] = [] + url = endpoint if endpoint.startswith("http") else urljoin(self.target_url + "/", endpoint) + headers = headers or {"Content-Type": "application/x-www-form-urlencoded"} + + for param in params: + for payload in self.payloads: + test_params = dict(params) + test_params[param] = payload + try: + if method.upper() == "POST": + response = self.session.post( + url, data=test_params, headers=headers, timeout=self.timeout + ) + else: + response = self.session.get( + url, params=test_params, headers=headers, timeout=self.timeout + ) + if self.is_vulnerable(response, param, payload): + vulnerabilities.append({ + "parameter": param, + "payload": payload, + "endpoint": url, + "status_code": response.status_code, + }) + except requests.RequestException: + continue + return vulnerabilities