diff --git a/mc_aigod_paper.py b/mc_aigod_paper.py index ce092ee..9a8a435 100644 --- a/mc_aigod_paper.py +++ b/mc_aigod_paper.py @@ -7,6 +7,7 @@ Config: /etc/mc_aigod.json """ import json, os, random, re, socket, struct, threading, time, logging +from concurrent.futures import ThreadPoolExecutor from collections import deque from datetime import datetime from typing import Any, Dict @@ -24,6 +25,16 @@ logging.basicConfig( ) log = logging.getLogger(__name__) +# --- Async request processing pool --- +_request_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="aigod") + +def _safe_call(func, *args, **kwargs): + """Wrapper for thread pool calls with error logging.""" + try: + func(*args, **kwargs) + except Exception as e: + log.error(f"Async {func.__name__} failed: {e}", exc_info=True) + CONFIG_PATH = os.environ.get('MC_AIGOD_CONFIG', '/etc/mc_aigod_paper.json') PRAY_PATTERNS = [ @@ -1202,6 +1213,10 @@ def write_training_audit(player: str, mode: str, user_message: str, "server_type": config.get("server_type", "paper"), "version": "1.21.x", "online_players": context.get("online_players", []), + "player_details": context.get("player_details", {}), + "time_of_day": context.get("time_of_day", "unknown"), + "weather": context.get("weather", "unknown"), + "world_border": context.get("world_border"), } # Add player position if available try: @@ -2034,7 +2049,10 @@ def _llm_call(model: str, system: str, user: str, config: dict, payload["format"] = fmt r = requests.post(f"{config['ollama_url']}/api/chat", json=payload, timeout=timeout) r.raise_for_status() - return r.json()["message"]["content"] + content = r.json()["message"]["content"] + # Strip ... blocks from Qwen3 models + content = re.sub(r'[\s\S]*?\s*', '', content) + return content # --- Gemini API cost tracking and call --- @@ -2260,6 +2278,23 @@ def _gateway_actor(player: str, mode: str, config) -> str: def _gateway_start_session(player: str, mode: str, config) -> str: actor = _gateway_actor(player, mode, config) key = _gateway_key(actor, mode) + + # Fresh session mode: always start new session, no history carryover + if config.get("gateway_fresh_session", False): + url = config.get("langgraph_gateway_url", "http://127.0.0.1:8091") + timeout = int(config.get("langgraph_gateway_timeout", 45)) + r = requests.post( + f"{url}/v1/session/start", + json={"player": actor, "mode": mode}, + timeout=timeout, + ) + r.raise_for_status() + sid = r.json()["session_id"] + with _gateway_lock: + _gateway_sessions[key] = sid + return sid + + # Default: reuse existing session (carries history) with _gateway_lock: sid = _gateway_sessions.get(key) if sid: @@ -2370,7 +2405,42 @@ def ask_god(player, prayer, context, config): log.info(f"Gateway god tool_trace={out.get('tool_trace', [])}") return {"message": out.get("message"), "commands": out.get("commands") or []} - # --- Call 1: commands --- + # --- Single-call mode: one LLM call returns both commands and message --- + if config.get("single_call", False): + log.info(f"Single call ({command_model}) — {player}: {prayer[:60]} (history={len(history)//2})") + try: + content = _llm_call( + model=command_model, + system=GOD_SYSTEM_PROMPT, + user=user_msg, + config=config, + fmt="json", + temperature=0.5, + max_tokens=400, + ) + result = _parse_llm_json(content) + commands = result.get("commands") or [] + message = result.get("message") or "" + log.info(f"Commands decided: {commands}") + log.info(f"Message: {message[:200]}") + except Exception as e: + log.error(f"Single call failed: {e}") + commands = [] + message = "" + + try: + with open('/var/log/mc_aigod_paper_responses.log', 'a') as rf: + rf.write( + f"\n--- {time.strftime('%Y-%m-%d %H:%M:%S')} prayer:{player} [single] ---\n" + f"COMMANDS: {commands}\n" + f"MESSAGE: {message}\n" + ) + except Exception: + pass + + return {"message": message, "commands": commands} + + # --- Two-call mode (default): separate command and message calls --- log.info(f"Commands call ({command_model}) — {player}: {prayer[:60]} (history={len(history)//2})") try: cmd_content = _llm_call( @@ -2390,7 +2460,6 @@ def ask_god(player, prayer, context, config): commands = [] # --- Call 2: message --- - # Tell the message model what was decided so it can write accordingly if commands: action_summary = f"You decided to execute these server commands: {commands}" else: @@ -2402,7 +2471,6 @@ def ask_god(player, prayer, context, config): f"Now write your spoken message to all players." ) - # Include prayer history so God's voice is consistent msg_messages = ( [{"role": "system", "content": build_message_system_prompt(config)}] + history @@ -2422,6 +2490,8 @@ def ask_god(player, prayer, context, config): r = requests.post(f"{config['ollama_url']}/api/chat", json=msg_payload, timeout=60) r.raise_for_status() message = r.json()["message"]["content"].strip() + # Strip think blocks + message = re.sub(r'[\s\S]*?\s*', '', message) log.info(f"Message: {message[:200]}") except Exception as e: log.error(f"Message call failed: {e}") @@ -3013,32 +3083,108 @@ def _repair_execute_tail(cmd: str, fallback_player: str) -> str: return rebuilt return raw +# --- Validator hit-rate tracking --- +_validator_stats_lock = threading.Lock() +_validator_stats_file = "/var/log/mc_validator_stats.json" + +def _load_validator_stats(): + try: + with open(_validator_stats_file) as f: + return json.load(f) + except: + return {"total": 0, "fixes": {}} + +def _save_validator_stats(stats): + try: + with open(_validator_stats_file, "w") as f: + json.dump(stats, f) + except: + pass + +_validator_stats = _load_validator_stats() + +def _track_fix(fix_name, before, after): + """Track when a validator fix actually changes a command.""" + if before == after: + return + with _validator_stats_lock: + _validator_stats["fixes"].setdefault(fix_name, 0) + _validator_stats["fixes"][fix_name] += 1 + # Save every 50 commands + if _validator_stats["total"] % 50 == 0: + _save_validator_stats(_validator_stats) + + def validate_command(cmd, online_players, fallback_player, config=None): """Replace placeholders, auto-fix common give syntax errors, check safe prefix.""" + with _validator_stats_lock: + _validator_stats["total"] += 1 + resolved = cmd.replace("{player}", fallback_player).replace("{target}", fallback_player) resolved = resolved.strip() if resolved.startswith("/"): resolved = resolved[1:] + # Fix 5: hallucinated commands (must run first — may drop or rewrite entire command) + before = resolved resolved = fix_hallucinated_command(resolved) + _track_fix("hallucinated_command", before, resolved) if not resolved: return "", False + # Fix 1: @s → player name (RCON has no executor entity) + before = resolved resolved = fix_at_s_selector(resolved, fallback_player) + _track_fix("at_s_selector", before, resolved) + # Fix 2: old NBT enchantment → 1.21 component syntax + before = resolved resolved = fix_nbt_enchantment_syntax(resolved) + _track_fix("nbt_enchantment", before, resolved) + # Fix 3: strip invalid item components (display, durability, enc, etc.) + before = resolved resolved = fix_invalid_item_components(resolved) + _track_fix("invalid_components", before, resolved) + # Fix 4: hallucinated effect names → closest valid effect + before = resolved resolved = fix_hallucinated_effect(resolved) + _track_fix("hallucinated_effect", before, resolved) + # Existing fixes + before = resolved resolved = fix_give_command(resolved) + _track_fix("give_syntax", before, resolved) + + before = resolved resolved = fix_effect_command(resolved) + _track_fix("effect_syntax", before, resolved) + + before = resolved resolved = fix_gamemode_command(resolved, fallback_player) + _track_fix("gamemode_syntax", before, resolved) + + before = resolved resolved = fix_weather_command(resolved) + _track_fix("weather_syntax", before, resolved) + + before = resolved resolved = fix_fill_fire_command(resolved) + _track_fix("fill_fire", before, resolved) + + before = resolved resolved = fix_bow_enchant_syntax(resolved) + _track_fix("bow_enchant", before, resolved) + + before = resolved resolved = _repair_execute_tail(resolved, fallback_player) + _track_fix("execute_tail", before, resolved) + + # Periodic save + with _validator_stats_lock: + if _validator_stats["total"] % 50 == 0: + _save_validator_stats(_validator_stats) caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE] prefixes = caps["safe_prefixes"] if not any(resolved.startswith(p) for p in prefixes): @@ -4347,10 +4493,13 @@ def main(): player = m.group(1) prompt = m.group(2).strip() log.info(f"SUDO from {player}: {prompt}") - try: - process_sudo(player, prompt, config) - except Exception as e: - log.error(f"Error processing sudo: {e}", exc_info=True) + if config.get("async_processing", False): + _request_pool.submit(_safe_call, process_sudo, player, prompt, config) + else: + try: + process_sudo(player, prompt, config) + except Exception as e: + log.error(f"Error processing sudo: {e}", exc_info=True) matched = True break @@ -4365,10 +4514,13 @@ def main(): player = m.group(1) prayer = m.group(2).strip() log.info(f"Prayer from {player}: {prayer}") - try: - process_prayer(player, prayer, config, cooldowns) - except Exception as e: - log.error(f"Error processing prayer: {e}", exc_info=True) + if config.get("async_processing", False): + _request_pool.submit(_safe_call, process_prayer, player, prayer, config, cooldowns) + else: + try: + process_prayer(player, prayer, config, cooldowns) + except Exception as e: + log.error(f"Error processing prayer: {e}", exc_info=True) matched = True break