diff --git a/.env.example b/.env.example index bdca4a4..489228b 100644 --- a/.env.example +++ b/.env.example @@ -20,5 +20,9 @@ SPENDING_CAP=10.00 # $ before gateway stops accepting LABOR_RATE_PER_HOUR=0.00 # $/hr for setup/maintenance time PROFIT_MARGIN=0.00 # Markup multiplier (0.10 = 10%) +# Dual ledger +LEDGER_SECRET=change_me_to_a_shared_secret # Both sides must match +CALLBACK_URL= # Seth's server (e.g. http://seth_ip:8435/transaction) + # Features ALLOW_MODEL_UPDATES=false # Allow remote model push via /admin/update-model diff --git a/gateway.py b/gateway.py index 18f2899..e00a636 100644 --- a/gateway.py +++ b/gateway.py @@ -19,6 +19,8 @@ import os import time import threading import subprocess +import hashlib +import uuid from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import requests @@ -74,6 +76,93 @@ def _save_cost_config(config): COST_CONFIG = _load_cost_config() +# --- Dual Ledger --- +LEDGER_FILE = os.environ.get("LEDGER_FILE", "/var/lib/mortdecai-gateway/ledger.jsonl") +LEDGER_SECRET = os.environ.get("LEDGER_SECRET", "change_me_shared_secret") +CALLBACK_URL = os.environ.get("CALLBACK_URL", "") # Seth's server endpoint for transaction logging +_ledger_lock = threading.Lock() + + +def _ledger_hash(entry): + """Create a verification hash from transaction data + shared secret.""" + raw = f"{entry['id']}|{entry['tokens_in']}|{entry['tokens_out']}|{entry['duration']}|{entry['cost']}|{LEDGER_SECRET}" + return hashlib.sha256(raw.encode()).hexdigest()[:16] + + +def _ledger_write(entry): + """Append a transaction to the local ledger.""" + with _ledger_lock: + try: + os.makedirs(os.path.dirname(LEDGER_FILE), exist_ok=True) + with open(LEDGER_FILE, "a") as f: + f.write(json.dumps(entry) + "\n") + except Exception as e: + print(f"Ledger write failed: {e}") + + +def _ledger_callback(entry): + """Send transaction to the client's server for cross-verification.""" + if not CALLBACK_URL: + return + try: + requests.post( + CALLBACK_URL, + json=entry, + headers={"Content-Type": "application/json"}, + timeout=5, + ) + except: + pass # Non-blocking — don't fail inference because callback is down + + +def _ledger_record(tokens_in, tokens_out, duration, cost, energy_wh, model): + """Record a transaction in the ledger and notify the client.""" + entry = { + "id": str(uuid.uuid4())[:12], + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "tokens_in": tokens_in, + "tokens_out": tokens_out, + "duration": round(duration, 3), + "cost": round(cost, 8), + "energy_wh": round(energy_wh, 4), + "model": model, + "billing_mode": COST_CONFIG["billing_mode"], + } + entry["hash"] = _ledger_hash(entry) + + _ledger_write(entry) + + # Send to client in background + threading.Thread(target=_ledger_callback, args=(entry,), daemon=True).start() + + return entry + + +def _ledger_load(): + """Load all ledger entries.""" + entries = [] + try: + with open(LEDGER_FILE) as f: + for line in f: + if line.strip(): + entries.append(json.loads(line)) + except: + pass + return entries + + +def _ledger_verify(entries): + """Verify all ledger entries against their hashes.""" + results = {"total": len(entries), "valid": 0, "invalid": 0, "invalid_ids": []} + for entry in entries: + expected = _ledger_hash(entry) + if entry.get("hash") == expected: + results["valid"] += 1 + else: + results["invalid"] += 1 + results["invalid_ids"].append(entry.get("id", "?")) + return results + # --- Stats tracking --- _stats_lock = threading.Lock() _stats = { @@ -127,10 +216,13 @@ def _calc_marginal_cost(duration_seconds): return marginal_watts, energy_wh, cost -def _track_request(tokens_in, tokens_out, duration_seconds): - """Track a completed inference request.""" +def _track_request(tokens_in, tokens_out, duration_seconds, model="mortdecai-v4"): + """Track a completed inference request and record in ledger.""" marginal_watts, energy_wh, cost = _calc_marginal_cost(duration_seconds) + # Record in dual ledger + _ledger_record(tokens_in, tokens_out, duration_seconds, cost, energy_wh, model) + with _stats_lock: _stats["total_requests"] += 1 _stats["total_tokens_in"] += tokens_in @@ -250,8 +342,9 @@ class GatewayHandler(BaseHTTPRequestHandler): # Track token usage from response tokens_in = data.get("prompt_eval_count", 0) tokens_out = data.get("eval_count", 0) + model_name = (body or {}).get("model", "unknown") if tokens_in or tokens_out: - _track_request(tokens_in, tokens_out, duration) + _track_request(tokens_in, tokens_out, duration, model_name) # Add gateway metadata to response if isinstance(data, dict): @@ -305,6 +398,34 @@ class GatewayHandler(BaseHTTPRequestHandler): self._send_json(200, COST_CONFIG) return + if parsed.path == "/ledger": + if not self._check_auth(): + return + entries = _ledger_load() + total_cost = sum(e.get("cost", 0) for e in entries) + self._send_json(200, { + "entries": len(entries), + "total_cost": round(total_cost, 6), + "last_10": entries[-10:], + }) + return + + if parsed.path == "/reconcile": + if not self._check_auth(): + return + entries = _ledger_load() + verification = _ledger_verify(entries) + total_cost = sum(e.get("cost", 0) for e in entries) + self._send_json(200, { + "ledger_entries": len(entries), + "ledger_total_cost": round(total_cost, 6), + "stats_total_cost": round(_stats.get("total_cost", 0), 6), + "discrepancy": round(abs(total_cost - _stats.get("total_cost", 0)), 6), + "hash_verification": verification, + "status": "OK" if verification["invalid"] == 0 else "TAMPERED", + }) + return + if parsed.path == "/dashboard": self._serve_dashboard() return diff --git a/ledger_receiver.py b/ledger_receiver.py new file mode 100644 index 0000000..dd6df59 --- /dev/null +++ b/ledger_receiver.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +Ledger Receiver — runs on YOUR server to collect transaction records from remote gateways. + +Each gateway POSTs transactions here. You keep an independent copy of every +transaction with hash verification. If the gateway operator resets their stats, +your ledger still has the full history. + +Usage: + python3 ledger_receiver.py + LEDGER_SECRET=shared_secret python3 ledger_receiver.py + +Endpoints: + POST /transaction — receive a transaction from a gateway + GET /ledger — view all transactions + GET /reconcile/ — compare your ledger against a gateway's + GET /summary — total cost by gateway +""" + +import json +import os +import hashlib +import threading +import time +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse + +LISTEN_PORT = int(os.environ.get("RECEIVER_PORT", "8435")) +LEDGER_DIR = os.environ.get("LEDGER_DIR", "/var/lib/mortdecai-ledger") +LEDGER_SECRET = os.environ.get("LEDGER_SECRET", "change_me_shared_secret") + +_lock = threading.Lock() + + +def _verify_hash(entry): + raw = f"{entry['id']}|{entry['tokens_in']}|{entry['tokens_out']}|{entry['duration']}|{entry['cost']}|{LEDGER_SECRET}" + expected = hashlib.sha256(raw.encode()).hexdigest()[:16] + return entry.get("hash") == expected + + +def _save_transaction(entry, source_ip): + """Save a transaction to the per-gateway ledger file.""" + entry["_received_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ") + entry["_source_ip"] = source_ip + entry["_hash_valid"] = _verify_hash(entry) + + os.makedirs(LEDGER_DIR, exist_ok=True) + # One file per source IP + safe_ip = source_ip.replace(":", "_").replace(".", "_") + path = os.path.join(LEDGER_DIR, f"ledger_{safe_ip}.jsonl") + + with _lock: + with open(path, "a") as f: + f.write(json.dumps(entry) + "\n") + + +def _load_all(): + """Load all ledger entries from all gateways.""" + all_entries = {} + try: + for fname in os.listdir(LEDGER_DIR): + if fname.endswith(".jsonl"): + gateway = fname.replace("ledger_", "").replace(".jsonl", "") + entries = [] + with open(os.path.join(LEDGER_DIR, fname)) as f: + for line in f: + if line.strip(): + entries.append(json.loads(line)) + all_entries[gateway] = entries + except: + pass + return all_entries + + +class ReceiverHandler(BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + pass + + def _send_json(self, status, data): + body = json.dumps(data, indent=2).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + + def do_POST(self): + if self.path == "/transaction": + length = int(self.headers.get("Content-Length", 0)) + entry = json.loads(self.rfile.read(length)) + source_ip = self.client_address[0] + + valid = _verify_hash(entry) + _save_transaction(entry, source_ip) + + self._send_json(200, { + "status": "recorded", + "id": entry.get("id"), + "hash_valid": valid, + }) + return + + self._send_json(404, {"error": "not found"}) + + def do_GET(self): + parsed = urlparse(self.path) + + if parsed.path == "/summary": + all_data = _load_all() + summary = {} + for gateway, entries in all_data.items(): + total_cost = sum(e.get("cost", 0) for e in entries) + total_tokens = sum(e.get("tokens_out", 0) for e in entries) + valid = sum(1 for e in entries if e.get("_hash_valid", False)) + invalid = len(entries) - valid + summary[gateway] = { + "transactions": len(entries), + "total_cost": round(total_cost, 6), + "total_tokens_out": total_tokens, + "hashes_valid": valid, + "hashes_invalid": invalid, + } + self._send_json(200, summary) + return + + if parsed.path == "/ledger": + all_data = _load_all() + flat = [] + for entries in all_data.values(): + flat.extend(entries) + flat.sort(key=lambda e: e.get("timestamp", "")) + + total = sum(e.get("cost", 0) for e in flat) + self._send_json(200, { + "total_transactions": len(flat), + "total_cost": round(total, 6), + "last_20": flat[-20:], + }) + return + + self._send_json(404, {"error": "not found"}) + + +if __name__ == "__main__": + os.makedirs(LEDGER_DIR, exist_ok=True) + print(f"Ledger Receiver on port {LISTEN_PORT}") + print(f" Ledger dir: {LEDGER_DIR}") + HTTPServer(("0.0.0.0", LISTEN_PORT), ReceiverHandler).serve_forever()