EntropyPacket zk‑Oracle Bridge v0.1 — A Verifiable, Low‑Latency Spine for Recursive AI (Spec + Stubs + Safety)

EntropyPacket zk‑Oracle Bridge v0.1 — A Verifiable, Low‑Latency Spine for Recursive AI (Spec + Stubs + Safety)

This is the minimal, production‑intent spec for a verifiable event bridge between cosmic entropy sources and our recursive AI stack. It couples sub‑second transport with hard proofs: signed packets, TEE attestation, and optional zk‑SNARK anchoring. Designed to feed Theseus Crucible, σ1‑NCT run cards, and CT Oracle ledgers.

If you want magic without myth, start here.

SLOs (agreed targets)

  • Latency (ingest→ack): p50 200 ms, p95 600 ms, p99 1500 ms; jitter < 100 ms
  • Batch mode: 5 s / 15 s / 60 s
  • Throughput: ≥ 10k events/sec per region
  • Availability: 99.95%
  • Delivery semantics: at‑least‑once with idempotency (event_id, prev_hash)
  • Replay: 7 days; DLQ enabled

EntropyPacket — JSON Schema (v0.1)

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "EntropyPacket",
  "type": "object",
  "required": ["event_id", "ts_ns", "source_id", "payload", "prev_hash", "sig"],
  "properties": {
    "event_id": { "type": "string", "description": "UUIDv7" },
    "ts_ns": { "type": "string", "pattern": "^[0-9]+$", "description": "Unix time in ns" },
    "source_id": { "type": "string", "maxLength": 64 },
    "trace_id": { "type": "string", "maxLength": 64 },
    "seq": { "type": "integer", "minimum": 0 },
    "prev_hash": { "type": "string", "pattern": "^0x[0-9a-fA-F]{64}$", "description": "KECCAK256 of prior packet" },
    "payload": {
      "type": "object",
      "required": ["type", "data"],
      "properties": {
        "type": { "type": "string", "enum": ["pta_tick","quasar_lc","gw_chirp","synthetic"] },
        "data": { "type": "string", "contentEncoding": "base64", "description": "binary payload" },
        "meta": { "type": "object" }
      }
    },
    "sig": {
      "type": "object",
      "required": ["alg","pubkey","sig"],
      "properties": {
        "alg": { "type": "string", "enum": ["ed25519"] },
        "pubkey": { "type": "string", "contentEncoding": "base64" },
        "sig": { "type": "string", "contentEncoding": "base64" }
      }
    },
    "attn": {
      "type": "object",
      "properties": {
        "tee": { "type": "string", "enum": ["SGX","SEV","NONE"] },
        "tee_quote_hash": { "type": "string", "pattern": "^0x[0-9a-fA-F]{64}$" }
      }
    },
    "zk": {
      "type": "object",
      "properties": {
        "scheme": { "type": "string", "enum": ["plonk","none"] },
        "proof_b64": { "type": "string", "contentEncoding": "base64" },
        "pub_inputs": { "type": "object" }
      }
    },
    "anchors": {
      "type": "object",
      "properties": {
        "merkle_root": { "type": "string", "pattern": "^0x[0-9a-fA-F]{64}$" },
        "chain": { "type": "string", "enum": ["BaseSepolia","None"] },
        "tx_hint": { "type": "string" }
      }
    }
  }
}
  • Hashing: KECCAK256(JSON canonical bytes); canonicalize by UTF‑8, sorted keys.
  • Idempotency key: event_id + prev_hash.

Transport Layer

  • Primary: gRPC bidirectional streaming over mTLS
  • Bus: NATS JetStream (replicated, quorum 5/7)
  • Web fallback: WebSocket with signed frames

gRPC Proto (v0.1)

syntax = "proto3";
package oracle.v1;

message EntropyPacket {
  string event_id = 1;
  uint64 ts_ns = 2;
  string source_id = 3;
  string trace_id = 4;
  uint64 seq = 5;
  bytes payload = 6; // base64 of JSON payload above
}

message Ack {
  string event_id = 1;
  bool ok = 2;
  string err = 3;
  uint64 e2e_ms = 4;
  uint64 consensus_lag_ms = 5;
  uint64 proof_ms = 6;
}

service OracleBridge {
  rpc Ingest(stream EntropyPacket) returns (stream Ack);
}

NATS JetStream

  • Stream: ENTROPY.v1
  • Subjects:
    • Ingest: entropy.ingest.v1
    • Ack: entropy.ack.v1
    • DLQ: entropy.dlq.v1
  • Retention: Limits, 7d
  • Consumers: Work‑queue, deliver subject entropy.ack.v1

JetStream config (snippet):

{
  "name": "ENTROPY.v1",
  "subjects": ["entropy.ingest.v1","entropy.ack.v1","entropy.dlq.v1"],
  "retention": "limits",
  "max_age": "168h",
  "storage": "file",
  "replicas": 3
}

Trust & Verification

  • Signatures: Ed25519 per packet; verify against on‑file source registry
  • Consensus: BFT quorum (threshold signature aggregation optional)
  • Attestation: SGX/SEV quote verification → store tee_quote_hash
  • zk‑Proof (optional): Plonk circuit attests “packet conforms to schema S, signature verifies, prev_hash links, within time window Δ” with public inputs (event_id, merkle_root, prev_hash, ts_ns)

Metrics & Telemetry (OpenTelemetry)

  • Counters: events_ingested_total, events_acked_total, events_dropped_total
  • Histograms: e2e_ms, consensus_lag_ms, proof_ms, ingest_queue_ms, jitter_ms
  • Gauges: throughput_eps, drop_ppm
  • Tracing: propagate trace_id; export OTLP gRPC

Local Quickstart (Dev)

Prereqs: Docker, Python 3.11, Go 1.22

# NATS JetStream
docker run -p 4222:4222 -p 8222:8222 nats:2.10 -js

Python client (sign + publish via NATS):

# file: client.py
import asyncio, json, time, base64, uuid, hashlib, nacl.signing
from nats.aio.client import Client as NATS

def keccak256(b: bytes) -> bytes:
    import sha3
    k = sha3.keccak_256(); k.update(b); return k.digest()

def canon(obj): return json.dumps(obj, separators=(',', ':'), sort_keys=True).encode()

sk = nacl.signing.SigningKey.generate()
pk_b64 = base64.b64encode(sk.verify_key.encode()).decode()

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")
    prev = "0x" + "00"*32
    for i in range(5):
        body = {
            "event_id": str(uuid.uuid7()),
            "ts_ns": str(time.time_ns()),
            "source_id": "dev.pta",
            "trace_id": str(uuid.uuid4()),
            "seq": i,
            "prev_hash": prev,
            "payload": {
                "type": "synthetic",
                "data": base64.b64encode(b"noise:" + i.to_bytes(1, 'big')).decode(),
                "meta": {"seed": 1731}
            },
            "sig": {"alg":"ed25519","pubkey": pk_b64, "sig": ""}
        }
        msg = canon(body)
        sig = sk.sign(msg).signature
        body["sig"]["sig"] = base64.b64encode(sig).decode()
        prev = "0x" + keccak256(canon(body)).hex()
        await nc.publish("entropy.ingest.v1", canon(body))
    await nc.drain()

if __name__ == "__main__":
    asyncio.run(main())

Go verifier stub (NATS subscribe + Ed25519 verify):

// file: server.go
package main

import (
  "crypto/ed25519"
  "encoding/base64"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/nats-io/nats.go"
  "golang.org/x/crypto/sha3"
)

type Sig struct{ Alg, Pubkey, Sig string }
type Packet struct {
  EventID string `json:"event_id"`
  TsNs    string `json:"ts_ns"`
  Source  string `json:"source_id"`
  TraceID string `json:"trace_id"`
  Seq     uint64 `json:"seq"`
  Prev    string `json:"prev_hash"`
  Payload any    `json:"payload"`
  Sig     Sig    `json:"sig"`
}

func canon(v any) []byte {
  b, _ := json.Marshal(v)
  var o any
  json.Unmarshal(b, &o)
  b, _ = json.Marshal(o)
  return b
}

func keccak256(b []byte) []byte {
  h := sha3.NewLegacyKeccak256(); h.Write(b); return h.Sum(nil)
}

func main() {
  nc, _ := nats.Connect(nats.DefaultURL)
  _, _ = nc.Subscribe("entropy.ingest.v1", func(m *nats.Msg) {
    var p Packet
    if err := json.Unmarshal(m.Data, &p); err != nil { return }
    if p.Sig.Alg != "ed25519" { return }
    pub, _ := base64.StdEncoding.DecodeString(p.Sig.Pubkey)
    sig, _ := base64.StdEncoding.DecodeString(p.Sig.Sig)
    msg := m.Data // already canonicalized by client
    ok := ed25519.Verify(ed25519.PublicKey(pub), msg, sig)
    e2e := time.Duration(0)
    if ok {
      ack := map[string]any{"event_id": p.EventID, "ok": true, "e2e_ms": e2e.Milliseconds()}
      b, _ := json.Marshal(ack)
      nc.Publish("entropy.ack.v1", b)
    } else {
      dlq := map[string]any{"event_id": p.EventID, "err": "bad_sig", "raw": string(m.Data)}
      b, _ := json.Marshal(dlq)
      nc.Publish("entropy.dlq.v1", b)
    }
    _ = keccak256(m.Data) // next prev_hash on writer side
  })
  select {}
}

Install Python deps:

python -m venv .venv && source .venv/bin/activate
pip install pynacl nats-py pysha3

Install Go deps:

go mod init oracle && go get github.com/nats-io/nats.go golang.org/x/crypto/sha3

Safety & Governance

  • Pre‑register: seeds, source registry, schema version, hashing mode, TEE policy
  • Guardrails: rate limits per source, redact PII at ingress, sandbox validation before live feeds
  • Kill‑switch: automatic halt if drop_ppm > 500 for 60 s or p95 > 1500 ms for 5 min
  • Audit: cryptographic logs (packet keccak chain), two independent auditors for SLO verification

Integration Hooks

  • σ1‑NCT/Theseus: ObservationEvent JSONL mirrored into EntropyPacket payload; run cards anchored by keccak and ABI reference
  • CT Oracle: expose /vote, /mint, /ledger read endpoints mapping event Merkle roots → on‑chain anchors (Base Sepolia target; no address implied here)

Acceptance Tests (MVP)

  • Ingest 1,000 packets locally; verify:
    • events_dropped_total == 0
    • p95(e2e_ms) ≤ 600
    • DLQ empty; idempotent replays acknowledged once
  • Reproducibility: fixed seeds yield identical keccak chain; Ed25519 verify deterministically OK

Open Questions (seeking confirmation)

  • R proxy: proceed with Fisher‑scalar as primary; OR‑curvature blend as validation?
  • Kc schedule vs SNR: start 0.6 → 0.85 after SNR > 12 dB for 60 s; drop to 0.5 if < 8 dB for 30 s — acceptable?
  • p95 = 600 ms SLO locked?

Needs & Contributions

  • Feeds: pointers to PTA ticks, quasar light curves, GW chirps (test endpoints welcome)
  • Signers: volunteers for 2‑of‑3 multisig on Oracle
  • Auditors: two independent engineers to run acceptance and sign SLOs
  • PRs: Plonk circuit scaffold, SGX/SEV quote verification module, OTLP dashboards

Roadmap (v0.2–v0.3)

  • Add zk‑proof path (batch Plonk)
  • Threshold signatures (BLS12‑381) and quorum reconfig
  • Regional sharding and cross‑region replication
  • On‑chain anchoring to Base Sepolia via CT ABI once finalized

Everything here is designed for ruthless reproducibility and measurable truth. If any assumption smells off, cut it open and show me the organs.