From f16f16ef77122d904105cb769ded017fde41e665 Mon Sep 17 00:00:00 2001 From: Mortdecai Date: Sun, 29 Mar 2026 19:12:52 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Textual=20TUI=20app=20=E2=80=94=20widge?= =?UTF-8?q?t=20rendering,=20socket=20client,=20user=20events?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- src/kitty_workbench/tui.py | 240 +++++++++++++++++++++++++++++++++++++ tests/test_tui.py | 57 +++++++++ 2 files changed, 297 insertions(+) create mode 100644 src/kitty_workbench/tui.py create mode 100644 tests/test_tui.py diff --git a/src/kitty_workbench/tui.py b/src/kitty_workbench/tui.py new file mode 100644 index 0000000..d50ccc9 --- /dev/null +++ b/src/kitty_workbench/tui.py @@ -0,0 +1,240 @@ +"""Textual TUI application — the display pane.""" + +from __future__ import annotations + +import asyncio +import json +import sys +from pathlib import Path + +from textual.app import App, ComposeResult +from textual.containers import Container, Horizontal, Vertical +from textual.widgets import ( + Static, Markdown, DataTable, Button, Input, Header, Footer, + Checkbox, ListView, ListItem, RichLog, Label, +) +from textual.css.query import NoMatches + +from kitty_workbench.protocol import ( + decode_message, encode_message, ReadyEvent, + DisplayCmd, ImageCmd, LogCmd, ClearCmd, LayoutCmd, InitCmd, + NotifyCmd, ShutdownCmd, + ChecklistToggleEvent, ButtonClickEvent, InputSubmitEvent, +) + + +class KittWorkbenchApp(App): + """Kitty-Workbench display TUI.""" + + CSS = """ + Screen { + layout: grid; + grid-size: 1 1; + } + + #main { + overflow-y: auto; + } + + #sidebar { + overflow-y: auto; + display: none; + } + + #log { + display: none; + border-top: solid $accent; + } + + #status-bar { + dock: bottom; + height: 1; + background: $surface; + color: $text-muted; + padding: 0 1; + } + + .log-info { color: $text; } + .log-warning { color: yellow; } + .log-error { color: red; } + .log-success { color: green; } + """ + + def __init__(self, socket_path: str, **kwargs): + super().__init__(**kwargs) + self.socket_path = socket_path + self._writer: asyncio.StreamWriter | None = None + self._project = "" + self._image_protocol = "none" + + def compose(self) -> ComposeResult: + yield Container(id="main") + yield Container(id="sidebar") + yield RichLog(id="log", wrap=True, markup=True) + yield Static("Connecting...", id="status-bar") + + async def on_mount(self) -> None: + self.run_worker(self._connect_socket(), exclusive=True) + + async def _connect_socket(self) -> None: + try: + reader, writer = await asyncio.open_unix_connection(self.socket_path) + self._writer = writer + writer.write((encode_message(ReadyEvent()) + "\n").encode()) + await writer.drain() + self.query_one("#status-bar", Static).update("Connected") + while True: + line = await reader.readline() + if not line: + break + msg = decode_message(line.decode().strip()) + if msg is not None: + self.handle_command(msg) + except Exception as e: + self.query_one("#status-bar", Static).update(f"Error: {e}") + + def handle_command(self, cmd) -> None: + if isinstance(cmd, InitCmd): + self._project = cmd.project + self._image_protocol = cmd.image_protocol + self.title = cmd.title + self.query_one("#status-bar", Static).update( + f"● {self._project} │ {self._image_protocol}" + ) + elif isinstance(cmd, DisplayCmd): + self._handle_display(cmd) + elif isinstance(cmd, ImageCmd): + self._handle_image(cmd) + elif isinstance(cmd, LogCmd): + self._handle_log(cmd) + elif isinstance(cmd, ClearCmd): + self._handle_clear(cmd) + elif isinstance(cmd, LayoutCmd): + self._handle_layout(cmd) + elif isinstance(cmd, NotifyCmd): + self.notify(cmd.message, severity=cmd.level) + elif isinstance(cmd, ShutdownCmd): + self.exit() + + def _get_pane(self, pane_name: str) -> Container | RichLog: + try: + return self.query_one(f"#{pane_name}") + except NoMatches: + return self.query_one("#main") + + def _handle_display(self, cmd: DisplayCmd) -> None: + pane = self._get_pane(cmd.pane) + if cmd.clear and not isinstance(pane, RichLog): + pane.remove_children() + + if cmd.widget == "markdown": + pane.mount(Markdown(cmd.content or "")) + elif cmd.widget == "table": + data = json.loads(cmd.content) if cmd.content else {"columns": [], "rows": []} + table = DataTable() + for col in data.get("columns", []): + table.add_column(col, key=col) + for row in data.get("rows", []): + table.add_row(*row) + pane.mount(table) + elif cmd.widget == "checklist": + items = cmd.items or [] + lv = ListView(id=f"checklist-{cmd.pane}") + for i, item in enumerate(items): + cb = Checkbox(item["label"], value=item.get("checked", False), id=f"check-{cmd.pane}-{i}") + lv.append(ListItem(cb)) + pane.mount(lv) + elif cmd.widget == "button": + btn = Button(cmd.label or cmd.id or "Button", id=f"btn-{cmd.id}") + pane.mount(btn) + elif cmd.widget == "input": + inp = Input(placeholder=cmd.placeholder or "", id=f"input-{cmd.id}") + pane.mount(inp) + + def _handle_image(self, cmd: ImageCmd) -> None: + pane = self._get_pane(cmd.pane) + if cmd.clear and not isinstance(pane, RichLog): + pane.remove_children() + # Placeholder — image rendering added in Task 7 + pane.mount(Static(f"[Image: {cmd.path}]")) + + def _handle_log(self, cmd: LogCmd) -> None: + try: + log_widget = self.query_one("#log", RichLog) + except NoMatches: + return + log_widget.write(f"[{cmd.level.upper()}] {cmd.entry}") + + def _handle_clear(self, cmd: ClearCmd) -> None: + pane = self._get_pane(cmd.pane) + if isinstance(pane, RichLog): + pane.clear() + else: + pane.remove_children() + + def _handle_layout(self, cmd: LayoutCmd) -> None: + pane_names = set(cmd.panes.keys()) + for name in ("main", "sidebar", "log"): + try: + widget = self.query_one(f"#{name}") + if name in pane_names: + widget.styles.display = "block" + else: + widget.styles.display = "none" + except NoMatches: + pass + + has_sidebar = "sidebar" in pane_names + has_log = "log" in pane_names + + cols = 2 if has_sidebar else 1 + rows = 2 if has_log else 1 + + self.screen.styles.grid_size_columns = cols + self.screen.styles.grid_size_rows = rows + + if has_sidebar: + main_ratio = cmd.panes.get("main", {}).get("ratio", 2) + side_ratio = cmd.panes.get("sidebar", {}).get("ratio", 1) + self.screen.styles.grid_columns = f"{main_ratio}fr {side_ratio}fr" + + if has_log: + content_ratio = 3 + log_ratio = cmd.panes.get("log", {}).get("ratio", 1) + self.screen.styles.grid_rows = f"{content_ratio}fr {log_ratio}fr" + + async def _send_event(self, evt) -> None: + if self._writer: + self._writer.write((encode_message(evt) + "\n").encode()) + await self._writer.drain() + + def on_checkbox_changed(self, event: Checkbox.Changed) -> None: + cb = event.checkbox + parts = cb.id.split("-") if cb.id else [] + if len(parts) >= 3: + pane = parts[1] + index = int(parts[2]) + evt = ChecklistToggleEvent(pane=pane, index=index, label=str(cb.label), checked=cb.value) + self.run_worker(self._send_event(evt)) + + def on_button_pressed(self, event: Button.Pressed) -> None: + btn = event.button + btn_id = btn.id.removeprefix("btn-") if btn.id else "" + pane = "main" + for parent in btn.ancestors: + if hasattr(parent, "id") and parent.id in ("main", "sidebar", "log"): + pane = parent.id + break + evt = ButtonClickEvent(pane=pane, id=btn_id) + self.run_worker(self._send_event(evt)) + + def on_input_submitted(self, event: Input.Submitted) -> None: + inp = event.input + inp_id = inp.id.removeprefix("input-") if inp.id else "" + pane = "main" + for parent in inp.ancestors: + if hasattr(parent, "id") and parent.id in ("main", "sidebar", "log"): + pane = parent.id + break + evt = InputSubmitEvent(pane=pane, id=inp_id, value=event.value) + self.run_worker(self._send_event(evt)) diff --git a/tests/test_tui.py b/tests/test_tui.py new file mode 100644 index 0000000..58d188a --- /dev/null +++ b/tests/test_tui.py @@ -0,0 +1,57 @@ +import asyncio +import json + +import pytest +from textual.app import App + +from kitty_workbench.tui import KittWorkbenchApp +from kitty_workbench.protocol import ( + encode_message, InitCmd, DisplayCmd, LayoutCmd, LogCmd, ClearCmd, ShutdownCmd, +) + + +@pytest.mark.asyncio +async def test_app_instantiates(): + app = KittWorkbenchApp(socket_path="/tmp/nonexistent.sock") + assert isinstance(app, App) + + +@pytest.mark.asyncio +async def test_app_handles_markdown_display(): + app = KittWorkbenchApp(socket_path="/tmp/nonexistent.sock") + async with app.run_test(size=(80, 24)) as pilot: + app.handle_command(DisplayCmd( + widget="markdown", + content="# Hello World\n\nThis is a test.", + pane="main", + )) + await pilot.pause() + assert app.is_running + + +@pytest.mark.asyncio +async def test_app_handles_layout(): + app = KittWorkbenchApp(socket_path="/tmp/nonexistent.sock") + async with app.run_test(size=(80, 24)) as pilot: + app.handle_command(LayoutCmd(panes={ + "main": {"ratio": 2}, + "sidebar": {"ratio": 1, "position": "right"}, + "log": {"ratio": 1, "position": "bottom"}, + })) + await pilot.pause() + assert app.query_one("#sidebar") is not None + assert app.query_one("#log") is not None + + +@pytest.mark.asyncio +async def test_app_handles_log(): + app = KittWorkbenchApp(socket_path="/tmp/nonexistent.sock") + async with app.run_test(size=(80, 24)) as pilot: + app.handle_command(LayoutCmd(panes={ + "main": {"ratio": 2}, + "log": {"ratio": 1, "position": "bottom"}, + })) + await pilot.pause() + app.handle_command(LogCmd(entry="R412 measured 1.05M — FAIL", level="warning")) + await pilot.pause() + assert app.is_running