From 834f5927e19eea3b4eb6eac56f9fe84e6b92f052 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 03:10:36 -0300 Subject: [PATCH 01/20] chore(config): migrate validators/config to Pydantic v2 style --- core/config.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/config.py b/core/config.py index e5ef06617..835bf6660 100644 --- a/core/config.py +++ b/core/config.py @@ -1,6 +1,6 @@ from typing import Dict, Optional -from pydantic import __version__ as PYDANTIC_VERSION, validator +from pydantic import ConfigDict, __version__ as PYDANTIC_VERSION, field_validator try: from pydantic_settings import BaseSettings @@ -79,19 +79,20 @@ class Settings(BaseSettings): RAG_RELEVANCE_THRESHOLD: float = 0.3 RAG_ENV: str = "dev" - class Config: - env_file = ".env" - case_sensitive = True - extra = "allow" + model_config = ConfigDict( + env_file=".env", + case_sensitive=True, + extra="allow", + ) - @validator("DATABASE_URL") + @field_validator("DATABASE_URL") @classmethod def validate_database_url(cls, v): if not v: raise ValueError("DATABASE_URL no puede estar vacío") return v - @validator("ML_ENSEMBLE_WEIGHTS") + @field_validator("ML_ENSEMBLE_WEIGHTS") @classmethod def validate_ensemble_weights(cls, v): total = sum(v.values()) From e893cfa1642c29d807187db81289ce99011c4302 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:01:24 -0300 Subject: [PATCH 02/20] chore(scope): add human-confirmed churn override --- .guardian/churn_approval.yml | 5 +++ tests/test_guardian_scope_check.py | 27 ++++++++++++- tools/guardian_scope_check.py | 65 +++++++++++++++++++++++++++++- 3 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 .guardian/churn_approval.yml diff --git a/.guardian/churn_approval.yml b/.guardian/churn_approval.yml new file mode 100644 index 000000000..4a34c75d1 --- /dev/null +++ b/.guardian/churn_approval.yml @@ -0,0 +1,5 @@ +approved_by: Cheewye +reason: Pydantic v2 config hygiene: migrate deprecated validators/config; behavior preserved; tests green. +allow_paths: + - core/config.py +pr: 85 diff --git a/tests/test_guardian_scope_check.py b/tests/test_guardian_scope_check.py index 3279cd1a1..5f49db2f6 100644 --- a/tests/test_guardian_scope_check.py +++ b/tests/test_guardian_scope_check.py @@ -1,4 +1,11 @@ -from tools.guardian_scope_check import ScopeConfig, evaluate_files, format_violation_message +from pathlib import Path + +from tools.guardian_scope_check import ( + ScopeConfig, + evaluate_files, + format_violation_message, + _load_churn_approval, +) def _scope_cfg(): @@ -32,3 +39,21 @@ def test_violation_message_contains_out_of_scope(): message = format_violation_message(violations) assert "out of scope changes detected" in message assert "frontend/src/App.tsx" in message + + +def test_churn_approval_loader_requires_fields(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text("approved_by: \nreason: \nallow_paths:\n - core/config.py\n", encoding="utf-8") + assert _load_churn_approval(approval_path) is None + + +def test_churn_approval_loader_parses_allow_paths(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text( + "approved_by: Cheewye\nreason: ok\nallow_paths:\n - core/config.py\n - tests/test_guardian_scope_check.py\n", + encoding="utf-8", + ) + approval = _load_churn_approval(approval_path) + assert approval is not None + assert approval.approved_by == "Cheewye" + assert "core/config.py" in approval.allow_paths diff --git a/tools/guardian_scope_check.py b/tools/guardian_scope_check.py index 211b81cd0..58f58cd89 100644 --- a/tools/guardian_scope_check.py +++ b/tools/guardian_scope_check.py @@ -7,7 +7,7 @@ import sys from dataclasses import dataclass from pathlib import Path -from typing import Iterable, List +from typing import Iterable, List, Optional @dataclass @@ -18,6 +18,14 @@ class ScopeConfig: allow_globs: List[str] +@dataclass +class ChurnApproval: + approved_by: str + reason: str + allow_paths: List[str] + pr: Optional[str] = None + + def _load_config(path: Path) -> ScopeConfig: data = json.loads(path.read_text(encoding="utf-8")) return ScopeConfig( @@ -64,10 +72,50 @@ def format_violation_message(violations: List[str]) -> str: lines = ["out of scope changes detected:"] for path in violations: lines.append(f"- {path}") - lines.append("tip: limit changes to allowed prefixes or use --allow-churn with human confirmation") + lines.append("tip: limit changes to allowed prefixes or use an approval file (.guardian/churn_approval.yml)") return "\n".join(lines) +def _load_churn_approval(path: Path) -> Optional[ChurnApproval]: + if not path.exists(): + return None + approved_by = "" + reason = "" + pr = None + allow_paths: List[str] = [] + in_allow_paths = False + for raw in path.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + if line.startswith("approved_by:"): + approved_by = line.split(":", 1)[1].strip() + in_allow_paths = False + continue + if line.startswith("reason:"): + reason = line.split(":", 1)[1].strip() + in_allow_paths = False + continue + if line.startswith("pr:"): + pr = line.split(":", 1)[1].strip() + in_allow_paths = False + continue + if line.startswith("allow_paths:"): + in_allow_paths = True + continue + if in_allow_paths and line.startswith("-"): + allow_paths.append(line.lstrip("-").strip()) + continue + if not approved_by or not reason: + return None + return ChurnApproval( + approved_by=approved_by, + reason=reason, + allow_paths=allow_paths, + pr=pr, + ) + + def main() -> int: parser = argparse.ArgumentParser(description="guardian scope check") parser.add_argument("--base", default="origin/main") @@ -75,6 +123,7 @@ def main() -> int: parser.add_argument("--config", default="config/guardian_scope.json") parser.add_argument("--mode", default="guardian") parser.add_argument("--allow-churn", action="store_true") + parser.add_argument("--approval-file", default=".guardian/churn_approval.yml") parser.add_argument("--files", nargs="*") args = parser.parse_args() @@ -84,6 +133,18 @@ def main() -> int: if violations: message = format_violation_message(violations) + approval = _load_churn_approval(Path(args.approval_file)) + if approval: + uncovered = [v for v in violations if v not in approval.allow_paths] + if not uncovered: + print( + f"CHURN OVERRIDE ACCEPTED (human confirmed): {approval.approved_by} reason={approval.reason}" + ) + return 0 + print("churn approval present but does not cover all out-of-scope paths") + print("uncovered:") + for path in uncovered: + print(f"- {path}") if args.allow_churn: print("WARNING: scope violations detected but allow-churn is set") print(message) From 7e3ff9ac32b8253ea96c68e12e105744be8b2f09 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:04:37 -0300 Subject: [PATCH 03/20] fix(scope): ignore approval file in churn override --- tests/test_guardian_scope_check.py | 16 ++++++++++++++++ tools/guardian_scope_check.py | 9 ++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/test_guardian_scope_check.py b/tests/test_guardian_scope_check.py index 5f49db2f6..848e9ee66 100644 --- a/tests/test_guardian_scope_check.py +++ b/tests/test_guardian_scope_check.py @@ -57,3 +57,19 @@ def test_churn_approval_loader_parses_allow_paths(tmp_path: Path): assert approval is not None assert approval.approved_by == "Cheewye" assert "core/config.py" in approval.allow_paths + + +def test_churn_approval_ignores_approval_file_as_violation(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text( + "approved_by: Cheewye\nreason: ok\nallow_paths:\n - core/config.py\n", + encoding="utf-8", + ) + approval = _load_churn_approval(approval_path) + assert approval is not None + approval_rel = approval_path.as_posix().lstrip("./") + violations = [approval_rel, "core/config.py"] + uncovered = [ + v for v in violations if v not in approval.allow_paths and v != approval_rel + ] + assert uncovered == [] diff --git a/tools/guardian_scope_check.py b/tools/guardian_scope_check.py index 58f58cd89..b1e27bbee 100644 --- a/tools/guardian_scope_check.py +++ b/tools/guardian_scope_check.py @@ -135,7 +135,14 @@ def main() -> int: message = format_violation_message(violations) approval = _load_churn_approval(Path(args.approval_file)) if approval: - uncovered = [v for v in violations if v not in approval.allow_paths] + approval_path = Path(args.approval_file).as_posix() + if approval_path.startswith("./"): + approval_path = approval_path[2:] + uncovered = [ + v + for v in violations + if v not in approval.allow_paths and v != approval_path + ] if not uncovered: print( f"CHURN OVERRIDE ACCEPTED (human confirmed): {approval.approved_by} reason={approval.reason}" From fef94b0e318ffcdf8a72b75b31f029726be9b5fb Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:15:14 -0300 Subject: [PATCH 04/20] chore(sap): harden churn approval schema + output --- tools/guardian_scope_check.py | 221 ++++++++++++++++++++++++++++------ 1 file changed, 184 insertions(+), 37 deletions(-) diff --git a/tools/guardian_scope_check.py b/tools/guardian_scope_check.py index b1e27bbee..e9f039007 100644 --- a/tools/guardian_scope_check.py +++ b/tools/guardian_scope_check.py @@ -3,11 +3,13 @@ import argparse import json +import re import subprocess import sys from dataclasses import dataclass +from datetime import date, datetime, time, timezone from pathlib import Path -from typing import Iterable, List, Optional +from typing import Dict, Iterable, List, Optional, Tuple @dataclass @@ -24,6 +26,9 @@ class ChurnApproval: reason: str allow_paths: List[str] pr: Optional[str] = None + expires_at: Optional[datetime] = None + base_ref: Optional[str] = None + sha: Optional[str] = None def _load_config(path: Path) -> ScopeConfig: @@ -68,54 +73,177 @@ def _changed_files(base: str, head: str) -> List[str]: return [line for line in result.stdout.splitlines() if line.strip()] -def format_violation_message(violations: List[str]) -> str: +def format_violation_message(violations: List[str], scope: ScopeConfig) -> str: lines = ["out of scope changes detected:"] for path in violations: lines.append(f"- {path}") - lines.append("tip: limit changes to allowed prefixes or use an approval file (.guardian/churn_approval.yml)") + if scope.allow_prefixes: + lines.append("allowed prefixes:") + for prefix in scope.allow_prefixes: + lines.append(f"- {prefix}") + if scope.allow_exact: + lines.append("allowed exact paths:") + for path in scope.allow_exact: + lines.append(f"- {path}") + lines.append("approval template:") + lines.append("approved_by: ") + lines.append("reason: ") + lines.append("allow_paths:") + lines.append(" - path/to/file.py") + lines.append("expires_at: 2026-01-31") return "\n".join(lines) -def _load_churn_approval(path: Path) -> Optional[ChurnApproval]: - if not path.exists(): - return None - approved_by = "" - reason = "" - pr = None +_ALLOWED_APPROVAL_FIELDS = { + "approved_by", + "reason", + "allow_paths", + "pr", + "expires_at", + "base_ref", + "sha", +} + + +def _strip_quotes(value: str) -> str: + if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}: + return value[1:-1] + return value + + +def _normalize_relative_path(value: str) -> str: + path = value.replace("\\", "/").strip() + if path.startswith("./"): + path = path[2:] + return path + + +def _validate_allow_path(raw: str) -> Tuple[Optional[str], Optional[str]]: + if not raw: + return None, "allow_paths entry is empty" + value = _normalize_relative_path(raw) + if value.startswith("/") or re.match(r"^[A-Za-z]:/", value): + return None, f"allow_paths entry must be relative: {raw}" + if ".." in Path(value).parts: + return None, f"allow_paths entry cannot contain '..': {raw}" + if any(token in value for token in ["*", "?", "[", "]"]): + return None, f"allow_paths entry cannot contain glob/wildcards: {raw}" + if value.endswith("/"): + return None, f"allow_paths entry must be a file path, not a directory: {raw}" + if value in {".", ""}: + return None, f"allow_paths entry must be a file path: {raw}" + return value, None + + +def _parse_expires_at(value: str) -> Tuple[Optional[datetime], Optional[str]]: + cleaned = _strip_quotes(value.strip()) + if not cleaned: + return None, "expires_at cannot be empty" + try: + if "T" in cleaned: + if cleaned.endswith("Z"): + parsed = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%SZ") + return parsed.replace(tzinfo=timezone.utc), None + parsed = datetime.strptime(cleaned, "%Y-%m-%dT%H:%M:%S") + return parsed.replace(tzinfo=timezone.utc), None + parsed_date = date.fromisoformat(cleaned) + return datetime.combine(parsed_date, time.max, tzinfo=timezone.utc), None + except ValueError: + return None, f"expires_at must be ISO8601 date or datetime: {value}" + + +def _parse_churn_approval_lines(lines: List[str]) -> Tuple[Dict[str, str], List[str], List[str]]: + data: Dict[str, str] = {} allow_paths: List[str] = [] + errors: List[str] = [] in_allow_paths = False - for raw in path.read_text(encoding="utf-8").splitlines(): + for raw in lines: line = raw.strip() if not line or line.startswith("#"): continue - if line.startswith("approved_by:"): - approved_by = line.split(":", 1)[1].strip() - in_allow_paths = False + if in_allow_paths and line.startswith("-"): + allow_paths.append(line.lstrip("-").strip()) continue - if line.startswith("reason:"): - reason = line.split(":", 1)[1].strip() - in_allow_paths = False + if ":" not in line: + errors.append(f"invalid line: {raw}") continue - if line.startswith("pr:"): - pr = line.split(":", 1)[1].strip() - in_allow_paths = False + key, value = line.split(":", 1) + key = key.strip() + value = value.strip() + if key not in _ALLOWED_APPROVAL_FIELDS: + errors.append(f"unknown field: {key}") continue - if line.startswith("allow_paths:"): + if key == "allow_paths": + if value: + errors.append("allow_paths must be a list (use '-' entries)") in_allow_paths = True continue - if in_allow_paths and line.startswith("-"): - allow_paths.append(line.lstrip("-").strip()) - continue - if not approved_by or not reason: + in_allow_paths = False + data[key] = value + if allow_paths: + data["allow_paths"] = "LIST" + return data, allow_paths, errors + + +def _load_churn_approval(path: Path, now: Optional[datetime] = None) -> Optional[ChurnApproval]: + if not path.exists(): return None + lines = path.read_text(encoding="utf-8").splitlines() + data, raw_allow_paths, errors = _parse_churn_approval_lines(lines) + if errors: + raise ValueError("; ".join(errors)) + approved_by = _strip_quotes(data.get("approved_by", "")).strip() + reason = _strip_quotes(data.get("reason", "")).strip() + if not approved_by: + raise ValueError("approved_by is required") + if not reason or len(reason) < 10: + raise ValueError("reason is required and must be at least 10 chars") + if "allow_paths" not in data: + raise ValueError("allow_paths is required") + if not raw_allow_paths: + raise ValueError("allow_paths must include at least one path") + normalized_paths: List[str] = [] + for raw in raw_allow_paths: + normalized, error = _validate_allow_path(raw) + if error: + raise ValueError(error) + normalized_paths.append(normalized) + pr = _strip_quotes(data.get("pr", "")).strip() or None + base_ref = _strip_quotes(data.get("base_ref", "")).strip() or None + sha = _strip_quotes(data.get("sha", "")).strip() or None + if sha and not re.match(r"^[0-9a-fA-F]{7,40}$", sha): + raise ValueError("sha must be a git SHA (7-40 hex chars)") + expires_at_value = data.get("expires_at", "") + expires_at = None + if expires_at_value: + expires_at, error = _parse_expires_at(expires_at_value) + if error: + raise ValueError(error) + now_dt = now or datetime.now(timezone.utc) + if expires_at < now_dt: + raise ValueError("expires_at is in the past") return ChurnApproval( approved_by=approved_by, reason=reason, - allow_paths=allow_paths, + allow_paths=sorted(set(normalized_paths)), pr=pr, + expires_at=expires_at, + base_ref=base_ref, + sha=sha, ) +def _approval_paths(approval: ChurnApproval, approval_path: str) -> List[str]: + approved = list(approval.allow_paths) + if approval_path not in approved: + approved.append(approval_path) + return sorted(set(approved)) + + +def _normalize_violations(violations: List[str]) -> List[str]: + return sorted({_normalize_relative_path(v) for v in violations}) + + def main() -> int: parser = argparse.ArgumentParser(description="guardian scope check") parser.add_argument("--base", default="origin/main") @@ -132,21 +260,40 @@ def main() -> int: violations = evaluate_files(files, scope) if violations: - message = format_violation_message(violations) - approval = _load_churn_approval(Path(args.approval_file)) + message = format_violation_message(violations, scope) + approval_path = _normalize_relative_path(Path(args.approval_file).as_posix()) + approval = None + approval_error = None + try: + approval = _load_churn_approval(Path(args.approval_file)) + except ValueError as exc: + approval_error = str(exc) + if approval_error: + print(f"ERROR: churn approval invalid: {approval_error}") if approval: - approval_path = Path(args.approval_file).as_posix() - if approval_path.startswith("./"): - approval_path = approval_path[2:] - uncovered = [ - v - for v in violations - if v not in approval.allow_paths and v != approval_path - ] - if not uncovered: + if approval.base_ref and approval.base_ref != args.base: print( - f"CHURN OVERRIDE ACCEPTED (human confirmed): {approval.approved_by} reason={approval.reason}" + "ERROR: churn approval base_ref does not match --base " + f"({approval.base_ref} != {args.base})" ) + return 2 + normalized_violations = _normalize_violations(violations) + approved_paths = _approval_paths(approval, approval_path) + uncovered = [v for v in normalized_violations if v not in approved_paths] + if not uncovered: + print("CHURN OVERRIDE ACCEPTED") + print(f"approved_by: {approval.approved_by}") + print(f"reason: {approval.reason}") + if approval.expires_at: + print(f"expires_at: {approval.expires_at.isoformat()}") + else: + print("WARNING: expires_at missing; add an expiry for safety") + print("approved paths:") + for path in approved_paths: + print(f"- {path}") + print("detected out-of-scope paths:") + for path in normalized_violations: + print(f"- {path}") return 0 print("churn approval present but does not cover all out-of-scope paths") print("uncovered:") From 96bba58830941cf10173043e361b1eb9ec1b1b5d Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:15:19 -0300 Subject: [PATCH 05/20] test(sap): add churn approval validation tests --- tests/test_guardian_scope_check.py | 88 +++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/tests/test_guardian_scope_check.py b/tests/test_guardian_scope_check.py index 848e9ee66..da0617f7c 100644 --- a/tests/test_guardian_scope_check.py +++ b/tests/test_guardian_scope_check.py @@ -5,6 +5,8 @@ evaluate_files, format_violation_message, _load_churn_approval, + _approval_paths, + _normalize_violations, ) @@ -36,7 +38,7 @@ def test_violation_message_contains_out_of_scope(): files = ["frontend/src/App.tsx"] violations = evaluate_files(files, scope) assert violations - message = format_violation_message(violations) + message = format_violation_message(violations, scope) assert "out of scope changes detected" in message assert "frontend/src/App.tsx" in message @@ -44,13 +46,17 @@ def test_violation_message_contains_out_of_scope(): def test_churn_approval_loader_requires_fields(tmp_path: Path): approval_path = tmp_path / "churn_approval.yml" approval_path.write_text("approved_by: \nreason: \nallow_paths:\n - core/config.py\n", encoding="utf-8") - assert _load_churn_approval(approval_path) is None + try: + _load_churn_approval(approval_path) + assert False, "expected ValueError" + except ValueError as exc: + assert "approved_by" in str(exc) or "reason" in str(exc) def test_churn_approval_loader_parses_allow_paths(tmp_path: Path): approval_path = tmp_path / "churn_approval.yml" approval_path.write_text( - "approved_by: Cheewye\nreason: ok\nallow_paths:\n - core/config.py\n - tests/test_guardian_scope_check.py\n", + "approved_by: Cheewye\nreason: ok enough reason\nallow_paths:\n - core/config.py\n - tests/test_guardian_scope_check.py\n", encoding="utf-8", ) approval = _load_churn_approval(approval_path) @@ -62,14 +68,80 @@ def test_churn_approval_loader_parses_allow_paths(tmp_path: Path): def test_churn_approval_ignores_approval_file_as_violation(tmp_path: Path): approval_path = tmp_path / "churn_approval.yml" approval_path.write_text( - "approved_by: Cheewye\nreason: ok\nallow_paths:\n - core/config.py\n", + "approved_by: Cheewye\nreason: ok enough reason\nallow_paths:\n - core/config.py\n", encoding="utf-8", ) approval = _load_churn_approval(approval_path) assert approval is not None - approval_rel = approval_path.as_posix().lstrip("./") + approval_rel = approval_path.as_posix() + approved_paths = _approval_paths(approval, approval_rel) violations = [approval_rel, "core/config.py"] - uncovered = [ - v for v in violations if v not in approval.allow_paths and v != approval_rel - ] + uncovered = [v for v in _normalize_violations(violations) if v not in approved_paths] assert uncovered == [] + + +def test_churn_approval_rejects_unknown_field(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text( + "approved_by: Cheewye\nreason: ok enough reason\nunknown: nope\nallow_paths:\n - core/config.py\n", + encoding="utf-8", + ) + try: + _load_churn_approval(approval_path) + assert False, "expected ValueError" + except ValueError as exc: + assert "unknown field" in str(exc) + + +def test_churn_approval_rejects_glob_or_directory_path(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text( + "approved_by: Cheewye\nreason: ok enough reason\nallow_paths:\n - frontend/**\n", + encoding="utf-8", + ) + try: + _load_churn_approval(approval_path) + assert False, "expected ValueError" + except ValueError as exc: + assert "glob" in str(exc) + + +def test_churn_approval_rejects_expired(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text( + "approved_by: Cheewye\nreason: ok enough reason\nexpires_at: 2000-01-01\nallow_paths:\n - core/config.py\n", + encoding="utf-8", + ) + try: + _load_churn_approval(approval_path) + assert False, "expected ValueError" + except ValueError as exc: + assert "expires_at" in str(exc) + + +def test_churn_approval_covers_all_out_of_scope_paths(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text( + "approved_by: Cheewye\nreason: ok enough reason\nallow_paths:\n - core/config.py\n - tools/guardian_scope_check.py\n", + encoding="utf-8", + ) + approval = _load_churn_approval(approval_path) + approval_rel = approval_path.as_posix() + approved_paths = _approval_paths(approval, approval_rel) + violations = [approval_rel, "core/config.py", "tools/guardian_scope_check.py"] + uncovered = [v for v in _normalize_violations(violations) if v not in approved_paths] + assert uncovered == [] + + +def test_churn_approval_partial_coverage_fails(tmp_path: Path): + approval_path = tmp_path / "churn_approval.yml" + approval_path.write_text( + "approved_by: Cheewye\nreason: ok enough reason\nallow_paths:\n - core/config.py\n", + encoding="utf-8", + ) + approval = _load_churn_approval(approval_path) + approval_rel = approval_path.as_posix() + approved_paths = _approval_paths(approval, approval_rel) + violations = [approval_rel, "core/config.py", "tools/guardian_scope_check.py"] + uncovered = [v for v in _normalize_violations(violations) if v not in approved_paths] + assert "tools/guardian_scope_check.py" in uncovered From 95cf445fa93077d21ba58361bc81cfd5b64558b7 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:15:22 -0300 Subject: [PATCH 06/20] docs(sap): document churn approval schema --- docs/guardian/churn_approval.md | 53 +++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 docs/guardian/churn_approval.md diff --git a/docs/guardian/churn_approval.md b/docs/guardian/churn_approval.md new file mode 100644 index 000000000..8dcb193e7 --- /dev/null +++ b/docs/guardian/churn_approval.md @@ -0,0 +1,53 @@ +# Churn approval (scope-check) + +Este archivo describe el esquema estricto de `.guardian/churn_approval.yml` y las +reglas de seguridad aplicadas por `tools/guardian_scope_check.py`. + +## Esquema + +Campos requeridos: +- `approved_by`: string no vacío +- `reason`: string no vacío (mínimo 10 caracteres) +- `allow_paths`: lista no vacía de rutas de archivos explícitas + +Campos opcionales (recomendados): +- `pr`: número o string +- `expires_at`: ISO8601 (`YYYY-MM-DD` o `YYYY-MM-DDTHH:MM:SSZ`) +- `base_ref`: string (por ejemplo, `origin/main`) +- `sha`: SHA de git (7-40 hex) + +Reglas estrictas: +- Se rechazan campos desconocidos. +- `allow_paths` no admite globs, wildcards, `..`, rutas absolutas ni directorios. + +## Plantilla mínima + +```yaml +approved_by: Nombre Apellido +reason: Motivo claro y auditable (min 10 chars) +allow_paths: + - core/config.py +expires_at: 2026-01-31 +``` + +## Ejemplo válido + +```yaml +approved_by: Cheewye +reason: Ajuste de higiene pydantic; comportamiento preservado. +allow_paths: + - core/config.py + - tools/guardian_scope_check.py +expires_at: 2026-02-15T23:59:59Z +base_ref: origin/main +pr: 86 +``` + +## Ejemplo inválido + +```yaml +approved_by: +reason: corto +allow_paths: + - frontend/** +``` From 3df8fa6c3078a3066c7e901f81cca4a545fe9059 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:46:29 -0300 Subject: [PATCH 07/20] chore(docs): add _sandbox convention + ignore --- .gitignore | 3 +++ docs/_sandbox/README.md | 5 +++++ 2 files changed, 8 insertions(+) create mode 100644 docs/_sandbox/README.md diff --git a/.gitignore b/.gitignore index 366da4ba3..50439560e 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,9 @@ logs/ *.wav generated/ docs/deploy/prod_evidence.json +docs/_sandbox/ +!docs/_sandbox/ +!docs/_sandbox/README.md # Estados y runtime generados en ejecución data/ diff --git a/docs/_sandbox/README.md b/docs/_sandbox/README.md new file mode 100644 index 000000000..cec59d752 --- /dev/null +++ b/docs/_sandbox/README.md @@ -0,0 +1,5 @@ +# Docs Sandbox + +Este directorio es solo para notas locales o borradores temporales. +El contenido aquí se ignora por defecto en git. +No uses esta carpeta para documentación oficial. From 6b6ea13140e5bade92f355c47f74c31984460ef3 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:49:43 -0300 Subject: [PATCH 08/20] chore(sap): add churn approval for docs sandbox --- .guardian/churn_approval.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.guardian/churn_approval.yml b/.guardian/churn_approval.yml index 4a34c75d1..99d5bdddd 100644 --- a/.guardian/churn_approval.yml +++ b/.guardian/churn_approval.yml @@ -1,5 +1,7 @@ approved_by: Cheewye -reason: Pydantic v2 config hygiene: migrate deprecated validators/config; behavior preserved; tests green. +reason: Ignorar .gitignore para convenio docs/_sandbox sin tocar frontend. allow_paths: - - core/config.py -pr: 85 + - .gitignore +expires_at: 2026-03-01 +base_ref: origin/main +pr: 87 From 95d6a7c3dd45536be426ad75ad81696702b1976b Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:58:38 -0300 Subject: [PATCH 09/20] test(sap): add runtime ssot smoke --- tests/test_runtime_smoke_ssot.py | 91 ++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 tests/test_runtime_smoke_ssot.py diff --git a/tests/test_runtime_smoke_ssot.py b/tests/test_runtime_smoke_ssot.py new file mode 100644 index 000000000..b4c6ef515 --- /dev/null +++ b/tests/test_runtime_smoke_ssot.py @@ -0,0 +1,91 @@ +import json +from pathlib import Path + + +def _result_signature(result): + return { + "status": result.status, + "assistant_message": result.assistant_message, + "requires_human_confirmation": result.requires_human_confirmation, + "lane": result.provenance.lane, + "hit_rule_ids": result.provenance.hit_rule_ids, + "model_used": result.provenance.model_used, + "post_rewrites": result.provenance.post_rewrites, + } + + +def _last_telemetry_event(path: Path) -> dict: + lines = path.read_text(encoding="utf-8").strip().splitlines() + return json.loads(lines[-1]) + + +def test_runtime_smoke_ssot_pipeline(monkeypatch, tmp_path: Path): + from backend.core.brain_core import run_brain + import backend.core.brain_core as brain_core_mod + import backend.core.guardian_engine as ge_mod + import backend.core.response_postprocess as post_mod + + telemetry_path = tmp_path / "events.jsonl" + monkeypatch.setenv("IURI_GUARDIAN_TELEMETRY", "1") + monkeypatch.setenv("IURI_GUARDIAN_TELEMETRY_PATH", str(telemetry_path)) + + calls = {"crit_gate": 0, "evaluate": 0, "postprocess": 0} + call_order = [] + + original_crit_gate = brain_core_mod.crit_gate + + def wrapped_crit_gate(payload, *, engine=None): + calls["crit_gate"] += 1 + call_order.append("crit_gate") + return original_crit_gate(payload, engine=engine) + + monkeypatch.setattr(brain_core_mod, "crit_gate", wrapped_crit_gate) + + original_eval = ge_mod.GuardianEngine.evaluate + + def wrapped_eval(self, action, intent="", context=None, flags=None): + calls["evaluate"] += 1 + call_order.append("evaluate") + return original_eval(self, action=action, intent=intent, context=context, flags=flags) + + monkeypatch.setattr(ge_mod.GuardianEngine, "evaluate", wrapped_eval) + + original_post = post_mod.apply + + def wrapped_postprocess(text, decision, context): + calls["postprocess"] += 1 + call_order.append("postprocess") + return original_post(text, decision, context) + + monkeypatch.setattr(ge_mod, "apply_postprocess", wrapped_postprocess) + monkeypatch.setattr(brain_core_mod, "apply_postprocess", wrapped_postprocess) + + action = "abuso sexual infantil" + context = {"test_mode": True} + flags = {"lane": "default", "test_mode": True} + + result = run_brain(action, intent="chat", context=context, flags=flags) + + assert calls["crit_gate"] == 1 + assert calls["evaluate"] == 1 + assert calls["postprocess"] == 1 + assert call_order[:2] == ["crit_gate", "evaluate"] + + assert result.status == "BLOCK" + assert result.provenance.lane == "blocked" + assert isinstance(result.provenance.hit_rule_ids, list) + + signature_one = _result_signature(result) + + calls = {"crit_gate": 0, "evaluate": 0, "postprocess": 0} + call_order.clear() + + result_two = run_brain(action, intent="chat", context=context, flags=flags) + signature_two = _result_signature(result_two) + + assert signature_two == signature_one + + event = _last_telemetry_event(telemetry_path) + forbidden_keys = {"prompt", "input", "user_text", "raw_text", "raw_prompt", "full_text"} + assert forbidden_keys.isdisjoint(event.keys()) + assert action not in json.dumps(event, ensure_ascii=False) From 85a0ef085c13705c415e3e0eb3777512ed0d85ef Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 07:58:42 -0300 Subject: [PATCH 10/20] docs(sap): add ssot runtime smoke runbook --- docs/guardian/runtime_smoke_ssot.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 docs/guardian/runtime_smoke_ssot.md diff --git a/docs/guardian/runtime_smoke_ssot.md b/docs/guardian/runtime_smoke_ssot.md new file mode 100644 index 000000000..c73f2d464 --- /dev/null +++ b/docs/guardian/runtime_smoke_ssot.md @@ -0,0 +1,22 @@ +# Runtime SSOT Smoke Runbook + +## Qué verifica +- El entrypoint SSOT (`backend/core/brain_core.py::run_brain`) pasa por CritGate. +- `GuardianEngine.evaluate` corre exactamente una vez por request. +- `response_postprocess.apply` corre una sola vez por request (ruta bloqueada). +- Lanes de provenance correctos (`blocked` / `guardian_only` / `model_call`). +- Telemetría no registra el prompt por defecto (privacy). + +## Cómo correrlo +- `pytest -q tests/test_runtime_smoke_ssot.py` +- Recomendado junto a: + - `pytest -q tests/test_brain_core_ssot.py` + - `pytest -q tests/test_critgate_entrypoint.py` + - `pytest -q tests/test_antibypass_guardian_callsite.py` + - `pytest -q tests/test_antibypass_entrypoints.py` + +## Si falla +- **CritGate / evaluate**: revisar `backend/core/brain_core.py` y `backend/core/crit_gate.py`. +- **postprocess**: revisar `backend/core/guardian_engine.py` y `backend/core/response_postprocess.py`. +- **provenance**: revisar `backend/core/brain_core.py` (lane y post_rewrites). +- **telemetry**: revisar `backend/core/guardian_telemetry.py` y `tests/test_guardian_telemetry_privacy.py`. From 3c6c7d753b0960369ce2e2cc5f03ac92985016d4 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 08:08:32 -0300 Subject: [PATCH 11/20] chore(sap): add missing CRIT rules --- backend/core/guardian_engine.py | 12 ++++ config/guardian_rules.json | 107 ++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+) diff --git a/backend/core/guardian_engine.py b/backend/core/guardian_engine.py index 951f3aaea..ec016856a 100644 --- a/backend/core/guardian_engine.py +++ b/backend/core/guardian_engine.py @@ -272,6 +272,18 @@ def _matches_rule(self, action_lower: str, rule: Dict[str, Any]) -> bool: ): return True + if rule.get("id") == "RG-710": + if self._current_flags.get("bypass_critgate") or self._current_context.get( + "bypass_critgate" + ): + return True + + if rule.get("id") == "RG-740": + if self._current_flags.get("nondeterministic") or self._current_context.get( + "nondeterministic" + ): + return True + return False @staticmethod diff --git a/config/guardian_rules.json b/config/guardian_rules.json index 8d7fad3db..b176ee3ea 100644 --- a/config/guardian_rules.json +++ b/config/guardian_rules.json @@ -1680,6 +1680,113 @@ "require_human_confirmation": true }, "message_to_assistant": "No puedo filtrar por forma ni por fragmentos." + }, + { + "id": "RG-710", + "title": "No bypass de CritGate", + "category": "safety", + "severity": "critical", + "description": "Prohibido pedir o implementar bypass del CritGate o acceso directo al GuardianEngine.", + "rationale": "CritGate es la única puerta de entrada del pipeline SSOT.", + "forbidden_patterns": [ + "bypass critgate", + "saltate critgate", + "salteate critgate", + "sin critgate", + "llama guardianengine directo", + "guardianengine directo", + "direct guardianengine", + "bypass guardian", + "bypassear critgate" + ], + "enforcement": { + "action": "block", + "require_human_confirmation": true + }, + "message_to_assistant": "No puedo omitir CritGate ni acceder directo al GuardianEngine." + }, + { + "id": "RG-720", + "title": "Cambios de sistema requieren confirmación", + "category": "system_config", + "severity": "high", + "description": "Cambios de sistema, paquetes, servicios o permisos deben requerir confirmación explícita.", + "rationale": "Evita modificaciones sensibles sin supervisión humana.", + "forbidden_patterns": [ + "apt-get install", + "apt install", + "yum install", + "dnf install", + "pacman -S", + "pip install", + "pip3 install", + "npm install -g", + "systemctl enable", + "systemctl disable", + "systemctl stop", + "ufw", + "iptables", + "firewall-cmd", + "chmod -R", + "chown -R", + "useradd", + "usermod", + "mkfs", + "fdisk", + "parted", + "dd if=" + ], + "enforcement": { + "action": "warn_and_require_confirmation", + "require_human_confirmation": true + }, + "message_to_assistant": "Esto toca sistema/paquetes/permisos; requerís confirmación explícita." + }, + { + "id": "RG-730", + "title": "Privacidad: no registrar prompts o secretos", + "category": "ethics", + "severity": "high", + "description": "Prohibido pedir registrar prompts, secretos o .env en telemetría/logs.", + "rationale": "Evita fuga de datos sensibles en telemetría.", + "forbidden_patterns": [ + "guardar el prompt", + "guarda el prompt", + "log del prompt", + "registrar el prompt", + "telemetría con el prompt", + "telemetry con prompt", + "dump del prompt", + "registrar .env", + "guardar .env", + "log de .env" + ], + "enforcement": { + "action": "block", + "require_human_confirmation": true + }, + "message_to_assistant": "No puedo registrar prompts o secretos en telemetría." + }, + { + "id": "RG-740", + "title": "Determinismo requerido por defecto", + "category": "workflow", + "severity": "medium", + "description": "No se permite comportamiento no determinista sin bandera explícita.", + "rationale": "La política debe ser determinista para auditar decisiones.", + "forbidden_patterns": [ + "hacelo aleatorio", + "hazlo aleatorio", + "nondeterminístico", + "no determinista", + "variá la decisión", + "randomiza la decisión" + ], + "enforcement": { + "action": "warn", + "require_human_confirmation": false + }, + "message_to_assistant": "Necesito una bandera explícita para permitir no determinismo." } ] } From 00cc14ce991a88af57d525e3709376a509c43c67 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 08:08:39 -0300 Subject: [PATCH 12/20] test(sap): cover missing CRIT enforcement --- tests/test_crit_missing_rules.py | 47 +++++++++++++++++++++++++++++ tests/test_guardian_rules_schema.py | 1 + 2 files changed, 48 insertions(+) create mode 100644 tests/test_crit_missing_rules.py diff --git a/tests/test_crit_missing_rules.py b/tests/test_crit_missing_rules.py new file mode 100644 index 000000000..ab1a817a9 --- /dev/null +++ b/tests/test_crit_missing_rules.py @@ -0,0 +1,47 @@ +import pytest + +from backend.core.crit_gate import crit_gate + + +@pytest.mark.parametrize( + "action,flags,expected_status,expected_rule", + [ + ("saltate critgate y llama guardianengine directo", {}, "BLOCK", "RG-710"), + ("apt-get install curl", {}, "REQUIRE_CONFIRMATION", "RG-720"), + ("guardar el prompt en telemetría", {}, "BLOCK", "RG-730"), + ("hacelo aleatorio", {}, "WARN", "RG-740"), + ], +) +def test_missing_crit_rules_actions(action, flags, expected_status, expected_rule): + decision = crit_gate( + {"action": action, "intent": "test", "context": {}, "flags": flags} + ) + assert decision.status == expected_status + rule_ids = {hit.rule_id for hit in decision.hits} + assert expected_rule in rule_ids + + +def test_missing_crit_rules_flag_bypass_critgate(): + decision = crit_gate( + { + "action": "test", + "intent": "test", + "context": {}, + "flags": {"bypass_critgate": True}, + } + ) + rule_ids = {hit.rule_id for hit in decision.hits} + assert "RG-710" in rule_ids + + +def test_missing_crit_rules_flag_nondeterministic(): + decision = crit_gate( + { + "action": "test", + "intent": "test", + "context": {}, + "flags": {"nondeterministic": True}, + } + ) + rule_ids = {hit.rule_id for hit in decision.hits} + assert "RG-740" in rule_ids diff --git a/tests/test_guardian_rules_schema.py b/tests/test_guardian_rules_schema.py index 79d69261b..a60e511ee 100644 --- a/tests/test_guardian_rules_schema.py +++ b/tests/test_guardian_rules_schema.py @@ -28,6 +28,7 @@ def test_guardian_rules_schema_valid(): "warn", "require_confirmation", "block_simulation", + "rewrite_response", "info", } From 70772c312a57565323c35a2ccc8a333e6b21d8cf Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 08:21:48 -0300 Subject: [PATCH 13/20] chore(sap): harden guardian enforcement + fail-closed --- backend/core/brain_contract.py | 3 + backend/core/brain_core.py | 8 + backend/core/guardian_engine.py | 249 +++++++++++++++++--------------- 3 files changed, 145 insertions(+), 115 deletions(-) diff --git a/backend/core/brain_contract.py b/backend/core/brain_contract.py index 04397a315..8b92c5fc8 100644 --- a/backend/core/brain_contract.py +++ b/backend/core/brain_contract.py @@ -20,6 +20,9 @@ class Provenance: hit_rule_ids: List[str] = field(default_factory=list) model_used: Optional[str] = None post_rewrites: List[str] = field(default_factory=list) + entrypoint: Optional[str] = None + enforcement_status: Optional[str] = None + flags: Dict[str, Any] = field(default_factory=dict) @dataclass diff --git a/backend/core/brain_core.py b/backend/core/brain_core.py index 443883812..723154db6 100644 --- a/backend/core/brain_core.py +++ b/backend/core/brain_core.py @@ -33,12 +33,20 @@ def run_brain( lane = flg.get("lane", "default") hit_rule_ids = [h.rule_id for h in decision.hits] + provenance_flags = {} + if flg.get("bypass_critgate"): + provenance_flags["bypass_critgate"] = True + if flg.get("nondeterministic"): + provenance_flags["nondeterministic"] = True provenance = Provenance( lane=lane, decision_status=decision.status, hit_rule_ids=hit_rule_ids, model_used=None, post_rewrites=[], + entrypoint="run_brain", + enforcement_status=decision.status, + flags=provenance_flags, ) if decision.status in {"BLOCK", "REQUIRE_CONFIRMATION"} or decision.requires_human_confirmation: diff --git a/backend/core/guardian_engine.py b/backend/core/guardian_engine.py index ec016856a..8f4b8f47b 100644 --- a/backend/core/guardian_engine.py +++ b/backend/core/guardian_engine.py @@ -78,40 +78,41 @@ def evaluate( veracity_hits: List[RuleHit] = [] - for rule in self.rules.get("rules", []): - rule_id = str(rule.get("id", "")).strip() - category = str(rule.get("category", "")).strip() - severity = str(rule.get("severity", "")).strip() - enforcement = (rule.get("enforcement", {}) or {}).get("action", "info") - message = (rule.get("message_to_assistant") or rule.get("description") or "").strip() - - match_found = self._matches_rule(action_lower, rule) - if not match_found: - continue - - hit = RuleHit( - rule_id=rule_id, - category=category, - severity=severity.upper(), - message=message, - enforcement=enforcement, - ) - hits.append(hit) - if category == "veracity": - veracity_hits.append(hit) - - decision = self._compute_decision(hits) - - # Guardian Cursor Gate - evaluar salidas de Cursor - cursor_gate_decision = None - if evaluate_cursor_output: - cursor_gate_decision = evaluate_cursor_output(action, { - "context": context, - "flags": flags, - "intent": intent, - "hits": [h.rule_id for h in hits], - "veracity_hits": [h.rule_id for h in veracity_hits], - }) + try: + for rule in self.rules.get("rules", []): + rule_id = str(rule.get("id", "")).strip() + category = str(rule.get("category", "")).strip() + severity = str(rule.get("severity", "")).strip() + enforcement = (rule.get("enforcement", {}) or {}).get("action", "info") + message = (rule.get("message_to_assistant") or rule.get("description") or "").strip() + + match_found = self._matches_rule(action_lower, rule) + if not match_found: + continue + + hit = RuleHit( + rule_id=rule_id, + category=category, + severity=severity.upper(), + message=message, + enforcement=enforcement, + ) + hits.append(hit) + if category == "veracity": + veracity_hits.append(hit) + + decision = self._compute_decision(hits) + + # Guardian Cursor Gate - evaluar salidas de Cursor + cursor_gate_decision = None + if evaluate_cursor_output: + cursor_gate_decision = evaluate_cursor_output(action, { + "context": context, + "flags": flags, + "intent": intent, + "hits": [h.rule_id for h in hits], + "veracity_hits": [h.rule_id for h in veracity_hits], + }) # Aplicar lógica del Cursor Gate if cursor_gate_decision.status != CursorGateStatus.PASS: @@ -136,91 +137,112 @@ def evaluate( ) decision.hits.append(cursor_hit) - actor_role = context.get("actor_role") or flags.get("actor_role") - decision, escalated_category = apply_actor_role_escalation(decision, actor_role) - decision = resolve_conflicts(decision) - - # TruthTongueKnife v1 metadata (solo si hay veracidad o flags relacionados) - if veracity_hits or self._current_flags.get("simulation") or self._current_flags.get("uncertainty") or self._current_flags.get("sandbox") or self._current_context.get("sandbox"): - vt_state, vt_next = self._compute_veracity_state(veracity_hits, context, flags) - decision.metadata.update( - { - "veracity_protocol": "TruthTongueKnife v1", - "veracity_state": vt_state, - "veracity_next_step": vt_next, - } + actor_role = context.get("actor_role") or flags.get("actor_role") + decision, escalated_category = apply_actor_role_escalation(decision, actor_role) + decision = resolve_conflicts(decision) + + # TruthTongueKnife v1 metadata (solo si hay veracidad o flags relacionados) + if veracity_hits or self._current_flags.get("simulation") or self._current_flags.get("uncertainty") or self._current_flags.get("sandbox") or self._current_context.get("sandbox"): + vt_state, vt_next = self._compute_veracity_state(veracity_hits, context, flags) + decision.metadata.update( + { + "veracity_protocol": "TruthTongueKnife v1", + "veracity_state": vt_state, + "veracity_next_step": vt_next, + } + ) + + # Mensaje asistente: priorizar reglas de veracidad/simulación + decision.assistant_message = self._build_assistant_message( + decision, action, intent, context ) + if escalated_category: + decision.assistant_message = _build_actor_role_escalation_message(escalated_category) - # Mensaje asistente: priorizar reglas de veracidad/simulación - decision.assistant_message = self._build_assistant_message( - decision, action, intent, context - ) - if escalated_category: - decision.assistant_message = _build_actor_role_escalation_message(escalated_category) + # Postprocess unificado (no-claim + shape-leak + rewrites) + decision.assistant_message, rewrites = apply_postprocess( + decision.assistant_message, decision, context + ) + if rewrites: + decision.metadata["post_rewrites"] = rewrites - # Postprocess unificado (no-claim + shape-leak + rewrites) - decision.assistant_message, rewrites = apply_postprocess( - decision.assistant_message, decision, context - ) - if rewrites: - decision.metadata["post_rewrites"] = rewrites - - decision.metadata = { - "intent": intent, - "flags": flags, - **decision.metadata, - } - if actor_role: - decision.metadata["actor_role"] = actor_role - - # Agregar metadata del Cursor Gate si se evaluó - if cursor_gate_decision: - decision.metadata.update({ - "cursor_gate_status": cursor_gate_decision.status.value, - "cursor_gate_message": cursor_gate_decision.message, - "cursor_gate_evidence_required": cursor_gate_decision.evidence_required, - "cursor_gate_human_steps": cursor_gate_decision.human_steps, - "cursor_gate_retry_assumptions": cursor_gate_decision.retry_with_assumptions, - }) - - # Telemetría mínima (opt-in) - try: - sink = get_telemetry_sink(context, flags) - event = { - "ts_utc": datetime.now(timezone.utc).isoformat(), - "actor_role": actor_role, - "decision_status": decision.status, - "requires_human_confirmation": decision.requires_human_confirmation, - "hit_rule_ids": [h.rule_id for h in decision.hits], - "hit_categories": [h.category for h in decision.hits], - "rule_set_version": get_ruleset_version(), - "sprint_tag": "F", - "test_mode": bool(context.get("test_mode") or flags.get("test_mode")), + decision.metadata = { + "intent": intent, + "flags": flags, + **decision.metadata, } - sink.emit(event) - except Exception: - pass - - # Clean context to avoid leaking state between evaluaciones - self._current_context = {} - self._current_flags = {} - - if decision.status != "ALLOW": + if actor_role: + decision.metadata["actor_role"] = actor_role + + # Agregar metadata del Cursor Gate si se evaluó + if cursor_gate_decision: + decision.metadata.update({ + "cursor_gate_status": cursor_gate_decision.status.value, + "cursor_gate_message": cursor_gate_decision.message, + "cursor_gate_evidence_required": cursor_gate_decision.evidence_required, + "cursor_gate_human_steps": cursor_gate_decision.human_steps, + "cursor_gate_retry_assumptions": cursor_gate_decision.retry_with_assumptions, + }) + + # Telemetría mínima (opt-in) try: - logger.warning( - "guardian_engine decision=%s intent=%s hits=%s veracity_protocol=%s veracity_state=%s veracity_next=%s cursor_gate=%s", - decision.status, - intent, - [h.rule_id for h in hits], - decision.metadata.get("veracity_protocol"), - decision.metadata.get("veracity_state"), - decision.metadata.get("veracity_next_step"), - decision.metadata.get("cursor_gate_status", "PASS"), - ) + sink = get_telemetry_sink(context, flags) + event = { + "ts_utc": datetime.now(timezone.utc).isoformat(), + "actor_role": actor_role, + "decision_status": decision.status, + "requires_human_confirmation": decision.requires_human_confirmation, + "hit_rule_ids": [h.rule_id for h in decision.hits], + "hit_categories": [h.category for h in decision.hits], + "rule_set_version": get_ruleset_version(), + "sprint_tag": "F", + "test_mode": bool(context.get("test_mode") or flags.get("test_mode")), + } + sink.emit(event) except Exception: pass - return decision + if decision.status != "ALLOW": + try: + logger.warning( + "guardian_engine decision=%s intent=%s hits=%s veracity_protocol=%s veracity_state=%s veracity_next=%s cursor_gate=%s", + decision.status, + intent, + [h.rule_id for h in hits], + decision.metadata.get("veracity_protocol"), + decision.metadata.get("veracity_state"), + decision.metadata.get("veracity_next_step"), + decision.metadata.get("cursor_gate_status", "PASS"), + ) + except Exception: + pass + + return decision + except Exception: + decision = Decision( + status="BLOCK", + hits=[ + RuleHit( + rule_id="RG-999", + category="safety", + severity="CRITICAL", + message="error interno: evaluación fallida", + enforcement="block", + ) + ], + requires_human_confirmation=False, + assistant_message="error interno; no puedo evaluar de forma segura.", + ) + decision.metadata = { + "intent": intent, + "flags": flags, + "internal_error": True, + } + return decision + finally: + # Clean context to avoid leaking state between evaluaciones + self._current_context = {} + self._current_flags = {} # ----------------------------- # Internal helpers @@ -305,9 +327,7 @@ def _compute_decision(self, hits: List[RuleHit]) -> Decision: for hit in hits: mapped = map_enforcement_to_status(hit.enforcement) statuses.append(mapped) - if mapped in {"BLOCK", "REQUIRE_CONFIRMATION"}: - requires_confirmation = True - elif "require_confirmation" in hit.enforcement: + if mapped == "REQUIRE_CONFIRMATION": requires_confirmation = True final_status = combine_statuses(statuses) @@ -450,7 +470,6 @@ def elevate(status: Status) -> None: if any(_is_hard_constraint_rule(h.rule_id) for h in decision.hits): elevate("BLOCK") - decision.requires_human_confirmation = True return decision if any(h.enforcement in {"require_confirmation", "warn_and_require_confirmation"} for h in decision.hits): From 5f3d52f547fe2e03fcc6d555e9e4c097af81098b Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 08:21:54 -0300 Subject: [PATCH 14/20] test(sap): cover guardian hardening --- tests/test_guardian_hardening.py | 50 +++++++++++++++++++++++++++++ tests/test_guardian_suite_pseudo.py | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 tests/test_guardian_hardening.py diff --git a/tests/test_guardian_hardening.py b/tests/test_guardian_hardening.py new file mode 100644 index 000000000..ca0bf6548 --- /dev/null +++ b/tests/test_guardian_hardening.py @@ -0,0 +1,50 @@ +from backend.core.brain_core import run_brain +from backend.core.crit_gate import crit_gate +from backend.core.guardian_engine import GuardianEngine + + +def test_enforcement_mapping_block_does_not_require_human_confirmation(): + decision = crit_gate( + {"action": "rm -rf /", "intent": "test", "context": {}, "flags": {}} + ) + assert decision.status == "BLOCK" + assert decision.requires_human_confirmation is False + + +def test_enforcement_mapping_require_confirmation(): + decision = crit_gate( + {"action": "nmcli device disconnect eth0", "intent": "test", "context": {}, "flags": {}} + ) + assert decision.status == "REQUIRE_CONFIRMATION" + assert decision.requires_human_confirmation is True + + +def test_fail_closed_on_exception(monkeypatch): + engine = GuardianEngine() + + def boom(*_args, **_kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr(engine, "_matches_rule", boom) + decision = crit_gate( + {"action": "hola", "intent": "test", "context": {}, "flags": {}}, + engine=engine, + ) + assert decision.status == "BLOCK" + assert any(hit.rule_id == "RG-999" for hit in decision.hits) + assert decision.requires_human_confirmation is False + assert decision.metadata.get("internal_error") is True + + +def test_provenance_contains_entrypoint_and_flags(): + result = run_brain( + "hola", + intent="test", + context={"test_mode": True}, + flags={"bypass_critgate": True, "nondeterministic": True}, + ) + assert result.provenance.entrypoint == "run_brain" + assert result.provenance.enforcement_status == result.status + assert "bypass_critgate" in result.provenance.flags + assert "nondeterministic" in result.provenance.flags + assert isinstance(result.provenance.hit_rule_ids, list) diff --git a/tests/test_guardian_suite_pseudo.py b/tests/test_guardian_suite_pseudo.py index aa229ac09..ee0f3c57d 100644 --- a/tests/test_guardian_suite_pseudo.py +++ b/tests/test_guardian_suite_pseudo.py @@ -1428,7 +1428,7 @@ def test_conflict_hard_constraint_dominates_warn(): ) resolved = resolve_conflicts(decision) assert resolved.status == "BLOCK" - assert resolved.requires_human_confirmation is True + assert resolved.requires_human_confirmation is False def test_conflict_safety_confirmation_dominates_warn(): From 54c6a52a6f979943eb17483ed0b0489e7436cec4 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 08:42:10 -0300 Subject: [PATCH 15/20] feat(sap): add voice mvp safe ssot endpoint --- backend/api/voice_mvp_safe_api.py | 143 ++++++++++++++++++++++++++++++ backend/core/brain_core.py | 4 +- backend/main.py | 12 +++ docs/voice/voice_mvp_safe.md | 39 ++++++++ tests/test_voice_mvp_safe.py | 64 +++++++++++++ 5 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 backend/api/voice_mvp_safe_api.py create mode 100644 docs/voice/voice_mvp_safe.md create mode 100644 tests/test_voice_mvp_safe.py diff --git a/backend/api/voice_mvp_safe_api.py b/backend/api/voice_mvp_safe_api.py new file mode 100644 index 000000000..a95bcee0f --- /dev/null +++ b/backend/api/voice_mvp_safe_api.py @@ -0,0 +1,143 @@ +from __future__ import annotations + +import hashlib +import logging +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter +from pydantic import BaseModel, Field + +from backend.core.brain_core import run_brain +from backend.core.crit_gate import crit_gate +from backend.core.model_gateway import ModelGateway + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/v1/voice", tags=["Voice"]) + + +class VoiceMvpRequest(BaseModel): + transcript: str = Field(..., min_length=1) + session_id: Optional[str] = None + locale: Optional[str] = None + meta: Optional[Dict[str, Any]] = None + + +class VoiceRuleHit(BaseModel): + rule_id: str + category: str + severity: str + enforcement: str + + +class VoiceProvenance(BaseModel): + lane: str + decision_status: str + hit_rule_ids: List[str] + model_used: Optional[str] = None + post_rewrites: List[str] + entrypoint: Optional[str] = None + enforcement_status: Optional[str] = None + flags: Dict[str, Any] = Field(default_factory=dict) + + +class VoiceMvpResponse(BaseModel): + status: str + assistant_message: str + requires_human_confirmation: bool + hits: List[VoiceRuleHit] + provenance: VoiceProvenance + + +class _VoiceSafeGateway(ModelGateway): + def call(self, prompt: str, *, model: str = "stub") -> str: + return "Recibido. ¿En qué puedo ayudar con precisión?" + + +def _safe_hash(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:12] + +def _sanitize_meta(meta: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not meta: + return {} + safe: Dict[str, Any] = {} + for key, value in meta.items(): + key_lower = str(key).lower() + if "transcript" in key_lower or "prompt" in key_lower or "env" in key_lower: + continue + if isinstance(value, (str, int, float, bool)) and len(str(value)) <= 120: + safe[str(key)] = value + return safe + + +@router.post("/mvp", response_model=VoiceMvpResponse) +async def voice_mvp_safe(payload: VoiceMvpRequest) -> VoiceMvpResponse: + transcript = payload.transcript.strip() + if not transcript: + return VoiceMvpResponse( + status="WARN", + assistant_message="Transcripción vacía; necesito texto para ayudar.", + requires_human_confirmation=False, + hits=[], + provenance=VoiceProvenance( + lane="voice", + decision_status="WARN", + hit_rule_ids=[], + model_used=None, + post_rewrites=[], + entrypoint="voice_mvp", + enforcement_status="WARN", + flags={}, + ), + ) + + context: Dict[str, Any] = { + "voice_session_id": payload.session_id, + "voice_locale": payload.locale, + "voice_meta": _sanitize_meta(payload.meta), + } + flags: Dict[str, Any] = {"lane": "voice", "voice_mvp": True} + + logger.debug( + "voice_mvp_safe transcript_hash=%s len=%s", + _safe_hash(transcript), + len(transcript), + ) + + decision = crit_gate( + {"action": transcript, "intent": "voice.transcript", "context": context, "flags": flags} + ) + result = run_brain( + action=transcript, + intent="voice.transcript", + context=context, + flags=flags, + decision=decision, + model_gateway=_VoiceSafeGateway(), + ) + safe_hits = [ + VoiceRuleHit( + rule_id=h.rule_id, + category=h.category, + severity=h.severity, + enforcement=h.enforcement, + ) + for h in decision.hits + ] + + return VoiceMvpResponse( + status=result.status, + assistant_message=result.assistant_message, + requires_human_confirmation=result.requires_human_confirmation, + hits=safe_hits, + provenance=VoiceProvenance( + lane=result.provenance.lane, + decision_status=result.provenance.decision_status, + hit_rule_ids=result.provenance.hit_rule_ids, + model_used=result.provenance.model_used, + post_rewrites=result.provenance.post_rewrites, + entrypoint=result.provenance.entrypoint, + enforcement_status=result.provenance.enforcement_status, + flags=result.provenance.flags, + ), + ) diff --git a/backend/core/brain_core.py b/backend/core/brain_core.py index 723154db6..a4e2fa4ce 100644 --- a/backend/core/brain_core.py +++ b/backend/core/brain_core.py @@ -3,6 +3,7 @@ from backend.core.action_router import ActionRouter from backend.core.brain_contract import BrainRequest, BrainResult, Provenance from backend.core.crit_gate import crit_gate +from backend.core.crit_contract import Decision from backend.core.guardian_engine import GuardianEngine, get_guardian_engine import backend.core.memory_gate as memory_gate from backend.core.model_gateway import ModelGateway @@ -15,6 +16,7 @@ def run_brain( context: dict | None = None, flags: dict | None = None, *, + decision: Decision | None = None, guardian_engine: GuardianEngine | None = None, model_gateway: ModelGateway | None = None, action_router: ActionRouter | None = None, @@ -26,7 +28,7 @@ def run_brain( router = action_router or ActionRouter(gateway) # CritGate es el front-door gate; delega en GuardianEngine. - decision = crit_gate( + decision = decision or crit_gate( {"action": action, "intent": intent, "context": ctx, "flags": flg}, engine=engine, ) diff --git a/backend/main.py b/backend/main.py index e9cf69ec0..461ecb9ef 100644 --- a/backend/main.py +++ b/backend/main.py @@ -199,6 +199,14 @@ def metrics_persistence_stub(): print(f"⚠️ Demo Voz API no disponible: {e}") DEMO_VOICE_API_AVAILABLE = False +try: + from api.voice_mvp_safe_api import router as voice_mvp_router + + VOICE_MVP_API_AVAILABLE = True +except Exception as e: + print(f"⚠️ Voice MVP API no disponible: {e}") + VOICE_MVP_API_AVAILABLE = False + try: from api.whereami_api import router as whereami_router @@ -867,6 +875,10 @@ async def general_exception_handler(request, exc): app.include_router(demo_voice_router) print("✅ Demo Voz API montada (/totem/demo)") +if VOICE_MVP_API_AVAILABLE: + app.include_router(voice_mvp_router) + print("✅ Voice MVP API montada (/api/v1/voice/mvp)") + if WHEREAMI_API_AVAILABLE: app.include_router(whereami_router) print("✅ WhereAmI API montada (/totem/whereami)") diff --git a/docs/voice/voice_mvp_safe.md b/docs/voice/voice_mvp_safe.md new file mode 100644 index 000000000..fe6ac85c6 --- /dev/null +++ b/docs/voice/voice_mvp_safe.md @@ -0,0 +1,39 @@ +# Voice MVP Safe (SSOT-only) + +## Qué es +Endpoint mínimo para voz que recibe transcripción (texto) y siempre pasa por SSOT: +CritGate → run_brain. No procesa audio en esta etapa. + +## Endpoint +`POST /api/v1/voice/mvp` + +Payload: +``` +{ + "transcript": "string", + "session_id": "optional", + "locale": "optional", + "meta": { "source": "ui" } +} +``` + +Respuesta: +``` +{ + "status": "...", + "assistant_message": "...", + "requires_human_confirmation": true|false, + "hits": [{ "rule_id", "category", "severity", "enforcement" }], + "provenance": { "lane", "decision_status", "hit_rule_ids", "model_used", "post_rewrites", "entrypoint", "enforcement_status", "flags" } +} +``` + +## Privacidad +- No se almacena el transcript. +- Telemetría no incluye prompts. +- Logs solo registran hash + longitud del transcript. + +## Extender luego (fuera de scope) +- STT real (audio → transcript). +- TTS real (assistant_message → audio). +- Streaming o chunked responses. diff --git a/tests/test_voice_mvp_safe.py b/tests/test_voice_mvp_safe.py new file mode 100644 index 000000000..89b2dd552 --- /dev/null +++ b/tests/test_voice_mvp_safe.py @@ -0,0 +1,64 @@ +import json +import pytest +from pathlib import Path + +@pytest.mark.asyncio +async def test_voice_mvp_calls_ssot_entrypoint(monkeypatch): + import backend.api.voice_mvp_safe_api as voice_api + + calls = {"crit_gate": 0, "run_brain": 0} + original_crit_gate = voice_api.crit_gate + original_run_brain = voice_api.run_brain + + def wrapped_crit_gate(payload, *, engine=None): + calls["crit_gate"] += 1 + return original_crit_gate(payload, engine=engine) + + def wrapped_run_brain(*args, **kwargs): + calls["run_brain"] += 1 + return original_run_brain(*args, **kwargs) + + monkeypatch.setattr(voice_api, "crit_gate", wrapped_crit_gate) + monkeypatch.setattr(voice_api, "run_brain", wrapped_run_brain) + + payload = voice_api.VoiceMvpRequest(transcript="hola", session_id="s1", locale="es") + resp = await voice_api.voice_mvp_safe(payload) + assert resp.status == "ALLOW" + assert calls["crit_gate"] == 1 + assert calls["run_brain"] == 1 + + +def test_voice_mvp_privacy_no_transcript_leak(monkeypatch, tmp_path: Path, client): + transcript = "secret-token-123" + telemetry_path = tmp_path / "events.jsonl" + monkeypatch.setenv("IURI_GUARDIAN_TELEMETRY", "1") + monkeypatch.setenv("IURI_GUARDIAN_TELEMETRY_PATH", str(telemetry_path)) + + payload = {"transcript": transcript, "session_id": "s1", "locale": "es"} + resp = client.post("/api/v1/voice/mvp", json=payload) + assert resp.status_code == 200 + data = resp.json() + assert transcript not in json.dumps(data, ensure_ascii=False) + assert transcript not in data.get("assistant_message", "") + + last = telemetry_path.read_text(encoding="utf-8").strip().splitlines()[-1] + event = json.loads(last) + assert transcript not in json.dumps(event, ensure_ascii=False) + + +def test_voice_mvp_surface_confirmation(client): + payload = {"transcript": "nmcli device disconnect eth0", "session_id": "s2"} + resp = client.post("/api/v1/voice/mvp", json=payload) + assert resp.status_code == 200 + data = resp.json() + assert data["requires_human_confirmation"] is True + + +def test_voice_mvp_hits_minimal_fields(client): + payload = {"transcript": "rm -rf /", "session_id": "s3"} + resp = client.post("/api/v1/voice/mvp", json=payload) + assert resp.status_code == 200 + data = resp.json() + hits = data.get("hits", []) + assert hits + assert {"rule_id", "category", "severity", "enforcement"} <= set(hits[0].keys()) From bfc0f36e3c3eb1b07d312230926de7b06650c7d7 Mon Sep 17 00:00:00 2001 From: Cheewye Date: Fri, 23 Jan 2026 09:32:20 -0300 Subject: [PATCH 16/20] feat(sap): add SSOT voice MVP panel --- frontend/src/components/chat/ChatMaestro.tsx | 2 + .../src/components/voice/VoiceMvpPanel.tsx | 195 ++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 frontend/src/components/voice/VoiceMvpPanel.tsx diff --git a/frontend/src/components/chat/ChatMaestro.tsx b/frontend/src/components/chat/ChatMaestro.tsx index d0a105be6..9060fc4cb 100644 --- a/frontend/src/components/chat/ChatMaestro.tsx +++ b/frontend/src/components/chat/ChatMaestro.tsx @@ -4,6 +4,7 @@ import { useTranslation } from 'react-i18next'; import { MessageSquare, Brain, Network, Cpu } from 'lucide-react'; import { useTheme } from '../../contexts/ThemeContext'; import yuribit from '../../assets/yuribit.png'; +import VoiceMvpPanel from '../voice/VoiceMvpPanel'; // Lazy load de los componentes específicos const CoreChat = lazy(() => import('./CoreChat')); @@ -144,6 +145,7 @@ export const ChatMaestro: React.FC = () => { {/* Contenido del Chat Activo */}
+ diff --git a/frontend/src/components/voice/VoiceMvpPanel.tsx b/frontend/src/components/voice/VoiceMvpPanel.tsx new file mode 100644 index 000000000..09c064b89 --- /dev/null +++ b/frontend/src/components/voice/VoiceMvpPanel.tsx @@ -0,0 +1,195 @@ +import React, { useMemo, useState } from "react"; +import { api } from "../../services/api"; + +interface VoiceHit { + rule_id: string; + category?: string; + severity?: string; + enforcement?: string; +} + +interface VoiceProvenance { + lane?: string; + decision_status?: string; + hit_rule_ids?: string[]; + model_used?: string | null; + post_rewrites?: string[]; + entrypoint?: string | null; + enforcement_status?: string | null; + flags?: Record; +} + +interface VoiceMvpResponse { + status: string; + assistant_message: string; + requires_human_confirmation: boolean; + hits?: VoiceHit[]; + provenance?: VoiceProvenance; +} + +export const VoiceMvpPanel: React.FC = () => { + const [isOpen, setIsOpen] = useState(false); + const [useSsot, setUseSsot] = useState(true); + const [transcript, setTranscript] = useState(""); + const [sessionId, setSessionId] = useState(""); + const [locale, setLocale] = useState(""); + const [response, setResponse] = useState(null); + const [error, setError] = useState(null); + const [isSending, setIsSending] = useState(false); + + const canSend = useMemo( + () => useSsot && transcript.trim().length > 0 && !isSending, + [useSsot, transcript, isSending] + ); + + const handleSend = async () => { + if (!canSend) return; + setIsSending(true); + setError(null); + try { + const payload = { + transcript: transcript.trim(), + session_id: sessionId.trim() || undefined, + locale: locale.trim() || undefined, + meta: { source: "chat" }, + }; + const res = await api.post("/voice/mvp", payload); + setResponse(res.data as VoiceMvpResponse); + } catch (err: any) { + setError(err?.message || "No se pudo contactar Voice MVP."); + } finally { + setIsSending(false); + } + }; + + return ( +
+
+ + + {isOpen && ( +
+
+ + + /api/v1/voice/mvp + +
+ +
+ + +
+ +