diff --git a/mgtp/__init__.py b/mgtp/__init__.py new file mode 100644 index 0000000..916c658 --- /dev/null +++ b/mgtp/__init__.py @@ -0,0 +1 @@ +# MGTP package diff --git a/mgtp/decision_record.py b/mgtp/decision_record.py new file mode 100644 index 0000000..39191fb --- /dev/null +++ b/mgtp/decision_record.py @@ -0,0 +1,10 @@ +"""mgtp.decision_record — Re-exports the canonical DecisionRecord. + +The sole DecisionRecord implementation lives in mgtp.types. +This module exists so that ``from mgtp.decision_record import DecisionRecord`` +continues to work without introducing a duplicate or competing class. +""" + +from mgtp.types import DecisionRecord # noqa: F401 + +__all__ = ["DecisionRecord"] diff --git a/mgtp/evaluate.py b/mgtp/evaluate.py new file mode 100644 index 0000000..c474ebb --- /dev/null +++ b/mgtp/evaluate.py @@ -0,0 +1,61 @@ +"""mgtp.evaluate — Canonical registry-free MGTP evaluator. + +Public entrypoint: evaluate() + +This module exposes exactly one evaluation function. For registry-based +evaluation see mgtp.evaluate_transition. + +Invariants: +- evaluate() is pure: same inputs produce same output. +- No file I/O. No network calls. No state mutation. +- No randomness. No clock calls. No logging. +""" + +from authority_gate import AuthorityGate, Decision, Evidence + +__all__ = ["evaluate"] + +from .types import AuthorityContext, DecisionRecord, RiskClass, TransitionOutcome, TransitionRequest + + +def evaluate(request: TransitionRequest, context: AuthorityContext) -> DecisionRecord: + """Evaluate a TransitionRequest against an AuthorityContext without a registry. + + ``context.authority_basis`` names the *required* Evidence level. + ``context.provided_evidence`` carries the *actual* Evidence provided. + + Raises: + ValueError: if ``context.provided_evidence`` is None. + ValueError: if ``context.authority_basis`` is not a recognised Evidence name. + + Returns: + A DecisionRecord with outcome APPROVED, REFUSED, or SUPERVISED. + """ + if context.provided_evidence is None: + raise ValueError("provided_evidence is required for MGTP evaluation") + + try: + required = Evidence[context.authority_basis] + except KeyError: + raise ValueError(f"Unknown authority_basis: {context.authority_basis!r}") + + gate = AuthorityGate(required) + decision = gate.check(context.provided_evidence) + + if decision == Decision.DENY: + reason = "authority_insufficient" + outcome = TransitionOutcome.REFUSED + elif request.irreversible and request.risk_class == RiskClass.CRITICAL: + reason = "critical_irreversible_supervised" + outcome = TransitionOutcome.SUPERVISED + else: + reason = "authority_sufficient" + outcome = TransitionOutcome.APPROVED + + return DecisionRecord( + transition_id=request.transition_id, + outcome=outcome, + authority_basis=context.authority_basis, + risk_class=request.risk_class, + reason=reason, + ) diff --git a/mgtp/evaluate_transition.py b/mgtp/evaluate_transition.py new file mode 100644 index 0000000..2bdc86f --- /dev/null +++ b/mgtp/evaluate_transition.py @@ -0,0 +1,119 @@ +"""mgtp.evaluate_transition — Registry-based MGTP evaluator. + +Public entrypoint: evaluate_transition() + +This function is a thin wrapper around evaluate() (mgtp.evaluate). It adds +registry pre-check and context mapping, then delegates the core gate decision +to evaluate(). There is no divergent gate behaviour between the two paths. + +Evaluation pipeline: + 1. Registry membership check → REFUSED / transition_not_registered + 2. Map (entry["required_authority"], context.authority_basis) to an + (authority_basis=required, provided_evidence=actual) context. + 3. Delegate to evaluate() for the gate decision. + 4. Reconstruct DecisionRecord with the actor's original authority_basis and + extended audit fields (context_hash, actor_id, tenant_id, …). + +Invariants: +- evaluate_transition() is pure: same inputs produce same output. +- No file I/O. No network calls. No state mutation. +- No randomness. No clock calls. No logging. +""" + +import hashlib +import json + +from authority_gate import Evidence + +__all__ = ["evaluate_transition"] + +from .decision_record import DecisionRecord +from .evaluate import evaluate +from .types import AuthorityContext, TransitionOutcome, TransitionRequest + + +def _context_hash(request: TransitionRequest, context: AuthorityContext) -> str: + """Deterministic sha256 over the canonical representation of (request, context).""" + payload = { + "actor_id": context.actor_id, + "authority_basis": context.authority_basis, + "irreversible": request.irreversible, + "override_token": request.override_token, + "resource_identifier": request.resource_identifier, + "risk_class": request.risk_class.value, + "tenant_id": context.tenant_id, + "timestamp": request.timestamp, + "transition_id": request.transition_id, + "trust_boundary_crossed": request.trust_boundary_crossed, + } + canonical = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + return hashlib.sha256(canonical).hexdigest() + + +def evaluate_transition( + request: TransitionRequest, + context: AuthorityContext, + registry: dict, +) -> DecisionRecord: + """Evaluate a TransitionRequest against the registry and authority context. + + This is a thin wrapper around evaluate(): registry lookup and context + mapping happen here; the core gate decision is delegated to evaluate(). + + Args: + request: The transition being requested. + context: The authority context of the requesting actor. + ``context.authority_basis`` names the *provided* Evidence + level (looked up in the Evidence enum). + registry: Mapping of transition_id -> entry dict. Each entry must + have a ``required_authority`` key whose value is a valid + Evidence member name. + + Returns: + A DecisionRecord with outcome APPROVED, REFUSED, or SUPERVISED. + """ + ctx_hash = _context_hash(request, context) + + # 1. Registry membership check. + if request.transition_id not in registry: + return DecisionRecord( + transition_id=request.transition_id, + outcome=TransitionOutcome.REFUSED, + authority_basis=context.authority_basis, + risk_class=request.risk_class, + reason="transition_not_registered", + actor_id=context.actor_id, + tenant_id=context.tenant_id, + timestamp=request.timestamp, + context_hash=ctx_hash, + ) + + entry = registry[request.transition_id] + gate_version = entry.get("gate_version", "") + + # 2. Map to evaluate() context: + # authority_basis = required level (from registry) + # provided_evidence = actual evidence (from context.authority_basis) + eval_context = AuthorityContext( + actor_id=context.actor_id, + authority_basis=entry["required_authority"], + tenant_id=context.tenant_id, + provided_evidence=Evidence[context.authority_basis], + ) + + # 3. Delegate to evaluate() for the gate decision. + gate_result = evaluate(request, eval_context) + + # 4. Reconstruct with the actor's original authority_basis and audit fields. + return DecisionRecord( + transition_id=gate_result.transition_id, + outcome=gate_result.outcome, + authority_basis=context.authority_basis, + risk_class=gate_result.risk_class, + reason=gate_result.reason, + actor_id=context.actor_id, + tenant_id=context.tenant_id, + timestamp=request.timestamp, + gate_version=gate_version, + context_hash=ctx_hash, + ) diff --git a/mgtp/registry.py b/mgtp/registry.py new file mode 100644 index 0000000..ff69602 --- /dev/null +++ b/mgtp/registry.py @@ -0,0 +1,67 @@ +"""registry — Load and validate the MGTP transition registry from JSON.""" + +import json + +from authority_gate import Evidence +from mgtp.types import RiskClass + +_REQUIRED_FIELDS = {"id", "irreversible", "risk_class", "required_authority", "gate_version"} + + +def load_registry(path: str) -> dict: + """Load a transition registry JSON file. + + Returns a dict mapping transition_id -> entry dict. + + Raises: + ValueError: if the file is missing the schema_version field, contains duplicate + transition IDs, is missing required fields, or has invalid + enum values for risk_class or required_authority. + """ + with open(path, "r", encoding="utf-8") as fh: + data = json.load(fh) + + if not isinstance(data, dict) or "schema_version" not in data: + raise ValueError("Registry JSON must contain a top-level 'schema_version' field.") + + transitions = data.get("transitions", []) + registry: dict = {} + + for entry in transitions: + tid = entry.get("id") + if tid is None: + raise ValueError(f"Transition entry missing required field 'id': {entry}") + if tid in registry: + raise ValueError(f"Duplicate transition ID in registry: '{tid}'") + + missing = _REQUIRED_FIELDS - set(entry.keys()) + if missing: + raise ValueError( + f"Transition '{tid}' missing required fields: {sorted(missing)}" + ) + + # Validate risk_class against RiskClass enum. + rc = entry["risk_class"] + try: + RiskClass(rc) + except ValueError: + valid = [e.value for e in RiskClass] + raise ValueError( + f"Transition '{tid}' has invalid risk_class '{rc}'. " + f"Must be one of: {valid}" + ) + + # Validate required_authority against Evidence enum names. + ra = entry["required_authority"] + try: + Evidence[ra] + except KeyError: + valid = [e.name for e in Evidence] + raise ValueError( + f"Transition '{tid}' has invalid required_authority '{ra}'. " + f"Must be one of: {valid}" + ) + + registry[tid] = entry + + return registry diff --git a/mgtp/types.py b/mgtp/types.py new file mode 100644 index 0000000..287736b --- /dev/null +++ b/mgtp/types.py @@ -0,0 +1,169 @@ +"""mgtp.types — All MGTP types including the canonical DecisionRecord. + +There is exactly one DecisionRecord implementation in this package; it lives +here. mgtp.decision_record re-exports it for import convenience. +""" + +import hashlib +import json +import uuid +from dataclasses import dataclass +from enum import Enum +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from authority_gate import Evidence + + +class RiskClass(str, Enum): + LOW = "LOW" + MEDIUM = "MEDIUM" + HIGH = "HIGH" + CRITICAL = "CRITICAL" + + +class TransitionOutcome(str, Enum): + APPROVED = "APPROVED" + REFUSED = "REFUSED" + SUPERVISED = "SUPERVISED" + + +@dataclass(frozen=True) +class TransitionRequest: + transition_id: str + risk_class: RiskClass + irreversible: bool + resource_identifier: str + trust_boundary_crossed: bool + override_token: Optional[str] + timestamp: str # injected; do not call a clock + + +@dataclass(frozen=True) +class AuthorityContext: + actor_id: str + authority_basis: str # Evidence member name, e.g. "OWNER" + tenant_id: str + provided_evidence: Optional["Evidence"] = None + + +# Stable UUID5 namespace for MGTP decision IDs (reuses the DNS UUID namespace). +_DECISION_NS = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8") + + +@dataclass(frozen=True) +class DecisionRecord: + """Immutable, deterministic record of a single transition decision. + + This is the canonical and sole DecisionRecord implementation in the MGTP + layer. mgtp.decision_record re-exports this class; no competing version + exists. + + Core fields (always populated): + transition_id, outcome, authority_basis, risk_class, reason + + Extended audit fields (populated by evaluate_transition; default "" when + not applicable): + actor_id, tenant_id, timestamp, gate_version, context_hash + + Computed properties (derived; never stored as fields): + reason_code — alias for ``reason`` + canonical_bytes — deterministic UTF-8 JSON bytes (sorted keys) + to_canonical_json — alias method for canonical_bytes + canonical_hash — sha256 hex digest of canonical_bytes + content_hash — alias for canonical_hash + decision_id — UUID5 derived from context_hash (empty when absent) + """ + + transition_id: str + outcome: TransitionOutcome + authority_basis: str + risk_class: RiskClass + reason: str + # Extended audit trail (optional; defaults allow simple two-arg construction) + actor_id: str = "" + tenant_id: str = "" + timestamp: str = "" + gate_version: str = "" + context_hash: str = "" + + # ------------------------------------------------------------------ + # Computed properties — no stored duplicates, no divergent logic + # ------------------------------------------------------------------ + + @property + def reason_code(self) -> str: + """Alias for ``reason``, used by registry-based evaluation paths.""" + return self.reason + + @property + def canonical_bytes(self) -> bytes: + """Deterministic UTF-8 JSON bytes: sorted keys, compact separators.""" + obj = { + "authority_basis": self.authority_basis, + "outcome": self.outcome.value, + "reason": self.reason, + "risk_class": self.risk_class.value, + "transition_id": self.transition_id, + } + return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode("utf-8") + + def to_canonical_json(self) -> bytes: + """Return canonical JSON bytes (alias for the canonical_bytes property).""" + return self.canonical_bytes + + @property + def canonical_hash(self) -> str: + """sha256 hex digest (lower-case) of canonical_bytes.""" + return hashlib.sha256(self.canonical_bytes).hexdigest() + + @property + def content_hash(self) -> str: + """Alias for canonical_hash.""" + return self.canonical_hash + + @property + def decision_id(self) -> str: + """UUID5 derived from context_hash; empty string when context_hash absent.""" + if not self.context_hash: + return "" + return str(uuid.uuid5(_DECISION_NS, self.context_hash)) + + # ------------------------------------------------------------------ + # Factory + # ------------------------------------------------------------------ + + @classmethod + def build( + cls, + *, + transition_id: str, + actor_id: str, + tenant_id: str, + authority_basis: str, + risk_class: "str | RiskClass", + outcome: TransitionOutcome, + reason_code: str, + timestamp: str, + gate_version: str, + context_hash: str, + ) -> "DecisionRecord": + """Factory for full audit-trail records (used by evaluate_transition). + + ``reason_code`` is stored as ``reason``; both attributes return the + same value so callers can use either name. + """ + if isinstance(risk_class, str): + risk_class = RiskClass(risk_class) + return cls( + transition_id=transition_id, + outcome=outcome, + authority_basis=authority_basis, + risk_class=risk_class, + reason=reason_code, + actor_id=actor_id, + tenant_id=tenant_id, + timestamp=timestamp, + gate_version=gate_version, + context_hash=context_hash, + ) diff --git a/registry/TRANSITION_REGISTRY_v0.2.json b/registry/TRANSITION_REGISTRY_v0.2.json new file mode 100644 index 0000000..72b81ed --- /dev/null +++ b/registry/TRANSITION_REGISTRY_v0.2.json @@ -0,0 +1,12 @@ +{ + "schema_version": "0.2", + "transitions": [ + { + "id": "TOOL_CALL_HTTP", + "irreversible": true, + "risk_class": "HIGH", + "required_authority": "OWNER", + "gate_version": "v0.2" + } + ] +} diff --git a/tests/test_decision_record.py b/tests/test_decision_record.py new file mode 100644 index 0000000..9a3afa0 --- /dev/null +++ b/tests/test_decision_record.py @@ -0,0 +1,216 @@ +"""Tests for mgtp.decision_record / mgtp.types.DecisionRecord. + +There is exactly one DecisionRecord implementation (mgtp.types.DecisionRecord). +mgtp.decision_record re-exports it; both import paths are tested here to confirm +there is no duplicate or competing class. +""" + +import hashlib +import json +import sys +from pathlib import Path + +import pytest + +# Ensure the repo root is on sys.path so authority_gate can be imported. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from mgtp.decision_record import DecisionRecord as DR_from_decision_record +from mgtp.types import DecisionRecord, TransitionOutcome, RiskClass + + +_COMMON = dict( + transition_id="TOOL_CALL_HTTP", + actor_id="user-42", + tenant_id="tenant-99", + authority_basis="OWNER", + risk_class="HIGH", + outcome=TransitionOutcome.APPROVED, + reason_code="APPROVED", + timestamp="2025-01-01T00:00:00Z", + gate_version="v0.2", + context_hash="abc123", +) + + +# --------------------------------------------------------------------------- +# Single canonical class (no duplicate) +# --------------------------------------------------------------------------- + + +def test_single_implementation_same_object(): + """mgtp.decision_record.DecisionRecord is the same class as mgtp.types.DecisionRecord.""" + assert DR_from_decision_record is DecisionRecord + + +# --------------------------------------------------------------------------- +# build() factory +# --------------------------------------------------------------------------- + + +def test_build_returns_decision_record(): + rec = DecisionRecord.build(**_COMMON) + assert isinstance(rec, DecisionRecord) + + +def test_build_stores_reason_as_reason_code(): + rec = DecisionRecord.build(**_COMMON) + assert rec.reason == "APPROVED" + assert rec.reason_code == "APPROVED" + + +def test_build_accepts_string_risk_class(): + rec = DecisionRecord.build(**_COMMON) + assert rec.risk_class == RiskClass.HIGH + assert rec.risk_class == "HIGH" # str Enum equality + + +def test_build_accepts_enum_risk_class(): + kwargs = {**_COMMON, "risk_class": RiskClass.HIGH} + rec = DecisionRecord.build(**kwargs) + assert rec.risk_class == RiskClass.HIGH + + +# --------------------------------------------------------------------------- +# Record is immutable +# --------------------------------------------------------------------------- + + +def test_record_is_immutable(): + rec = DecisionRecord.build(**_COMMON) + with pytest.raises((AttributeError, TypeError)): + rec.transition_id = "OTHER" # type: ignore[misc] + + +# --------------------------------------------------------------------------- +# canonical_bytes property +# --------------------------------------------------------------------------- + + +def test_canonical_bytes_are_bytes(): + rec = DecisionRecord.build(**_COMMON) + assert isinstance(rec.canonical_bytes, bytes) + + +def test_canonical_bytes_are_valid_json(): + rec = DecisionRecord.build(**_COMMON) + parsed = json.loads(rec.canonical_bytes) + assert parsed["transition_id"] == "TOOL_CALL_HTTP" + assert parsed["outcome"] == TransitionOutcome.APPROVED.value + + +def test_canonical_bytes_sorted_keys(): + rec = DecisionRecord.build(**_COMMON) + keys = list(json.loads(rec.canonical_bytes).keys()) + assert keys == sorted(keys) + + +def test_canonical_bytes_compact_separators(): + rec = DecisionRecord.build(**_COMMON) + raw = rec.canonical_bytes.decode("utf-8") + assert ": " not in raw + assert ", " not in raw + + +def test_to_canonical_json_alias(): + rec = DecisionRecord.build(**_COMMON) + assert rec.to_canonical_json() == rec.canonical_bytes + + +# --------------------------------------------------------------------------- +# Determinism +# --------------------------------------------------------------------------- + + +def test_determinism_same_inputs_same_canonical_bytes(): + r1 = DecisionRecord.build(**_COMMON) + r2 = DecisionRecord.build(**_COMMON) + assert r1.canonical_bytes == r2.canonical_bytes + + +def test_determinism_same_inputs_same_content_hash(): + r1 = DecisionRecord.build(**_COMMON) + r2 = DecisionRecord.build(**_COMMON) + assert r1.content_hash == r2.content_hash + + +def test_determinism_same_inputs_same_decision_id(): + r1 = DecisionRecord.build(**_COMMON) + r2 = DecisionRecord.build(**_COMMON) + assert r1.decision_id == r2.decision_id + + +def test_different_context_hash_produces_different_decision_id(): + r1 = DecisionRecord.build(**_COMMON) + r2 = DecisionRecord.build(**{**_COMMON, "context_hash": "different"}) + assert r1.decision_id != r2.decision_id + + +# --------------------------------------------------------------------------- +# content_hash is sha256 of canonical_bytes +# --------------------------------------------------------------------------- + + +def test_content_hash_matches_sha256(): + rec = DecisionRecord.build(**_COMMON) + expected = hashlib.sha256(rec.canonical_bytes).hexdigest() + assert rec.content_hash == expected + + +def test_content_hash_is_64_hex_chars(): + rec = DecisionRecord.build(**_COMMON) + assert len(rec.content_hash) == 64 + assert rec.content_hash == rec.content_hash.lower() + + +# --------------------------------------------------------------------------- +# decision_id +# --------------------------------------------------------------------------- + + +def test_decision_id_is_uuid_format(): + rec = DecisionRecord.build(**_COMMON) + assert isinstance(rec.decision_id, str) + assert len(rec.decision_id) == 36 # UUID5 + + +def test_decision_id_empty_when_no_context_hash(): + rec = DecisionRecord( + transition_id="x", + outcome=TransitionOutcome.APPROVED, + authority_basis="OWNER", + risk_class=RiskClass.LOW, + reason="authority_sufficient", + ) + assert rec.decision_id == "" + + +# --------------------------------------------------------------------------- +# Simple 5-field construction (used by evaluate()) +# --------------------------------------------------------------------------- + + +def test_simple_construction(): + rec = DecisionRecord( + transition_id="tx-1", + outcome=TransitionOutcome.REFUSED, + authority_basis="USER", + risk_class=RiskClass.HIGH, + reason="authority_insufficient", + ) + assert rec.reason_code == "authority_insufficient" + assert rec.actor_id == "" + assert rec.context_hash == "" + assert rec.decision_id == "" + + +def test_all_required_fields_present(): + rec = DecisionRecord.build(**_COMMON) + assert rec.transition_id == "TOOL_CALL_HTTP" + assert rec.actor_id == "user-42" + assert rec.tenant_id == "tenant-99" + assert rec.authority_basis == "OWNER" + assert rec.outcome is TransitionOutcome.APPROVED + assert rec.timestamp == "2025-01-01T00:00:00Z" + assert rec.gate_version == "v0.2" + assert rec.context_hash == "abc123" diff --git a/tests/test_evaluate_coherence.py b/tests/test_evaluate_coherence.py new file mode 100644 index 0000000..add6ab4 --- /dev/null +++ b/tests/test_evaluate_coherence.py @@ -0,0 +1,192 @@ +"""Tests proving evaluate() and evaluate_transition() have no divergent behaviour. + +The gate decision (ALLOW/DENY) is performed by AuthorityGate in both paths. +This test suite confirms that for equivalent authority configurations, both +entrypoints produce the same outcome — satisfying the coherence requirement +from the PR #6 conflict-resolution review. +""" + +import sys +import textwrap +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from authority_gate import Evidence +from mgtp.evaluate import evaluate +from mgtp.evaluate_transition import evaluate_transition +from mgtp.types import ( + AuthorityContext, + RiskClass, + TransitionOutcome, + TransitionRequest, +) + +_TS = "2026-01-15T12:00:00Z" + + +def _req( + transition_id="OP", + risk_class=RiskClass.MEDIUM, + irreversible=False, + resource_identifier="res-x", + trust_boundary_crossed=False, + override_token=None, + timestamp=_TS, +): + return TransitionRequest( + transition_id=transition_id, + risk_class=risk_class, + irreversible=irreversible, + resource_identifier=resource_identifier, + trust_boundary_crossed=trust_boundary_crossed, + override_token=override_token, + timestamp=timestamp, + ) + + +def _registry(transition_id="OP", required_authority="OWNER", risk_class="MEDIUM"): + return { + transition_id: { + "id": transition_id, + "irreversible": False, + "risk_class": risk_class, + "required_authority": required_authority, + "gate_version": "v0.1", + } + } + + +def _eval_ctx(required_authority: str, actual_authority: str) -> AuthorityContext: + """Context for evaluate(): authority_basis = required, provided_evidence = actual.""" + return AuthorityContext( + actor_id="actor", + authority_basis=required_authority, + tenant_id="t1", + provided_evidence=Evidence[actual_authority], + ) + + +def _trans_ctx(actual_authority: str) -> AuthorityContext: + """Context for evaluate_transition(): authority_basis = actor's actual authority.""" + return AuthorityContext( + actor_id="actor", + authority_basis=actual_authority, + tenant_id="t1", + ) + + +# --------------------------------------------------------------------------- +# Coherence: same gate decision for APPROVED cases +# --------------------------------------------------------------------------- + + +def test_approved_coherence_owner_required_owner_provided(): + req = _req() + reg = _registry(required_authority="OWNER") + eval_ctx = _eval_ctx("OWNER", "OWNER") + trans_ctx = _trans_ctx("OWNER") + + r_eval = evaluate(req, eval_ctx) + r_trans = evaluate_transition(req, trans_ctx, reg) + + assert r_eval.outcome is TransitionOutcome.APPROVED + assert r_trans.outcome is TransitionOutcome.APPROVED + + +def test_approved_coherence_user_required_owner_provided(): + req = _req() + reg = _registry(required_authority="USER") + eval_ctx = _eval_ctx("USER", "OWNER") + trans_ctx = _trans_ctx("OWNER") + + r_eval = evaluate(req, eval_ctx) + r_trans = evaluate_transition(req, trans_ctx, reg) + + assert r_eval.outcome is TransitionOutcome.APPROVED + assert r_trans.outcome is TransitionOutcome.APPROVED + + +def test_approved_coherence_none_required_user_provided(): + req = _req() + reg = _registry(required_authority="NONE") + eval_ctx = _eval_ctx("NONE", "USER") + trans_ctx = _trans_ctx("USER") + + r_eval = evaluate(req, eval_ctx) + r_trans = evaluate_transition(req, trans_ctx, reg) + + assert r_eval.outcome is TransitionOutcome.APPROVED + assert r_trans.outcome is TransitionOutcome.APPROVED + + +# --------------------------------------------------------------------------- +# Coherence: same gate decision for REFUSED cases +# --------------------------------------------------------------------------- + + +def test_refused_coherence_owner_required_user_provided(): + req = _req() + reg = _registry(required_authority="OWNER") + eval_ctx = _eval_ctx("OWNER", "USER") + trans_ctx = _trans_ctx("USER") + + r_eval = evaluate(req, eval_ctx) + r_trans = evaluate_transition(req, trans_ctx, reg) + + assert r_eval.outcome is TransitionOutcome.REFUSED + assert r_trans.outcome is TransitionOutcome.REFUSED + + +def test_refused_coherence_admin_required_owner_provided(): + req = _req() + reg = _registry(required_authority="ADMIN") + eval_ctx = _eval_ctx("ADMIN", "OWNER") + trans_ctx = _trans_ctx("OWNER") + + r_eval = evaluate(req, eval_ctx) + r_trans = evaluate_transition(req, trans_ctx, reg) + + assert r_eval.outcome is TransitionOutcome.REFUSED + assert r_trans.outcome is TransitionOutcome.REFUSED + + +# --------------------------------------------------------------------------- +# Coherence: SUPERVISED path (CRITICAL + irreversible) +# --------------------------------------------------------------------------- + + +def test_supervised_coherence_critical_irreversible(): + req = _req(risk_class=RiskClass.CRITICAL, irreversible=True) + reg = _registry(required_authority="ADMIN", risk_class="CRITICAL") + eval_ctx = _eval_ctx("ADMIN", "ADMIN") + trans_ctx = _trans_ctx("ADMIN") + + r_eval = evaluate(req, eval_ctx) + r_trans = evaluate_transition(req, trans_ctx, reg) + + assert r_eval.outcome is TransitionOutcome.SUPERVISED + assert r_trans.outcome is TransitionOutcome.SUPERVISED + + +# --------------------------------------------------------------------------- +# No third entrypoint — confirm only evaluate() and evaluate_transition() exist +# --------------------------------------------------------------------------- + + +def test_no_third_entrypoint_in_evaluate_module(): + """mgtp.evaluate exposes only 'evaluate' via __all__.""" + import mgtp.evaluate as mod + + assert mod.__all__ == ["evaluate"], ( + f"Unexpected extra entrypoints in mgtp.evaluate.__all__: {mod.__all__}" + ) + + +def test_no_third_entrypoint_in_evaluate_transition_module(): + """mgtp.evaluate_transition exposes only 'evaluate_transition' via __all__.""" + import mgtp.evaluate_transition as mod + + assert mod.__all__ == ["evaluate_transition"], ( + f"Unexpected extra entrypoints in mgtp.evaluate_transition.__all__: {mod.__all__}" + ) diff --git a/tests/test_mgtp_evaluate.py b/tests/test_mgtp_evaluate.py new file mode 100644 index 0000000..dfdc407 --- /dev/null +++ b/tests/test_mgtp_evaluate.py @@ -0,0 +1,201 @@ +"""Tests for mgtp.evaluate — registry-free evaluator.""" + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from authority_gate import Evidence +from mgtp.evaluate import evaluate +from mgtp.types import ( + AuthorityContext, + DecisionRecord, + RiskClass, + TransitionOutcome, + TransitionRequest, +) + +_TS = "2026-01-01T00:00:00Z" + + +def _req( + transition_id="tx-001", + risk_class=RiskClass.MEDIUM, + irreversible=False, + resource_identifier="res://alpha", + trust_boundary_crossed=False, + override_token=None, + timestamp=_TS, +): + return TransitionRequest( + transition_id=transition_id, + risk_class=risk_class, + irreversible=irreversible, + resource_identifier=resource_identifier, + trust_boundary_crossed=trust_boundary_crossed, + override_token=override_token, + timestamp=timestamp, + ) + + +def _ctx(authority_basis="OWNER", provided_evidence=Evidence.OWNER, actor_id="alice", tenant_id="t1"): + return AuthorityContext( + actor_id=actor_id, + authority_basis=authority_basis, + tenant_id=tenant_id, + provided_evidence=provided_evidence, + ) + + +# --------------------------------------------------------------------------- +# Happy path — APPROVED +# --------------------------------------------------------------------------- + + +def test_approved_when_authority_sufficient(): + rec = evaluate(_req(), _ctx()) + assert rec.outcome == TransitionOutcome.APPROVED + assert rec.reason == "authority_sufficient" + assert rec.transition_id == "tx-001" + + +def test_approved_returns_decision_record(): + rec = evaluate(_req(), _ctx()) + assert isinstance(rec, DecisionRecord) + + +# --------------------------------------------------------------------------- +# REFUSED paths +# --------------------------------------------------------------------------- + + +def test_refused_when_authority_insufficient(): + ctx = _ctx(authority_basis="OWNER", provided_evidence=Evidence.USER) + rec = evaluate(_req(), ctx) + assert rec.outcome == TransitionOutcome.REFUSED + assert rec.reason == "authority_insufficient" + + +def test_refused_when_provided_none_evidence(): + ctx = _ctx(authority_basis="OWNER", provided_evidence=Evidence.NONE) + rec = evaluate(_req(), ctx) + assert rec.outcome == TransitionOutcome.REFUSED + + +# --------------------------------------------------------------------------- +# SUPERVISED path +# --------------------------------------------------------------------------- + + +def test_supervised_for_critical_irreversible(): + req = _req(risk_class=RiskClass.CRITICAL, irreversible=True) + ctx = _ctx(authority_basis="ADMIN", provided_evidence=Evidence.ADMIN) + rec = evaluate(req, ctx) + assert rec.outcome == TransitionOutcome.SUPERVISED + assert rec.reason == "critical_irreversible_supervised" + + +def test_not_supervised_when_critical_but_reversible(): + req = _req(risk_class=RiskClass.CRITICAL, irreversible=False) + ctx = _ctx(authority_basis="ADMIN", provided_evidence=Evidence.ADMIN) + rec = evaluate(req, ctx) + assert rec.outcome == TransitionOutcome.APPROVED + + +def test_not_supervised_when_irreversible_but_not_critical(): + req = _req(risk_class=RiskClass.HIGH, irreversible=True) + ctx = _ctx(authority_basis="OWNER", provided_evidence=Evidence.OWNER) + rec = evaluate(req, ctx) + assert rec.outcome == TransitionOutcome.APPROVED + + +# --------------------------------------------------------------------------- +# Error cases +# --------------------------------------------------------------------------- + + +def test_raises_value_error_when_provided_evidence_none(): + ctx = AuthorityContext(actor_id="alice", authority_basis="OWNER", tenant_id="t1") + with pytest.raises(ValueError, match="provided_evidence is required"): + evaluate(_req(), ctx) + + +def test_raises_value_error_on_unknown_authority_basis(): + ctx = _ctx(authority_basis="UNKNOWN_LEVEL") + with pytest.raises(ValueError, match="Unknown authority_basis"): + evaluate(_req(), ctx) + + +# --------------------------------------------------------------------------- +# Record fields +# --------------------------------------------------------------------------- + + +def test_authority_basis_reflects_required_level(): + ctx = _ctx(authority_basis="OWNER") + rec = evaluate(_req(), ctx) + assert rec.authority_basis == "OWNER" + + +def test_risk_class_propagated(): + rec = evaluate(_req(risk_class=RiskClass.HIGH), _ctx()) + assert rec.risk_class == RiskClass.HIGH + + +def test_record_is_frozen(): + rec = evaluate(_req(), _ctx()) + try: + rec.outcome = TransitionOutcome.REFUSED # type: ignore[misc] + assert False, "Should raise" + except (AttributeError, TypeError): + pass + + +# --------------------------------------------------------------------------- +# Determinism +# --------------------------------------------------------------------------- + + +def test_deterministic_same_inputs(): + r1 = evaluate(_req(), _ctx()) + r2 = evaluate(_req(), _ctx()) + assert r1 == r2 + assert r1.canonical_bytes == r2.canonical_bytes + + +def test_canonical_bytes_deterministic_across_20_calls(): + results = [evaluate(_req(), _ctx()).canonical_bytes for _ in range(20)] + assert all(r == results[0] for r in results) + + +# --------------------------------------------------------------------------- +# Boundary: importing mgtp does not affect authority_gate behaviour +# --------------------------------------------------------------------------- + + +def test_authority_gate_unaffected_after_mgtp_import(): + from authority_gate import AuthorityGate, Decision + + import mgtp # noqa: F401 + + gate = AuthorityGate(Evidence.OWNER) + assert gate.check(Evidence.ADMIN) is Decision.ALLOW + assert gate.check(Evidence.USER) is Decision.DENY + + +def test_commit_gate_not_imported_by_mgtp(): + import importlib + + mgtp_keys = [k for k in sys.modules if k == "mgtp" or k.startswith("mgtp.")] + saved = {k: sys.modules.pop(k) for k in mgtp_keys} + pre = {k for k in sys.modules if k.startswith("commit_gate")} + try: + importlib.import_module("mgtp") + post = {k for k in sys.modules if k.startswith("commit_gate")} + assert not (post - pre), f"mgtp import introduced commit_gate modules: {post - pre}" + finally: + for k in [k for k in sys.modules if k == "mgtp" or k.startswith("mgtp.")]: + sys.modules.pop(k, None) + sys.modules.update(saved) diff --git a/tests/test_mgtp_evaluate_transition.py b/tests/test_mgtp_evaluate_transition.py new file mode 100644 index 0000000..13317f0 --- /dev/null +++ b/tests/test_mgtp_evaluate_transition.py @@ -0,0 +1,185 @@ +"""Tests for mgtp.evaluate_transition — registry-based thin-wrapper evaluator.""" + +import json +import os +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from mgtp.evaluate_transition import evaluate_transition +from mgtp.registry import load_registry +from mgtp.types import AuthorityContext, DecisionRecord, RiskClass, TransitionOutcome, TransitionRequest + +_REAL_REGISTRY = os.path.join( + os.path.dirname(__file__), "..", "registry", "TRANSITION_REGISTRY_v0.2.json" +) +_TS = "2025-06-01T12:00:00Z" + + +@pytest.fixture() +def real_registry(): + return load_registry(_REAL_REGISTRY) + + +def _req( + transition_id="TOOL_CALL_HTTP", + risk_class=RiskClass.HIGH, + irreversible=True, + override_token=None, + resource_identifier="res-1", + trust_boundary_crossed=False, + timestamp=_TS, +): + return TransitionRequest( + transition_id=transition_id, + risk_class=risk_class, + irreversible=irreversible, + resource_identifier=resource_identifier, + trust_boundary_crossed=trust_boundary_crossed, + override_token=override_token, + timestamp=timestamp, + ) + + +def _ctx(actor_id="alice", authority_basis="OWNER", tenant_id="t1"): + return AuthorityContext(actor_id=actor_id, authority_basis=authority_basis, tenant_id=tenant_id) + + +# --------------------------------------------------------------------------- +# Scenario 1: undeclared transition → REFUSED +# --------------------------------------------------------------------------- +def test_undeclared_transition_refused(real_registry): + req = _req(transition_id="NO_SUCH_TRANSITION") + rec = evaluate_transition(req, _ctx(), real_registry) + assert rec.outcome is TransitionOutcome.REFUSED + assert rec.reason_code == "transition_not_registered" + + +# --------------------------------------------------------------------------- +# Scenario 2: insufficient authority → REFUSED +# --------------------------------------------------------------------------- +def test_insufficient_authority_refused(real_registry): + req = _req() + ctx = _ctx(authority_basis="USER") + rec = evaluate_transition(req, ctx, real_registry) + assert rec.outcome is TransitionOutcome.REFUSED + assert rec.reason_code == "authority_insufficient" + + +def test_none_authority_refused(real_registry): + req = _req() + ctx = _ctx(authority_basis="NONE") + rec = evaluate_transition(req, ctx, real_registry) + assert rec.outcome is TransitionOutcome.REFUSED + assert rec.reason_code == "authority_insufficient" + + +# --------------------------------------------------------------------------- +# Scenario 3: sufficient authority → APPROVED (HIGH risk, not CRITICAL+irreversible) +# --------------------------------------------------------------------------- +def test_owner_approved_for_high_risk(real_registry): + req = _req(risk_class=RiskClass.HIGH, irreversible=False) + ctx = _ctx(authority_basis="OWNER") + rec = evaluate_transition(req, ctx, real_registry) + assert rec.outcome is TransitionOutcome.APPROVED + assert rec.reason_code == "authority_sufficient" + + +# --------------------------------------------------------------------------- +# Scenario 4: CRITICAL + irreversible → SUPERVISED +# --------------------------------------------------------------------------- +def test_critical_irreversible_supervised(tmp_path): + content = json.dumps({ + "schema_version": "0.2", + "transitions": [ + { + "id": "CRITICAL_OP", + "irreversible": True, + "risk_class": "CRITICAL", + "required_authority": "ADMIN", + "gate_version": "v0.2" + } + ] + }) + p = tmp_path / "r.json" + p.write_text(content, encoding="utf-8") + reg = load_registry(str(p)) + req = _req(transition_id="CRITICAL_OP", risk_class=RiskClass.CRITICAL, irreversible=True) + ctx = _ctx(authority_basis="ADMIN") + rec = evaluate_transition(req, ctx, reg) + assert rec.outcome is TransitionOutcome.SUPERVISED + assert rec.reason_code == "critical_irreversible_supervised" + + +# --------------------------------------------------------------------------- +# Scenario 5: LOW risk with sufficient authority → APPROVED +# --------------------------------------------------------------------------- +def test_low_risk_approved(tmp_path): + content = json.dumps({ + "schema_version": "0.2", + "transitions": [ + { + "id": "READ_ONLY", + "irreversible": False, + "risk_class": "LOW", + "required_authority": "USER", + "gate_version": "v0.1" + } + ] + }) + p = tmp_path / "r.json" + p.write_text(content, encoding="utf-8") + reg = load_registry(str(p)) + req = _req(transition_id="READ_ONLY", risk_class=RiskClass.LOW, irreversible=False) + ctx = _ctx(authority_basis="USER") + rec = evaluate_transition(req, ctx, reg) + assert rec.outcome is TransitionOutcome.APPROVED + assert rec.reason_code == "authority_sufficient" + + +# --------------------------------------------------------------------------- +# Result is a DecisionRecord +# --------------------------------------------------------------------------- +def test_returns_decision_record(real_registry): + req = _req(risk_class=RiskClass.HIGH, irreversible=False) + rec = evaluate_transition(req, _ctx(), real_registry) + assert isinstance(rec, DecisionRecord) + + +# --------------------------------------------------------------------------- +# Audit fields populated +# --------------------------------------------------------------------------- +def test_audit_fields_populated(real_registry): + req = _req(risk_class=RiskClass.HIGH, irreversible=False) + ctx = _ctx(actor_id="alice", tenant_id="t1") + rec = evaluate_transition(req, ctx, real_registry) + assert rec.actor_id == "alice" + assert rec.tenant_id == "t1" + assert rec.timestamp == _TS + assert rec.gate_version == "v0.2" + assert rec.context_hash != "" + assert rec.decision_id != "" + assert rec.content_hash != "" + + +def test_authority_basis_is_actor_authority(real_registry): + """authority_basis in the record reflects the actor's provided authority, not required.""" + req = _req(risk_class=RiskClass.HIGH, irreversible=False) + rec = evaluate_transition(req, _ctx(authority_basis="OWNER"), real_registry) + assert rec.authority_basis == "OWNER" + + +# --------------------------------------------------------------------------- +# Determinism +# --------------------------------------------------------------------------- +def test_deterministic_same_inputs(real_registry): + req = _req(risk_class=RiskClass.HIGH, irreversible=False) + ctx = _ctx() + r1 = evaluate_transition(req, ctx, real_registry) + r2 = evaluate_transition(req, ctx, real_registry) + assert r1.canonical_bytes == r2.canonical_bytes + assert r1.content_hash == r2.content_hash + assert r1.decision_id == r2.decision_id