Files
Mortdecai/agent/tools/script_manager.py
Mortdecai da8f557219 GPU scheduler, 14-tool architecture, plugin deployment, event dispatcher
GPU Scheduler (gpu.sethpc.xyz):
- Live dashboard with 4 GPUs, training monitor, loss sparklines
- Preset-based job scheduler with 3 triggers (time, finish_training, cost)
- Model selection per GPU, pipeline configuration
- Tool self-play and training pipeline types
- Behind Google OAuth, live-refresh without page reload

Tool Architecture (14 tools):
- 3 new tools: world.nearby_entities, memory.read, memory.write
- 7 script.* tools: write, validate, execute, read, list, delete, schedule
- ScriptManager: full mcfunction datapack CRUD with RCON validation
- Training data: 1,430 tool examples (up from 1,159)

Plugin Deployment (paper-ai-25567):
- WorldGuard 7.0.12, CoreProtect CE 23.1, EssentialsX 2.21.2, Vault 1.7.3
- Fresh greenfield world reset
- 104 RCON-validated plugin training examples

Event Dispatcher:
- Watches server log for deaths, joins, advancements, PvP kills
- Configurable trigger probability and cooldowns per event type
- Deployed to dev server, fires god_system prompts on events
- 21 event-response training examples

Training Infrastructure:
- train_lora.py: --save-steps 50, --resume from checkpoint
- run_training.sh: stops Ollama, activates conda, restarts after
- Passwordless sudo for ollama services on steel141
- Dev server added to MCSManager with autoStart

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 03:14:45 -04:00

393 lines
16 KiB
Python

"""
Mortdecai Script Manager — mcfunction datapack CRUD + validation + scheduling.
Gives the model full control over a Minecraft datapack:
script.write — create/overwrite mcfunction files
script.validate — dry-run commands through RCON before writing
script.execute — run a function via RCON
script.read — read an existing script
script.list — list all scripts in the datapack
script.delete — remove a script
script.schedule — add to tick.json or load.json
Datapack structure:
<server>/world/datapacks/mortdecai/
pack.mcmeta
data/mortdecai/
function/ ← mcfunction files live here
tags/function/ ← tick.json, load.json for scheduling
Usage:
from agent.tools.script_manager import ScriptManager
mgr = ScriptManager(config)
result = mgr.write("build_house", ["fill ~-5 ~ ~-5 ~5 ~4 ~5 oak_planks hollow", ...])
result = mgr.validate(["give @s minecraft:diamond 1", "bad command here"])
result = mgr.execute("build_house", as_player="slingshooter08")
"""
import json
import os
import re
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional
_lock = threading.Lock()
MAX_SCRIPT_LINES = 200
MAX_SCRIPTS = 50
NAMESPACE = "mortdecai"
PACK_MCMETA = {
"pack": {
"pack_format": 48,
"description": "Mortdecai AI-generated functions"
}
}
class ScriptManager:
def __init__(self, config: dict):
"""
config keys:
datapack_path: str — path to the datapack root (e.g. /opt/paper-ai-25567/world/datapacks/mortdecai)
rcon_fn: callable — function(command: str) -> str that executes an RCON command and returns the result
"""
self.base = Path(config.get("datapack_path", "/opt/paper-ai-25567/world/datapacks/mortdecai"))
self.func_dir = self.base / "data" / NAMESPACE / "function"
self.tags_dir = self.base / "data" / "minecraft" / "tags" / "function"
self.rcon = config.get("rcon_fn")
self._ensure_datapack()
def _ensure_datapack(self):
"""Create the datapack structure if it doesn't exist."""
with _lock:
self.func_dir.mkdir(parents=True, exist_ok=True)
self.tags_dir.mkdir(parents=True, exist_ok=True)
mcmeta = self.base / "pack.mcmeta"
if not mcmeta.exists():
with open(mcmeta, "w") as f:
json.dump(PACK_MCMETA, f, indent=2)
# Ensure tick.json and load.json exist
for tag_name in ("tick", "load"):
tag_file = self.tags_dir / f"{tag_name}.json"
if not tag_file.exists():
with open(tag_file, "w") as f:
json.dump({"values": []}, f, indent=2)
def _sanitize_name(self, name: str) -> str:
"""Sanitize a script name to a valid mcfunction filename."""
name = name.lower().strip()
name = re.sub(r'[^a-z0-9_/]', '_', name)
name = re.sub(r'_+', '_', name).strip('_')
return name or "unnamed"
def _func_path(self, name: str) -> Path:
return self.func_dir / f"{self._sanitize_name(name)}.mcfunction"
# ── Write ──────────────────────────────────────────────────────────────
def write(self, name: str, commands: List[str], description: str = "") -> Dict[str, Any]:
"""Write a mcfunction file. Returns {ok, path, lines}."""
name = self._sanitize_name(name)
if len(commands) > MAX_SCRIPT_LINES:
return {"ok": False, "error": f"Too many lines ({len(commands)}). Max {MAX_SCRIPT_LINES}."}
# Check total script count
existing = list(self.func_dir.glob("*.mcfunction"))
if len(existing) >= MAX_SCRIPTS and not self._func_path(name).exists():
return {"ok": False, "error": f"Too many scripts ({len(existing)}). Max {MAX_SCRIPTS}. Delete some first."}
# Strip leading slashes from commands
cleaned = []
for cmd in commands:
cmd = cmd.strip()
if cmd.startswith("/"):
cmd = cmd[1:]
if cmd and not cmd.startswith("#"):
cleaned.append(cmd)
elif cmd.startswith("#"):
cleaned.append(cmd) # preserve comments
lines = []
if description:
lines.append(f"# {description}")
lines.append(f"# Generated by Mortdecai")
lines.append("")
lines.extend(cleaned)
path = self._func_path(name)
with _lock:
# Support subdirectories
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
f.write("\n".join(lines) + "\n")
return {"ok": True, "path": f"{NAMESPACE}:{name}", "lines": len(cleaned)}
# ── Validate ───────────────────────────────────────────────────────────
def validate(self, commands: List[str]) -> Dict[str, Any]:
"""Dry-run validate commands through RCON. Returns per-line results."""
if not self.rcon:
return {"ok": False, "error": "No RCON connection available"}
errors = []
passed = 0
for i, cmd in enumerate(commands):
cmd = cmd.strip()
if not cmd or cmd.startswith("#"):
passed += 1
continue
if cmd.startswith("/"):
cmd = cmd[1:]
# Skip commands that use relative coords — can't validate without context
# But we can still check the command structure
if "~" in cmd or "^" in cmd:
# Basic syntax check: try to parse the command name at least
parts = cmd.split()
base_cmd = parts[0] if parts else ""
valid_bases = {
"give", "tp", "teleport", "effect", "kill", "summon", "setblock",
"fill", "clone", "weather", "time", "gamemode", "clear", "xp",
"experience", "execute", "playsound", "title", "particle", "say",
"tellraw", "scoreboard", "team", "tag", "data", "attribute",
"enchant", "spreadplayers", "spawnpoint", "setworldspawn",
"worldborder", "gamerule", "difficulty", "forceload", "locate",
"place", "ride", "damage", "return", "schedule", "function",
"bossbar", "recipe", "advancement", "loot", "item", "trigger",
}
if base_cmd in valid_bases:
passed += 1
else:
errors.append({"line": i + 1, "command": cmd, "error": f"Unknown command: {base_cmd}"})
continue
try:
result = self.rcon(cmd)
is_error = any(e in result for e in ("<--[HERE]", "Unknown", "Incorrect", "Expected", "Invalid"))
if is_error:
errors.append({"line": i + 1, "command": cmd, "error": result[:200]})
else:
passed += 1
except Exception as e:
errors.append({"line": i + 1, "command": cmd, "error": str(e)})
return {
"valid": len(errors) == 0,
"total": len(commands),
"passed": passed,
"errors": errors,
}
# ── Execute ────────────────────────────────────────────────────────────
def execute(self, name: str, as_player: str = None) -> Dict[str, Any]:
"""Execute a mcfunction via RCON."""
name = self._sanitize_name(name)
path = self._func_path(name)
if not path.exists():
return {"ok": False, "error": f"Script '{name}' not found"}
if not self.rcon:
return {"ok": False, "error": "No RCON connection available"}
# Reload datapack to pick up any changes
try:
self.rcon("reload")
except Exception:
pass
# Execute the function
if as_player:
cmd = f"execute as {as_player} at @s run function {NAMESPACE}:{name}"
else:
cmd = f"function {NAMESPACE}:{name}"
try:
result = self.rcon(cmd)
is_error = any(e in result for e in ("<--[HERE]", "Unknown", "Incorrect"))
return {"ok": not is_error, "result": result[:500]}
except Exception as e:
return {"ok": False, "error": str(e)}
# ── Read ───────────────────────────────────────────────────────────────
def read(self, name: str) -> Dict[str, Any]:
"""Read script contents."""
name = self._sanitize_name(name)
path = self._func_path(name)
if not path.exists():
return {"ok": False, "error": f"Script '{name}' not found"}
with open(path) as f:
lines = f.read().strip().split("\n")
# Filter out comments and blanks for the command list
commands = [l for l in lines if l.strip() and not l.strip().startswith("#")]
return {"ok": True, "commands": commands, "lines": len(commands), "raw": lines}
# ── List ───────────────────────────────────────────────────────────────
def list_scripts(self) -> Dict[str, Any]:
"""List all scripts in the datapack."""
scripts = []
# Read tick/load schedules
scheduled = {}
for tag_name in ("tick", "load"):
tag_file = self.tags_dir / f"{tag_name}.json"
if tag_file.exists():
with open(tag_file) as f:
tag_data = json.load(f)
for val in tag_data.get("values", []):
func_name = val.replace(f"{NAMESPACE}:", "")
scheduled[func_name] = tag_name
for path in sorted(self.func_dir.glob("**/*.mcfunction")):
rel = path.relative_to(self.func_dir)
name = str(rel).replace(".mcfunction", "").replace(os.sep, "/")
with open(path) as f:
line_count = sum(1 for l in f if l.strip() and not l.strip().startswith("#"))
scripts.append({
"name": name,
"lines": line_count,
"scheduled": scheduled.get(name, "none"),
})
return {"scripts": scripts}
# ── Context (for prompt injection) ───────────────────────────────────
def format_script_context(self) -> str:
"""Format available scripts for LLM context injection.
Injected into the system prompt so the model knows what scripts exist
without needing to call script.list every time."""
info = self.list_scripts()
scripts = info.get("scripts", [])
if not scripts:
return ""
lines = ["=== AVAILABLE SCRIPTS ==="]
for s in scripts:
sched = f", {s['scheduled']}" if s["scheduled"] != "none" else ""
# Try to read the description comment from the file
desc = ""
path = self._func_path(s["name"])
if path.exists():
with open(path) as f:
first_line = f.readline().strip()
if first_line.startswith("# ") and "Generated by" not in first_line:
desc = f"{first_line[2:]}"
lines.append(f" {s['name']} ({s['lines']} lines{sched}){desc}")
lines.append("Use script.execute to run, script.read to inspect, script.write to create new.")
return "\n".join(lines)
# ── Delete ─────────────────────────────────────────────────────────────
def delete(self, name: str) -> Dict[str, Any]:
"""Delete a script and remove from schedules."""
name = self._sanitize_name(name)
path = self._func_path(name)
if not path.exists():
return {"ok": False, "error": f"Script '{name}' not found"}
with _lock:
path.unlink()
# Remove from tick/load if scheduled
for tag_name in ("tick", "load"):
tag_file = self.tags_dir / f"{tag_name}.json"
if tag_file.exists():
with open(tag_file) as f:
tag_data = json.load(f)
full_name = f"{NAMESPACE}:{name}"
if full_name in tag_data.get("values", []):
tag_data["values"].remove(full_name)
with open(tag_file, "w") as f:
json.dump(tag_data, f, indent=2)
return {"ok": True}
# ── Schedule ───────────────────────────────────────────────────────────
def schedule(self, name: str, schedule_type: str) -> Dict[str, Any]:
"""Add a script to tick.json or load.json."""
name = self._sanitize_name(name)
path = self._func_path(name)
if not path.exists():
return {"ok": False, "error": f"Script '{name}' not found"}
if schedule_type not in ("tick", "load"):
return {"ok": False, "error": f"Invalid schedule type: {schedule_type}. Use 'tick' or 'load'."}
tag_file = self.tags_dir / f"{schedule_type}.json"
full_name = f"{NAMESPACE}:{name}"
with _lock:
with open(tag_file) as f:
tag_data = json.load(f)
if full_name not in tag_data.get("values", []):
tag_data.setdefault("values", []).append(full_name)
with open(tag_file, "w") as f:
json.dump(tag_data, f, indent=2)
# Reload to activate
if self.rcon:
try:
self.rcon("reload")
except Exception:
pass
return {"ok": True}
# ── Tool handler dispatch ──────────────────────────────────────────────────
def handle_script_tool(config: dict, tool_name: str, arguments: dict) -> Dict[str, Any]:
"""
Dispatch a script.* tool call to the ScriptManager.
Called from the inference loop when the model emits a script tool call.
"""
mgr = ScriptManager(config)
action = tool_name.split(".")[-1] if "." in tool_name else tool_name
if action == "write":
return mgr.write(
name=arguments.get("name", "unnamed"),
commands=arguments.get("commands", []),
description=arguments.get("description", ""),
)
elif action == "validate":
return mgr.validate(arguments.get("commands", []))
elif action == "execute":
return mgr.execute(
name=arguments.get("name", ""),
as_player=arguments.get("as_player"),
)
elif action == "read":
return mgr.read(arguments.get("name", ""))
elif action == "list":
return mgr.list_scripts()
elif action == "delete":
return mgr.delete(arguments.get("name", ""))
elif action == "schedule":
return mgr.schedule(
name=arguments.get("name", ""),
schedule_type=arguments.get("type", ""),
)
else:
return {"ok": False, "error": f"Unknown script action: {action}"}