Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mgtp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# MGTP package
10 changes: 10 additions & 0 deletions mgtp/decision_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""mgtp.decision_record — Re-exports the canonical DecisionRecord.

The sole DecisionRecord implementation lives in mgtp.types.
This module exists so that ``from mgtp.decision_record import DecisionRecord``
continues to work without introducing a duplicate or competing class.
"""

from mgtp.types import DecisionRecord # noqa: F401

__all__ = ["DecisionRecord"]
61 changes: 61 additions & 0 deletions mgtp/evaluate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""mgtp.evaluate — Canonical registry-free MGTP evaluator.

Public entrypoint: evaluate()

This module exposes exactly one evaluation function. For registry-based
evaluation see mgtp.evaluate_transition.

Invariants:
- evaluate() is pure: same inputs produce same output.
- No file I/O. No network calls. No state mutation.
- No randomness. No clock calls. No logging.
"""

from authority_gate import AuthorityGate, Decision, Evidence

__all__ = ["evaluate"]

from .types import AuthorityContext, DecisionRecord, RiskClass, TransitionOutcome, TransitionRequest


def evaluate(request: TransitionRequest, context: AuthorityContext) -> DecisionRecord:
"""Evaluate a TransitionRequest against an AuthorityContext without a registry.

``context.authority_basis`` names the *required* Evidence level.
``context.provided_evidence`` carries the *actual* Evidence provided.

Raises:
ValueError: if ``context.provided_evidence`` is None.
ValueError: if ``context.authority_basis`` is not a recognised Evidence name.

Returns:
A DecisionRecord with outcome APPROVED, REFUSED, or SUPERVISED.
"""
if context.provided_evidence is None:
raise ValueError("provided_evidence is required for MGTP evaluation")

try:
required = Evidence[context.authority_basis]
except KeyError:
raise ValueError(f"Unknown authority_basis: {context.authority_basis!r}")

gate = AuthorityGate(required)
decision = gate.check(context.provided_evidence)

if decision == Decision.DENY:
reason = "authority_insufficient"
outcome = TransitionOutcome.REFUSED
elif request.irreversible and request.risk_class == RiskClass.CRITICAL:
reason = "critical_irreversible_supervised"
outcome = TransitionOutcome.SUPERVISED
else:
reason = "authority_sufficient"
outcome = TransitionOutcome.APPROVED

return DecisionRecord(
transition_id=request.transition_id,
outcome=outcome,
authority_basis=context.authority_basis,
risk_class=request.risk_class,
reason=reason,
)
119 changes: 119 additions & 0 deletions mgtp/evaluate_transition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""mgtp.evaluate_transition — Registry-based MGTP evaluator.

Public entrypoint: evaluate_transition()

This function is a thin wrapper around evaluate() (mgtp.evaluate). It adds
registry pre-check and context mapping, then delegates the core gate decision
to evaluate(). There is no divergent gate behaviour between the two paths.

Evaluation pipeline:
1. Registry membership check → REFUSED / transition_not_registered
2. Map (entry["required_authority"], context.authority_basis) to an
(authority_basis=required, provided_evidence=actual) context.
3. Delegate to evaluate() for the gate decision.
4. Reconstruct DecisionRecord with the actor's original authority_basis and
extended audit fields (context_hash, actor_id, tenant_id, …).

Invariants:
- evaluate_transition() is pure: same inputs produce same output.
- No file I/O. No network calls. No state mutation.
- No randomness. No clock calls. No logging.
"""

import hashlib
import json

from authority_gate import Evidence

__all__ = ["evaluate_transition"]

from .decision_record import DecisionRecord
from .evaluate import evaluate
from .types import AuthorityContext, TransitionOutcome, TransitionRequest


def _context_hash(request: TransitionRequest, context: AuthorityContext) -> str:
"""Deterministic sha256 over the canonical representation of (request, context)."""
payload = {
"actor_id": context.actor_id,
"authority_basis": context.authority_basis,
"irreversible": request.irreversible,
"override_token": request.override_token,
"resource_identifier": request.resource_identifier,
"risk_class": request.risk_class.value,
"tenant_id": context.tenant_id,
"timestamp": request.timestamp,
"transition_id": request.transition_id,
"trust_boundary_crossed": request.trust_boundary_crossed,
}
canonical = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8")
return hashlib.sha256(canonical).hexdigest()


def evaluate_transition(
request: TransitionRequest,
context: AuthorityContext,
registry: dict,
) -> DecisionRecord:
"""Evaluate a TransitionRequest against the registry and authority context.

This is a thin wrapper around evaluate(): registry lookup and context
mapping happen here; the core gate decision is delegated to evaluate().

Args:
request: The transition being requested.
context: The authority context of the requesting actor.
``context.authority_basis`` names the *provided* Evidence
level (looked up in the Evidence enum).
registry: Mapping of transition_id -> entry dict. Each entry must
have a ``required_authority`` key whose value is a valid
Evidence member name.

Returns:
A DecisionRecord with outcome APPROVED, REFUSED, or SUPERVISED.
"""
ctx_hash = _context_hash(request, context)

# 1. Registry membership check.
if request.transition_id not in registry:
return DecisionRecord(
transition_id=request.transition_id,
outcome=TransitionOutcome.REFUSED,
authority_basis=context.authority_basis,
risk_class=request.risk_class,
reason="transition_not_registered",
actor_id=context.actor_id,
tenant_id=context.tenant_id,
timestamp=request.timestamp,
context_hash=ctx_hash,
)

entry = registry[request.transition_id]
gate_version = entry.get("gate_version", "")

# 2. Map to evaluate() context:
# authority_basis = required level (from registry)
# provided_evidence = actual evidence (from context.authority_basis)
eval_context = AuthorityContext(
actor_id=context.actor_id,
authority_basis=entry["required_authority"],
tenant_id=context.tenant_id,
provided_evidence=Evidence[context.authority_basis],
)

# 3. Delegate to evaluate() for the gate decision.
gate_result = evaluate(request, eval_context)

# 4. Reconstruct with the actor's original authority_basis and audit fields.
return DecisionRecord(
transition_id=gate_result.transition_id,
outcome=gate_result.outcome,
authority_basis=context.authority_basis,
risk_class=gate_result.risk_class,
reason=gate_result.reason,
actor_id=context.actor_id,
tenant_id=context.tenant_id,
timestamp=request.timestamp,
gate_version=gate_version,
context_hash=ctx_hash,
)
67 changes: 67 additions & 0 deletions mgtp/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""registry — Load and validate the MGTP transition registry from JSON."""

import json

from authority_gate import Evidence
from mgtp.types import RiskClass

_REQUIRED_FIELDS = {"id", "irreversible", "risk_class", "required_authority", "gate_version"}


def load_registry(path: str) -> dict:
"""Load a transition registry JSON file.

Returns a dict mapping transition_id -> entry dict.

Raises:
ValueError: if the file is missing the schema_version field, contains duplicate
transition IDs, is missing required fields, or has invalid
enum values for risk_class or required_authority.
"""
with open(path, "r", encoding="utf-8") as fh:
data = json.load(fh)

if not isinstance(data, dict) or "schema_version" not in data:
raise ValueError("Registry JSON must contain a top-level 'schema_version' field.")

transitions = data.get("transitions", [])
registry: dict = {}

for entry in transitions:
tid = entry.get("id")
if tid is None:
raise ValueError(f"Transition entry missing required field 'id': {entry}")
if tid in registry:
raise ValueError(f"Duplicate transition ID in registry: '{tid}'")

missing = _REQUIRED_FIELDS - set(entry.keys())
if missing:
raise ValueError(
f"Transition '{tid}' missing required fields: {sorted(missing)}"
)

# Validate risk_class against RiskClass enum.
rc = entry["risk_class"]
try:
RiskClass(rc)
except ValueError:
valid = [e.value for e in RiskClass]
raise ValueError(
f"Transition '{tid}' has invalid risk_class '{rc}'. "
f"Must be one of: {valid}"
)

# Validate required_authority against Evidence enum names.
ra = entry["required_authority"]
try:
Evidence[ra]
except KeyError:
valid = [e.name for e in Evidence]
raise ValueError(
f"Transition '{tid}' has invalid required_authority '{ra}'. "
f"Must be one of: {valid}"
)

registry[tid] = entry

return registry
Loading