9dd2c2fbc7
MCP server (Python/aiohttp) that lets any AI CLI spin up interactive hardware diagnostic web pages served over LAN with WebSocket live updates and dual-format session logging (markdown + JSONL). 6 tools: scaffold, state, log, read_log, list, stop Split-pane scaffold: diagnostic content + sethmux terminal iframe CLI wrapper: ~/bin/workbench (serve, list, mcp, help) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
294 lines
9.2 KiB
Python
294 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Workbench MCP Server — AI-driven hardware diagnostic tool.
|
|
|
|
Exposes 6 MCP tools for scaffolding, state management, and logging
|
|
of interactive hardware diagnostic web pages served over LAN.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import socket
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from aiohttp import web
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
WORKBENCH_DIR = Path.home() / "workbench"
|
|
SCAFFOLD_HTML = Path(__file__).parent / "scaffold.html"
|
|
DEFAULT_PORT = 8070
|
|
SETHMUX_URL = os.environ.get("WORKBENCH_SETHMUX_URL", "https://mux.sethpc.xyz")
|
|
|
|
# Track active projects: {name: {"port": int, "runner": web.AppRunner, "ws_clients": set}}
|
|
active_projects: dict = {}
|
|
|
|
|
|
def get_lan_ip() -> str:
|
|
"""Get the machine's LAN IP address."""
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("192.168.0.1", 80))
|
|
ip = s.getsockname()[0]
|
|
s.close()
|
|
return ip
|
|
except Exception:
|
|
return "127.0.0.1"
|
|
|
|
|
|
def find_free_port(start: int = DEFAULT_PORT) -> int:
|
|
"""Find a free port starting from the given port."""
|
|
for port in range(start, start + 100):
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.bind(("0.0.0.0", port))
|
|
s.close()
|
|
return port
|
|
except OSError:
|
|
continue
|
|
raise RuntimeError(f"No free port found in range {start}-{start+99}")
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
|
|
|
|
|
|
def project_path(name: str) -> Path:
|
|
return WORKBENCH_DIR / name
|
|
|
|
|
|
# --- HTTP/WebSocket server per project ---
|
|
|
|
async def ws_handler(request):
|
|
"""WebSocket endpoint for pushing state/log to browser."""
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
project_name = request.app["project_name"]
|
|
if project_name in active_projects:
|
|
active_projects[project_name]["ws_clients"].add(ws)
|
|
try:
|
|
async for msg in ws:
|
|
pass # Browser doesn't send us anything we need
|
|
finally:
|
|
if project_name in active_projects:
|
|
active_projects[project_name]["ws_clients"].discard(ws)
|
|
return ws
|
|
|
|
|
|
async def static_handler(request):
|
|
"""Serve files from the project directory."""
|
|
project_name = request.app["project_name"]
|
|
path = request.match_info.get("path", "index.html") or "index.html"
|
|
file_path = project_path(project_name) / path
|
|
if not file_path.exists():
|
|
return web.Response(status=404, text="Not found")
|
|
return web.FileResponse(file_path)
|
|
|
|
|
|
async def start_http_server(name: str, port: int) -> web.AppRunner:
|
|
"""Start an HTTP + WebSocket server for a project."""
|
|
app = web.Application()
|
|
app["project_name"] = name
|
|
app.router.add_get("/ws", ws_handler)
|
|
app.router.add_get("/{path:.*}", static_handler)
|
|
app.router.add_get("/", static_handler)
|
|
runner = web.AppRunner(app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, "0.0.0.0", port)
|
|
await site.start()
|
|
return runner
|
|
|
|
|
|
async def broadcast_ws(name: str, message: dict):
|
|
"""Send a message to all WebSocket clients of a project."""
|
|
if name not in active_projects:
|
|
return
|
|
clients = active_projects[name]["ws_clients"]
|
|
dead = set()
|
|
for ws in clients:
|
|
try:
|
|
await ws.send_json(message)
|
|
except Exception:
|
|
dead.add(ws)
|
|
clients -= dead
|
|
|
|
|
|
# --- MCP Tools ---
|
|
|
|
mcp = FastMCP("workbench", instructions="Hardware diagnostic workbench — serve interactive diagnostic pages over LAN")
|
|
|
|
|
|
@mcp.tool()
|
|
async def workbench_scaffold(name: str, title: str, description: str = "") -> str:
|
|
"""Create a new workbench project with split-pane diagnostic page + sethmux terminal.
|
|
|
|
Returns the LAN URL to open on your phone.
|
|
"""
|
|
pdir = project_path(name)
|
|
if pdir.exists():
|
|
# Already exists — just start the server if not running
|
|
if name not in active_projects:
|
|
port = find_free_port()
|
|
runner = await start_http_server(name, port)
|
|
active_projects[name] = {"port": port, "runner": runner, "ws_clients": set()}
|
|
ip = get_lan_ip()
|
|
port = active_projects[name]["port"]
|
|
return json.dumps({"path": str(pdir), "url": f"http://{ip}:{port}"})
|
|
|
|
pdir.mkdir(parents=True)
|
|
(pdir / "assets").mkdir()
|
|
|
|
# Copy and fill scaffold
|
|
template = SCAFFOLD_HTML.read_text()
|
|
html = template.replace("{{TITLE}}", title)
|
|
html = html.replace("{{DESCRIPTION}}", description)
|
|
html = html.replace("{{SETHMUX_URL}}", SETHMUX_URL)
|
|
(pdir / "index.html").write_text(html)
|
|
|
|
# Init log files
|
|
(pdir / "session.md").write_text(f"# {title} — Diagnostic Session\n\nStarted: {now_iso()}\n\n")
|
|
(pdir / "session.jsonl").write_text("")
|
|
(pdir / "cost-log.jsonl").write_text(
|
|
json.dumps({"ts": now_iso(), "event": "session_start", "project": name}) + "\n"
|
|
)
|
|
(pdir / "state.json").write_text("{}")
|
|
|
|
# Start server
|
|
port = find_free_port()
|
|
runner = await start_http_server(name, port)
|
|
active_projects[name] = {"port": port, "runner": runner, "ws_clients": set()}
|
|
|
|
ip = get_lan_ip()
|
|
return json.dumps({"path": str(pdir), "url": f"http://{ip}:{port}"})
|
|
|
|
|
|
@mcp.tool()
|
|
async def workbench_state(project: str, state: str) -> str:
|
|
"""Push state update to the browser via WebSocket.
|
|
|
|
The state is arbitrary JSON — the AI decides the schema.
|
|
Include a 'template' field (HTML string) to replace the diagnostic content area.
|
|
Include 'styles' for CSS and 'script' for JS to execute.
|
|
"""
|
|
state_obj = json.loads(state)
|
|
|
|
# Save to disk
|
|
pdir = project_path(project)
|
|
if not pdir.exists():
|
|
return json.dumps({"error": f"Project '{project}' not found. Run workbench_scaffold first."})
|
|
|
|
(pdir / "state.json").write_text(json.dumps(state_obj, indent=2))
|
|
|
|
# Push to browser
|
|
await broadcast_ws(project, {"type": "state", "state": state_obj})
|
|
|
|
return json.dumps({"ok": True})
|
|
|
|
|
|
@mcp.tool()
|
|
async def workbench_log(project: str, entry: str, data: str = "{}") -> str:
|
|
"""Append a log entry to the session log. Shows in the browser log feed.
|
|
|
|
entry: Human-readable markdown string (e.g., "R412 measured 1.05M — drifted +16.7%, FAIL")
|
|
data: Optional JSON for the machine-readable log
|
|
"""
|
|
pdir = project_path(project)
|
|
if not pdir.exists():
|
|
return json.dumps({"error": f"Project '{project}' not found."})
|
|
|
|
ts = now_iso()
|
|
|
|
# Append to session.md
|
|
with open(pdir / "session.md", "a") as f:
|
|
f.write(f"\n### {ts}\n{entry}\n")
|
|
|
|
# Append to session.jsonl
|
|
data_obj = json.loads(data) if data else {}
|
|
data_obj["ts"] = ts
|
|
data_obj["entry"] = entry
|
|
with open(pdir / "session.jsonl", "a") as f:
|
|
f.write(json.dumps(data_obj) + "\n")
|
|
|
|
# Push to browser
|
|
await broadcast_ws(project, {"type": "log", "entry": entry})
|
|
|
|
return json.dumps({"ok": True})
|
|
|
|
|
|
@mcp.tool()
|
|
async def workbench_read_log(project: str, tail: int = 20) -> str:
|
|
"""Read recent session log entries so AI can resume a session."""
|
|
pdir = project_path(project)
|
|
if not pdir.exists():
|
|
return json.dumps({"error": f"Project '{project}' not found."})
|
|
|
|
jsonl_path = pdir / "session.jsonl"
|
|
if not jsonl_path.exists():
|
|
return json.dumps({"entries": []})
|
|
|
|
lines = jsonl_path.read_text().strip().split("\n")
|
|
recent = lines[-tail:] if len(lines) > tail else lines
|
|
entries = []
|
|
for line in recent:
|
|
if line.strip():
|
|
try:
|
|
entries.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
entries.append({"raw": line})
|
|
|
|
return json.dumps({"entries": entries})
|
|
|
|
|
|
@mcp.tool()
|
|
async def workbench_list() -> str:
|
|
"""List all workbench projects and their status."""
|
|
if not WORKBENCH_DIR.exists():
|
|
return json.dumps({"projects": []})
|
|
|
|
projects = []
|
|
for d in sorted(WORKBENCH_DIR.iterdir()):
|
|
if d.is_dir():
|
|
info = {"name": d.name, "active": d.name in active_projects}
|
|
if d.name in active_projects:
|
|
ip = get_lan_ip()
|
|
port = active_projects[d.name]["port"]
|
|
info["url"] = f"http://{ip}:{port}"
|
|
info["ws_clients"] = len(active_projects[d.name]["ws_clients"])
|
|
projects.append(info)
|
|
|
|
return json.dumps({"projects": projects})
|
|
|
|
|
|
@mcp.tool()
|
|
async def workbench_stop(project: str) -> str:
|
|
"""Stop the HTTP/WebSocket server for a project and log session end."""
|
|
if project not in active_projects:
|
|
return json.dumps({"error": f"Project '{project}' is not running."})
|
|
|
|
# Log session end to cost-log
|
|
pdir = project_path(project)
|
|
jsonl_path = pdir / "session.jsonl"
|
|
entry_count = 0
|
|
if jsonl_path.exists():
|
|
entry_count = sum(1 for line in jsonl_path.read_text().strip().split("\n") if line.strip())
|
|
|
|
cost_entry = {
|
|
"ts": now_iso(),
|
|
"event": "session_end",
|
|
"project": project,
|
|
"log_entries": entry_count,
|
|
}
|
|
with open(pdir / "cost-log.jsonl", "a") as f:
|
|
f.write(json.dumps(cost_entry) + "\n")
|
|
|
|
# Shutdown server
|
|
runner = active_projects[project]["runner"]
|
|
await runner.cleanup()
|
|
del active_projects[project]
|
|
|
|
return json.dumps({"ok": True})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run(transport="stdio")
|