Files
kitty-workbench/demo.py
T
2026-03-29 19:22:38 -04:00

226 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""
Kitty-Workbench Demo
====================
Run this in a kitty terminal to see the interactive display panel.
Usage:
python3 demo.py
What it does:
1. Opens a Unix socket server
2. Splits your kitty window and launches the TUI in the right pane
3. Pushes a diagnostic scenario (Heathkit IO-102 oscilloscope focus repair)
4. Leaves the TUI running so you can interact with the checklist
Press Ctrl+C to close.
"""
import asyncio
import os
import sys
import time
from pathlib import Path
# Ensure kitty_workbench is importable
sys.path.insert(0, str(Path(__file__).parent / "src"))
from kitty_workbench.protocol import (
encode_message, decode_message, ReadyEvent, InitCmd,
DisplayCmd, LayoutCmd, LogCmd, ShutdownCmd,
)
from kitty_workbench.project import create_project
async def main():
project_name = "demo-io102"
title = "Heathkit IO-102 Focus Diagnostic"
sock_path = f"/tmp/kitt-{project_name}.sock"
# Clean up stale socket
if os.path.exists(sock_path):
os.unlink(sock_path)
# Ensure project dir exists
create_project(project_name, title)
# -- Socket server --
ready = asyncio.Event()
tui_writer = None
async def on_connect(reader, writer):
nonlocal tui_writer
tui_writer = writer
while True:
line = await reader.readline()
if not line:
break
msg = decode_message(line.decode().strip())
if isinstance(msg, ReadyEvent):
ready.set()
elif msg is not None:
# Print user events to this terminal
from dataclasses import asdict
print(f" [event] {asdict(msg)}")
server = await asyncio.start_unix_server(on_connect, path=sock_path)
# -- Launch TUI in a kitty split --
print(f"Launching Kitty-Workbench TUI...")
tui_cmd = [
sys.executable, "-m", "kitty_workbench",
"tui", project_name, "--socket", sock_path,
]
# Try kitty split first, fall back to tmux, then a new window
launched = False
if os.environ.get("KITTY_PID") or os.environ.get("KITTY_WINDOW_ID"):
# We're inside kitty — use native split
import subprocess
result = subprocess.run(
["kitty", "@", "launch", "--location=vsplit",
"--title", title] + tui_cmd,
capture_output=True, text=True,
)
if result.returncode == 0:
print(f" Opened kitty split pane (id: {result.stdout.strip()})")
launched = True
if not launched and os.environ.get("TMUX"):
import subprocess
result = subprocess.run(
["tmux", "split-window", "-h", "-d", "-P", "-F", "#{pane_id}"] + tui_cmd,
capture_output=True, text=True,
)
if result.returncode == 0:
print(f" Opened tmux split pane ({result.stdout.strip()})")
launched = True
if not launched:
# Last resort: try kitty @ anyway (might work with allow_remote_control)
import subprocess
result = subprocess.run(
["kitty", "@", "launch", "--location=vsplit",
"--title", title] + tui_cmd,
capture_output=True, text=True,
)
if result.returncode == 0:
print(f" Opened kitty split pane (id: {result.stdout.strip()})")
launched = True
else:
print(f" Could not auto-split. Run this in another terminal:")
print(f" {' '.join(tui_cmd)}")
print(f" Waiting for TUI to connect...")
# Wait for TUI to connect
try:
await asyncio.wait_for(ready.wait(), timeout=15)
except asyncio.TimeoutError:
print("TUI did not connect within 15s. Is kitty remote control enabled?")
print("Add to ~/.config/kitty/kitty.conf:")
print(" allow_remote_control yes")
server.close()
return
print(" TUI connected!\n")
async def send(cmd):
tui_writer.write((encode_message(cmd) + "\n").encode())
await tui_writer.drain()
await asyncio.sleep(0.4)
# -- Push demo content --
print("Pushing diagnostic scenario...")
await send(InitCmd(
project=project_name,
title=title,
image_protocol="none",
))
await send(LayoutCmd(panes={
"main": {"ratio": 2},
"sidebar": {"ratio": 1, "position": "right"},
"log": {"ratio": 1, "position": "bottom"},
}))
await send(DisplayCmd(
widget="markdown",
content="""# HV Focus Circuit Diagnostic
## CRT Focus Voltage Divider
The focus voltage is derived from the HV supply through a resistive divider:
- **R412** (910K) + **R413** (2.2M) + **R414** (1M)
- Expected focus voltage: ~2.1kV at CRT pin 6
- Measured: **1.8kV — low by 300V**
## Probable Cause
Carbon composition resistors R412-R414 have drifted with age.
R412 shows **+16.7% drift** — replacing with metal film.
## Circuit
```
HV Supply (5.2kV)
[R414] 1M
├──── Focus pin (CRT pin 6)
[R413] 2.2M
[R412] 910K ◄── DRIFTED to 1.05M
GND
```
""",
pane="main",
clear=True,
))
await send(DisplayCmd(
widget="checklist",
items=[
{"label": "Measure R412 (910K)", "checked": True},
{"label": "Measure R413 (2.2M)", "checked": True},
{"label": "Measure C201 ESR", "checked": True},
{"label": "Replace R412", "checked": False},
{"label": "Re-measure focus voltage", "checked": False},
{"label": "Verify CRT focus", "checked": False},
],
pane="sidebar",
clear=True,
))
await send(LogCmd(entry="R412: 1.05M (expected 910K) — FAIL +16.7%", level="warning"))
await send(LogCmd(entry="R413: 2.18M (expected 2.2M) — PASS", level="success"))
await send(LogCmd(entry="C201 ESR: 0.3Ω — PASS", level="success"))
await send(LogCmd(entry="Replacing R412 with 910K 1% metal film", level="info"))
print("Demo loaded! Interact with the checklist in the TUI pane.")
print("Events from the TUI will appear below.\n")
print("Press Ctrl+C to close.\n")
# Keep running and print events
try:
while True:
await asyncio.sleep(1)
except (KeyboardInterrupt, asyncio.CancelledError):
print("\nShutting down...")
try:
await send(ShutdownCmd())
except Exception:
pass
server.close()
if os.path.exists(sock_path):
os.unlink(sock_path)
if __name__ == "__main__":
asyncio.run(main())