Persistent RCON — single connection per server, auto-reconnect

Replaces socket-per-command pattern that crashed the dev server.
Connection pool keyed by host:port. Thread-safe with lock.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude Code
2026-03-20 18:25:05 -04:00
parent 618c98cc4e
commit f10e901fe0
+432 -24
View File
@@ -53,6 +53,10 @@ BUG_LOG_PATTERNS = [
re.compile(r'\[.*?\]: (?:\[Not Secure\] )?<(\w+)> [Bb]ug[_ ]?[Ll]og(?:\s+(.+))?\s*$'),
]
STATUS_PATTERNS = [
re.compile(r'\[.*?\]: (?:\[Not Secure\] )?<(\w+)> [Ss]tatus\s*$'),
]
JOIN_PATTERN = re.compile(r'\[.*?\]: (\w+) joined the game')
LEAVE_PATTERN = re.compile(r'\[.*?\]: (\w+) left the game')
@@ -1334,25 +1338,63 @@ def _infer_category(mode: str, user_message: str, commands_executed: list) -> st
# RCON
# ---------------------------------------------------------------------------
def rcon(cmd, host='127.0.0.1', port=25575, password='REDACTED_RCON'):
# --- Persistent RCON connection pool ---
_rcon_pool = {}
_rcon_pool_lock = threading.Lock()
class _PersistentRCON:
def __init__(self, host, port, password):
self.host, self.port, self.password = host, port, password
self._sock = None
self._lock = threading.Lock()
self._connected = False
self._req_id = 0
def _connect(self):
if self._sock:
try: self._sock.close()
except: pass
self._sock = socket.socket()
self._sock.settimeout(10)
self._sock.connect((self.host, self.port))
data = self.password.encode() + b'\x00\x00'
self._sock.sendall(struct.pack('<iii', len(data)+8, 1, 3) + data)
time.sleep(0.1)
self._sock.recv(4096)
self._connected = True
def command(self, cmd):
with self._lock:
for attempt in range(2):
try:
s = socket.socket()
s.settimeout(5)
s.connect((host, port))
def pkt(i, t, p):
p = p.encode() + b'\x00\x00'
return struct.pack('<iii', len(p) + 8, i, t) + p
s.sendall(pkt(1, 3, password))
time.sleep(0.2)
s.recv(4096)
s.sendall(pkt(2, 2, cmd))
time.sleep(0.2)
r = s.recv(4096)
s.close()
return r[12:-2].decode(errors='replace')
except Exception as e:
if not self._connected or not self._sock:
self._connect()
self._req_id += 1
data = cmd.encode() + b'\x00\x00'
self._sock.sendall(struct.pack('<iii', len(data)+8, self._req_id, 2) + data)
time.sleep(0.05)
raw = self._sock.recv(4096)
if len(raw) < 14:
self._connected = False
if attempt == 0: continue
return ''
return raw[12:-2].decode(errors='replace')
except (OSError, socket.timeout, ConnectionResetError, BrokenPipeError) as e:
self._connected = False
self._sock = None
if attempt == 0:
time.sleep(0.3)
continue
log.error(f"RCON error executing '{cmd}': {e}")
return ''
return ''
def rcon(cmd, host='127.0.0.1', port=25575, password='REDACTED_RCON'):
key = f"{host}:{port}"
with _rcon_pool_lock:
if key not in _rcon_pool:
_rcon_pool[key] = _PersistentRCON(host, port, password)
return _rcon_pool[key].command(cmd)
# ---------------------------------------------------------------------------
# Server context
@@ -1593,17 +1635,24 @@ SERVER_CAPABILITIES = {
"safe_prefixes": [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ',
'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ', 'gamemode ',
'gamerule ', 'particle ', 'playsound ', 'title ', 'scoreboard ',
'team ', 'bossbar ', 'locate ', 'difficulty ', 'spawnpoint ',
'clear ',
],
"sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder, gamemode",
"sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder, gamemode, gamerule, particle, playsound, title, scoreboard, team, bossbar, locate, difficulty, clear",
"template_build": False,
},
"paper": {
"safe_prefixes": [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ',
'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ',
'fill ', 'setblock ', 'clone ', 'gamemode ',
'fill ', 'setblock ', 'clone ', 'gamemode ', 'gamerule ',
'particle ', 'playsound ', 'title ', 'scoreboard ', 'team ',
'bossbar ', 'locate ', 'spreadplayers ', 'ride ', 'damage ',
'difficulty ', 'spawnpoint ', 'setworldspawn ', 'forceload ',
'attribute ', 'data ', 'seed', 'clear ',
],
"sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder, fill, setblock, clone, gamemode",
"sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder, fill, setblock, clone, gamemode, gamerule, particle, playsound, title, scoreboard, team, bossbar, locate, difficulty, clear",
"template_build": True,
},
}
@@ -2043,6 +2092,7 @@ def _llm_call(model: str, system: str, user: str, config: dict,
return r.json()["message"]["content"]
# Default: Ollama (prod servers use this path)
t0 = time.time()
payload = {
"model": model,
"messages": [
@@ -2059,8 +2109,24 @@ def _llm_call(model: str, system: str, user: str, config: dict,
payload["format"] = fmt
r = requests.post(f"{config['ollama_url']}/api/chat", json=payload, timeout=timeout)
r.raise_for_status()
content = r.json()["message"]["content"]
# Strip <think>...</think> blocks from Qwen3 models
data = r.json()
content = data["message"]["content"]
elapsed = time.time() - t0
# Stats
eval_count = data.get("eval_count", 0)
prompt_eval_count = data.get("prompt_eval_count", 0)
eval_duration = data.get("eval_duration", 0) / 1e9 if data.get("eval_duration") else 0
tokens_per_sec = eval_count / eval_duration if eval_duration > 0 else 0
log.info(f"LLM call: model={model} prompt_tokens={prompt_eval_count} output_tokens={eval_count} "
f"speed={tokens_per_sec:.1f}tok/s elapsed={elapsed:.1f}s temp={temperature}")
# Strip <think>...</think> blocks
think_match = re.search(r'<think>([\s\S]*?)</think>', content)
if think_match:
think_len = len(think_match.group(1))
log.info(f" Stripped {think_len} chars of thinking tokens")
content = re.sub(r'<think>[\s\S]*?</think>\s*', '', content)
return content
@@ -2128,6 +2194,9 @@ def _gemini_call(system: str, user: str, config: dict,
"gemini-2.5-flash": (0.15, 0.60),
"gemini-2.0-flash": (0.10, 0.40),
"gemini-2.5-pro": (1.25, 10.00),
"gemini-3-flash": (0.30, 2.50),
"gemini-3.1-flash-lite": (0.25, 1.50),
"gemini-3.1-pro": (2.00, 12.00),
}
in_rate, out_rate = pricing.get(model, (0.15, 0.60))
cost = (input_tokens / 1_000_000) * in_rate + (output_tokens / 1_000_000) * out_rate
@@ -2139,6 +2208,48 @@ def _gemini_call(system: str, user: str, config: dict,
curr_dollar = int(_gemini_total_cost)
if curr_dollar > prev_dollar:
log.info(f"Gemini cost milestone: ${_gemini_total_cost:.4f} / ${budget:.2f}")
try:
import socket as _sock
import subprocess as _sp
from escpos.printer import Dummy as _Dummy
_p = _Dummy(profile="default")
_cols = 57
_p.set(font='b', align='center', bold=True, height=2)
_p.text("MC AI TRAINING\n")
_p.set(font='b', align='center', bold=True, height=1)
_p.text("GEMINI STATUS\n")
_p.set(font='b', align='center', bold=False)
_p.text(time.strftime("%Y-%m-%d %H:%M") + "\n")
_p.text("=" * _cols + "\n")
_p.set(font='b', align='left', bold=True)
_p.text(f"GEMINI {model.upper()}\n")
_p.set(font='b', align='left', bold=True)
_p.text(f" Spent: ${_gemini_total_cost:.4f}\n")
_p.set(font='b', align='left', bold=False)
_p.text(f" Budget: ${budget:.2f}\n")
_p.text(f" Remaining: ${budget - _gemini_total_cost:.4f}\n")
_p.text("-" * _cols + "\n")
try:
def _wc(path):
try:
with open(path) as _f:
return sum(1 for _ in _f)
except: return 0
_dev = _wc("/var/log/mc_training_audit_dev.jsonl")
_p.set(font='b', align='left', bold=True)
_p.text("TRAINING DATA\n")
_p.set(font='b', align='left', bold=False)
_p.text(f" Dev audit: {_dev}\n")
_p.text("-" * _cols + "\n")
except: pass
_p.set(font='b', align='center', bold=False)
_p.text(f"${curr_dollar} Gemini milestone\n")
_p.text("=" * _cols + "\n")
_p.cut()
with _sock.create_connection(("192.168.0.137", 9100), timeout=5) as _s:
_s.sendall(_p.output)
except Exception as _pe:
log.warning(f"Gemini POS print failed: {_pe}")
return text
@@ -3205,6 +3316,74 @@ def validate_command(cmd, online_players, fallback_player, config=None):
with _validator_stats_lock:
if _validator_stats["total"] % 50 == 0:
_save_validator_stats(_validator_stats)
# Safety: tp fall death prevention
# For relative Y teleports (~ ~100 ~), check if the fall would be lethal
tp_abs = re.match(r'^tp\s+(\S+)\s+(-?\d+)\s+(-?\d+)\s+(-?\d+)', resolved)
tp_rel = re.match(r'^tp\s+(\S+)\s+~(-?\d*)\s+~(-?\d+)\s+~(-?\d*)', resolved)
if (tp_abs or tp_rel) and config:
try:
tp_player = (tp_abs or tp_rel).group(1)
if tp_rel:
rel_y = int(tp_rel.group(3)) if tp_rel.group(3) else 0
else:
rel_y = 0
# Only check upward teleports (positive Y offset) — those cause falls
if rel_y > 22 or (tp_abs and int(tp_abs.group(3)) > 200):
# Get player health
health_raw = rcon(f"data get entity {tp_player} Health",
config["rcon_host"], config["rcon_port"], config["rcon_password"])
health_m = re.search(r'(-?[\d.]+)f', health_raw or "")
health = float(health_m.group(1)) if health_m else 20.0
fall_distance = rel_y if tp_rel else 50 # estimate for absolute coords
fall_damage = max(0, fall_distance - 3)
lethal = fall_damage >= health
if lethal:
# Store fall info for logging/training
log.info(f"TP fall check: {tp_player} +{fall_distance}Y, {fall_damage} damage, {health} HP, lethal={lethal}")
_track_fix("tp_fall_detected", str(fall_distance), str(lethal))
# We allow the tp but the execute_response caller can add slow_falling
# by checking config._pending_fall_protection
config["_pending_fall_protection"] = {
"player": tp_player,
"fall_distance": fall_distance,
"lethal": lethal,
}
except Exception as e:
log.debug(f"Fall check failed: {e}")
# Safety: cap dangerous effect durations
dangerous_effect_caps = {
"levitation": 15,
"wither": 30,
"poison": 60,
"nausea": 30,
}
eff_m = re.match(r'^effect\s+give\s+\S+\s+minecraft:(\w+)\s+(\d+)', resolved)
if eff_m:
effect_name = eff_m.group(1)
duration = int(eff_m.group(2))
cap = dangerous_effect_caps.get(effect_name)
if cap and duration > cap:
old = resolved
resolved = resolved.replace(f" {duration}", f" {cap}", 1)
log.warning(f"Capped dangerous effect duration: {effect_name} {duration}s -> {cap}s")
_track_fix("dangerous_effect_cap", old, resolved)
# Safety: block kill @a (kills all players) — only allow kill @e (entities)
if re.match(r'^kill\s+@a\b', resolved):
log.warning(f"Command blocked (kill @a targets players): {resolved}")
_track_fix("blocked_kill_all_players", resolved, "")
return resolved, False
# Safety: fix tp to invalid targets like "tp player minecraft:spawn"
tp_spawn = re.match(r'^tp\s+(\S+)\s+minecraft:spawn\b', resolved)
if tp_spawn:
resolved = f"tp {tp_spawn.group(1)} 0 64 0"
_track_fix("tp_spawn_fix", tp_spawn.group(0), resolved)
caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE]
prefixes = caps["safe_prefixes"]
if not any(resolved.startswith(p) for p in prefixes):
@@ -3538,7 +3717,175 @@ def _attempt_error_correction(failed_cmd: str, error_msg: str, config: dict) ->
return ""
def execute_response(response, context, config, praying_player=None):
def send_status(player, config):
"""Send server AI status to a player."""
model = config.get("model", "unknown")
command_model = config.get("command_model", model)
provider = config.get("llm_provider", "ollama")
gateway = config.get("use_langgraph_gateway", False)
single_call = config.get("single_call", False)
error_corr = config.get("error_correction", True)
async_proc = config.get("async_processing", False)
mode_parts = []
if provider == "anthropic":
mode_parts.append("API cascade (Haiku->Gemini->local)")
elif provider == "ollama":
mode_parts.append("Local Ollama")
if gateway:
mode_parts.append("LangGraph")
if single_call:
mode_parts.append("single-call")
else:
mode_parts.append("two-call")
if error_corr:
mode_parts.append("error-correction")
if async_proc:
mode_parts.append("async")
lines = [
("=== MORTDECAI STATUS ===", "gold", True),
(f"Model: {command_model}", "aqua", False),
(f"Mode: {', '.join(mode_parts)}", "aqua", False),
]
# Validator stats
try:
with _validator_stats_lock:
total = _validator_stats.get("total", 0)
fixes = _validator_stats.get("fixes", {})
if total > 0:
fix_count = sum(fixes.values())
pct = fix_count / total * 100 if total > 0 else 0
lines.append((f"Validator: {total} cmds, {fix_count} fixes ({pct:.1f}%)", "gray", False))
except:
pass
lines.append(("Learn more: mortdec.ai", "yellow", False))
lines.append(("========================", "gold", True))
for text, color, bold in lines:
safe = text.replace("\\", "\\\\").replace('"', '\\"')
bold_str = "true" if bold else "false"
rcon(
f'tellraw {player} {{"text":"{safe}","color":"{color}","bold":{bold_str}}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
time.sleep(0.1)
# --- Gamerule revert timer ---
# When a gamerule is changed, schedule automatic revert unless admin says "permanently"
_GAMERULE_DEFAULT_REVERT = {
# gamerule: (default_value, revert_seconds)
"doMobSpawning": ("true", 300),
"mobGriefing": ("true", 300),
"doDaylightCycle": ("true", 600),
"doWeatherCycle": ("true", 600),
"doFireTick": ("true", 300),
"doInsomnia": ("true", 600),
"naturalRegeneration": ("true", 600),
"keepInventory": ("false", None), # None = don't auto-revert (player expects this to persist)
"pvp": ("true", 300),
"doImmediateRespawn": ("false", 300),
"tntExplodes": ("true", 300),
"fallDamage": ("true", 300),
"fireDamage": ("true", 300),
"drowningDamage": ("true", 300),
"freezeDamage": ("true", 300),
}
_revert_timers = {} # gamerule_name -> threading.Timer
_revert_lock = threading.Lock()
def _schedule_gamerule_revert(gamerule_name, default_value, seconds, config):
"""Schedule a gamerule to revert to its default value after N seconds."""
with _revert_lock:
# Cancel existing timer for this gamerule
existing = _revert_timers.get(gamerule_name)
if existing:
existing.cancel()
def _do_revert():
log.info(f"Reverting gamerule {gamerule_name} -> {default_value} (timer expired)")
rcon(
f"gamerule {gamerule_name} {default_value}",
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
rcon(
f'tellraw @a {{"text":"[MORTDECAI] Gamerule {gamerule_name} reverted to {default_value}","color":"gray","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
with _revert_lock:
_revert_timers.pop(gamerule_name, None)
timer = threading.Timer(seconds, _do_revert)
timer.daemon = True
timer.start()
_revert_timers[gamerule_name] = timer
log.info(f"Scheduled revert: gamerule {gamerule_name} -> {default_value} in {seconds}s")
# Notify players
mins = seconds // 60
rcon(
f'tellraw @a {{"text":"[MORTDECAI] {gamerule_name} will revert in {mins} minutes","color":"gray","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
def _check_gamerule_revert(cmd, user_message, config):
"""After executing a gamerule command, check if we should schedule a revert."""
m = re.match(r'^gamerule\s+(\w+)\s+(\w+)', cmd)
if not m:
return
rule_name = m.group(1)
new_value = m.group(2).lower()
revert_info = _GAMERULE_DEFAULT_REVERT.get(rule_name)
if not revert_info:
return
default_value, default_seconds = revert_info
if default_seconds is None:
return # Don't auto-revert this gamerule
# Don't revert if setting back to default
if new_value == default_value.lower():
# Cancel any pending revert
with _revert_lock:
existing = _revert_timers.get(rule_name)
if existing:
existing.cancel()
_revert_timers.pop(rule_name, None)
log.info(f"Cancelled revert for {rule_name} (set back to default)")
return
# Check if user said "permanently" or "forever"
msg_lower = (user_message or "").lower()
if any(w in msg_lower for w in ("permanent", "forever", "always", "never revert")):
log.info(f"Skipping revert for {rule_name} — user requested permanent")
return
# Check if user specified a duration
time_match = re.search(r'(\d+)\s*(min|minute|m|hour|h|sec|second|s)\b', msg_lower)
if time_match:
amount = int(time_match.group(1))
unit = time_match.group(2)
if unit.startswith("h"):
seconds = amount * 3600
elif unit.startswith("m"):
seconds = amount * 60
else:
seconds = amount
else:
seconds = default_seconds
_schedule_gamerule_revert(rule_name, default_value, seconds, config)
def execute_response(response, context, config, praying_player=None, user_message=None):
message = response.get("message") or ""
commands = response.get("commands") or []
@@ -3626,6 +3973,18 @@ def execute_response(response, context, config, praying_player=None):
log.info(f"RCON result: {sresult!r}")
time.sleep(0.15)
# Fall protection: if tp would be lethal and user didn't intend to drop
fall_info = config.pop("_pending_fall_protection", None)
if fall_info and fall_info.get("lethal"):
msg_lower = (user_message or "").lower()
drop_intent = any(w in msg_lower for w in ("drop", "fall", "kill me", "smite", "launch", "yeet", "throw", "high up"))
if not drop_intent:
log.info(f"Adding slow_falling to prevent lethal fall ({fall_info['fall_distance']} blocks)")
rcon(
f"effect give {fall_info['player']} minecraft:slow_falling 30 0",
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
log.info(f"Executing RCON: {resolved}")
result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"])
log.info(f"RCON result: {result!r}")
@@ -3647,6 +4006,11 @@ def execute_response(response, context, config, praying_player=None):
if "thunder" in resolved: config["_weather_state"] = "thunderstorm"
elif "rain" in resolved: config["_weather_state"] = "rain"
elif "clear" in resolved: config["_weather_state"] = "clear"
# Schedule gamerule revert if applicable
if resolved.startswith("gamerule "):
_check_gamerule_revert(resolved, user_message, config)
time.sleep(0.3)
@@ -3777,6 +4141,7 @@ def process_first_login_benevolence(player, config):
log.info(f"First-login benevolence executed for {player}: {commands[:max_cmds]}")
def process_sudo(player, prompt, config):
_pipeline_start = time.time()
"""
sudo translator mode:
- no God persona
@@ -4090,7 +4455,7 @@ def process_sudo(player, prompt, config):
except Exception as e:
log.warning(f"SUDO template planner failed: {e}")
max_cmds = int(config.get("sudo_max_commands", 3))
max_cmds = int(config.get("sudo_max_commands", 8))
low_prompt = prompt.lower().strip()
if any(low_prompt.startswith(x) for x in ("build ", "make ", "create ")):
max_cmds = max(max_cmds, int(config.get("sudo_build_max_commands", 6)))
@@ -4142,10 +4507,28 @@ def process_sudo(player, prompt, config):
continue
log.info(f"SUDO execute: {resolved}")
_sudo_trace(player, f"[SUDO TRY] {resolved}", config)
# Show command in chat
ollama_url = config.get("ollama_url", "")
gpu_label = "RTX4000" if "179" in ollama_url else "3090Ti" if "11434" in ollama_url else "2080Ti"
safe_cmd = resolved[:80].replace("\\", "\\\\").replace('"', '\\"')
rcon(
f'tellraw {player} {{"text":"[MORTDECAI:{gpu_label}] {safe_cmd}","color":"dark_aqua","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"])
log.info(f"SUDO result: {result!r}")
_sudo_trace(player, f"[SUDO RES] {str(result or '')[:180]}", config)
# Show result in chat
safe_result = (result or "OK")[:60].replace("\\", "\\\\").replace('"', '\\"')
result_color = "green" if result and "Gave" in result or "Set" in result or "Applied" in result or "Summoned" in result else "gray"
rcon(
f'tellraw {player} {{"text":" > {safe_result}","color":"{result_color}","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
# Error correction for sudo
if config.get("error_correction", True) and result and _is_rcon_error(result):
log.info(f"SUDO error detected, attempting correction: {result}")
@@ -4159,6 +4542,10 @@ def process_sudo(player, prompt, config):
log.info(f"SUDO retry result: {result!r}")
resolved = corrected_resolved
# Schedule gamerule revert if applicable
if resolved.startswith("gamerule "):
_check_gamerule_revert(resolved, prompt, config)
executed.append(resolved)
results_seen.append((resolved, str(result or "")))
time.sleep(0.2)
@@ -4236,6 +4623,16 @@ def process_sudo(player, prompt, config):
add_sudo_history(player, prompt, commands, executed)
# Pipeline summary
_pipeline_elapsed = time.time() - _pipeline_start
n_generated = len(commands)
n_executed = len(executed)
n_blocked = n_generated - n_executed
effective = sum(1 for _, res in results_seen if _sudo_result_is_effective(res))
log.info(f"SUDO PIPELINE: player={player} prompt='{prompt[:50]}' "
f"generated={n_generated} executed={n_executed} blocked={n_blocked} "
f"effective={effective}/{n_executed} total={_pipeline_elapsed:.1f}s")
# Training audit: log the full sudo interaction
write_training_audit(
player=player,
@@ -4449,7 +4846,7 @@ def process_prayer(player, prayer, config, cooldowns):
log.error(f"LLM error: {e}")
return
execute_response(response, context, config, praying_player=player)
execute_response(response, context, config, praying_player=player, user_message=prayer)
# Store in prayer memory so God remembers this exchange
god_msg = response.get("message") or ""
@@ -4645,6 +5042,17 @@ def main():
log.error(f"Error sending bible to {player}: {e}", exc_info=True)
break
# /status
for pat in STATUS_PATTERNS:
m = pat.search(line)
if m:
player = m.group(1)
try:
send_status(player, config)
except Exception as e:
log.error(f"Error sending status to {player}: {e}", exc_info=True)
break
# login notice
m = JOIN_PATTERN.search(line)
if m: