8b740f3ec1
CLI can now BE the brain: - brain_mode_set: switch gateway to external mode - queue_get: poll incoming player commands - queue_complete: send response back to player - tool_execute: call any gateway tool directly (rcon, display, npc, etc.) 28 MCP tools total. When in external mode, Opus in the CLI processes player commands instead of Codex/Anthropic. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
792 lines
24 KiB
Python
792 lines
24 KiB
Python
"""
|
|
Mortdecai Gateway MCP Server.
|
|
|
|
Wraps the gateway HTTP API as MCP tools so Claude can operate
|
|
Mortdecai natively. All game operations go through the gateway —
|
|
this server never touches Minecraft directly.
|
|
|
|
Tool groups:
|
|
- Gateway lifecycle (start, stop, restart, status, health)
|
|
- Player commands (gateway_command)
|
|
- Brain management (hot-swap providers)
|
|
- Session management
|
|
- Bot playtesting (run profiles against the gateway)
|
|
- Diagnostics (read interactions, analyze errors)
|
|
- Escalation (write notes for architect sessions)
|
|
- Logs
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import time as _time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
import yaml
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
GATEWAY_URL = "http://localhost:8500"
|
|
CLI_DIR = Path(__file__).parent.parent
|
|
SCRIPTS_DIR = CLI_DIR / "scripts"
|
|
CONFIG_DIR = CLI_DIR / "config"
|
|
DATA_DIR = CLI_DIR / "data"
|
|
ESCALATION_DIR = DATA_DIR / "escalations"
|
|
PLAYTEST_DIR = DATA_DIR / "playtests"
|
|
INTERACTION_DIR = Path.home() / "bin" / "Mortdecai-2.0" / "data" / "interactions"
|
|
|
|
# Ensure data dirs exist
|
|
for d in [DATA_DIR, ESCALATION_DIR, PLAYTEST_DIR]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
mcp = FastMCP("mortdecai-gateway")
|
|
|
|
|
|
async def _get(path: str) -> dict:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(f"{GATEWAY_URL}{path}")
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def _post(path: str, body: dict | None = None) -> dict:
|
|
async with httpx.AsyncClient(timeout=90) as client:
|
|
resp = await client.post(f"{GATEWAY_URL}{path}", json=body or {})
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
async def _patch(path: str, body: dict | None = None) -> dict:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.patch(f"{GATEWAY_URL}{path}", json=body or {})
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
# --- Gateway lifecycle ---
|
|
|
|
|
|
@mcp.tool()
|
|
def gateway_start() -> str:
|
|
"""Start the Mortdecai gateway if not running."""
|
|
result = subprocess.run(
|
|
["bash", str(SCRIPTS_DIR / "start-gateway.sh")],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
return result.stdout + result.stderr
|
|
|
|
|
|
@mcp.tool()
|
|
def gateway_stop() -> str:
|
|
"""Stop the Mortdecai gateway."""
|
|
result = subprocess.run(
|
|
["bash", str(SCRIPTS_DIR / "stop-gateway.sh")],
|
|
capture_output=True, text=True, timeout=10,
|
|
)
|
|
return result.stdout + result.stderr
|
|
|
|
|
|
@mcp.tool()
|
|
def gateway_restart() -> str:
|
|
"""Restart the Mortdecai gateway (stop + start)."""
|
|
stop = subprocess.run(
|
|
["bash", str(SCRIPTS_DIR / "stop-gateway.sh")],
|
|
capture_output=True, text=True, timeout=10,
|
|
)
|
|
_time.sleep(2)
|
|
start = subprocess.run(
|
|
["bash", str(SCRIPTS_DIR / "start-gateway.sh")],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
return stop.stdout + start.stdout + start.stderr
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_status() -> str:
|
|
"""Get full gateway status: providers, sessions, oracle state."""
|
|
try:
|
|
data = await _get("/v2/status")
|
|
return json.dumps(data, indent=2)
|
|
except httpx.ConnectError:
|
|
return "Gateway is DOWN — not reachable on port 8500"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_health() -> str:
|
|
"""Quick health check — is the gateway alive?"""
|
|
try:
|
|
data = await _get("/v2/health")
|
|
return json.dumps(data, indent=2)
|
|
except httpx.ConnectError:
|
|
return "DOWN"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
# --- Brain mode + command queue ---
|
|
|
|
|
|
@mcp.tool()
|
|
async def brain_mode_set(mode: str) -> str:
|
|
"""Switch gateway between internal (AI loop) and external (CLI brain) mode.
|
|
|
|
In external mode, player commands are queued for YOU to handle.
|
|
In internal mode, the gateway's own AI (Codex/Anthropic) handles them.
|
|
|
|
Args:
|
|
mode: "internal" or "external"
|
|
"""
|
|
try:
|
|
data = await _patch(f"/v2/brain-mode?mode={mode}")
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def queue_get() -> str:
|
|
"""Get pending player commands waiting for you to handle.
|
|
|
|
Only works when brain_mode is 'external'. Returns commands with
|
|
player name, mode, message, world context, and player context.
|
|
Process each command by calling tools, then complete it with queue_complete.
|
|
"""
|
|
try:
|
|
data = await _get("/v2/queue")
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def queue_complete(cmd_id: str, response_text: str) -> str:
|
|
"""Complete a queued command — sends the response back to the player.
|
|
|
|
Args:
|
|
cmd_id: Command ID from queue_get
|
|
response_text: The message to send to the player
|
|
"""
|
|
try:
|
|
data = await _post(f"/v2/queue/complete/{cmd_id}", {
|
|
"response_text": response_text,
|
|
"player_message": response_text,
|
|
})
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def tool_execute(
|
|
tool: str,
|
|
params: str = "{}",
|
|
player: str = "",
|
|
server: str = "dev",
|
|
) -> str:
|
|
"""Execute a Mortdecai gateway tool directly. YOU are the brain.
|
|
|
|
Available tools: rcon_execute, rcon_query, display_send, display_interactive,
|
|
npc_spawn, npc_bulk_spawn, npc_despawn, npc_bulk_despawn, npc_list,
|
|
npc_command, npc_script_write, npc_script_read, eye_players, eye_world,
|
|
eye_events, memory_read, memory_write, history_read, logs_read,
|
|
sound_play, world_query, schem_list, schem_place, schem_download,
|
|
creative_name, perms_manage
|
|
|
|
Args:
|
|
tool: Tool name (e.g. "rcon_execute")
|
|
params: JSON string of tool parameters
|
|
player: Player context (for session lookup)
|
|
server: Server target — dev or prod
|
|
"""
|
|
try:
|
|
tool_params = json.loads(params) if isinstance(params, str) else params
|
|
except json.JSONDecodeError:
|
|
return f"Invalid JSON params: {params}"
|
|
try:
|
|
data = await _post("/v2/tools/execute", {
|
|
"tool": tool,
|
|
"params": tool_params,
|
|
"player": player,
|
|
"server": server,
|
|
})
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
# --- Player commands (through gateway) ---
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_command(
|
|
player: str,
|
|
text: str,
|
|
mode: str = "sudo",
|
|
server: str = "dev",
|
|
) -> str:
|
|
"""Send a command through the gateway as if a player typed it in-game.
|
|
|
|
Args:
|
|
player: Minecraft player name
|
|
text: The command text (e.g. "give me a diamond")
|
|
mode: Command mode — sudo, pray, ask, or raw
|
|
server: Server target — dev or prod
|
|
"""
|
|
try:
|
|
data = await _post("/v2/quick", {
|
|
"player": player,
|
|
"text": text,
|
|
"server": server,
|
|
"command_type": mode,
|
|
})
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
# --- Brain management (hot-swap providers) ---
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_brain_set(
|
|
role: str,
|
|
provider: str,
|
|
model: str,
|
|
) -> str:
|
|
"""Hot-swap the AI provider and model for a gateway role.
|
|
|
|
Args:
|
|
role: Agent role — eye, hand, voice, opus, architect, orchestrator
|
|
provider: Provider name — anthropic, codex, openai, ollama, regex
|
|
model: Model identifier (e.g. "gpt-5.1-codex-mini", "claude-opus-4-20250514")
|
|
"""
|
|
try:
|
|
data = await _patch(f"/v2/brain/{role}", {
|
|
"provider": provider,
|
|
"model": model,
|
|
})
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_brain_save(role: str) -> str:
|
|
"""Persist a brain's current live override to agents.yaml on disk.
|
|
|
|
Args:
|
|
role: Agent role to save
|
|
"""
|
|
try:
|
|
data = await _post(f"/v2/brain/{role}/save")
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_brain_reload(role: str) -> str:
|
|
"""Clear in-memory override, reload brain config from agents.yaml.
|
|
|
|
Args:
|
|
role: Agent role to reload
|
|
"""
|
|
try:
|
|
data = await _post(f"/v2/brain/{role}/reload")
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
# --- Session management ---
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_sessions_clear(player: str, mode: str = "") -> str:
|
|
"""Clear sessions for a player. Optionally filter by mode.
|
|
|
|
Args:
|
|
player: Player name
|
|
mode: Optional mode filter (sudo, pray, ask, raw). Empty = all modes.
|
|
"""
|
|
try:
|
|
url = f"/v2/sessions/clear/{player}"
|
|
if mode:
|
|
url += f"?mode={mode}"
|
|
data = await _post(url)
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def gateway_sessions_reset() -> str:
|
|
"""Clear ALL sessions for ALL players. Use with caution."""
|
|
try:
|
|
data = await _post("/v2/sessions/reset")
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
|
|
# --- Bot Playtesting ---
|
|
|
|
|
|
@mcp.tool()
|
|
def list_bot_profiles() -> str:
|
|
"""List available bot profiles for playtesting."""
|
|
profiles_path = CONFIG_DIR / "bot-profiles.yaml"
|
|
if not profiles_path.exists():
|
|
return "No bot profiles found at config/bot-profiles.yaml"
|
|
with open(profiles_path) as f:
|
|
data = yaml.safe_load(f)
|
|
profiles = data.get("profiles", {})
|
|
lines = []
|
|
for name, profile in profiles.items():
|
|
cmd_count = len(profile.get("commands", []))
|
|
lines.append(f" {name}: {profile.get('description', '')} ({cmd_count} commands)")
|
|
return f"Available profiles ({len(profiles)}):\n" + "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
async def run_playtest(profile: str, server: str = "dev") -> str:
|
|
"""Run a bot profile's commands through the gateway and collect results.
|
|
|
|
Sends each command sequentially, records status/response/tools for each.
|
|
Results are saved to data/playtests/ for later analysis.
|
|
|
|
Args:
|
|
profile: Bot profile name (e.g. "noob", "griefer", "builder")
|
|
server: Server target — dev or prod
|
|
"""
|
|
profiles_path = CONFIG_DIR / "bot-profiles.yaml"
|
|
if not profiles_path.exists():
|
|
return "No bot profiles config found"
|
|
with open(profiles_path) as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
profiles = data.get("profiles", {})
|
|
if profile not in profiles:
|
|
return f"Unknown profile: {profile}. Available: {list(profiles.keys())}"
|
|
|
|
bot = profiles[profile]
|
|
player = bot.get("player_name", f"Test{profile.title()}")
|
|
commands = bot.get("commands", [])
|
|
results = []
|
|
|
|
for cmd in commands:
|
|
mode = cmd.get("mode", "sudo")
|
|
text = cmd.get("text", "")
|
|
try:
|
|
resp = await _post("/v2/quick", {
|
|
"player": player,
|
|
"text": text,
|
|
"server": server,
|
|
"command_type": mode,
|
|
})
|
|
results.append({
|
|
"mode": mode,
|
|
"text": text,
|
|
"status": resp.get("status", "unknown"),
|
|
"response": (resp.get("response_text") or "")[:200],
|
|
"tools_used": [t.get("tool") for t in resp.get("tool_trace", [])],
|
|
"commands_executed": resp.get("commands_executed", []),
|
|
"error": None,
|
|
})
|
|
except Exception as e:
|
|
results.append({
|
|
"mode": mode,
|
|
"text": text,
|
|
"status": "error",
|
|
"response": "",
|
|
"tools_used": [],
|
|
"commands_executed": [],
|
|
"error": str(e),
|
|
})
|
|
# Brief pause between commands to avoid overwhelming
|
|
_time.sleep(1)
|
|
|
|
# Summarize
|
|
total = len(results)
|
|
passed = sum(1 for r in results if r["status"] == "completed" and not r["error"])
|
|
failed = sum(1 for r in results if r["status"] != "completed" or r["error"])
|
|
no_tools = sum(1 for r in results if r["status"] == "completed" and not r["tools_used"])
|
|
|
|
report = {
|
|
"profile": profile,
|
|
"player": player,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"summary": {
|
|
"total": total,
|
|
"passed": passed,
|
|
"failed": failed,
|
|
"no_tools_used": no_tools,
|
|
},
|
|
"results": results,
|
|
}
|
|
|
|
# Save report
|
|
filename = f"{datetime.now().strftime('%Y%m%d-%H%M')}-{profile}.json"
|
|
report_path = PLAYTEST_DIR / filename
|
|
report_path.write_text(json.dumps(report, indent=2))
|
|
|
|
# Return summary
|
|
summary_lines = [f"Playtest: {profile} ({player}) — {passed}/{total} passed, {failed} failed, {no_tools} no-tool-use"]
|
|
for r in results:
|
|
status_icon = "OK" if r["status"] == "completed" and not r["error"] else "FAIL"
|
|
tool_str = ",".join(r["tools_used"]) if r["tools_used"] else "NO_TOOLS"
|
|
summary_lines.append(f" [{status_icon}] /{r['mode']} {r['text'][:50]} → {tool_str}")
|
|
if r["error"]:
|
|
summary_lines.append(f" error: {r['error'][:100]}")
|
|
|
|
summary_lines.append(f"\nReport saved: {report_path}")
|
|
return "\n".join(summary_lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_playtest_reports(limit: int = 10) -> str:
|
|
"""List recent playtest reports.
|
|
|
|
Args:
|
|
limit: Max number of reports to show (default 10)
|
|
"""
|
|
reports = sorted(PLAYTEST_DIR.glob("*.json"), reverse=True)[:limit]
|
|
if not reports:
|
|
return "No playtest reports found"
|
|
lines = []
|
|
for r in reports:
|
|
try:
|
|
data = json.loads(r.read_text())
|
|
s = data.get("summary", {})
|
|
lines.append(f" {r.name}: {data.get('profile')} — {s.get('passed',0)}/{s.get('total',0)} passed")
|
|
except Exception:
|
|
lines.append(f" {r.name}: (unreadable)")
|
|
return f"Recent reports ({len(reports)}):\n" + "\n".join(lines)
|
|
|
|
|
|
# --- Diagnostics ---
|
|
|
|
|
|
@mcp.tool()
|
|
def read_interactions(date: str = "", limit: int = 20) -> str:
|
|
"""Read recent gateway interaction logs for analysis.
|
|
|
|
Args:
|
|
date: Date string YYYY-MM-DD (default: today)
|
|
limit: Max interactions to return (default 20)
|
|
"""
|
|
if not date:
|
|
date = datetime.now().strftime("%Y-%m-%d")
|
|
log_path = INTERACTION_DIR / f"{date}.jsonl"
|
|
if not log_path.exists():
|
|
return f"No interaction log for {date}"
|
|
|
|
lines = log_path.read_text().strip().split("\n")
|
|
recent = lines[-limit:]
|
|
results = []
|
|
for line in recent:
|
|
try:
|
|
d = json.loads(line)
|
|
tools = [t.get("tool") for t in d.get("tool_trace", [])]
|
|
results.append({
|
|
"player": d.get("player"),
|
|
"mode": d.get("mode"),
|
|
"message": (d.get("message") or "")[:80],
|
|
"status": d.get("status"),
|
|
"tools": tools,
|
|
"has_commands": bool(d.get("commands_executed")),
|
|
"response_preview": (d.get("response_text") or "")[:100],
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
return json.dumps(results, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def analyze_errors(date: str = "", hours: int = 4) -> str:
|
|
"""Analyze recent gateway logs and interactions for error patterns.
|
|
|
|
Checks for: repeated errors, tool-use failures, timeouts, empty responses,
|
|
session poisoning (text-only responses with no tool calls).
|
|
|
|
Args:
|
|
date: Date string YYYY-MM-DD (default: today)
|
|
hours: How many hours back to analyze (default 4)
|
|
"""
|
|
issues = []
|
|
|
|
# Check gateway log for errors
|
|
log_path = Path("/tmp/mortdecai-gateway.log")
|
|
if log_path.exists():
|
|
try:
|
|
log_text = log_path.read_text()
|
|
error_lines = [l for l in log_text.split("\n") if "ERROR" in l or "Traceback" in l]
|
|
if error_lines:
|
|
issues.append({
|
|
"type": "gateway_errors",
|
|
"count": len(error_lines),
|
|
"recent": error_lines[-3:],
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
# Check interaction logs
|
|
if not date:
|
|
date = datetime.now().strftime("%Y-%m-%d")
|
|
interaction_path = INTERACTION_DIR / f"{date}.jsonl"
|
|
if interaction_path.exists():
|
|
cutoff = _time.time() - (hours * 3600)
|
|
interactions = []
|
|
for line in interaction_path.read_text().strip().split("\n"):
|
|
try:
|
|
d = json.loads(line)
|
|
if d.get("timestamp", 0) > cutoff:
|
|
interactions.append(d)
|
|
except Exception:
|
|
continue
|
|
|
|
# Check for text-only responses (no tool calls)
|
|
no_tools = [i for i in interactions if not i.get("tool_trace") and i.get("status") == "completed"]
|
|
if no_tools:
|
|
issues.append({
|
|
"type": "no_tool_use",
|
|
"count": len(no_tools),
|
|
"description": "Completed responses with no tool calls (model responded with text only)",
|
|
"examples": [{"player": i.get("player"), "mode": i.get("mode"), "msg": i.get("message", "")[:60]} for i in no_tools[:3]],
|
|
})
|
|
|
|
# Check for errors/timeouts
|
|
errors = [i for i in interactions if i.get("status") in ("error", "timeout")]
|
|
if errors:
|
|
issues.append({
|
|
"type": "request_failures",
|
|
"count": len(errors),
|
|
"examples": [{"player": i.get("player"), "mode": i.get("mode"), "status": i.get("status"), "msg": i.get("message", "")[:60]} for i in errors[:3]],
|
|
})
|
|
|
|
# Check for empty responses
|
|
empty = [i for i in interactions if not i.get("response_text") and i.get("status") == "completed"]
|
|
if empty:
|
|
issues.append({
|
|
"type": "empty_responses",
|
|
"count": len(empty),
|
|
"examples": [{"player": i.get("player"), "mode": i.get("mode"), "msg": i.get("message", "")[:60]} for i in empty[:3]],
|
|
})
|
|
|
|
if not issues:
|
|
return f"No issues found in the last {hours} hours."
|
|
|
|
return json.dumps({"issues_found": len(issues), "issues": issues}, indent=2)
|
|
|
|
|
|
# --- Escalation ---
|
|
|
|
|
|
@mcp.tool()
|
|
def write_escalation(
|
|
title: str,
|
|
severity: str,
|
|
description: str,
|
|
evidence: str = "",
|
|
suggested_fix: str = "",
|
|
) -> str:
|
|
"""Write an escalation note for the architect session (Seth + Claude).
|
|
|
|
Use this when you find an issue you cannot or should not fix yourself.
|
|
|
|
Args:
|
|
title: Short title for the issue
|
|
severity: low, medium, high, critical
|
|
description: What's wrong and how you discovered it
|
|
evidence: Log lines, interaction data, or other evidence
|
|
suggested_fix: Your recommendation for how to fix it (optional)
|
|
"""
|
|
note = {
|
|
"title": title,
|
|
"severity": severity,
|
|
"description": description,
|
|
"evidence": evidence,
|
|
"suggested_fix": suggested_fix,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"status": "open",
|
|
}
|
|
|
|
filename = f"{datetime.now().strftime('%Y%m%d-%H%M')}-{title[:40].replace(' ', '-').lower()}.json"
|
|
path = ESCALATION_DIR / filename
|
|
path.write_text(json.dumps(note, indent=2))
|
|
return f"Escalation written: {path}"
|
|
|
|
|
|
@mcp.tool()
|
|
def list_escalations(status: str = "open") -> str:
|
|
"""List escalation notes, optionally filtered by status.
|
|
|
|
Args:
|
|
status: Filter by status — open, resolved, all (default: open)
|
|
"""
|
|
files = sorted(ESCALATION_DIR.glob("*.json"), reverse=True)
|
|
if not files:
|
|
return "No escalations found"
|
|
|
|
notes = []
|
|
for f in files:
|
|
try:
|
|
data = json.loads(f.read_text())
|
|
if status != "all" and data.get("status") != status:
|
|
continue
|
|
notes.append(f" [{data.get('severity','?').upper()}] {data.get('title','')} ({f.name})")
|
|
except Exception:
|
|
continue
|
|
|
|
if not notes:
|
|
return f"No {status} escalations"
|
|
return f"Escalations ({len(notes)}):\n" + "\n".join(notes)
|
|
|
|
|
|
@mcp.tool()
|
|
def read_escalation(filename: str) -> str:
|
|
"""Read a specific escalation note.
|
|
|
|
Args:
|
|
filename: Escalation filename (from list_escalations)
|
|
"""
|
|
path = ESCALATION_DIR / filename
|
|
if not path.exists():
|
|
return f"Not found: {filename}"
|
|
return path.read_text()
|
|
|
|
|
|
# --- Logs ---
|
|
|
|
|
|
# --- Session Notes (persistent memory across runs) ---
|
|
|
|
|
|
@mcp.tool()
|
|
def write_note(topic: str, content: str) -> str:
|
|
"""Save a learning or observation that should persist across runs.
|
|
|
|
Use for: patterns discovered, things that work, things that don't,
|
|
provider quirks, player behavior patterns, diagnostic findings.
|
|
Keep notes focused and factual. One topic per note.
|
|
|
|
Args:
|
|
topic: Short topic key (e.g. "codex-tool-compliance", "griefer-patterns")
|
|
content: The observation or learning
|
|
"""
|
|
notes_dir = DATA_DIR / "notes"
|
|
notes_dir.mkdir(exist_ok=True)
|
|
note_path = notes_dir / f"{topic}.md"
|
|
|
|
entry = f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n{content}\n"
|
|
|
|
if note_path.exists():
|
|
# Append to existing topic
|
|
with open(note_path, "a") as f:
|
|
f.write(entry)
|
|
else:
|
|
# New topic
|
|
with open(note_path, "w") as f:
|
|
f.write(f"# {topic}\n{entry}")
|
|
|
|
return f"Note saved: {note_path}"
|
|
|
|
|
|
@mcp.tool()
|
|
def read_notes(topic: str = "") -> str:
|
|
"""Read session notes. If topic is empty, lists all topics.
|
|
|
|
Args:
|
|
topic: Topic key to read, or empty to list all
|
|
"""
|
|
notes_dir = DATA_DIR / "notes"
|
|
if not notes_dir.exists():
|
|
return "No notes yet"
|
|
|
|
if not topic:
|
|
files = sorted(notes_dir.glob("*.md"))
|
|
if not files:
|
|
return "No notes yet"
|
|
lines = []
|
|
for f in files:
|
|
size = f.stat().st_size
|
|
lines.append(f" {f.stem} ({size} bytes)")
|
|
return f"Topics ({len(files)}):\n" + "\n".join(lines)
|
|
|
|
note_path = notes_dir / f"{topic}.md"
|
|
if not note_path.exists():
|
|
return f"No notes for topic: {topic}"
|
|
return note_path.read_text()
|
|
|
|
|
|
@mcp.tool()
|
|
def write_session_summary(summary: str) -> str:
|
|
"""Write a summary of this run's findings and actions.
|
|
|
|
Call this at the end of every scheduled run. Keeps a rolling log
|
|
of what happened, what was fixed, what was escalated.
|
|
|
|
Args:
|
|
summary: Brief summary of this run (findings, actions, escalations)
|
|
"""
|
|
log_path = DATA_DIR / "run-log.md"
|
|
entry = f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n{summary}\n\n---\n"
|
|
|
|
with open(log_path, "a") as f:
|
|
f.write(entry)
|
|
|
|
# Keep run log under 50KB (trim oldest entries)
|
|
if log_path.stat().st_size > 50_000:
|
|
text = log_path.read_text()
|
|
sections = text.split("\n---\n")
|
|
trimmed = "\n---\n".join(sections[-(len(sections) // 2):])
|
|
log_path.write_text(trimmed)
|
|
|
|
return f"Session summary saved to {log_path}"
|
|
|
|
|
|
@mcp.tool()
|
|
def read_run_log(entries: int = 5) -> str:
|
|
"""Read recent run summaries.
|
|
|
|
Args:
|
|
entries: Number of recent entries to show (default 5)
|
|
"""
|
|
log_path = DATA_DIR / "run-log.md"
|
|
if not log_path.exists():
|
|
return "No run log yet — this is the first run"
|
|
text = log_path.read_text()
|
|
sections = text.split("\n---\n")
|
|
recent = sections[-entries:] if len(sections) > entries else sections
|
|
return "\n---\n".join(recent)
|
|
|
|
|
|
# --- Logs ---
|
|
|
|
|
|
@mcp.tool()
|
|
def gateway_logs(lines: int = 50) -> str:
|
|
"""Read recent gateway log output.
|
|
|
|
Args:
|
|
lines: Number of lines to read from the end (default 50)
|
|
"""
|
|
log_path = Path("/tmp/mortdecai-gateway.log")
|
|
if not log_path.exists():
|
|
return "No gateway log file found"
|
|
try:
|
|
result = subprocess.run(
|
|
["tail", f"-{lines}", str(log_path)],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
return result.stdout or "Log file is empty"
|
|
except Exception as e:
|
|
return f"Error reading logs: {e}"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run(transport="stdio")
|