#!/usr/bin/env python3 """Workbench MCP Server — AI-driven hardware diagnostic tool. Exposes 6 MCP tools for scaffolding, state management, and logging of interactive hardware diagnostic web pages served over LAN. """ import asyncio import json import os import socket from datetime import datetime, timezone from pathlib import Path from aiohttp import web from mcp.server.fastmcp import FastMCP WORKBENCH_DIR = Path.home() / "workbench" SCAFFOLD_HTML = Path(__file__).parent / "scaffold.html" DEFAULT_PORT = 8070 SETHMUX_URL = os.environ.get("WORKBENCH_SETHMUX_URL", "https://mux.sethpc.xyz") # Track active projects: {name: {"port": int, "runner": web.AppRunner, "ws_clients": set}} active_projects: dict = {} def get_lan_ip() -> str: """Get the machine's LAN IP address.""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("192.168.0.1", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "127.0.0.1" def find_free_port(start: int = DEFAULT_PORT) -> int: """Find a free port starting from the given port.""" for port in range(start, start + 100): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(("0.0.0.0", port)) s.close() return port except OSError: continue raise RuntimeError(f"No free port found in range {start}-{start+99}") def now_iso() -> str: return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") def project_path(name: str) -> Path: return WORKBENCH_DIR / name # --- HTTP/WebSocket server per project --- async def ws_handler(request): """WebSocket endpoint for pushing state/log to browser.""" ws = web.WebSocketResponse() await ws.prepare(request) project_name = request.app["project_name"] if project_name in active_projects: active_projects[project_name]["ws_clients"].add(ws) try: async for msg in ws: pass # Browser doesn't send us anything we need finally: if project_name in active_projects: active_projects[project_name]["ws_clients"].discard(ws) return ws async def static_handler(request): """Serve files from the project directory.""" project_name = request.app["project_name"] path = request.match_info.get("path", "index.html") or "index.html" file_path = project_path(project_name) / path if not file_path.exists(): return web.Response(status=404, text="Not found") return web.FileResponse(file_path) async def start_http_server(name: str, port: int) -> web.AppRunner: """Start an HTTP + WebSocket server for a project.""" app = web.Application() app["project_name"] = name app.router.add_get("/ws", ws_handler) app.router.add_get("/{path:.*}", static_handler) app.router.add_get("/", static_handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, "0.0.0.0", port) await site.start() return runner async def broadcast_ws(name: str, message: dict): """Send a message to all WebSocket clients of a project.""" if name not in active_projects: return clients = active_projects[name]["ws_clients"] dead = set() for ws in clients: try: await ws.send_json(message) except Exception: dead.add(ws) clients -= dead # --- MCP Tools --- mcp = FastMCP("workbench", instructions="Hardware diagnostic workbench — serve interactive diagnostic pages over LAN") @mcp.tool() async def workbench_scaffold(name: str, title: str, description: str = "") -> str: """Create a new workbench project with split-pane diagnostic page + sethmux terminal. Returns the LAN URL to open on your phone. """ pdir = project_path(name) if pdir.exists(): # Already exists — just start the server if not running if name not in active_projects: port = find_free_port() runner = await start_http_server(name, port) active_projects[name] = {"port": port, "runner": runner, "ws_clients": set()} ip = get_lan_ip() port = active_projects[name]["port"] return json.dumps({"path": str(pdir), "url": f"http://{ip}:{port}"}) pdir.mkdir(parents=True) (pdir / "assets").mkdir() # Copy and fill scaffold template = SCAFFOLD_HTML.read_text() html = template.replace("{{TITLE}}", title) html = html.replace("{{DESCRIPTION}}", description) html = html.replace("{{SETHMUX_URL}}", SETHMUX_URL) (pdir / "index.html").write_text(html) # Init log files (pdir / "session.md").write_text(f"# {title} — Diagnostic Session\n\nStarted: {now_iso()}\n\n") (pdir / "session.jsonl").write_text("") (pdir / "cost-log.jsonl").write_text( json.dumps({"ts": now_iso(), "event": "session_start", "project": name}) + "\n" ) (pdir / "state.json").write_text("{}") # Start server port = find_free_port() runner = await start_http_server(name, port) active_projects[name] = {"port": port, "runner": runner, "ws_clients": set()} ip = get_lan_ip() return json.dumps({"path": str(pdir), "url": f"http://{ip}:{port}"}) @mcp.tool() async def workbench_state(project: str, state: str) -> str: """Push state update to the browser via WebSocket. The state is arbitrary JSON — the AI decides the schema. Include a 'template' field (HTML string) to replace the diagnostic content area. Include 'styles' for CSS and 'script' for JS to execute. """ state_obj = json.loads(state) # Save to disk pdir = project_path(project) if not pdir.exists(): return json.dumps({"error": f"Project '{project}' not found. Run workbench_scaffold first."}) (pdir / "state.json").write_text(json.dumps(state_obj, indent=2)) # Push to browser await broadcast_ws(project, {"type": "state", "state": state_obj}) return json.dumps({"ok": True}) @mcp.tool() async def workbench_log(project: str, entry: str, data: str = "{}") -> str: """Append a log entry to the session log. Shows in the browser log feed. entry: Human-readable markdown string (e.g., "R412 measured 1.05M — drifted +16.7%, FAIL") data: Optional JSON for the machine-readable log """ pdir = project_path(project) if not pdir.exists(): return json.dumps({"error": f"Project '{project}' not found."}) ts = now_iso() # Append to session.md with open(pdir / "session.md", "a") as f: f.write(f"\n### {ts}\n{entry}\n") # Append to session.jsonl data_obj = json.loads(data) if data else {} data_obj["ts"] = ts data_obj["entry"] = entry with open(pdir / "session.jsonl", "a") as f: f.write(json.dumps(data_obj) + "\n") # Push to browser await broadcast_ws(project, {"type": "log", "entry": entry}) return json.dumps({"ok": True}) @mcp.tool() async def workbench_read_log(project: str, tail: int = 20) -> str: """Read recent session log entries so AI can resume a session.""" pdir = project_path(project) if not pdir.exists(): return json.dumps({"error": f"Project '{project}' not found."}) jsonl_path = pdir / "session.jsonl" if not jsonl_path.exists(): return json.dumps({"entries": []}) lines = jsonl_path.read_text().strip().split("\n") recent = lines[-tail:] if len(lines) > tail else lines entries = [] for line in recent: if line.strip(): try: entries.append(json.loads(line)) except json.JSONDecodeError: entries.append({"raw": line}) return json.dumps({"entries": entries}) @mcp.tool() async def workbench_list() -> str: """List all workbench projects and their status.""" if not WORKBENCH_DIR.exists(): return json.dumps({"projects": []}) projects = [] for d in sorted(WORKBENCH_DIR.iterdir()): if d.is_dir(): info = {"name": d.name, "active": d.name in active_projects} if d.name in active_projects: ip = get_lan_ip() port = active_projects[d.name]["port"] info["url"] = f"http://{ip}:{port}" info["ws_clients"] = len(active_projects[d.name]["ws_clients"]) projects.append(info) return json.dumps({"projects": projects}) @mcp.tool() async def workbench_stop(project: str) -> str: """Stop the HTTP/WebSocket server for a project and log session end.""" if project not in active_projects: return json.dumps({"error": f"Project '{project}' is not running."}) # Log session end to cost-log pdir = project_path(project) jsonl_path = pdir / "session.jsonl" entry_count = 0 if jsonl_path.exists(): entry_count = sum(1 for line in jsonl_path.read_text().strip().split("\n") if line.strip()) cost_entry = { "ts": now_iso(), "event": "session_end", "project": project, "log_entries": entry_count, } with open(pdir / "cost-log.jsonl", "a") as f: f.write(json.dumps(cost_entry) + "\n") # Shutdown server runner = active_projects[project]["runner"] await runner.cleanup() del active_projects[project] return json.dumps({"ok": True}) if __name__ == "__main__": mcp.run(transport="stdio")