#!/usr/bin/env python3 """ Mortdecai GPU Scheduler — preset-based job scheduler with live GPU monitoring. Features: - GPU dashboard with live stats across the homelab - Configuration presets (GPU assignments, model selection, pipeline type) - Job scheduler with 3 trigger types: time, finish_training, cost - Model management: load/unload Ollama models per GPU - Training progress monitor with loss curves Usage: python3 gpu_scheduler.py --port 8098 Serve behind Caddy as gpu.sethpc.xyz with google_auth. """ import argparse import json import os import re import subprocess import threading import time import uuid from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path from urllib.parse import parse_qs, urlparse from datetime import datetime, timedelta PORT = 8098 DATA_DIR = Path(__file__).resolve().parent.parent / "data" / "scheduler" # ── GPU Inventory ────────────────────────────────────────────────────────── GPUS = [ { "id": "3090ti", "name": "RTX 3090 Ti", "vram_gb": 24, "vram_mb": 24564, "host": "seth@192.168.0.141", "gpu_index": 1, "ollama_port": 11434, "ollama_service": "ollama.service", "capabilities": ["training", "inference", "self-play", "pipeline"], "location": "steel141", }, { "id": "2080ti", "name": "RTX 2080 Ti", "vram_gb": 11, "vram_mb": 11264, "host": "seth@192.168.0.141", "gpu_index": 0, "ollama_port": 11435, "ollama_service": "ollama-gpu0.service", "capabilities": ["inference", "self-play", "pipeline", "generator"], "location": "steel141", }, { "id": "rtx4000", "name": "Quadro RTX 4000", "vram_gb": 8, "vram_mb": 8192, "host": "pve197", "gpu_index": 0, "pct_id": 105, "ollama_port": 11434, "ollama_service": "ollama.service", "capabilities": ["inference", "self-play", "pipeline", "prod"], "location": "pve197 → CT 105", }, { "id": "1660s", "name": "GTX 1660 Super", "vram_gb": 6, "vram_mb": 6144, "host": "root@192.168.0.235", "gpu_index": 0, "ollama_port": 11434, "ollama_service": "ollama.service", "capabilities": ["generator", "inference-small"], "location": "bedroom", "ssh_extra": "-o StrictHostKeyChecking=no", "ssh_pass": "REDACTED_PASSWORD", }, ] GPU_MAP = {g["id"]: g for g in GPUS} # ── Pipeline Definitions ────────────────────────────────────────────────── PIPELINE_TYPES = { "training": { "label": "Training (QLoRA)", "description": "Fine-tune model via Unsloth QLoRA", "gpu_req": ["training"], "params": ["base_model", "dataset", "output_name", "epochs", "lr", "batch_size", "grad_accum", "max_seq_len", "save_steps"], "defaults": { "base_model": "Qwen/Qwen3.5-9B", "dataset": "auto", "output_name": "mortdecai-0.5.0", "epochs": 1, "lr": 1e-4, "batch_size": 2, "grad_accum": 4, "max_seq_len": 2048, "save_steps": 50, }, }, "self_play": { "label": "Self-Play", "description": "Model generates edge cases and learns from failures", "gpu_req": ["inference"], "params": ["model", "tiers", "rounds_per_tier", "rcon_host", "rcon_port", "rcon_pass"], "defaults": { "model": "mortdecai:0.4.0", "tiers": "1,2,3", "rounds_per_tier": 50, "rcon_host": "192.168.0.244", "rcon_port": 25578, "rcon_pass": "REDACTED_RCON", }, }, "prompt_pipeline": { "label": "Prompt Pipeline", "description": "Small model generates prompts, big models process + RCON validate", "gpu_req": ["generator", "inference"], "params": ["gen_model", "proc_model", "batch_size", "interval"], "defaults": { "gen_model": "qwen3.5:0.8b", "proc_model": "mortdecai:0.4.0", "batch_size": 30, "interval": 120, }, }, "bakeoff": { "label": "Bake-off", "description": "Compare model versions on standard test prompts", "gpu_req": ["inference"], "params": ["models", "test_set", "rcon_host"], "defaults": { "models": "mortdecai:0.4.0,mortdecai:0.5.0", "test_set": "standard", "rcon_host": "192.168.0.244", }, }, "export_gguf": { "label": "Export GGUF", "description": "Convert LoRA adapter to GGUF for Ollama", "gpu_req": ["training"], "params": ["adapter_path", "output_name", "quant"], "defaults": { "adapter_path": "training/checkpoints/mortdecai-0.5.0", "output_name": "mortdecai:0.5.0", "quant": "q4_k_m", }, }, "tool_self_play": { "label": "Tool Self-Play", "description": "Exercise all 14 tools on the dev server — scripts, memory, entities, wiki", "gpu_req": ["inference"], "params": ["model", "rounds", "categories", "rcon_host", "rcon_port", "rcon_pass"], "defaults": { "model": "mortdecai:0.4.0", "rounds": 10, "categories": "all", "rcon_host": "192.168.0.112", "rcon_port": 25578, "rcon_pass": "REDACTED_RCON", }, }, "load_model": { "label": "Load Model", "description": "Load/switch Ollama model on a GPU", "gpu_req": ["inference"], "params": ["model"], "defaults": {"model": "mortdecai:0.4.0"}, }, } # ── State ────────────────────────────────────────────────────────────────── _lock = threading.Lock() _state = { "gpus": {}, "training": None, "last_refresh": None, } _presets = {} # id -> preset dict _jobs = [] # list of job dicts _schedule = [] # list of scheduled trigger dicts _cost_tracker = {"total_kwh": 0.0, "total_cost": 0.0, "electricity_rate": 0.12} TRAINING_LOG_PATTERN = "/home/seth/mc-ai-training/Minecraft-AI-model/training/train_run_*.log" TRAINING_HOST = "seth@192.168.0.141" # ── Persistence ──────────────────────────────────────────────────────────── def _ensure_data_dir(): DATA_DIR.mkdir(parents=True, exist_ok=True) def _save_presets(): _ensure_data_dir() with open(DATA_DIR / "presets.json", "w") as f: json.dump(_presets, f, indent=2) def _save_jobs(): _ensure_data_dir() with open(DATA_DIR / "jobs.json", "w") as f: json.dump(_jobs, f, indent=2, default=str) def _save_schedule(): _ensure_data_dir() with open(DATA_DIR / "schedule.json", "w") as f: json.dump(_schedule, f, indent=2, default=str) def _load_persisted(): global _presets, _jobs, _schedule _ensure_data_dir() for name, target in [("presets.json", "_presets"), ("jobs.json", "_jobs"), ("schedule.json", "_schedule")]: path = DATA_DIR / name if path.exists(): with open(path) as f: data = json.load(f) if target == "_presets": _presets = data elif target == "_jobs": _jobs = data elif target == "_schedule": _schedule = data # ── SSH Helpers ──────────────────────────────────────────────────────────── def _ssh_cmd(gpu_or_host, cmd, timeout=8): """Run a command over SSH. Accepts a GPU dict or host string.""" if isinstance(gpu_or_host, dict): gpu = gpu_or_host host = gpu["host"] extra = gpu.get("ssh_extra", "").split() if gpu.get("ssh_extra") else [] ssh_pass = gpu.get("ssh_pass") # If pct_id is set, wrap command through proxmox host if "pct_id" in gpu: cmd = f"pct exec {gpu['pct_id']} -- bash -c '{cmd}'" else: host = gpu_or_host extra = [] ssh_pass = None try: if ssh_pass: full_cmd = ["sshpass", "-p", ssh_pass, "ssh", "-o", "ConnectTimeout=4"] + extra + [host, cmd] else: full_cmd = ["ssh", "-o", "ConnectTimeout=4", "-o", "BatchMode=yes"] + extra + [host, cmd] r = subprocess.run(full_cmd, capture_output=True, text=True, timeout=timeout) return r.stdout.strip() if r.returncode == 0 else None except Exception: return None def _ollama_api(gpu, endpoint, method="GET", data=None): """Call Ollama API on a GPU via SSH curl.""" port = gpu["ollama_port"] if method == "GET": cmd = f"curl -s --connect-timeout 3 http://localhost:{port}{endpoint}" else: payload = json.dumps(data).replace("'", "'\\''") if data else "{}" cmd = f"curl -s --connect-timeout 3 -X POST http://localhost:{port}{endpoint} -d '{payload}'" raw = _ssh_cmd(gpu, cmd) if raw: try: return json.loads(raw) except json.JSONDecodeError: pass return None # ── GPU Monitoring ───────────────────────────────────────────────────────── def _fetch_gpu_stats(gpu): query = f"nvidia-smi --id={gpu['gpu_index']} --query-gpu=utilization.gpu,temperature.gpu,power.draw,memory.used,memory.total,fan.speed --format=csv,noheader,nounits" raw = _ssh_cmd(gpu, query) if not raw: return {"online": False, "id": gpu["id"], "name": gpu["name"]} parts = [p.strip() for p in raw.split(",")] try: return { "online": True, "id": gpu["id"], "name": gpu["name"], "vram_gb": gpu["vram_gb"], "location": gpu["location"], "capabilities": gpu["capabilities"], "utilization": int(parts[0]), "temperature": int(parts[1]), "power_watts": float(parts[2]), "vram_used_mb": int(parts[3]), "vram_total_mb": int(parts[4]), "fan_speed": int(parts[5]) if parts[5] not in ("[N/A]", "[Not Supported]") else None, "vram_pct": round(int(parts[3]) / int(parts[4]) * 100, 1), } except (ValueError, IndexError): return {"online": True, "id": gpu["id"], "name": gpu["name"], "error": raw} def _fetch_ollama_info(gpu): """Get running + available models from Ollama.""" ps = _ollama_api(gpu, "/api/ps") or {} tags = _ollama_api(gpu, "/api/tags") or {} running = [] for m in ps.get("models", []): running.append({ "name": m.get("name", "?"), "size_gb": round(m.get("size", 0) / 1e9, 1), "vram_gb": round(m.get("size_vram", 0) / 1e9, 1), }) available = [m.get("name", "?") for m in tags.get("models", [])] return {"running": running, "available": available} def _fetch_training_status(): # Find the most recently modified training log log_path = _ssh_cmd(TRAINING_HOST, f"ls -t {TRAINING_LOG_PATTERN} 2>/dev/null | head -1", timeout=5) if not log_path: return None raw = _ssh_cmd(TRAINING_HOST, f"tail -200 {log_path} 2>/dev/null", timeout=8) if not raw: return None status = {"active": False, "loss_history": []} progress_matches = re.findall(r'(\d+)%\|[^|]*\|\s*(\d+)/(\d+)\s*\[([^\]]+)\]', raw) if progress_matches: last = progress_matches[-1] status["pct"] = int(last[0]) status["current_step"] = int(last[1]) status["total_steps"] = int(last[2]) timing = last[3] eta_match = re.search(r'<([^,]+)', timing) elapsed_match = re.match(r'([^<]+)', timing) if eta_match: status["eta"] = eta_match.group(1).strip() if elapsed_match: status["elapsed"] = elapsed_match.group(1).strip() status["active"] = True if "OutOfMemoryError" in raw: status["active"] = False status["error"] = "OOM" elif "Error" in raw.split("\n")[-1] and "OutOfMemoryError" not in raw: status["active"] = False status["error"] = "crashed" loss_matches = re.findall(r"'loss':\s*'([^']+)'", raw) for lm in loss_matches: try: status["loss_history"].append(float(lm)) except ValueError: pass if status["loss_history"]: status["latest_loss"] = status["loss_history"][-1] lr_matches = re.findall(r"'learning_rate':\s*'([^']+)'", raw) if lr_matches: status["learning_rate"] = lr_matches[-1] return status def _fetch_processes(gpu): cmd = f"nvidia-smi --id={gpu['gpu_index']} --query-compute-apps=pid,name,used_memory --format=csv,noheader,nounits 2>/dev/null" raw = _ssh_cmd(gpu, cmd) if not raw: return [] procs = [] for line in raw.strip().split("\n"): if not line.strip(): continue parts = [p.strip() for p in line.split(",")] if len(parts) >= 3: procs.append({"pid": parts[0], "name": parts[1].split("/")[-1], "vram_mb": parts[2]}) return procs def refresh_state(): new_gpus = {} threads = [] def fetch_one(gpu): stats = _fetch_gpu_stats(gpu) stats["ollama"] = _fetch_ollama_info(gpu) stats["processes"] = _fetch_processes(gpu) # Check if any job is running on this GPU active_jobs = [j for j in _jobs if j.get("status") == "running" and gpu["id"] in j.get("gpus", [])] stats["active_job"] = active_jobs[0]["id"] if active_jobs else None new_gpus[gpu["id"]] = stats for gpu in GPUS: t = threading.Thread(target=fetch_one, args=(gpu,)) t.start() threads.append(t) for t in threads: t.join(timeout=12) with _lock: _state["gpus"] = new_gpus _state["training"] = _fetch_training_status() _state["last_refresh"] = time.strftime("%H:%M:%S") def _bg_refresh_loop(interval=10): while True: try: refresh_state() _check_triggers() except Exception as e: print(f"[scheduler] refresh error: {e}") time.sleep(interval) # ── Job Execution ────────────────────────────────────────────────────────── def _run_job_async(job): """Execute a job in a background thread.""" def _run(): job["status"] = "running" job["started_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ") _save_jobs() print(f"[scheduler] starting job {job['id']}: {job['pipeline']}") try: pipeline = job["pipeline"] params = job["params"] gpus = job["gpus"] if pipeline == "training": _exec_training(job, params) elif pipeline == "self_play": _exec_self_play(job, params, gpus) elif pipeline == "prompt_pipeline": _exec_prompt_pipeline(job, params, gpus) elif pipeline == "load_model": _exec_load_model(job, params, gpus) elif pipeline == "export_gguf": _exec_export_gguf(job, params) elif pipeline == "bakeoff": _exec_bakeoff(job, params, gpus) elif pipeline == "tool_self_play": _exec_tool_self_play(job, params, gpus) else: job["error"] = f"unknown pipeline: {pipeline}" job["status"] = "failed" except Exception as e: job["error"] = str(e) job["status"] = "failed" print(f"[scheduler] job {job['id']} failed: {e}") if job["status"] == "running": job["status"] = "completed" job["finished_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ") _save_jobs() print(f"[scheduler] job {job['id']} → {job['status']}") t = threading.Thread(target=_run, daemon=True) t.start() return job def _exec_training(job, params): """Launch training on the 3090 Ti via SSH.""" output_name = params.get('output_name', 'mortdecai-0.5.0') log_name = f"train_run_{output_name}.log" # Build the training command with conda environment activation train_cmd = ( f"source /home/seth/miniconda3/etc/profile.d/conda.sh && " f"conda activate mc-train && " f"cd /home/seth/mc-ai-training/Minecraft-AI-model && " f"TORCH_COMPILE_DISABLE=1 TORCHDYNAMO_DISABLE=1 CUDA_VISIBLE_DEVICES=0 " f"python3 training/scripts/train_lora.py " f"--model '{params.get('base_model', 'Qwen/Qwen3.5-9B')}' " f"--output 'training/checkpoints/{output_name}' " f"--lr {params.get('lr', 1e-4)} " f"--epochs {int(params.get('epochs', 1))} " f"--batch-size {int(params.get('batch_size', 2))} " f"--grad-accum {int(params.get('grad_accum', 4))} " f"--max-seq-len {int(params.get('max_seq_len', 2048))} " f"--save-steps {int(params.get('save_steps', 50))}" ) if params.get("resume"): train_cmd += " --resume" train_cmd += f" 2>&1 | tee training/{log_name}" # Cancel any running jobs on the 3090 Ti to free VRAM for j in _jobs: if j.get("status") == "running" and "3090ti" in j.get("gpus", []) and j["id"] != job["id"]: j["status"] = "cancelled" print(f"[training] cancelled conflicting job {j['id']} on 3090ti") _save_jobs() # Stop both Ollama services AND prevent auto-restart _ssh_cmd(TRAINING_HOST, "sudo systemctl stop ollama.service 2>/dev/null", timeout=10) _ssh_cmd(TRAINING_HOST, "sudo systemctl stop ollama-gpu0.service 2>/dev/null", timeout=10) time.sleep(2) # Kill any lingering ollama processes holding GPU 1 VRAM _ssh_cmd(TRAINING_HOST, "for pid in $(nvidia-smi --id=1 --query-compute-apps=pid --format=csv,noheader,nounits 2>/dev/null); do kill $pid 2>/dev/null; done", timeout=5) time.sleep(3) # Verify VRAM is free enough (need ~18GB free on 24GB card) vram_check = _ssh_cmd(TRAINING_HOST, "nvidia-smi --id=1 --query-gpu=memory.free --format=csv,noheader,nounits") if vram_check: try: free_mb = int(vram_check.strip()) except ValueError: free_mb = 0 print(f"[training] 3090 Ti free VRAM: {free_mb}MB") if free_mb < 18000: # Last resort: try harder to free VRAM _ssh_cmd(TRAINING_HOST, "sudo systemctl stop ollama.service; sudo systemctl stop ollama-gpu0.service", timeout=10) time.sleep(5) vram_check2 = _ssh_cmd(TRAINING_HOST, "nvidia-smi --id=1 --query-gpu=memory.free --format=csv,noheader,nounits") try: free_mb = int(vram_check2.strip()) if vram_check2 else 0 except ValueError: free_mb = 0 if free_mb < 18000: job["status"] = "failed" job["error"] = f"Not enough VRAM: {free_mb}MB free, need 18000MB" # Restart Ollama since we're not training _ssh_cmd(TRAINING_HOST, "sudo systemctl start ollama.service 2>/dev/null", timeout=10) _ssh_cmd(TRAINING_HOST, "sudo systemctl start ollama-gpu0.service 2>/dev/null", timeout=10) return # Launch training via nohup with bash -l for conda nohup_cmd = f"nohup bash -c '{train_cmd}' > /dev/null 2>&1 &" _ssh_cmd(TRAINING_HOST, nohup_cmd, timeout=10) job["log_path"] = f"training/{log_name}" print(f"[training] launched, logging to {log_name}") # Monitor until done while job["status"] == "running": time.sleep(30) status = _fetch_training_status() if status: job["progress"] = status if status.get("error"): job["status"] = "failed" job["error"] = status["error"] break if not status.get("active") and status.get("current_step", 0) == status.get("total_steps", 0) and status.get("total_steps", 0) > 0: job["status"] = "completed" break # Restart Ollama services after training _ssh_cmd(TRAINING_HOST, "sudo systemctl start ollama.service 2>/dev/null", timeout=10) _ssh_cmd(TRAINING_HOST, "sudo systemctl start ollama-gpu0.service 2>/dev/null", timeout=10) def _exec_self_play(job, params, gpus): resolved_gpus = [GPU_MAP[gid] for gid in gpus if gid in GPU_MAP] if not resolved_gpus: job["error"] = "no GPU assigned" job["status"] = "failed" return model = params.get("model", "mortdecai:0.4.0") tiers = [t.strip() for t in params.get("tiers", "1,2,3").split(",")] rounds = int(params.get("rounds_per_tier", 50)) rcon_host = params.get("rcon_host", "192.168.0.244") rcon_port = int(params.get("rcon_port", 25578)) rcon_pass = params.get("rcon_pass", "REDACTED_RCON") script_path = "/home/seth/mc-ai-training/Minecraft-AI-model/training/scripts/self_play.py" # Distribute tiers round-robin across GPUs, launch all in parallel gpu_assignments = {} # gpu_id -> list of tiers for i, tier in enumerate(tiers): gpu = resolved_gpus[i % len(resolved_gpus)] gpu_assignments.setdefault(gpu["id"], []).append(tier) job["gpu_assignments"] = {gid: ts for gid, ts in gpu_assignments.items()} # Launch all GPU workers in parallel threads errors = [] def run_on_gpu(gpu, assigned_tiers): port = gpu["ollama_port"] for tier in assigned_tiers: if job["status"] != "running": break log_file = f"/tmp/selfplay_{gpu['id']}_{tier}.log" cmd = (f"cd /home/seth/mc-ai-training/Minecraft-AI-model && " f"python3 {script_path} --tier {tier} --rounds {rounds} " f"--ollama-url http://localhost:{port} --model {model} " f"--rcon-host {rcon_host} --rcon-port {rcon_port} --rcon-pass {rcon_pass}") _ssh_cmd(gpu, f"nohup bash -c '{cmd}' > {log_file} 2>&1 &", timeout=10) print(f"[self-play] {gpu['name']}: {tier} x{rounds} started") # Wait for this tier to finish for _ in range(rounds * 3): time.sleep(10) log = _ssh_cmd(gpu, f"tail -5 {log_file} 2>/dev/null") if log and ("Complete" in log or "Error" in log or "Traceback" in log): if "Error" in log or "Traceback" in log: errors.append(f"{gpu['name']}/{tier}: {log[-200:]}") break if job["status"] != "running": break threads = [] for gid, assigned_tiers in gpu_assignments.items(): gpu = GPU_MAP[gid] t = threading.Thread(target=run_on_gpu, args=(gpu, assigned_tiers), daemon=True) t.start() threads.append(t) for t in threads: t.join() if errors: job["error"] = "; ".join(errors[:3]) def _exec_prompt_pipeline(job, params, gpus): gen_gpu = GPU_MAP.get(gpus[0]) if len(gpus) > 0 else None proc_gpu = GPU_MAP.get(gpus[1]) if len(gpus) > 1 else gen_gpu if not gen_gpu: job["error"] = "no GPUs assigned" job["status"] = "failed" return gen_port = gen_gpu["ollama_port"] proc_port = proc_gpu["ollama_port"] if proc_gpu else gen_port gen_host_ip = gen_gpu["host"].split("@")[-1] proc_host_ip = proc_gpu["host"].split("@")[-1] if proc_gpu else gen_host_ip cmd = (f"cd /home/seth/mc-ai-training/Minecraft-AI-model && " f"python3 training/scripts/prompt_pipeline.py --mode all " f"--gen-url http://{gen_host_ip}:{gen_port} " f"--gen-model {params.get('gen_model', 'qwen3.5:0.8b')} " f"--proc-urls http://{proc_host_ip}:{proc_port} " f"--proc-model {params.get('proc_model', 'mortdecai:0.4.0')} " f"--interval {params.get('interval', 120)}") _ssh_cmd(TRAINING_HOST, f"nohup bash -c '{cmd}' > /tmp/pipeline.log 2>&1 &", timeout=10) def _exec_load_model(job, params, gpus): for gid in gpus: gpu = GPU_MAP.get(gid) if not gpu: continue model = params.get("model", "mortdecai:0.4.0") result = _ollama_api(gpu, "/api/generate", method="POST", data={ "model": model, "prompt": "test", "stream": False, "options": {"num_predict": 1}, }) if result and "error" not in result: job["result"] = f"Loaded {model} on {gpu['name']}" else: job["error"] = f"Failed to load {model} on {gpu['name']}: {result}" job["status"] = "failed" def _exec_export_gguf(job, params): adapter = params.get("adapter_path", "training/checkpoints/mortdecai-0.5.0") quant = params.get("quant", "q4_k_m") cmd = (f"cd /home/seth/mc-ai-training/Minecraft-AI-model && " f"python3 -m unsloth.save --model {adapter} --output_type gguf --quantization {quant}") _ssh_cmd(TRAINING_HOST, f"nohup bash -c '{cmd}' > /tmp/export_gguf.log 2>&1 &", timeout=10) # Monitor for _ in range(120): time.sleep(15) log = _ssh_cmd(TRAINING_HOST, "tail -3 /tmp/export_gguf.log 2>/dev/null") if log and ("Saved" in log or "Error" in log or "error" in log): if "Error" in log or "error" in log: job["status"] = "failed" job["error"] = log break def _exec_bakeoff(job, params, gpus): gpu = GPU_MAP.get(gpus[0]) if gpus else None if not gpu: job["error"] = "no GPU assigned" job["status"] = "failed" return models = params.get("models", "mortdecai:0.4.0") cmd = (f"cd /home/seth/mc-ai-training/Minecraft-AI-model && " f"python3 training/scripts/bakeoff.py --models {models}") _ssh_cmd(TRAINING_HOST, f"nohup bash -c '{cmd}' > /tmp/bakeoff.log 2>&1 &", timeout=10) def _exec_tool_self_play(job, params, gpus): """Run tool-focused self-play on the dev server via the assigned GPU's Ollama.""" gpu = GPU_MAP.get(gpus[0]) if gpus else None if not gpu: job["error"] = "no GPU assigned" job["status"] = "failed" return host_ip = gpu["host"].split("@")[-1] if "@" in gpu["host"] else gpu["host"] # For pct-based GPUs, use the CT's external IP if "pct_id" in gpu: host_ip = "192.168.0.179" # CT 105 external IP port = gpu["ollama_port"] model = params.get("model", "mortdecai:0.4.0") rounds = int(params.get("rounds", 10)) categories = params.get("categories", "all") rcon_host = params.get("rcon_host", "192.168.0.112") rcon_port = int(params.get("rcon_port", 25578)) rcon_pass = params.get("rcon_pass", "REDACTED_RCON") script_path = "/home/seth/mc-ai-training/Minecraft-AI-model/training/scripts/tool_self_play.py" log_file = f"/tmp/tool_selfplay_{gpu['id']}.log" cmd = (f"cd /home/seth/mc-ai-training/Minecraft-AI-model && " f"python3 {script_path} " f"--ollama-url http://{host_ip}:{port} --model {model} " f"--rcon-host {rcon_host} --rcon-port {rcon_port} --rcon-pass {rcon_pass} " f"--rounds {rounds} --categories {categories}") _ssh_cmd(TRAINING_HOST, f"nohup bash -c '{cmd}' > {log_file} 2>&1 &", timeout=10) print(f"[tool-self-play] launched on {gpu['name']}, logging to {log_file}") # Monitor until done for _ in range(rounds * len(PIPELINE_TYPES) * 3): time.sleep(15) log = _ssh_cmd(TRAINING_HOST, f"tail -5 {log_file} 2>/dev/null") if log and ("Complete" in log or "Traceback" in log): if "Traceback" in log: job["error"] = log[-300:] job["status"] = "failed" break if job["status"] != "running": break # ── Trigger Engine ───────────────────────────────────────────────────────── def _check_triggers(): """Evaluate all scheduled triggers.""" now = datetime.now() for sched in _schedule: if sched.get("status") != "pending": continue trigger = sched["trigger"] fired = False if trigger["type"] == "time": target_str = trigger.get("at") if target_str: try: target = datetime.fromisoformat(target_str) if now >= target: fired = True except ValueError: pass duration_s = trigger.get("duration_seconds") created_str = sched.get("created_at") if duration_s and created_str: try: created = datetime.fromisoformat(created_str) if now >= created + timedelta(seconds=int(duration_s)): fired = True except ValueError: pass elif trigger["type"] == "finish_training": training = _state.get("training") if training: total = training.get("total_steps", 0) current = training.get("current_step", 0) if total > 0 and current >= total and not training.get("active"): fired = True elif trigger["type"] == "cost": threshold = float(trigger.get("threshold_usd", 999)) if _cost_tracker["total_cost"] >= threshold: fired = True if fired: sched["status"] = "fired" sched["fired_at"] = now.isoformat() _save_schedule() print(f"[scheduler] trigger fired: {sched['id']} → launching preset {sched['preset_id']}") _launch_preset(sched["preset_id"]) def _launch_preset(preset_id): """Create and start a job from a preset.""" preset = _presets.get(preset_id) if not preset: print(f"[scheduler] preset {preset_id} not found") return None job = { "id": str(uuid.uuid4())[:8], "preset_id": preset_id, "preset_name": preset.get("name", "?"), "pipeline": preset["pipeline"], "params": preset.get("params", {}), "gpus": preset.get("gpus", []), "status": "queued", "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"), } _jobs.append(job) _save_jobs() _run_job_async(job) return job # ── HTML Rendering ───────────────────────────────────────────────────────── def _render_page(): with _lock: state = dict(_state) gpu_cards = "" for gpu in GPUS: data = state["gpus"].get(gpu["id"], {"online": False, "id": gpu["id"], "name": gpu["name"]}) gpu_cards += _gpu_card_html(data) training_html = _training_card_html(state.get("training")) presets_list_html, presets_form_html = _presets_panel_html() schedule_html = _schedule_panel_html() jobs_html = _jobs_panel_html() last_refresh = state.get("last_refresh", "never") online_count = len([g for g in state["gpus"].values() if g.get("online")]) return f"""