Async processing, single-call mode, validator tracking, think stripping

- ThreadPoolExecutor (3 workers) for concurrent prayer/sudo processing
- Single-call mode: one LLM call returns commands + message (config: single_call)
- Validator hit-rate tracking to /var/log/mc_validator_stats.json
- Strip <think> blocks from Qwen3 model output via regex
- Fresh LangGraph sessions (no history carryover)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Code
2026-03-19 19:03:41 -04:00
parent 6f585e0021
commit 4706952c52
+164 -12
View File
@@ -7,6 +7,7 @@ Config: /etc/mc_aigod.json
""" """
import json, os, random, re, socket, struct, threading, time, logging import json, os, random, re, socket, struct, threading, time, logging
from concurrent.futures import ThreadPoolExecutor
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
from typing import Any, Dict from typing import Any, Dict
@@ -24,6 +25,16 @@ logging.basicConfig(
) )
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# --- Async request processing pool ---
_request_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="aigod")
def _safe_call(func, *args, **kwargs):
"""Wrapper for thread pool calls with error logging."""
try:
func(*args, **kwargs)
except Exception as e:
log.error(f"Async {func.__name__} failed: {e}", exc_info=True)
CONFIG_PATH = os.environ.get('MC_AIGOD_CONFIG', '/etc/mc_aigod_paper.json') CONFIG_PATH = os.environ.get('MC_AIGOD_CONFIG', '/etc/mc_aigod_paper.json')
PRAY_PATTERNS = [ PRAY_PATTERNS = [
@@ -1202,6 +1213,10 @@ def write_training_audit(player: str, mode: str, user_message: str,
"server_type": config.get("server_type", "paper"), "server_type": config.get("server_type", "paper"),
"version": "1.21.x", "version": "1.21.x",
"online_players": context.get("online_players", []), "online_players": context.get("online_players", []),
"player_details": context.get("player_details", {}),
"time_of_day": context.get("time_of_day", "unknown"),
"weather": context.get("weather", "unknown"),
"world_border": context.get("world_border"),
} }
# Add player position if available # Add player position if available
try: try:
@@ -2034,7 +2049,10 @@ def _llm_call(model: str, system: str, user: str, config: dict,
payload["format"] = fmt payload["format"] = fmt
r = requests.post(f"{config['ollama_url']}/api/chat", json=payload, timeout=timeout) r = requests.post(f"{config['ollama_url']}/api/chat", json=payload, timeout=timeout)
r.raise_for_status() r.raise_for_status()
return r.json()["message"]["content"] content = r.json()["message"]["content"]
# Strip <think>...</think> blocks from Qwen3 models
content = re.sub(r'<think>[\s\S]*?</think>\s*', '', content)
return content
# --- Gemini API cost tracking and call --- # --- Gemini API cost tracking and call ---
@@ -2260,6 +2278,23 @@ def _gateway_actor(player: str, mode: str, config) -> str:
def _gateway_start_session(player: str, mode: str, config) -> str: def _gateway_start_session(player: str, mode: str, config) -> str:
actor = _gateway_actor(player, mode, config) actor = _gateway_actor(player, mode, config)
key = _gateway_key(actor, mode) key = _gateway_key(actor, mode)
# Fresh session mode: always start new session, no history carryover
if config.get("gateway_fresh_session", False):
url = config.get("langgraph_gateway_url", "http://127.0.0.1:8091")
timeout = int(config.get("langgraph_gateway_timeout", 45))
r = requests.post(
f"{url}/v1/session/start",
json={"player": actor, "mode": mode},
timeout=timeout,
)
r.raise_for_status()
sid = r.json()["session_id"]
with _gateway_lock:
_gateway_sessions[key] = sid
return sid
# Default: reuse existing session (carries history)
with _gateway_lock: with _gateway_lock:
sid = _gateway_sessions.get(key) sid = _gateway_sessions.get(key)
if sid: if sid:
@@ -2370,7 +2405,42 @@ def ask_god(player, prayer, context, config):
log.info(f"Gateway god tool_trace={out.get('tool_trace', [])}") log.info(f"Gateway god tool_trace={out.get('tool_trace', [])}")
return {"message": out.get("message"), "commands": out.get("commands") or []} return {"message": out.get("message"), "commands": out.get("commands") or []}
# --- Call 1: commands --- # --- Single-call mode: one LLM call returns both commands and message ---
if config.get("single_call", False):
log.info(f"Single call ({command_model}) — {player}: {prayer[:60]} (history={len(history)//2})")
try:
content = _llm_call(
model=command_model,
system=GOD_SYSTEM_PROMPT,
user=user_msg,
config=config,
fmt="json",
temperature=0.5,
max_tokens=400,
)
result = _parse_llm_json(content)
commands = result.get("commands") or []
message = result.get("message") or ""
log.info(f"Commands decided: {commands}")
log.info(f"Message: {message[:200]}")
except Exception as e:
log.error(f"Single call failed: {e}")
commands = []
message = ""
try:
with open('/var/log/mc_aigod_paper_responses.log', 'a') as rf:
rf.write(
f"\n--- {time.strftime('%Y-%m-%d %H:%M:%S')} prayer:{player} [single] ---\n"
f"COMMANDS: {commands}\n"
f"MESSAGE: {message}\n"
)
except Exception:
pass
return {"message": message, "commands": commands}
# --- Two-call mode (default): separate command and message calls ---
log.info(f"Commands call ({command_model}) — {player}: {prayer[:60]} (history={len(history)//2})") log.info(f"Commands call ({command_model}) — {player}: {prayer[:60]} (history={len(history)//2})")
try: try:
cmd_content = _llm_call( cmd_content = _llm_call(
@@ -2390,7 +2460,6 @@ def ask_god(player, prayer, context, config):
commands = [] commands = []
# --- Call 2: message --- # --- Call 2: message ---
# Tell the message model what was decided so it can write accordingly
if commands: if commands:
action_summary = f"You decided to execute these server commands: {commands}" action_summary = f"You decided to execute these server commands: {commands}"
else: else:
@@ -2402,7 +2471,6 @@ def ask_god(player, prayer, context, config):
f"Now write your spoken message to all players." f"Now write your spoken message to all players."
) )
# Include prayer history so God's voice is consistent
msg_messages = ( msg_messages = (
[{"role": "system", "content": build_message_system_prompt(config)}] [{"role": "system", "content": build_message_system_prompt(config)}]
+ history + history
@@ -2422,6 +2490,8 @@ def ask_god(player, prayer, context, config):
r = requests.post(f"{config['ollama_url']}/api/chat", json=msg_payload, timeout=60) r = requests.post(f"{config['ollama_url']}/api/chat", json=msg_payload, timeout=60)
r.raise_for_status() r.raise_for_status()
message = r.json()["message"]["content"].strip() message = r.json()["message"]["content"].strip()
# Strip think blocks
message = re.sub(r'<think>[\s\S]*?</think>\s*', '', message)
log.info(f"Message: {message[:200]}") log.info(f"Message: {message[:200]}")
except Exception as e: except Exception as e:
log.error(f"Message call failed: {e}") log.error(f"Message call failed: {e}")
@@ -3013,32 +3083,108 @@ def _repair_execute_tail(cmd: str, fallback_player: str) -> str:
return rebuilt return rebuilt
return raw return raw
# --- Validator hit-rate tracking ---
_validator_stats_lock = threading.Lock()
_validator_stats_file = "/var/log/mc_validator_stats.json"
def _load_validator_stats():
try:
with open(_validator_stats_file) as f:
return json.load(f)
except:
return {"total": 0, "fixes": {}}
def _save_validator_stats(stats):
try:
with open(_validator_stats_file, "w") as f:
json.dump(stats, f)
except:
pass
_validator_stats = _load_validator_stats()
def _track_fix(fix_name, before, after):
"""Track when a validator fix actually changes a command."""
if before == after:
return
with _validator_stats_lock:
_validator_stats["fixes"].setdefault(fix_name, 0)
_validator_stats["fixes"][fix_name] += 1
# Save every 50 commands
if _validator_stats["total"] % 50 == 0:
_save_validator_stats(_validator_stats)
def validate_command(cmd, online_players, fallback_player, config=None): def validate_command(cmd, online_players, fallback_player, config=None):
"""Replace placeholders, auto-fix common give syntax errors, check safe prefix.""" """Replace placeholders, auto-fix common give syntax errors, check safe prefix."""
with _validator_stats_lock:
_validator_stats["total"] += 1
resolved = cmd.replace("{player}", fallback_player).replace("{target}", fallback_player) resolved = cmd.replace("{player}", fallback_player).replace("{target}", fallback_player)
resolved = resolved.strip() resolved = resolved.strip()
if resolved.startswith("/"): if resolved.startswith("/"):
resolved = resolved[1:] resolved = resolved[1:]
# Fix 5: hallucinated commands (must run first — may drop or rewrite entire command) # Fix 5: hallucinated commands (must run first — may drop or rewrite entire command)
before = resolved
resolved = fix_hallucinated_command(resolved) resolved = fix_hallucinated_command(resolved)
_track_fix("hallucinated_command", before, resolved)
if not resolved: if not resolved:
return "", False return "", False
# Fix 1: @s → player name (RCON has no executor entity) # Fix 1: @s → player name (RCON has no executor entity)
before = resolved
resolved = fix_at_s_selector(resolved, fallback_player) resolved = fix_at_s_selector(resolved, fallback_player)
_track_fix("at_s_selector", before, resolved)
# Fix 2: old NBT enchantment → 1.21 component syntax # Fix 2: old NBT enchantment → 1.21 component syntax
before = resolved
resolved = fix_nbt_enchantment_syntax(resolved) resolved = fix_nbt_enchantment_syntax(resolved)
_track_fix("nbt_enchantment", before, resolved)
# Fix 3: strip invalid item components (display, durability, enc, etc.) # Fix 3: strip invalid item components (display, durability, enc, etc.)
before = resolved
resolved = fix_invalid_item_components(resolved) resolved = fix_invalid_item_components(resolved)
_track_fix("invalid_components", before, resolved)
# Fix 4: hallucinated effect names → closest valid effect # Fix 4: hallucinated effect names → closest valid effect
before = resolved
resolved = fix_hallucinated_effect(resolved) resolved = fix_hallucinated_effect(resolved)
_track_fix("hallucinated_effect", before, resolved)
# Existing fixes # Existing fixes
before = resolved
resolved = fix_give_command(resolved) resolved = fix_give_command(resolved)
_track_fix("give_syntax", before, resolved)
before = resolved
resolved = fix_effect_command(resolved) resolved = fix_effect_command(resolved)
_track_fix("effect_syntax", before, resolved)
before = resolved
resolved = fix_gamemode_command(resolved, fallback_player) resolved = fix_gamemode_command(resolved, fallback_player)
_track_fix("gamemode_syntax", before, resolved)
before = resolved
resolved = fix_weather_command(resolved) resolved = fix_weather_command(resolved)
_track_fix("weather_syntax", before, resolved)
before = resolved
resolved = fix_fill_fire_command(resolved) resolved = fix_fill_fire_command(resolved)
_track_fix("fill_fire", before, resolved)
before = resolved
resolved = fix_bow_enchant_syntax(resolved) resolved = fix_bow_enchant_syntax(resolved)
_track_fix("bow_enchant", before, resolved)
before = resolved
resolved = _repair_execute_tail(resolved, fallback_player) resolved = _repair_execute_tail(resolved, fallback_player)
_track_fix("execute_tail", before, resolved)
# Periodic save
with _validator_stats_lock:
if _validator_stats["total"] % 50 == 0:
_save_validator_stats(_validator_stats)
caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE] caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE]
prefixes = caps["safe_prefixes"] prefixes = caps["safe_prefixes"]
if not any(resolved.startswith(p) for p in prefixes): if not any(resolved.startswith(p) for p in prefixes):
@@ -4347,10 +4493,13 @@ def main():
player = m.group(1) player = m.group(1)
prompt = m.group(2).strip() prompt = m.group(2).strip()
log.info(f"SUDO from {player}: {prompt}") log.info(f"SUDO from {player}: {prompt}")
try: if config.get("async_processing", False):
process_sudo(player, prompt, config) _request_pool.submit(_safe_call, process_sudo, player, prompt, config)
except Exception as e: else:
log.error(f"Error processing sudo: {e}", exc_info=True) try:
process_sudo(player, prompt, config)
except Exception as e:
log.error(f"Error processing sudo: {e}", exc_info=True)
matched = True matched = True
break break
@@ -4365,10 +4514,13 @@ def main():
player = m.group(1) player = m.group(1)
prayer = m.group(2).strip() prayer = m.group(2).strip()
log.info(f"Prayer from {player}: {prayer}") log.info(f"Prayer from {player}: {prayer}")
try: if config.get("async_processing", False):
process_prayer(player, prayer, config, cooldowns) _request_pool.submit(_safe_call, process_prayer, player, prayer, config, cooldowns)
except Exception as e: else:
log.error(f"Error processing prayer: {e}", exc_info=True) try:
process_prayer(player, prayer, config, cooldowns)
except Exception as e:
log.error(f"Error processing prayer: {e}", exc_info=True)
matched = True matched = True
break break