#!/usr/bin/env python3 """ Mortdecai Ollama Gateway — authenticated proxy with power metering. Sits in front of Ollama, provides: - API key authentication - Power/cost tracking (GPU utilization × TDP × electricity rate) - Usage dashboard - Spending cap enforcement - Health check endpoint Usage: python3 gateway.py OLLAMA_URL=http://localhost:11434 API_KEY=mk_test python3 gateway.py """ import json import os import time import threading import subprocess import hashlib import uuid from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import requests # --- Config --- OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") LISTEN_PORT = int(os.environ.get("GATEWAY_PORT", "8434")) API_KEY = os.environ.get("API_KEY", "mk_mortdecai_default") STATS_FILE = os.environ.get("STATS_FILE", "/var/lib/mortdecai-gateway/stats.json") CONFIG_FILE = os.environ.get("CONFIG_FILE", "/var/lib/mortdecai-gateway/cost_config.json") # Default cost config (overridden by config file or env vars) _DEFAULT_COST_CONFIG = { "electricity_rate": 0.15, # $/kWh "gpu_idle_watts": 15, # GPU at idle "gpu_load_watts": 54, # GPU during inference "system_idle_watts": 45, # Whole system idle (CPU/RAM/fans/PSU) "system_inference_watts": 65, # Whole system during inference "billing_mode": "marginal", # "marginal" = only extra watts; "dedicated" = all uptime "base_rate_per_hour": 0.00, # $/hr for keeping machine on (dedicated mode only) "spending_cap": 10.00, # $ before refusing requests "labor_rate_per_hour": 0.00, # $/hr for operator's time (setup, maintenance) "profit_margin": 0.00, # multiplier (0.10 = 10% markup) "labor_hours_logged": 0.0, # total hours spent on setup/maintenance } def _load_cost_config(): config = dict(_DEFAULT_COST_CONFIG) # Override from file try: with open(CONFIG_FILE) as f: config.update(json.load(f)) except: pass # Override from env vars for key in _DEFAULT_COST_CONFIG: env_key = key.upper() val = os.environ.get(env_key) if val is not None: try: config[key] = type(_DEFAULT_COST_CONFIG[key])(val) except: pass return config def _save_cost_config(config): try: os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) with open(CONFIG_FILE, "w") as f: json.dump(config, f, indent=2) except: pass COST_CONFIG = _load_cost_config() # --- Dual Ledger --- LEDGER_FILE = os.environ.get("LEDGER_FILE", "/var/lib/mortdecai-gateway/ledger.jsonl") LEDGER_SECRET = os.environ.get("LEDGER_SECRET", "change_me_shared_secret") CALLBACK_URL = os.environ.get("CALLBACK_URL", "") # Seth's server endpoint for transaction logging _ledger_lock = threading.Lock() def _ledger_hash(entry): """Create a verification hash from transaction data + shared secret.""" raw = f"{entry['id']}|{entry['tokens_in']}|{entry['tokens_out']}|{entry['duration']}|{entry['cost']}|{LEDGER_SECRET}" return hashlib.sha256(raw.encode()).hexdigest()[:16] def _ledger_write(entry): """Append a transaction to the local ledger.""" with _ledger_lock: try: os.makedirs(os.path.dirname(LEDGER_FILE), exist_ok=True) with open(LEDGER_FILE, "a") as f: f.write(json.dumps(entry) + "\n") except Exception as e: print(f"Ledger write failed: {e}") def _ledger_callback(entry): """Send transaction to the client's server for cross-verification.""" if not CALLBACK_URL: return try: requests.post( CALLBACK_URL, json=entry, headers={"Content-Type": "application/json"}, timeout=5, ) except: pass # Non-blocking — don't fail inference because callback is down def _ledger_record(tokens_in, tokens_out, duration, cost, energy_wh, model): """Record a transaction in the ledger and notify the client.""" entry = { "id": str(uuid.uuid4())[:12], "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ"), "tokens_in": tokens_in, "tokens_out": tokens_out, "duration": round(duration, 3), "cost": round(cost, 8), "energy_wh": round(energy_wh, 4), "model": model, "billing_mode": COST_CONFIG["billing_mode"], } entry["hash"] = _ledger_hash(entry) _ledger_write(entry) # Send to client in background threading.Thread(target=_ledger_callback, args=(entry,), daemon=True).start() return entry def _ledger_load(): """Load all ledger entries.""" entries = [] try: with open(LEDGER_FILE) as f: for line in f: if line.strip(): entries.append(json.loads(line)) except: pass return entries def _ledger_verify(entries): """Verify all ledger entries against their hashes.""" results = {"total": len(entries), "valid": 0, "invalid": 0, "invalid_ids": []} for entry in entries: expected = _ledger_hash(entry) if entry.get("hash") == expected: results["valid"] += 1 else: results["invalid"] += 1 results["invalid_ids"].append(entry.get("id", "?")) return results # --- Stats tracking --- _stats_lock = threading.Lock() _stats = { "total_requests": 0, "total_tokens_in": 0, "total_tokens_out": 0, "total_inference_seconds": 0, "total_energy_wh": 0.0, "total_cost": 0.0, "started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"), "last_request_at": None, "requests_rejected": 0, } def _load_stats(): global _stats try: with open(STATS_FILE) as f: saved = json.load(f) _stats.update(saved) except: pass def _save_stats(): try: os.makedirs(os.path.dirname(STATS_FILE), exist_ok=True) with open(STATS_FILE, "w") as f: json.dump(_stats, f, indent=2) except: pass def _calc_marginal_cost(duration_seconds): """Calculate marginal electricity cost for an inference call.""" c = COST_CONFIG if c["billing_mode"] == "marginal": # Only charge for extra watts above idle marginal_gpu = c["gpu_load_watts"] - c["gpu_idle_watts"] marginal_system = c["system_inference_watts"] - c["system_idle_watts"] marginal_watts = marginal_gpu + marginal_system else: # Dedicated: charge for full system draw during inference marginal_watts = c["gpu_load_watts"] + c["system_inference_watts"] energy_wh = (marginal_watts * duration_seconds) / 3600 electricity_cost = (energy_wh / 1000) * c["electricity_rate"] # Apply profit margin cost = electricity_cost * (1 + c["profit_margin"]) return marginal_watts, energy_wh, cost def _track_request(tokens_in, tokens_out, duration_seconds, model="mortdecai-v4"): """Track a completed inference request and record in ledger.""" marginal_watts, energy_wh, cost = _calc_marginal_cost(duration_seconds) # Record in dual ledger _ledger_record(tokens_in, tokens_out, duration_seconds, cost, energy_wh, model) with _stats_lock: _stats["total_requests"] += 1 _stats["total_tokens_in"] += tokens_in _stats["total_tokens_out"] += tokens_out _stats["total_inference_seconds"] += duration_seconds _stats["last_request_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ") _stats["total_energy_wh"] += energy_wh _stats["total_cost"] += cost _stats["total_marginal_watts_avg"] = ( _stats.get("total_marginal_watts_avg", marginal_watts) * 0.95 + marginal_watts * 0.05 ) # Base rate for dedicated mode if COST_CONFIG["billing_mode"] == "dedicated" and COST_CONFIG["base_rate_per_hour"] > 0: # Add base rate proportional to time since last request last = _stats.get("_last_base_calc", time.time()) elapsed_hours = (time.time() - last) / 3600 _stats["total_cost"] += COST_CONFIG["base_rate_per_hour"] * elapsed_hours _stats["_last_base_calc"] = time.time() if _stats["total_requests"] % 10 == 0: _save_stats() def _check_budget(): """Returns True if under spending cap.""" with _stats_lock: return _stats["total_cost"] < COST_CONFIG["spending_cap"] def _get_gpu_utilization(): """Get current GPU utilization via nvidia-smi or rocm-smi.""" try: # Try nvidia-smi first result = subprocess.run( ["nvidia-smi", "--query-gpu=utilization.gpu,temperature.gpu,power.draw", "--format=csv,noheader,nounits"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: parts = [p.strip() for p in result.stdout.strip().split(",")] return { "utilization": float(parts[0]), "temperature": float(parts[1]), "power_watts": float(parts[2]) if parts[2] != "[N/A]" else GPU_TDP_WATTS, "source": "nvidia-smi" } except: pass try: # Try rocm-smi for AMD result = subprocess.run( ["rocm-smi", "--showuse", "--showtemp", "--json"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: data = json.loads(result.stdout) # Parse rocm-smi JSON (format varies by version) for card_id, card_data in data.items(): if isinstance(card_data, dict): return { "utilization": float(card_data.get("GPU use (%)", 0)), "temperature": float(card_data.get("Temperature (Sensor edge) (C)", 0)), "power_watts": GPU_TDP_WATTS, "source": "rocm-smi" } except: pass return {"utilization": 0, "temperature": 0, "power_watts": 0, "source": "unavailable"} # --- HTTP Handler --- class GatewayHandler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): pass # Quiet def _check_auth(self): auth = self.headers.get("Authorization", "") if auth == f"Bearer {API_KEY}" or auth == API_KEY: return True self._send_json(401, {"error": "Invalid API key"}) return False def _send_json(self, status, data): body = json.dumps(data).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", len(body)) self.end_headers() self.wfile.write(body) def _proxy_to_ollama(self, path, body=None): """Proxy request to Ollama and track usage.""" if not _check_budget(): with _stats_lock: _stats["requests_rejected"] += 1 self._send_json(402, { "error": "Spending cap reached", "total_cost": _stats["total_cost"], "cap": SPENDING_CAP, }) return t0 = time.time() try: if body: r = requests.post(f"{OLLAMA_URL}{path}", json=body, timeout=120) else: r = requests.get(f"{OLLAMA_URL}{path}", timeout=10) duration = time.time() - t0 data = r.json() # Track token usage from response tokens_in = data.get("prompt_eval_count", 0) tokens_out = data.get("eval_count", 0) model_name = (body or {}).get("model", "unknown") if tokens_in or tokens_out: _track_request(tokens_in, tokens_out, duration, model_name) # Add gateway metadata to response if isinstance(data, dict): mw, ewh, ecost = _calc_marginal_cost(duration) data["_gateway"] = { "duration_seconds": round(duration, 2), "marginal_watts": round(mw, 1), "energy_wh": round(ewh, 4), "estimated_cost": round(ecost, 6), "total_cost": round(_stats["total_cost"], 4), "budget_remaining": round(COST_CONFIG["spending_cap"] - _stats["total_cost"], 4), "billing_mode": COST_CONFIG["billing_mode"], } self._send_json(r.status_code, data) except requests.exceptions.ConnectionError: self._send_json(502, {"error": "Ollama is not running"}) except requests.exceptions.Timeout: self._send_json(504, {"error": "Ollama timeout"}) except Exception as e: self._send_json(500, {"error": str(e)}) def do_GET(self): parsed = urlparse(self.path) # Public endpoints (no auth) if parsed.path == "/health": try: r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5) models = [m["name"] for m in r.json().get("models", [])] self._send_json(200, {"status": "ok", "ollama": "connected", "models": models}) except: self._send_json(503, {"status": "error", "ollama": "disconnected"}) return if parsed.path == "/stats": if not self._check_auth(): return gpu = _get_gpu_utilization() with _stats_lock: stats_copy = {k: v for k, v in _stats.items() if not k.startswith("_")} stats_copy["gpu"] = gpu stats_copy["cost_config"] = COST_CONFIG self._send_json(200, stats_copy) return if parsed.path == "/config": if not self._check_auth(): return self._send_json(200, COST_CONFIG) return if parsed.path == "/ledger": if not self._check_auth(): return entries = _ledger_load() total_cost = sum(e.get("cost", 0) for e in entries) self._send_json(200, { "entries": len(entries), "total_cost": round(total_cost, 6), "last_10": entries[-10:], }) return if parsed.path == "/reconcile": if not self._check_auth(): return entries = _ledger_load() verification = _ledger_verify(entries) total_cost = sum(e.get("cost", 0) for e in entries) self._send_json(200, { "ledger_entries": len(entries), "ledger_total_cost": round(total_cost, 6), "stats_total_cost": round(_stats.get("total_cost", 0), 6), "discrepancy": round(abs(total_cost - _stats.get("total_cost", 0)), 6), "hash_verification": verification, "status": "OK" if verification["invalid"] == 0 else "TAMPERED", }) return if parsed.path == "/dashboard": self._serve_dashboard() return # Proxy everything else to Ollama if not self._check_auth(): return self._proxy_to_ollama(self.path) def do_POST(self): if not self._check_auth(): return length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length > 0 else None # Config update endpoint — adjust cost parameters live if self.path == "/config" and body: global COST_CONFIG for key in body: if key in COST_CONFIG: COST_CONFIG[key] = type(_DEFAULT_COST_CONFIG.get(key, ""))(body[key]) _save_cost_config(COST_CONFIG) self._send_json(200, {"status": "updated", "config": COST_CONFIG}) return # Model update endpoint — downloads new GGUF and reloads if self.path == "/admin/update-model" and body: self._handle_model_update(body) return self._proxy_to_ollama(self.path, body) def _handle_model_update(self, body): """Download a new GGUF from a URL and reload the model. Request: {"url": "https://mortdec.ai/dl/...", "name": "mortdecai-v5"} This is opt-in — the gateway operator must enable ALLOW_MODEL_UPDATES=true. """ if os.environ.get("ALLOW_MODEL_UPDATES", "false").lower() != "true": self._send_json(403, {"error": "Model updates disabled. Set ALLOW_MODEL_UPDATES=true in .env to enable."}) return url = body.get("url") name = body.get("name", "mortdecai-latest") if not url: self._send_json(400, {"error": "url is required"}) return try: import subprocess # Download GGUF gguf_path = f"/models/{name}.gguf" print(f"Downloading model from {url}...") r = requests.get(url, stream=True, timeout=600) r.raise_for_status() with open(f"models/{name}.gguf", "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) # Create Modelfile and load subprocess.run( ["docker", "exec", "mortdecai-ollama", "ollama", "create", name, "-f", f"/models/Modelfile"], timeout=120, check=True ) self._send_json(200, {"status": "ok", "model": name, "message": "Model updated and loaded"}) except Exception as e: self._send_json(500, {"error": f"Update failed: {e}"}) def _serve_dashboard(self): """Simple HTML dashboard showing usage stats.""" with _stats_lock: s = {k: v for k, v in _stats.items() if not k.startswith("_")} gpu = _get_gpu_utilization() c = COST_CONFIG marginal_w = (c["gpu_load_watts"] - c["gpu_idle_watts"]) + (c["system_inference_watts"] - c["system_idle_watts"]) active = _check_budget() avg_cost_per_req = s["total_cost"] / max(s["total_requests"], 1) reqs_remaining = int((c["spending_cap"] - s["total_cost"]) / max(avg_cost_per_req, 0.000001)) if avg_cost_per_req > 0 else "∞" html = f"""
Config: GET /config | Update: POST /config | Stats: GET /stats (auth required)
""" self.send_response(200) self.send_header("Content-Type", "text/html") self.end_headers() self.wfile.write(html.encode()) def main(): _load_stats() c = COST_CONFIG print(f"Mortdecai Gateway starting") print(f" Ollama: {OLLAMA_URL}") print(f" Listen: 0.0.0.0:{LISTEN_PORT}") print(f" GPU: {c['gpu_idle_watts']}W idle → {c['gpu_load_watts']}W load") print(f" System: {c['system_idle_watts']}W idle → {c['system_inference_watts']}W load") print(f" Rate: ${c['electricity_rate']}/kWh | Mode: {c['billing_mode']}") print(f" Cap: ${c['spending_cap']}") print(f" Dashboard: http://localhost:{LISTEN_PORT}/dashboard") # Save stats periodically def _periodic_save(): while True: time.sleep(60) with _stats_lock: _save_stats() t = threading.Thread(target=_periodic_save, daemon=True) t.start() server = HTTPServer(("0.0.0.0", LISTEN_PORT), GatewayHandler) server.serve_forever() if __name__ == "__main__": main()