#!/usr/bin/env python3 """M-UESS v1.5 Validator - Receipt Ledger Schema Enforcement with Life-Criticality""" import json, sys from datetime import datetime from typing import Dict, Any, List, Optional BASE_CORE_REQUIRED = { "receipt_id": str, "domain": str, "jurisdiction": str, "gatekeeper": str, "burdened_party": str, "decision_node": dict, "remedy_execution": dict, } DECISION_NODE_REQUIRED = { "submission_date": str, "statutory_sla_days": (int, float), "latency_variance_days": (int, float), } REMEDY_REQUIRED = { "auto_expire_triggered": bool, "burden_inverted": bool, "penalty_accrued_usd": (int, float), } EXTENSION_MODULES = { "structural": {"sovereignty_tier": (int,), "vendor_concentration": (int, float)}, "social": {"demographic_skew_delta": (int, float), "contextual_omission_flag": bool, "agency_override_success_rate": (int, float)}, "systemic": {"contingency_loss_probability": (int, float)}, } VALID_DOMAINS = ["grid","utility_interconnection","robotics","housing","healthcare","transit","water","education","arctic","algorithmic","telecommunications","agriculture"] VALID_CRITICALITY = ["A","B","C"] VALID_VERDICT_STATUS = ["ACCEPT","REJECT","WARN"] VALID_AUDIT_STREAMS = ["raw_telemetry","community_consensus","institutional_certification","NIST_standard"] CRITICALITY_WEIGHTS = {"A": 10.0, "B": 1.0, "C": 0.5} class MuessValidator: def __init__(self): self.errors = [] self.warnings = [] self.receipt = None def validate(self, receipt_json: str) -> Dict[str, Any]: self.errors, self.warnings, self.receipt = [], [], None try: self.receipt = json.loads(receipt_json) except json.JSONDecodeError as e: self.errors.append(f"Invalid JSON: {e}") return self._build_report() self._validate_base_core() if not self.errors: self._validate_decision_node() self._validate_remedy() self._validate_extensions() self._validate_provenance() verdict = self._compute_verdict() return self._build_report(verdict) def _validate_base_core(self): r = self.receipt for field, etype in BASE_CORE_REQUIRED.items(): if field not in r: self.errors.append(f"Missing required field: {field}") elif not isinstance(r[field], etype): self.errors.append(f"Field '{field}' expected {etype.__name__}, got {type(r[field]).__name__}") if "domain" in r and r["domain"] not in VALID_DOMAINS: self.warnings.append(f"Domain '{r['domain']}' not in standard list") def _validate_decision_node(self): dn = self.receipt.get("decision_node", {}) for field, etype in DECISION_NODE_REQUIRED.items(): if field not in dn: self.errors.append(f"decision_node missing: {field}") elif not isinstance(dn[field], etype): self.errors.append(f"decision_node.{field} type mismatch") if "submission_date" in dn: try: datetime.strptime(dn["submission_date"], "%Y-%m-%d") except ValueError: self.errors.append(f"submission_date invalid: {dn['submission_date']}") def _validate_remedy(self): rem = self.receipt.get("remedy_execution", {}) for field, etype in REMEDY_REQUIRED.items(): if field not in rem: self.errors.append(f"remedy_execution missing: {field}") elif not isinstance(rem[field], etype): self.errors.append(f"remedy_execution.{field} type mismatch") dv = rem.get("deployment_verdict") if dv and "status" in dv and dv["status"] not in VALID_VERDICT_STATUS: self.errors.append(f"deployment_verdict.status invalid: {dv['status']}") def _validate_extensions(self): exts = self.receipt.get("extensions", []) for i, ext in enumerate(exts): mt = ext.get("module_type") if mt not in EXTENSION_MODULES: self.warnings.append(f"Extension {i}: unknown module_type '{mt}'") continue for field, etype in EXTENSION_MODULES[mt].items(): if field not in ext: self.warnings.append(f"Extension {i} ({mt}): missing {field}") elif not isinstance(ext[field], etype): self.warnings.append(f"Extension {i} ({mt}).{field} type mismatch") if mt == "structural" and "sovereignty_tier" in ext: if not 1 <= ext["sovereignty_tier"] <= 3: self.errors.append(f"sovereignty_tier must be 1-3") if mt == "social" and "demographic_skew_delta" in ext: if not 0 <= ext["demographic_skew_delta"] <= 1: self.warnings.append(f"demographic_skew_delta should be 0-1") if mt == "systemic" and "contingency_loss_probability" in ext: if not 0 <= ext["contingency_loss_probability"] <= 1: self.warnings.append(f"contingency_loss_probability should be 0-1") def _validate_provenance(self): pm = self.receipt.get("provenance_metadata") if not pm: self.warnings.append("No provenance_metadata - v1.5 confidence weighting unavailable") return if "confidence_score" in pm and not 0 <= pm["confidence_score"] <= 1: self.errors.append(f"confidence_score must be 0-1") if "audit_stream" in pm and pm["audit_stream"] not in VALID_AUDIT_STREAMS: self.warnings.append(f"audit_stream not in standard list") def _compute_verdict(self) -> Dict[str, Any]: r = self.receipt severity = 0.0 dn = r.get("decision_node", {}) rem = r.get("remedy_execution", {}) # Latency variance contribution lv = abs(dn.get("latency_variance_days", 0)) if lv > 365: severity += 0.3 elif lv > 90: severity += 0.2 elif lv > 30: severity += 0.1 # Auto-expire contribution if rem.get("auto_expire_triggered"): severity += 0.2 # Extension module contributions for ext in r.get("extensions", []): mt = ext.get("module_type") if mt == "structural" and ext.get("sovereignty_tier",0) == 3: severity += 0.15 if mt == "social" and ext.get("demographic_skew_delta",0) > 0.3: severity += 0.15 if mt == "systemic" and ext.get("contingency_loss_probability",0) > 0.3: severity += 0.1 # Life-criticality adjustment from extraction_metrics em = r.get("extraction_metrics", {}) cc = em.get("criticality_class", "") cw = CRITICALITY_WEIGHTS.get(cc, 1.0) if cc == "A": severity += 0.2 # Consequence variance flag cvf = em.get("consequence_variance_flag", False) if cvf: severity += 0.15 severity = min(severity, 1.0) omega_p = r.get("provenance_metadata",{}).get("confidence_score", 0.5) # Verdict computation if severity > 0.6 and omega_p > 0.8: status, vcode = "REJECT", "EXTRACT_HIGH_CONF" elif severity > 0.6 and omega_p < 0.4: status, vcode = "WARN", "EXTRACT_LOW_CONF_GRACE" elif severity > 0.35: status, vcode = "WARN", "EXTRACT_MODERATE" else: status, vcode = "ACCEPT", "WITHIN_TOLERANCE" # Deployment verdict override - higher severity takes precedence dv = rem.get("deployment_verdict") if dv and dv.get("status"): rank = {"ACCEPT":0,"WARN":1,"REJECT":2} if rank.get(dv["status"],0) >= rank.get(status,0): status = dv["status"] vcode = dv.get("verdict_code", vcode) return { "computed_severity": round(severity,3), "provenance_confidence": omega_p, "enforcement_status": status, "verdict_code": vcode, "criticality_class": cc or "unspecified", "consequence_weight": cw, } def _build_report(self, verdict=None): report = { "valid": len(self.errors)==0, "errors": self.errors, "warnings": self.warnings, "receipt_id": self.receipt.get("receipt_id","unknown") if self.receipt else "parse_failed", } if verdict: report["enforcement_verdict"] = verdict return report if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: muess_validator.py ") print(" or pipe JSON: echo '{...}' | python muess_validator.py -") sys.exit(1) src = sys.argv[1] if src == "-": data = sys.stdin.read() else: with open(src) as f: data = f.read() v = MuessValidator() report = v.validate(data) print(json.dumps(report, indent=2)) sys.exit(0 if report["valid"] else 1)