"""MCP server — exposes kitt_* tools, manages socket connections to TUI apps.""" from __future__ import annotations import asyncio import json import os import sys from pathlib import Path from typing import Optional from mcp.server.fastmcp import FastMCP from kitty_workbench.backends import detect_backend, Backend from kitty_workbench.project import ( create_project, project_exists, append_log, read_log, list_projects, log_session_event, project_path, ) from kitty_workbench.protocol import ( InitCmd, DisplayCmd, ImageCmd, LogCmd, ClearCmd, LayoutCmd, ShutdownCmd, encode_message, decode_message, ReadyEvent, ) DEFAULT_WORKBENCH_DIR = Path.home() / "Kitty-Workbench" DEFAULT_SOCKET_DIR = "/tmp" class KittWorkbenchServer: """Core server logic — testable without MCP transport.""" def __init__(self, workbench_dir: Path = DEFAULT_WORKBENCH_DIR, socket_dir: str = DEFAULT_SOCKET_DIR): self.workbench_dir = Path(workbench_dir) self.socket_dir = socket_dir self.backend: Optional[Backend] = None self._connections: dict[str, asyncio.StreamWriter] = {} self._pane_ids: dict[str, object] = {} self._event_queues: dict[str, list] = {} self._socket_servers: dict[str, asyncio.AbstractServer] = {} def _socket_path(self, name: str) -> str: return os.path.join(self.socket_dir, f"kitt-{name}.sock") async def _launch_tui(self, name: str, title: str) -> None: """Start socket server, launch TUI pane, wait for ready.""" if self.backend is None: self.backend = detect_backend() sock_path = self._socket_path(name) if os.path.exists(sock_path): os.unlink(sock_path) ready_event = asyncio.Event() async def handle_tui_connection(reader, writer): self._connections[name] = writer while True: try: line = await reader.readline() if not line: break msg = decode_message(line.decode().strip()) if msg is None: continue if isinstance(msg, ReadyEvent): init_cmd = InitCmd( project=name, title=title, image_protocol=self.backend.image_protocol(), ) writer.write((encode_message(init_cmd) + "\n").encode()) await writer.drain() ready_event.set() else: if name not in self._event_queues: self._event_queues[name] = [] from dataclasses import asdict self._event_queues[name].append(asdict(msg)) except (ConnectionResetError, asyncio.IncompleteReadError): break server = await asyncio.start_unix_server(handle_tui_connection, path=sock_path) self._socket_servers[name] = server command = [sys.executable, "-m", "kitty_workbench", "tui", name, "--socket", sock_path] pane_id = self.backend.launch_pane(command, title) self._pane_ids[name] = pane_id try: await asyncio.wait_for(ready_event.wait(), timeout=10.0) except asyncio.TimeoutError: raise RuntimeError(f"TUI did not connect within 10 seconds for project '{name}'") async def _send_cmd(self, name: str, cmd) -> None: writer = self._connections.get(name) if writer is None: return writer.write((encode_message(cmd) + "\n").encode()) await writer.drain() async def kitt_open(self, name: str, title: str, description: str = "") -> str: if name in self._connections: return json.dumps({ "project": name, "backend": self.backend.name if self.backend else "unknown", "status": "already_open", }) create_project(name, title, workbench_dir=self.workbench_dir) log_session_event(name, "session_start", workbench_dir=self.workbench_dir) self._event_queues[name] = [] await self._launch_tui(name, title) return json.dumps({ "project": name, "backend": self.backend.name if self.backend else "unknown", "image_support": self.backend.image_protocol() if self.backend else "none", "status": "ready", }) async def kitt_display(self, project: str, widget: str, content: str = "", items: str = "", id: str = "", label: str = "", placeholder: str = "", pane: str = "main", clear: bool = False) -> str: if project not in self._connections: return json.dumps({"error": f"Project '{project}' not open. Call kitt_open first."}) cmd = DisplayCmd( widget=widget, pane=pane, clear=clear, content=content or None, items=json.loads(items) if items else None, id=id or None, label=label or None, placeholder=placeholder or None, ) await self._send_cmd(project, cmd) return json.dumps({"ok": True}) async def kitt_image(self, project: str, path: str, pane: str = "main", clear: bool = True) -> str: if project not in self._connections: return json.dumps({"error": f"Project '{project}' not open."}) p = Path(path) if not p.is_absolute(): p = project_path(project, self.workbench_dir) / "assets" / path if not p.exists(): return json.dumps({"error": f"Image not found: {p}"}) cmd = ImageCmd(path=str(p), pane=pane, clear=clear) await self._send_cmd(project, cmd) return json.dumps({"ok": True, "path": str(p), "protocol": self.backend.image_protocol() if self.backend else "none"}) async def kitt_layout(self, project: str, panes: str) -> str: if project not in self._connections: return json.dumps({"error": f"Project '{project}' not open."}) panes_dict = json.loads(panes) cmd = LayoutCmd(panes=panes_dict) await self._send_cmd(project, cmd) return json.dumps({"ok": True, "panes": list(panes_dict.keys())}) async def kitt_log(self, project: str, entry: str, data: str = "{}", level: str = "info") -> str: if not project_exists(project, workbench_dir=self.workbench_dir): return json.dumps({"error": f"Project '{project}' not found."}) data_dict = json.loads(data) if data and data != "{}" else None append_log(project, entry, data=data_dict, level=level, workbench_dir=self.workbench_dir) if project in self._connections: cmd = LogCmd(entry=entry, level=level) await self._send_cmd(project, cmd) return json.dumps({"ok": True}) async def kitt_events(self, project: str) -> str: events = self._event_queues.get(project, []) self._event_queues[project] = [] return json.dumps({"events": events}) async def kitt_read_log(self, project: str, tail: int = 20) -> str: if not project_exists(project, workbench_dir=self.workbench_dir): return json.dumps({"error": f"Project '{project}' not found."}) entries = read_log(project, tail=tail, workbench_dir=self.workbench_dir) return json.dumps({"entries": entries}) async def kitt_list(self) -> str: projects = list_projects(workbench_dir=self.workbench_dir) for p in projects: p["active"] = p["name"] in self._connections return json.dumps({"projects": projects}) async def kitt_close(self, project: str) -> str: if project not in self._connections and project not in self._pane_ids: return json.dumps({"error": f"Project '{project}' is not open."}) if project_exists(project, workbench_dir=self.workbench_dir): entries = read_log(project, tail=999999, workbench_dir=self.workbench_dir) log_session_event(project, "session_end", workbench_dir=self.workbench_dir, log_entries=len(entries)) if project in self._connections: try: await self._send_cmd(project, ShutdownCmd()) except Exception: pass del self._connections[project] if project in self._pane_ids and self.backend: try: self.backend.close_pane(self._pane_ids[project]) except Exception: pass del self._pane_ids[project] if project in self._socket_servers: self._socket_servers[project].close() del self._socket_servers[project] sock_path = self._socket_path(project) if os.path.exists(sock_path): os.unlink(sock_path) self._event_queues.pop(project, None) return json.dumps({"ok": True}) def create_mcp_server(workbench_dir: Path = DEFAULT_WORKBENCH_DIR, socket_dir: str = DEFAULT_SOCKET_DIR) -> FastMCP: """Create the FastMCP instance with all kitt_* tools registered.""" srv = KittWorkbenchServer(workbench_dir=workbench_dir, socket_dir=socket_dir) mcp = FastMCP("kitty-workbench", instructions="Kitty-Workbench — interactive terminal display panel for AI-driven diagnostics. Call kitt_open first to start.") @mcp.tool() async def kitt_open(name: str, title: str, description: str = "") -> str: """Create a new Kitty-Workbench project and open the interactive display pane. Call this first before using any other kitt_ tools. A new pane appears next to the AI CLI — as a split in kitty or tmux, or as a separate terminal window.""" return await srv.kitt_open(name, title, description) @mcp.tool() async def kitt_display(project: str, widget: str, content: str = "", items: str = "", id: str = "", label: str = "", placeholder: str = "", pane: str = "main", clear: bool = False) -> str: """Push content to the display pane. Widget types: 'markdown' (rendered text), 'table' (columns+rows as JSON in content), 'checklist' (items as JSON array), 'button' (needs id+label), 'input' (text field — needs id+placeholder, user must press Enter to submit). Tell the user to press Enter to submit text inputs. Use kitt_events to read user interactions.""" return await srv.kitt_display(project, widget, content, items, id, label, placeholder, pane, clear) @mcp.tool() async def kitt_image(project: str, path: str, pane: str = "main", clear: bool = True) -> str: """Display an image in the display pane. Supports PNG, JPG, GIF. Use absolute path or relative to project assets/ dir. Image quality depends on terminal (kitty > sixel > ASCII art).""" return await srv.kitt_image(project, path, pane, clear) @mcp.tool() async def kitt_layout(project: str, panes: str) -> str: """Change the display layout. Pass a JSON object with region names and options. Example: {"main": {"ratio": 2}, "sidebar": {"ratio": 1, "position": "right"}, "log": {"ratio": 1, "position": "bottom"}}""" return await srv.kitt_layout(project, panes) @mcp.tool() async def kitt_log(project: str, entry: str, data: str = "{}", level: str = "info") -> str: """Record a diagnostic log entry. Saved to disk and shown in log pane if it exists. Levels: info, warning, error, success.""" return await srv.kitt_log(project, entry, data, level) @mcp.tool() async def kitt_events(project: str) -> str: """Read user interactions from the display pane — checklist toggles, button clicks, text input submissions. Returns all events since last call, then clears the queue.""" return await srv.kitt_events(project) @mcp.tool() async def kitt_read_log(project: str, tail: int = 20) -> str: """Read recent session log entries from disk. Use this to resume a previous session.""" return await srv.kitt_read_log(project, tail) @mcp.tool() async def kitt_list() -> str: """List all Kitty-Workbench projects and whether their display pane is currently open.""" return await srv.kitt_list() @mcp.tool() async def kitt_close(project: str) -> str: """Close the display pane and end the session. Logs session end to cost-log.jsonl.""" return await srv.kitt_close(project) return mcp