""" Mortdecai Script Manager — mcfunction datapack CRUD + validation + scheduling. Gives the model full control over a Minecraft datapack: script.write — create/overwrite mcfunction files script.validate — dry-run commands through RCON before writing script.execute — run a function via RCON script.read — read an existing script script.list — list all scripts in the datapack script.delete — remove a script script.schedule — add to tick.json or load.json Datapack structure: /world/datapacks/mortdecai/ pack.mcmeta data/mortdecai/ function/ ← mcfunction files live here tags/function/ ← tick.json, load.json for scheduling Usage: from agent.tools.script_manager import ScriptManager mgr = ScriptManager(config) result = mgr.write("build_house", ["fill ~-5 ~ ~-5 ~5 ~4 ~5 oak_planks hollow", ...]) result = mgr.validate(["give @s minecraft:diamond 1", "bad command here"]) result = mgr.execute("build_house", as_player="slingshooter08") """ import json import os import re import threading from pathlib import Path from typing import Any, Dict, List, Optional _lock = threading.Lock() MAX_SCRIPT_LINES = 200 MAX_SCRIPTS = 50 NAMESPACE = "mortdecai" PACK_MCMETA = { "pack": { "pack_format": 48, "description": "Mortdecai AI-generated functions" } } class ScriptManager: def __init__(self, config: dict): """ config keys: datapack_path: str — path to the datapack root (e.g. /opt/paper-ai-25567/world/datapacks/mortdecai) rcon_fn: callable — function(command: str) -> str that executes an RCON command and returns the result """ self.base = Path(config.get("datapack_path", "/opt/paper-ai-25567/world/datapacks/mortdecai")) self.func_dir = self.base / "data" / NAMESPACE / "function" self.tags_dir = self.base / "data" / "minecraft" / "tags" / "function" self.rcon = config.get("rcon_fn") self._ensure_datapack() def _ensure_datapack(self): """Create the datapack structure if it doesn't exist.""" with _lock: self.func_dir.mkdir(parents=True, exist_ok=True) self.tags_dir.mkdir(parents=True, exist_ok=True) mcmeta = self.base / "pack.mcmeta" if not mcmeta.exists(): with open(mcmeta, "w") as f: json.dump(PACK_MCMETA, f, indent=2) # Ensure tick.json and load.json exist for tag_name in ("tick", "load"): tag_file = self.tags_dir / f"{tag_name}.json" if not tag_file.exists(): with open(tag_file, "w") as f: json.dump({"values": []}, f, indent=2) def _sanitize_name(self, name: str) -> str: """Sanitize a script name to a valid mcfunction filename.""" name = name.lower().strip() name = re.sub(r'[^a-z0-9_/]', '_', name) name = re.sub(r'_+', '_', name).strip('_') return name or "unnamed" def _func_path(self, name: str) -> Path: return self.func_dir / f"{self._sanitize_name(name)}.mcfunction" # ── Write ────────────────────────────────────────────────────────────── def write(self, name: str, commands: List[str], description: str = "") -> Dict[str, Any]: """Write a mcfunction file. Returns {ok, path, lines}.""" name = self._sanitize_name(name) if len(commands) > MAX_SCRIPT_LINES: return {"ok": False, "error": f"Too many lines ({len(commands)}). Max {MAX_SCRIPT_LINES}."} # Check total script count existing = list(self.func_dir.glob("*.mcfunction")) if len(existing) >= MAX_SCRIPTS and not self._func_path(name).exists(): return {"ok": False, "error": f"Too many scripts ({len(existing)}). Max {MAX_SCRIPTS}. Delete some first."} # Strip leading slashes from commands cleaned = [] for cmd in commands: cmd = cmd.strip() if cmd.startswith("/"): cmd = cmd[1:] if cmd and not cmd.startswith("#"): cleaned.append(cmd) elif cmd.startswith("#"): cleaned.append(cmd) # preserve comments lines = [] if description: lines.append(f"# {description}") lines.append(f"# Generated by Mortdecai") lines.append("") lines.extend(cleaned) path = self._func_path(name) with _lock: # Support subdirectories path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: f.write("\n".join(lines) + "\n") return {"ok": True, "path": f"{NAMESPACE}:{name}", "lines": len(cleaned)} # ── Validate ─────────────────────────────────────────────────────────── def validate(self, commands: List[str]) -> Dict[str, Any]: """Dry-run validate commands through RCON. Returns per-line results.""" if not self.rcon: return {"ok": False, "error": "No RCON connection available"} errors = [] passed = 0 for i, cmd in enumerate(commands): cmd = cmd.strip() if not cmd or cmd.startswith("#"): passed += 1 continue if cmd.startswith("/"): cmd = cmd[1:] # Skip commands that use relative coords — can't validate without context # But we can still check the command structure if "~" in cmd or "^" in cmd: # Basic syntax check: try to parse the command name at least parts = cmd.split() base_cmd = parts[0] if parts else "" valid_bases = { "give", "tp", "teleport", "effect", "kill", "summon", "setblock", "fill", "clone", "weather", "time", "gamemode", "clear", "xp", "experience", "execute", "playsound", "title", "particle", "say", "tellraw", "scoreboard", "team", "tag", "data", "attribute", "enchant", "spreadplayers", "spawnpoint", "setworldspawn", "worldborder", "gamerule", "difficulty", "forceload", "locate", "place", "ride", "damage", "return", "schedule", "function", "bossbar", "recipe", "advancement", "loot", "item", "trigger", } if base_cmd in valid_bases: passed += 1 else: errors.append({"line": i + 1, "command": cmd, "error": f"Unknown command: {base_cmd}"}) continue try: result = self.rcon(cmd) is_error = any(e in result for e in ("<--[HERE]", "Unknown", "Incorrect", "Expected", "Invalid")) if is_error: errors.append({"line": i + 1, "command": cmd, "error": result[:200]}) else: passed += 1 except Exception as e: errors.append({"line": i + 1, "command": cmd, "error": str(e)}) return { "valid": len(errors) == 0, "total": len(commands), "passed": passed, "errors": errors, } # ── Execute ──────────────────────────────────────────────────────────── def execute(self, name: str, as_player: str = None) -> Dict[str, Any]: """Execute a mcfunction via RCON.""" name = self._sanitize_name(name) path = self._func_path(name) if not path.exists(): return {"ok": False, "error": f"Script '{name}' not found"} if not self.rcon: return {"ok": False, "error": "No RCON connection available"} # Reload datapack to pick up any changes try: self.rcon("reload") except Exception: pass # Execute the function if as_player: cmd = f"execute as {as_player} at @s run function {NAMESPACE}:{name}" else: cmd = f"function {NAMESPACE}:{name}" try: result = self.rcon(cmd) is_error = any(e in result for e in ("<--[HERE]", "Unknown", "Incorrect")) return {"ok": not is_error, "result": result[:500]} except Exception as e: return {"ok": False, "error": str(e)} # ── Read ─────────────────────────────────────────────────────────────── def read(self, name: str) -> Dict[str, Any]: """Read script contents.""" name = self._sanitize_name(name) path = self._func_path(name) if not path.exists(): return {"ok": False, "error": f"Script '{name}' not found"} with open(path) as f: lines = f.read().strip().split("\n") # Filter out comments and blanks for the command list commands = [l for l in lines if l.strip() and not l.strip().startswith("#")] return {"ok": True, "commands": commands, "lines": len(commands), "raw": lines} # ── List ─────────────────────────────────────────────────────────────── def list_scripts(self) -> Dict[str, Any]: """List all scripts in the datapack.""" scripts = [] # Read tick/load schedules scheduled = {} for tag_name in ("tick", "load"): tag_file = self.tags_dir / f"{tag_name}.json" if tag_file.exists(): with open(tag_file) as f: tag_data = json.load(f) for val in tag_data.get("values", []): func_name = val.replace(f"{NAMESPACE}:", "") scheduled[func_name] = tag_name for path in sorted(self.func_dir.glob("**/*.mcfunction")): rel = path.relative_to(self.func_dir) name = str(rel).replace(".mcfunction", "").replace(os.sep, "/") with open(path) as f: line_count = sum(1 for l in f if l.strip() and not l.strip().startswith("#")) scripts.append({ "name": name, "lines": line_count, "scheduled": scheduled.get(name, "none"), }) return {"scripts": scripts} # ── Context (for prompt injection) ─────────────────────────────────── def format_script_context(self) -> str: """Format available scripts for LLM context injection. Injected into the system prompt so the model knows what scripts exist without needing to call script.list every time.""" info = self.list_scripts() scripts = info.get("scripts", []) if not scripts: return "" lines = ["=== AVAILABLE SCRIPTS ==="] for s in scripts: sched = f", {s['scheduled']}" if s["scheduled"] != "none" else "" # Try to read the description comment from the file desc = "" path = self._func_path(s["name"]) if path.exists(): with open(path) as f: first_line = f.readline().strip() if first_line.startswith("# ") and "Generated by" not in first_line: desc = f" — {first_line[2:]}" lines.append(f" {s['name']} ({s['lines']} lines{sched}){desc}") lines.append("Use script.execute to run, script.read to inspect, script.write to create new.") return "\n".join(lines) # ── Delete ───────────────────────────────────────────────────────────── def delete(self, name: str) -> Dict[str, Any]: """Delete a script and remove from schedules.""" name = self._sanitize_name(name) path = self._func_path(name) if not path.exists(): return {"ok": False, "error": f"Script '{name}' not found"} with _lock: path.unlink() # Remove from tick/load if scheduled for tag_name in ("tick", "load"): tag_file = self.tags_dir / f"{tag_name}.json" if tag_file.exists(): with open(tag_file) as f: tag_data = json.load(f) full_name = f"{NAMESPACE}:{name}" if full_name in tag_data.get("values", []): tag_data["values"].remove(full_name) with open(tag_file, "w") as f: json.dump(tag_data, f, indent=2) return {"ok": True} # ── Schedule ─────────────────────────────────────────────────────────── def schedule(self, name: str, schedule_type: str) -> Dict[str, Any]: """Add a script to tick.json or load.json.""" name = self._sanitize_name(name) path = self._func_path(name) if not path.exists(): return {"ok": False, "error": f"Script '{name}' not found"} if schedule_type not in ("tick", "load"): return {"ok": False, "error": f"Invalid schedule type: {schedule_type}. Use 'tick' or 'load'."} tag_file = self.tags_dir / f"{schedule_type}.json" full_name = f"{NAMESPACE}:{name}" with _lock: with open(tag_file) as f: tag_data = json.load(f) if full_name not in tag_data.get("values", []): tag_data.setdefault("values", []).append(full_name) with open(tag_file, "w") as f: json.dump(tag_data, f, indent=2) # Reload to activate if self.rcon: try: self.rcon("reload") except Exception: pass return {"ok": True} # ── Tool handler dispatch ────────────────────────────────────────────────── def handle_script_tool(config: dict, tool_name: str, arguments: dict) -> Dict[str, Any]: """ Dispatch a script.* tool call to the ScriptManager. Called from the inference loop when the model emits a script tool call. """ mgr = ScriptManager(config) action = tool_name.split(".")[-1] if "." in tool_name else tool_name if action == "write": return mgr.write( name=arguments.get("name", "unnamed"), commands=arguments.get("commands", []), description=arguments.get("description", ""), ) elif action == "validate": return mgr.validate(arguments.get("commands", [])) elif action == "execute": return mgr.execute( name=arguments.get("name", ""), as_player=arguments.get("as_player"), ) elif action == "read": return mgr.read(arguments.get("name", "")) elif action == "list": return mgr.list_scripts() elif action == "delete": return mgr.delete(arguments.get("name", "")) elif action == "schedule": return mgr.schedule( name=arguments.get("name", ""), schedule_type=arguments.get("type", ""), ) else: return {"ok": False, "error": f"Unknown script action: {action}"}