From e9c0cd7f151b0f4261c6cf0b4666423050767deb Mon Sep 17 00:00:00 2001 From: Mortdecai Date: Sun, 29 Mar 2026 19:10:48 -0400 Subject: [PATCH] feat: MCP server with kitt_* tools and Unix socket management Co-Authored-By: Claude Sonnet 4.6 --- src/kitty_workbench/server.py | 258 ++++++++++++++++++++++++++++++++++ tests/test_server.py | 57 ++++++++ 2 files changed, 315 insertions(+) create mode 100644 src/kitty_workbench/server.py create mode 100644 tests/test_server.py diff --git a/src/kitty_workbench/server.py b/src/kitty_workbench/server.py new file mode 100644 index 0000000..319b34a --- /dev/null +++ b/src/kitty_workbench/server.py @@ -0,0 +1,258 @@ +"""MCP server — exposes kitt_* tools, manages socket connections to TUI apps.""" + +from __future__ import annotations + +import asyncio +import json +import os +import sys +from pathlib import Path +from typing import Optional + +from mcp.server.fastmcp import FastMCP + +from kitty_workbench.backends import detect_backend, Backend +from kitty_workbench.project import ( + create_project, project_exists, append_log, read_log, + list_projects, log_session_event, project_path, +) +from kitty_workbench.protocol import ( + InitCmd, DisplayCmd, ImageCmd, LogCmd, ClearCmd, LayoutCmd, + ShutdownCmd, encode_message, decode_message, ReadyEvent, +) + +DEFAULT_WORKBENCH_DIR = Path.home() / "Kitty-Workbench" +DEFAULT_SOCKET_DIR = "/tmp" + + +class KittWorkbenchServer: + """Core server logic — testable without MCP transport.""" + + def __init__(self, workbench_dir: Path = DEFAULT_WORKBENCH_DIR, socket_dir: str = DEFAULT_SOCKET_DIR): + self.workbench_dir = Path(workbench_dir) + self.socket_dir = socket_dir + self.backend: Optional[Backend] = None + self._connections: dict[str, asyncio.StreamWriter] = {} + self._pane_ids: dict[str, object] = {} + self._event_queues: dict[str, list] = {} + self._socket_servers: dict[str, asyncio.AbstractServer] = {} + + def _socket_path(self, name: str) -> str: + return os.path.join(self.socket_dir, f"kitt-{name}.sock") + + async def _launch_tui(self, name: str, title: str) -> None: + """Start socket server, launch TUI pane, wait for ready.""" + if self.backend is None: + self.backend = detect_backend() + + sock_path = self._socket_path(name) + if os.path.exists(sock_path): + os.unlink(sock_path) + + ready_event = asyncio.Event() + + async def handle_tui_connection(reader, writer): + self._connections[name] = writer + while True: + try: + line = await reader.readline() + if not line: + break + msg = decode_message(line.decode().strip()) + if msg is None: + continue + if isinstance(msg, ReadyEvent): + init_cmd = InitCmd( + project=name, title=title, + image_protocol=self.backend.image_protocol(), + ) + writer.write((encode_message(init_cmd) + "\n").encode()) + await writer.drain() + ready_event.set() + else: + if name not in self._event_queues: + self._event_queues[name] = [] + from dataclasses import asdict + self._event_queues[name].append(asdict(msg)) + except (ConnectionResetError, asyncio.IncompleteReadError): + break + + server = await asyncio.start_unix_server(handle_tui_connection, path=sock_path) + self._socket_servers[name] = server + + command = [sys.executable, "-m", "kitty_workbench", "tui", name, "--socket", sock_path] + pane_id = self.backend.launch_pane(command, title) + self._pane_ids[name] = pane_id + + try: + await asyncio.wait_for(ready_event.wait(), timeout=10.0) + except asyncio.TimeoutError: + raise RuntimeError(f"TUI did not connect within 10 seconds for project '{name}'") + + async def _send_cmd(self, name: str, cmd) -> None: + writer = self._connections.get(name) + if writer is None: + return + writer.write((encode_message(cmd) + "\n").encode()) + await writer.drain() + + async def kitt_open(self, name: str, title: str, description: str = "") -> str: + if name in self._connections: + return json.dumps({ + "project": name, + "backend": self.backend.name if self.backend else "unknown", + "status": "already_open", + }) + create_project(name, title, workbench_dir=self.workbench_dir) + log_session_event(name, "session_start", workbench_dir=self.workbench_dir) + self._event_queues[name] = [] + await self._launch_tui(name, title) + return json.dumps({ + "project": name, + "backend": self.backend.name if self.backend else "unknown", + "image_support": self.backend.image_protocol() if self.backend else "none", + "status": "ready", + }) + + async def kitt_display(self, project: str, widget: str, content: str = "", + items: str = "", id: str = "", label: str = "", + placeholder: str = "", pane: str = "main", clear: bool = False) -> str: + if project not in self._connections: + return json.dumps({"error": f"Project '{project}' not open. Call kitt_open first."}) + cmd = DisplayCmd( + widget=widget, pane=pane, clear=clear, + content=content or None, + items=json.loads(items) if items else None, + id=id or None, label=label or None, placeholder=placeholder or None, + ) + await self._send_cmd(project, cmd) + return json.dumps({"ok": True}) + + async def kitt_image(self, project: str, path: str, pane: str = "main", clear: bool = True) -> str: + if project not in self._connections: + return json.dumps({"error": f"Project '{project}' not open."}) + p = Path(path) + if not p.is_absolute(): + p = project_path(project, self.workbench_dir) / "assets" / path + if not p.exists(): + return json.dumps({"error": f"Image not found: {p}"}) + cmd = ImageCmd(path=str(p), pane=pane, clear=clear) + await self._send_cmd(project, cmd) + return json.dumps({"ok": True, "path": str(p), "protocol": self.backend.image_protocol() if self.backend else "none"}) + + async def kitt_layout(self, project: str, panes: str) -> str: + if project not in self._connections: + return json.dumps({"error": f"Project '{project}' not open."}) + panes_dict = json.loads(panes) + cmd = LayoutCmd(panes=panes_dict) + await self._send_cmd(project, cmd) + return json.dumps({"ok": True, "panes": list(panes_dict.keys())}) + + async def kitt_log(self, project: str, entry: str, data: str = "{}", level: str = "info") -> str: + if not project_exists(project, workbench_dir=self.workbench_dir): + return json.dumps({"error": f"Project '{project}' not found."}) + data_dict = json.loads(data) if data and data != "{}" else None + append_log(project, entry, data=data_dict, level=level, workbench_dir=self.workbench_dir) + if project in self._connections: + cmd = LogCmd(entry=entry, level=level) + await self._send_cmd(project, cmd) + return json.dumps({"ok": True}) + + async def kitt_events(self, project: str) -> str: + events = self._event_queues.get(project, []) + self._event_queues[project] = [] + return json.dumps({"events": events}) + + async def kitt_read_log(self, project: str, tail: int = 20) -> str: + if not project_exists(project, workbench_dir=self.workbench_dir): + return json.dumps({"error": f"Project '{project}' not found."}) + entries = read_log(project, tail=tail, workbench_dir=self.workbench_dir) + return json.dumps({"entries": entries}) + + async def kitt_list(self) -> str: + projects = list_projects(workbench_dir=self.workbench_dir) + for p in projects: + p["active"] = p["name"] in self._connections + return json.dumps({"projects": projects}) + + async def kitt_close(self, project: str) -> str: + if project not in self._connections and project not in self._pane_ids: + return json.dumps({"error": f"Project '{project}' is not open."}) + if project_exists(project, workbench_dir=self.workbench_dir): + entries = read_log(project, tail=999999, workbench_dir=self.workbench_dir) + log_session_event(project, "session_end", workbench_dir=self.workbench_dir, log_entries=len(entries)) + if project in self._connections: + try: + await self._send_cmd(project, ShutdownCmd()) + except Exception: + pass + del self._connections[project] + if project in self._pane_ids and self.backend: + try: + self.backend.close_pane(self._pane_ids[project]) + except Exception: + pass + del self._pane_ids[project] + if project in self._socket_servers: + self._socket_servers[project].close() + del self._socket_servers[project] + sock_path = self._socket_path(project) + if os.path.exists(sock_path): + os.unlink(sock_path) + self._event_queues.pop(project, None) + return json.dumps({"ok": True}) + + +def create_mcp_server(workbench_dir: Path = DEFAULT_WORKBENCH_DIR, socket_dir: str = DEFAULT_SOCKET_DIR) -> FastMCP: + """Create the FastMCP instance with all kitt_* tools registered.""" + srv = KittWorkbenchServer(workbench_dir=workbench_dir, socket_dir=socket_dir) + mcp = FastMCP("kitty-workbench", instructions="Kitty-Workbench — interactive terminal display panel for AI-driven diagnostics. Call kitt_open first to start.") + + @mcp.tool() + async def kitt_open(name: str, title: str, description: str = "") -> str: + """Create a new Kitty-Workbench project and open the interactive display pane. Call this first before using any other kitt_ tools. A new pane appears next to the AI CLI — as a split in kitty or tmux, or as a separate terminal window.""" + return await srv.kitt_open(name, title, description) + + @mcp.tool() + async def kitt_display(project: str, widget: str, content: str = "", items: str = "", + id: str = "", label: str = "", placeholder: str = "", + pane: str = "main", clear: bool = False) -> str: + """Push content to the display pane. Widget types: 'markdown' (rendered text), 'table' (columns+rows as JSON in content), 'checklist' (items as JSON array), 'button' (needs id+label), 'input' (needs id+placeholder). Use kitt_events to read user interactions.""" + return await srv.kitt_display(project, widget, content, items, id, label, placeholder, pane, clear) + + @mcp.tool() + async def kitt_image(project: str, path: str, pane: str = "main", clear: bool = True) -> str: + """Display an image in the display pane. Supports PNG, JPG, GIF. Use absolute path or relative to project assets/ dir. Image quality depends on terminal (kitty > sixel > ASCII art).""" + return await srv.kitt_image(project, path, pane, clear) + + @mcp.tool() + async def kitt_layout(project: str, panes: str) -> str: + """Change the display layout. Pass a JSON object with region names and options. Example: {"main": {"ratio": 2}, "sidebar": {"ratio": 1, "position": "right"}, "log": {"ratio": 1, "position": "bottom"}}""" + return await srv.kitt_layout(project, panes) + + @mcp.tool() + async def kitt_log(project: str, entry: str, data: str = "{}", level: str = "info") -> str: + """Record a diagnostic log entry. Saved to disk and shown in log pane if it exists. Levels: info, warning, error, success.""" + return await srv.kitt_log(project, entry, data, level) + + @mcp.tool() + async def kitt_events(project: str) -> str: + """Read user interactions from the display pane — checklist toggles, button clicks, text input submissions. Returns all events since last call, then clears the queue.""" + return await srv.kitt_events(project) + + @mcp.tool() + async def kitt_read_log(project: str, tail: int = 20) -> str: + """Read recent session log entries from disk. Use this to resume a previous session.""" + return await srv.kitt_read_log(project, tail) + + @mcp.tool() + async def kitt_list() -> str: + """List all Kitty-Workbench projects and whether their display pane is currently open.""" + return await srv.kitt_list() + + @mcp.tool() + async def kitt_close(project: str) -> str: + """Close the display pane and end the session. Logs session end to cost-log.jsonl.""" + return await srv.kitt_close(project) + + return mcp diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..6979d89 --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,57 @@ +import asyncio +import json +from pathlib import Path +from unittest.mock import patch, AsyncMock + +import pytest + +from kitty_workbench.server import KittWorkbenchServer +from kitty_workbench.protocol import encode_message, decode_message, ReadyEvent, ChecklistToggleEvent + + +@pytest.fixture +def server(tmp_workbench, socket_path): + return KittWorkbenchServer(workbench_dir=tmp_workbench, socket_dir=str(Path(socket_path).parent)) + + +@pytest.mark.asyncio +async def test_server_open_creates_project(server, tmp_workbench): + with patch.object(server, "_launch_tui", new_callable=AsyncMock): + result = await server.kitt_open("test-proj", "Test Project") + result_data = json.loads(result) + assert result_data["project"] == "test-proj" + assert (tmp_workbench / "test-proj" / "session.md").exists() + + +@pytest.mark.asyncio +async def test_server_list_empty(server): + result = await server.kitt_list() + data = json.loads(result) + assert data["projects"] == [] + + +@pytest.mark.asyncio +async def test_server_log_writes_to_disk(server, tmp_workbench): + with patch.object(server, "_launch_tui", new_callable=AsyncMock): + await server.kitt_open("test-proj", "Test") + server._connections["test-proj"] = AsyncMock() + result = await server.kitt_log("test-proj", "R412 measured 1.05M", level="warning") + data = json.loads(result) + assert data["ok"] is True + jsonl = (tmp_workbench / "test-proj" / "session.jsonl").read_text().strip() + entry = json.loads(jsonl) + assert entry["entry"] == "R412 measured 1.05M" + + +@pytest.mark.asyncio +async def test_server_events_queue(server): + server._event_queues["test-proj"] = [] + server._event_queues["test-proj"].append( + {"event": "checklist_toggle", "index": 0, "label": "Test", "checked": True, "pane": "sidebar"} + ) + result = await server.kitt_events("test-proj") + data = json.loads(result) + assert len(data["events"]) == 1 + assert data["events"][0]["checked"] is True + result2 = await server.kitt_events("test-proj") + assert json.loads(result2)["events"] == []