#!/usr/bin/env python3 """Physical Receipt Validator v0.1 - Binds software artifacts to physical reality""" import json, argparse, statistics, random from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple DEFAULT_THRESHOLDS = { "multimodal": {"acoustic_piezo_correlation": 0.85, "thermal_acoustic_correlation": 0.78}, "power": {"voltage_sag_percent": 2.0}, "thermal": {"drift_rate_degC_per_hour": 1.5}, "torque": {"command_actual_delta_percent": 15.0} } class SomaticLedger: REQUIRED_FIELDS = ["power_sag_percent", "torque_command_vs_actual", "sensor_drift", "interlock_state", "local_override_auth"] def __init__(self, filepath): self.filepath = Path(filepath); self.records = []; self.parse_errors = [] def load(self): if not self.filepath.exists(): print(f"ERROR: {self.filepath} not found"); return False with open(self.filepath) as f: for i, line in enumerate(f, 1): line = line.strip() if not line or line.startswith("#"): continue try: rec = json.loads(line) missing = [f for f in self.REQUIRED_FIELDS if f not in rec] if missing: self.parse_errors.append({"line": i, "error": f"Missing: {missing}"}) else: rec["_line"] = i; self.records.append(rec) except json.JSONDecodeError as e: self.parse_errors.append({"line": i, "error": str(e)}) print(f"Loaded {len(self.records)} records"); return True def get_anomalies(self, thresholds): anomalies = [] for rec in self.records: if rec.get("power_sag_percent", 0) > thresholds["power"]["voltage_sag_percent"]: anomalies.append({"type": "HIGH_ENTROPY", "severity": "CRITICAL", "value": rec["power_sag_percent"], "threshold": thresholds["power"]["voltage_sag_percent"]}) torque = abs(rec.get("torque_command_vs_actual", 0)) if torque > thresholds["torque"]["command_actual_delta_percent"]: anomalies.append({"type": "PHYSICAL_MISMATCH", "severity": "HIGH", "value": torque}) drift = rec.get("sensor_drift", {}) if isinstance(drift, dict) and drift.get("rate_degC_per_hour", 0) > thresholds["thermal"]["drift_rate_degC_per_hour"]: anomalies.append({"type": "DEGRADED", "severity": "MEDIUM", "value": drift["rate_degC_per_hour"]}) return anomalies class MultimodalConsensus: def __init__(self): self.modalities = {} def add_modality(self, name, values): self.modalities[name] = values def cross_correlate(self, m1, m2): if m1 not in self.modalities or m2 not in self.modalities or len(self.modalities[m1]) < 3: return 0.0 v1, v2 = self.modalities[m1], self.modalities[m2] mean1, mean2 = statistics.mean(v1), statistics.mean(v2) std1, std2 = statistics.stdev(v1), statistics.stdev(v2) if std1 == 0 or std2 == 0: return 0.0 cov = sum((a-mean1)*(b-mean2) for a,b in zip(v1,v2))/len(v1) return max(-1.0, min(1.0, cov/(std1*std2))) def validate_consensus(self, thresholds): res = {"status": "TRUSTED", "checks": [], "compromised_sensors": []} if "acoustic" in self.modalities and "piezo" in self.modalities: corr = self.cross_correlate("acoustic", "piezo") status = "PASS" if corr >= thresholds["multimodal"]["acoustic_piezo_correlation"] else "FAIL" res["checks"].append({"check": "acoustic_piezo", "value": round(corr,4), "threshold": thresholds["multimodal"]["acoustic_piezo_correlation"], "status": status}) if corr < thresholds["multimodal"]["acoustic_piezo_correlation"]: res["compromised_sensors"].append("acoustic_or_piezo"); res["status"] = "SENSOR_COMPROMISE" return res class CopenhagenStandard: def __init__(self): self.violations = [] def validate_blob(self, path, manifest): violations = [] if not manifest or "sha256" not in manifest: violations.append("MISSING_SHA256") if not manifest or "commit_hash" not in manifest: violations.append("MISSING_COMMIT_HASH") if not manifest or "license" not in manifest: violations.append("MISSING_LICENSE") if not manifest or "physical_layer_ack" not in manifest: violations.append("MISSING_PHYSICAL_LAYER_ACK") if violations: self.violations.extend(violations); print("THERMODYNAMIC MALPRACTICE:"); [print(f" - {v}") for v in violations]; return False, violations return True, [] class EvidenceBundleGenerator: def generate(self, somatic, consensus, manifest, thresholds): anomalies = somatic.get_anomalies(thresholds) consensus_res = consensus.validate_consensus(thresholds) status = "VALIDATED - SAFE TO EXECUTE" if any(a["severity"]=="CRITICAL" for a in anomalies): status = "HIGH_ENTROPY - DO NOT EXECUTE" elif consensus_res["status"] == "SENSOR_COMPROMISE": status = "SENSOR_COMPROMISE - DATA UNTRUSTED" elif any(a["severity"]=="HIGH" for a in anomalies): status = "DEGRADED - OPERATE WITH CAUTION" return {"schema_version": "1.0", "generated_at": datetime.utcnow().isoformat()+"Z", "software": {k: manifest.get(k) for k in ["sha256","commit_hash","license","path"]}, "physical_layer": {k: manifest.get("physical_layer_ack",{}).get(v) for k,v in [("component_type","type"),("serial_number","serial"),("material_spec","material")]}, "somatic_summary": {"records": len(somatic.records), "anomalies": anomalies}, "multimodal_consensus": consensus_res, "validation_status": status} def main(): parser = argparse.ArgumentParser(); parser.add_argument("--ledger", required=True); parser.add_argument("--manifest"); parser.add_argument("--output", default="evidence_bundle.json"); parser.add_argument("--test", action="store_true") args = parser.parse_args() print("="*70); print("PHYSICAL RECEIPT VALIDATOR v0.1"); print("="*70) if args.test: print("[TEST MODE]") with open("/workspace/test_somatic.jsonl", "w") as f: for data in [{"ts_utc_ns": 1e18, "power_sag_percent": 0.5, "torque_command_vs_actual": 2.1, "sensor_drift": {"rate_degC_per_hour": 0.3}, "interlock_state": "ENGAGED", "local_override_auth": None}, {"ts_utc_ns": 1e18+6e7, "power_sag_percent": 3.2, "torque_command_vs_actual": 1.8, "sensor_drift": {"rate_degC_per_hour": 0.4}, "interlock_state": "ENGAGED", "local_override_auth": None}, {"ts_utc_ns": 1e18+1.2e8, "power_sag_percent": 0.3, "torque_command_vs_actual": 1.2, "sensor_drift": {"rate_degC_per_hour": 2.1}, "interlock_state": "ENGAGED", "local_override_auth": None}]: f.write(json.dumps(data)+"") args.ledger = "/workspace/test_somatic.jsonl" if not args.manifest: with open("/workspace/test_manifest.json", "w") as f: json.dump({"sha256": "abc123...", "commit_hash": "9dbc1435...", "license": "Apache-2.0", "path": "/opt/grid_ai/predictor.py", "physical_layer_ack": {"type": "silicon_memristor", "serial": "TR-OAK-001", "material": "grain_oriented_steel"}}, f, indent=2) args.manifest = "/workspace/test_manifest.json" somatic = SomaticLedger(args.ledger); somatic.load() copenhagen = CopenhagenStandard(); manifest = json.load(open(args.manifest)) if args.manifest else None; passes, _ = copenhagen.validate_blob(args.manifest, manifest) if not passes: print("FATAL: Copenhagen Standard failed"); return 1 consensus = MultimodalConsensus() if args.test: base = [random.gauss(0,1) for _ in range(100)] consensus.add_modality("acoustic", [x+random.gauss(0,0.1) for x in base]); consensus.add_modality("piezo", [x+random.gauss(0,0.15) for x in base]) bundle = EvidenceBundleGenerator().generate(somatic, consensus, manifest or {}, DEFAULT_THRESHOLDS) with open(args.output, "w") as f: json.dump(bundle, f, indent=2) print(f"Status: {bundle['validation_status']}"); print(f"Output: {args.output}") return 0 if "SAFE" in bundle["validation_status"] else (2 if "DO NOT EXECUTE" in bundle["validation_status"] else 1) if __name__ == "__main__": exit(main())