feat: MCP server with kitt_* tools and Unix socket management
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,57 @@
|
||||
import asyncio
|
||||
import json
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from kitty_workbench.server import KittWorkbenchServer
|
||||
from kitty_workbench.protocol import encode_message, decode_message, ReadyEvent, ChecklistToggleEvent
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def server(tmp_workbench, socket_path):
|
||||
return KittWorkbenchServer(workbench_dir=tmp_workbench, socket_dir=str(Path(socket_path).parent))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_server_open_creates_project(server, tmp_workbench):
|
||||
with patch.object(server, "_launch_tui", new_callable=AsyncMock):
|
||||
result = await server.kitt_open("test-proj", "Test Project")
|
||||
result_data = json.loads(result)
|
||||
assert result_data["project"] == "test-proj"
|
||||
assert (tmp_workbench / "test-proj" / "session.md").exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_server_list_empty(server):
|
||||
result = await server.kitt_list()
|
||||
data = json.loads(result)
|
||||
assert data["projects"] == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_server_log_writes_to_disk(server, tmp_workbench):
|
||||
with patch.object(server, "_launch_tui", new_callable=AsyncMock):
|
||||
await server.kitt_open("test-proj", "Test")
|
||||
server._connections["test-proj"] = AsyncMock()
|
||||
result = await server.kitt_log("test-proj", "R412 measured 1.05M", level="warning")
|
||||
data = json.loads(result)
|
||||
assert data["ok"] is True
|
||||
jsonl = (tmp_workbench / "test-proj" / "session.jsonl").read_text().strip()
|
||||
entry = json.loads(jsonl)
|
||||
assert entry["entry"] == "R412 measured 1.05M"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_server_events_queue(server):
|
||||
server._event_queues["test-proj"] = []
|
||||
server._event_queues["test-proj"].append(
|
||||
{"event": "checklist_toggle", "index": 0, "label": "Test", "checked": True, "pane": "sidebar"}
|
||||
)
|
||||
result = await server.kitt_events("test-proj")
|
||||
data = json.loads(result)
|
||||
assert len(data["events"]) == 1
|
||||
assert data["events"][0]["checked"] is True
|
||||
result2 = await server.kitt_events("test-proj")
|
||||
assert json.loads(result2)["events"] == []
|
||||
Reference in New Issue
Block a user