diff --git a/training/scripts/prompt_pipeline.py b/training/scripts/prompt_pipeline.py new file mode 100644 index 0000000..c046879 --- /dev/null +++ b/training/scripts/prompt_pipeline.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 +""" +Prompt Pipeline — 1660 generates prompts, bigger GPUs process them through Mortdecai. + +Architecture: + 1660 Super (qwen3.5:0.8b) → generates diverse edge-case prompts + → writes to shared queue file + 2080 Ti / RTX 4000 / 3090 Ti (mortdecai) → reads queue, processes, validates via RCON + +Usage: + # Terminal 1: prompt generator (runs on 1660) + python3 prompt_pipeline.py --mode generate --ollama-url http://192.168.0.235:11434 + + # Terminal 2+: processors (one per GPU) + python3 prompt_pipeline.py --mode process --ollama-url http://192.168.0.141:11434 + python3 prompt_pipeline.py --mode process --ollama-url http://192.168.0.179:11434 + + # Or run everything: + python3 prompt_pipeline.py --mode all +""" + +import argparse +import json +import os +import re +import random +import sys +import time +import threading +import fcntl +from pathlib import Path + +import requests + +ROOT = Path(__file__).resolve().parent.parent.parent +sys.path.insert(0, str(ROOT)) +from agent.tools.persistent_rcon import get_rcon + +QUEUE_FILE = ROOT / "data" / "processed" / "prompt_queue.jsonl" +OUTPUT_FILE = ROOT / "data" / "processed" / "pipeline_output.jsonl" +LOCK_FILE = str(QUEUE_FILE) + ".lock" + +# Categories the model needs more training on (based on validator hit rates and bake-off failures) +GENERATION_PROMPTS = { + "fill_syntax": """Generate 5 Minecraft player chat messages that involve filling areas with blocks. +Include: walls, floors, replacing blocks, clearing areas, building structures. +The AI often gets fill syntax wrong — adding a trailing count number or wrong coordinates. +Every message must start with "sudo ". Return ONLY a JSON array of strings.""", + + "enchantments": """Generate 5 Minecraft player chat messages requesting enchanted items. +Include: multiple enchantments, max level enchants, mutually exclusive combinations, items that cant be enchanted. +The AI sometimes uses old NBT syntax instead of 1.21 component syntax. +Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""", + + "execute_chains": """Generate 5 Minecraft player chat messages requiring execute command chains. +Include: execute as/at/if/positioned/in, targeting by gamemode/team, dimension switching, conditional blocks. +Every message must start with "sudo ". Return ONLY a JSON array of strings.""", + + "entity_targeting": """Generate 5 Minecraft player chat messages about killing or targeting specific entities. +Include: "kill THE zombie" vs "kill ALL zombies", distance-based targeting, entity types, nearby vs far. +Every message must start with "sudo ". Return ONLY a JSON array of strings.""", + + "gamerules_timed": """Generate 5 Minecraft player chat messages requesting temporary gamerule changes. +Include: specific durations like "for 5 minutes", "for an hour", "briefly", "permanently". +The AI needs to learn when to set revert timers vs permanent changes. +Every message must start with "sudo ". Return ONLY a JSON array of strings.""", + + "memory_commands": """Generate 5 Minecraft player chat messages about remembering or recalling locations and facts. +Include: "remember this as home", "tp me home", "what do you know about me", "forget my base". +Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""", + + "creative_prayers": """Generate 5 creative Minecraft prayers to an AI God character. +Include: humble requests, greedy demands, blasphemy, roleplay, emotional appeals, sarcasm. +Be creative and diverse. Every message must start with "pray ". Return ONLY a JSON array of strings.""", + + "edge_items": """Generate 5 Minecraft player requests for obscure or easily-confused items. +Include: items with color variants (wool, concrete, glass), items that changed names between versions, +items people misspell (steak vs cooked_beef, log vs oak_log, bed vs white_bed). +Every message must start with "sudo ". Return ONLY a JSON array of strings.""", + + "multicommand": """Generate 5 Minecraft player requests that need multiple commands to fulfill. +Include: "gear me up for the nether", "build a house and tp me inside", "create a pvp arena", +"prepare me for a boss fight". Complex multi-step requests. +Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""", + + "natural_language": """Generate 5 Minecraft chat messages phrased in unusual natural language. +Include: typos, slang, abbreviations, run-on sentences, vague requests, sarcasm, mixed languages. +The AI should still figure out what the player wants. +Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""", +} + + +def ensure_prefix(msg): + lower = msg.lower().strip() + if lower.startswith("pray ") or lower.startswith("sudo ") or lower.startswith("bug_log"): + return msg + return ("pray " if random.random() < 0.4 else "sudo ") + msg + + +def generate_prompts(ollama_url, model="qwen3.5:0.8b", count=50): + """Use a small model to generate diverse prompts.""" + prompts = [] + categories = list(GENERATION_PROMPTS.keys()) + random.shuffle(categories) + + for cat in categories: + if len(prompts) >= count: + break + try: + r = requests.post(f"{ollama_url}/api/chat", json={ + "model": model, + "messages": [ + {"role": "system", "content": "You generate Minecraft chat messages for AI training. Return ONLY a JSON array of strings."}, + {"role": "user", "content": GENERATION_PROMPTS[cat]}, + ], + "stream": False, + "options": {"temperature": 1.1, "num_predict": 400}, + }, timeout=60) + content = r.json()["message"]["content"] + content = re.sub(r'[\s\S]*?\s*', '', content) + cleaned = content.replace("```json", "").replace("```", "").strip() + match = re.search(r'\[[\s\S]*\]', cleaned) + if match: + items = json.loads(match.group()) + for item in items: + if isinstance(item, str) and item.strip(): + prompts.append({ + "prompt": ensure_prefix(item.strip()), + "category": cat, + "generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"), + }) + print(f" [{cat}] +{len(items) if match else 0} prompts") + except Exception as e: + print(f" [{cat}] ERROR: {e}") + time.sleep(0.5) + + return prompts + + +def write_queue(prompts): + """Append prompts to the queue file with file locking.""" + os.makedirs(os.path.dirname(QUEUE_FILE), exist_ok=True) + with open(LOCK_FILE, "w") as lf: + fcntl.flock(lf, fcntl.LOCK_EX) + with open(QUEUE_FILE, "a") as f: + for p in prompts: + f.write(json.dumps(p) + "\n") + fcntl.flock(lf, fcntl.LOCK_UN) + print(f" Queued {len(prompts)} prompts") + + +def read_queue(batch_size=5): + """Read and remove prompts from the queue.""" + if not QUEUE_FILE.exists(): + return [] + + with open(LOCK_FILE, "w") as lf: + fcntl.flock(lf, fcntl.LOCK_EX) + with open(QUEUE_FILE) as f: + lines = f.readlines() + taken = lines[:batch_size] + remaining = lines[batch_size:] + with open(QUEUE_FILE, "w") as f: + f.writelines(remaining) + fcntl.flock(lf, fcntl.LOCK_UN) + + return [json.loads(l) for l in taken if l.strip()] + + +def process_prompt(prompt_data, ollama_url, model, rcon_host, rcon_port, rcon_pass): + """Process a single prompt through Mortdecai and validate via RCON.""" + prompt = prompt_data["prompt"] + category = prompt_data.get("category", "unknown") + mode = "god" if prompt.lower().startswith("pray ") else "sudo" + + system = ('You are God in a Minecraft server. Return JSON: {"message": "...", "commands": ["cmd1"], "reasoning": "why"}' + if mode == "god" else + 'You are a Minecraft 1.21 command translator. Return JSON: {"commands": ["cmd1"], "reasoning": "why"}\n' + 'IMPORTANT: Each command is ONE COMPLETE STRING. Use minecraft: prefix. Enchantments: item[enchantments={name:level}].') + + try: + r = requests.post(f"{ollama_url}/api/chat", json={ + "model": model, + "messages": [ + {"role": "system", "content": "/no_think\n" + system}, + {"role": "user", "content": f"Player slingshooter08: {prompt}"}, + ], + "stream": False, "format": "json", + "options": {"temperature": 0.3, "num_predict": 500}, + }, timeout=120) + content = r.json()["message"]["content"] + content = re.sub(r'[\s\S]*?\s*', '', content) + parsed = json.loads(content) + except Exception as e: + return {"prompt": prompt, "category": category, "error": str(e), "success": False} + + commands = parsed.get("commands") or [] + if not commands or not all(isinstance(c, str) and " " in c for c in commands): + return {"prompt": prompt, "category": category, "commands": commands, "success": False, "reason": "empty_or_split"} + + # Validate via RCON + rcon = get_rcon(rcon_host, rcon_port, rcon_pass) + results = [] + all_ok = True + for cmd in commands[:6]: + try: + result = rcon.command(cmd) + is_error = "<--[HERE]" in result or "Unknown" in result or "Incorrect" in result + results.append({"cmd": cmd, "result": result[:150], "ok": not is_error}) + if is_error: + all_ok = False + except: + results.append({"cmd": cmd, "result": "RCON error", "ok": False}) + all_ok = False + + return { + "prompt": prompt, + "category": category, + "mode": mode, + "commands": commands, + "rcon_results": results, + "success": all_ok, + "message": parsed.get("message", ""), + "reasoning": parsed.get("reasoning", ""), + "processed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "ollama_url": ollama_url, + } + + +def save_result(result): + """Save a processed result.""" + os.makedirs(os.path.dirname(OUTPUT_FILE), exist_ok=True) + with open(OUTPUT_FILE, "a") as f: + f.write(json.dumps(result, ensure_ascii=False) + "\n") + + +def run_generator(ollama_url, model, interval=120): + """Continuously generate prompts.""" + print(f"Prompt Generator started") + print(f" Model: {model} on {ollama_url}") + print(f" Interval: {interval}s between batches") + + while True: + print(f"\nGenerating batch at {time.strftime('%H:%M:%S')}...") + prompts = generate_prompts(ollama_url, model, count=30) + if prompts: + write_queue(prompts) + print(f" Queue size: {sum(1 for _ in open(QUEUE_FILE)) if QUEUE_FILE.exists() else 0}") + time.sleep(interval) + + +def run_processor(ollama_url, model, rcon_host, rcon_port, rcon_pass): + """Continuously process prompts from the queue.""" + print(f"Processor started") + print(f" Model: {model} on {ollama_url}") + print(f" RCON: {rcon_host}:{rcon_port}") + + stats = {"processed": 0, "success": 0, "failed": 0} + + while True: + batch = read_queue(3) + if not batch: + time.sleep(5) + continue + + for prompt_data in batch: + result = process_prompt(prompt_data, ollama_url, model, rcon_host, rcon_port, rcon_pass) + save_result(result) + stats["processed"] += 1 + if result.get("success"): + stats["success"] += 1 + print(f" OK [{result['category'][:12]}] {result['prompt'][:50]}") + else: + stats["failed"] += 1 + print(f" FAIL [{result['category'][:12]}] {result['prompt'][:50]}") + + if stats["processed"] % 20 == 0: + rate = stats["success"] / max(stats["processed"], 1) * 100 + print(f" Stats: {stats['processed']} processed, {rate:.0f}% success") + + time.sleep(0.2) + + +def main(): + parser = argparse.ArgumentParser(description="Prompt generation pipeline") + parser.add_argument("--mode", choices=["generate", "process", "all"], required=True) + parser.add_argument("--gen-url", default="http://192.168.0.235:11434", help="Generator Ollama URL (small model)") + parser.add_argument("--gen-model", default="qwen3.5:0.8b", help="Generator model") + parser.add_argument("--proc-urls", default="http://192.168.0.141:11434,http://192.168.0.179:11434", + help="Processor Ollama URLs (comma-separated)") + parser.add_argument("--proc-model", default="mortdecai:0.4.0", help="Processor model") + parser.add_argument("--rcon-host", default="192.168.0.244") + parser.add_argument("--rcon-port", type=int, default=25578) + parser.add_argument("--rcon-pass", default="REDACTED_RCON") + parser.add_argument("--interval", type=int, default=120, help="Generator batch interval (seconds)") + args = parser.parse_args() + + if args.mode == "generate": + run_generator(args.gen_url, args.gen_model, args.interval) + + elif args.mode == "process": + urls = args.proc_urls.split(",") + run_processor(urls[0], args.proc_model, args.rcon_host, args.rcon_port, args.rcon_pass) + + elif args.mode == "all": + urls = args.proc_urls.split(",") + + # Start generator in background + gen_thread = threading.Thread(target=run_generator, + args=(args.gen_url, args.gen_model, args.interval), daemon=True) + gen_thread.start() + + # Start a processor per URL + for url in urls: + proc_thread = threading.Thread(target=run_processor, + args=(url, args.proc_model, args.rcon_host, args.rcon_port, args.rcon_pass), + daemon=True) + proc_thread.start() + print(f"Processor started on {url}") + + # Keep main alive + while True: + time.sleep(60) + queue_size = sum(1 for _ in open(QUEUE_FILE)) if QUEUE_FILE.exists() else 0 + output_size = sum(1 for _ in open(OUTPUT_FILE)) if OUTPUT_FILE.exists() else 0 + print(f"[Pipeline] Queue: {queue_size} | Output: {output_size}") + + +if __name__ == "__main__": + main()