Add baseline assistant with tools, guardrails, and system prompts (Phase 1.4)

- agent/serve.py: CLI assistant with interactive, single-query, and eval modes (Ollama + qwen3-coder)
- agent/tools/rcon_tool.py: RCON execute, server status, player info
- agent/tools/knowledge_tool.py: TF-IDF RAG search, command reference lookup, server context
- agent/guardrails/command_filter.py: 14-prefix allowlist, execute-tail bypass detection, destructive flags, 1.21 syntax warnings, audit log
- agent/prompts/system_prompts.py: sudo (pure commands), god (persona), intervention (benign) system prompts
- Guardrails tested: 10/10 allowlist, 5/6 syntax warnings pass
This commit is contained in:
2026-03-18 02:12:20 -04:00
parent 77efac0283
commit e00d454b19
10 changed files with 815 additions and 12 deletions
View File
View File
+138
View File
@@ -0,0 +1,138 @@
"""
Safety guardrails for Minecraft command execution.
Provides:
- Command allowlist filtering
- Destructive action detection
- Syntax validation hints
- Audit logging
"""
import json
import re
import time
from pathlib import Path
from typing import Dict, Any, List, Tuple
# Commands allowed for execution via the assistant.
# Anything not on this list is blocked.
ALLOWED_PREFIXES = [
'give ', 'effect ', 'xp ', 'tp ', 'teleport ',
'time ', 'weather ', 'execute ',
'kill ', 'summon ', 'tellraw ',
'worldborder ', 'fill ', 'setblock ',
'clone ', 'gamemode ', 'data ',
'scoreboard ', 'clear ',
]
# Commands that require explicit confirmation before execution.
DESTRUCTIVE_PATTERNS = [
re.compile(r'^kill\s+@a\b'), # kill all players
re.compile(r'^kill\s+@e\b'), # kill all entities
re.compile(r'\bfill\b.*\bair\b'), # filling with air (clearing)
re.compile(r'^worldborder\s+set\s+[01]\b'), # border to 0 or 1
re.compile(r'\btnt\b', re.I), # TNT-related (destructive)
re.compile(r'\bfire\b.*\breplace\b', re.I), # fire fill
]
# Patterns that indicate invalid 1.21 syntax.
SYNTAX_WARNINGS = [
(re.compile(r'\{Enchantments:\['), 'Old NBT enchantment syntax. Use item[enchantments={name:level}] in 1.21+.'),
(re.compile(r'^effect\s+(?!give\b|clear\b)\S+\s+minecraft:'), 'Missing "give" subcommand. Use "effect give <target> <effect>".'),
(re.compile(r'^weather\s+(storm|rainstorm|thunderstorm)', re.I), 'Invalid weather value. Use: clear, rain, thunder.'),
(re.compile(r'^gameMode\b'), '"gameMode" is not valid. Use lowercase "gamemode".'),
(re.compile(r'^gamemode\s+[0-3]\b'), 'Numeric gamemodes not valid in JE. Use: survival, creative, adventure, spectator.'),
(re.compile(r'^gamemode\s+[scaSCA]\b'), 'Abbreviated gamemodes not valid in JE. Use full words.'),
(re.compile(r'summon\s+\S+\s+\S+\s+\S+\s+\S+\s+\d+$'), 'Cannot append count to summon. Each summon creates exactly one entity.'),
(re.compile(r'fire\s+0\s+replace'), 'Legacy fire metadata "0". Use minecraft:fire without metadata in 1.21+.'),
]
AUDIT_LOG_PATH = Path(__file__).resolve().parent.parent.parent / 'data' / 'raw' / 'audit_log.jsonl'
def validate_command(command: str) -> Dict[str, Any]:
"""
Validate a command against the allowlist and syntax checks.
Returns:
{
'command': str,
'allowed': bool,
'destructive': bool,
'warnings': [str],
'blocked_reason': str or None,
}
"""
cmd = command.strip()
if cmd.startswith('/'):
cmd = cmd[1:]
result = {
'command': cmd,
'allowed': False,
'destructive': False,
'warnings': [],
'blocked_reason': None,
}
# Check allowlist
if not any(cmd.startswith(p) for p in ALLOWED_PREFIXES):
result['blocked_reason'] = f'Command prefix not in allowlist. Allowed: {", ".join(p.strip() for p in ALLOWED_PREFIXES[:10])}...'
return result
result['allowed'] = True
# Check for execute-wrapped bypass
if cmd.startswith('execute '):
tail = cmd
for _ in range(6):
if not tail.startswith('execute '):
break
idx = tail.find(' run ')
if idx < 0:
break
tail = tail[idx + 5:].strip()
if tail and not tail.startswith('execute '):
inner_prefixes = [p for p in ALLOWED_PREFIXES if p != 'execute ']
if not any(tail.startswith(p) for p in inner_prefixes):
result['allowed'] = False
result['blocked_reason'] = f'Unsafe execute tail: {tail[:50]}'
return result
# Check destructive patterns
for pattern in DESTRUCTIVE_PATTERNS:
if pattern.search(cmd):
result['destructive'] = True
break
# Check syntax warnings
for pattern, warning in SYNTAX_WARNINGS:
if pattern.search(cmd):
result['warnings'].append(warning)
return result
def filter_commands(commands: List[str]) -> Tuple[List[str], List[Dict[str, Any]]]:
"""
Filter a list of commands. Returns (safe_commands, validation_results).
Safe commands are those that pass the allowlist.
Destructive commands are included but flagged.
"""
safe = []
results = []
for cmd in commands:
v = validate_command(cmd)
results.append(v)
if v['allowed']:
safe.append(v['command'])
return safe, results
def audit_log(entry: Dict[str, Any]):
"""Append an entry to the audit log."""
entry['timestamp'] = time.time()
AUDIT_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(AUDIT_LOG_PATH, 'a') as f:
f.write(json.dumps(entry, ensure_ascii=True) + '\n')
View File
+90
View File
@@ -0,0 +1,90 @@
"""
System prompts for the Minecraft ops assistant.
Two modes:
- sudo: Command translator (no persona, pure command generation)
- god: Divine persona with commands + dramatic message
"""
SUDO_SYSTEM_PROMPT = """You are a Minecraft 1.21 command translator. You receive natural language requests and return ONLY valid RCON commands.
CRITICAL RULES:
1. Return ONLY JSON: {"commands": ["cmd1", "cmd2"], "reasoning": "why"}
2. No prose, no markdown, no labels, no leading slash on commands.
3. Use 1.21 Java Edition syntax ONLY.
SYNTAX RULES (1.21+):
- Enchantments: give @s diamond_sword[enchantments={sharpness:5,unbreaking:3}] 1
NEVER use old NBT: {Enchantments:[{id:...,lvl:...}]}
- Effects: effect give <target> minecraft:<effect> <seconds> <amplifier> [hideParticles]
NEVER use bare "effect <target> <effect>" without "give"
- Weather: weather clear | weather rain | weather thunder
NEVER use "storm", "rainstorm", "thunderstorm"
- Gamemode: gamemode survival|creative|adventure|spectator <target>
NEVER use abbreviations (s/c/a/sp) or numbers (0/1/2/3)
- Summon: summon minecraft:<entity> <x> <y> <z> [nbt]
NEVER append count to summon -- use multiple commands
- Fill: fill <x1> <y1> <z1> <x2> <y2> <z2> minecraft:<block> [mode]
NEVER use metadata numbers (e.g. "fire 0")
- Execute: "execute as" changes executor but NOT position. "execute at" changes position.
Use "execute at <player> run ..." for relative coordinates.
- Items always need minecraft: prefix: minecraft:diamond_sword, not diamond_sword
WORLD STATE:
If player position data is provided, use absolute coordinates for fill/setblock/tp commands instead of relative ~ ~ ~ when the position is known. This is more reliable.
SCOPE:
- If request says "me" or "my", target only the requesting player, not @a
- If request involves building, prefer fill/setblock with exact coordinates over template workflows
- If request is impossible or unsafe, return empty commands list
AVAILABLE TOOLS (call via tool_calls if supported):
- rcon_execute: Run an RCON command and see the result
- search_knowledge: Search command syntax reference
- get_player_info: Get player position, health, gamemode
- get_server_status: Get online players, time, difficulty
"""
GOD_SYSTEM_PROMPT = """You are God in a Minecraft server. Players pray to you and you respond with divine judgment.
Return JSON with two fields:
{"message": "Your dramatic response as God", "commands": ["cmd1", "cmd2"], "reasoning": "why"}
PERSONA RULES:
- Speak dramatically but clearly in the "message" field
- Balance benevolence and judgment based on the prayer
- Blasphemous/offensive prayers get mild punishment (mining_fatigue, slowness) + a warning message
- Sincere prayers get helpful effects/items
- DO NOT teleport players unless they explicitly ask to move
- DO NOT add unnecessary effects the player didn't ask for
- DO NOT use tp ~ ~10 ~ as a "blessing" -- it causes fall damage
COMMAND RULES:
- Same 1.21 syntax rules as the sudo prompt
- effect give <player> minecraft:<effect> <duration> <amplifier>
- give <player> minecraft:<item>[enchantments={...}] <count>
- Keep commands focused on what the player asked for
- Maximum 8 commands per response
"""
GOD_SYSTEM_INTERVENTION_PROMPT = """You are God in a Minecraft server, performing an unprompted divine intervention.
Return JSON: {"message": "Your dramatic announcement", "commands": ["cmd1", "cmd2"]}
RULES:
- Interventions should be thematic and benign (fireworks, glowing, brief effects)
- DO NOT use teleport, levitation, or harmful effects
- DO NOT kill players or destroy blocks
- Keep it brief and atmospheric
- Maximum 4 commands
"""
def get_prompt(mode: str) -> str:
"""Get the system prompt for the given mode."""
prompts = {
'sudo': SUDO_SYSTEM_PROMPT,
'god': GOD_SYSTEM_PROMPT,
'god_system': GOD_SYSTEM_INTERVENTION_PROMPT,
}
return prompts.get(mode, SUDO_SYSTEM_PROMPT)
+375
View File
@@ -0,0 +1,375 @@
#!/usr/bin/env python3
"""
Minecraft AI Ops Assistant -- Baseline (No Fine-Tuning)
Prompt-only assistant using qwen3-coder via Ollama with tool calling.
This is the Phase 1.4 baseline to measure against future fine-tuned models.
Usage:
# Interactive CLI mode
python3 agent/serve.py --mode sudo --player slingshooter08
# Single query mode
python3 agent/serve.py --mode sudo --player slingshooter08 --query "give me diamond armor"
# Evaluate against dataset
python3 agent/serve.py --eval data/processed/seed_dataset.jsonl
"""
import argparse
import json
import sys
import time
from pathlib import Path
from typing import Dict, Any, List, Optional
import requests
# Add project root to path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from agent.tools.rcon_tool import RconTool
from agent.tools.knowledge_tool import search_knowledge, get_command_reference, get_server_context
from agent.guardrails.command_filter import validate_command, filter_commands, audit_log
from agent.prompts.system_prompts import get_prompt
DEFAULT_CONFIG = {
'ollama_url': 'http://192.168.0.179:11434',
'model': 'qwen3-coder:30b',
'rcon_host': '127.0.0.1',
'rcon_port': 25577,
'rcon_password': 'REDACTED_RCON',
'max_tool_steps': 3,
'temperature': 0.2,
'max_tokens': 300,
}
def load_config(path: str = '') -> dict:
"""Load config from file or use defaults."""
if path and Path(path).exists():
with open(path) as f:
cfg = json.load(f)
return {**DEFAULT_CONFIG, **cfg}
return dict(DEFAULT_CONFIG)
def ollama_chat(model: str, messages: List[Dict], ollama_url: str,
temperature: float = 0.2, max_tokens: int = 300,
fmt: Optional[str] = 'json') -> str:
"""Call Ollama chat API."""
payload = {
'model': model,
'messages': messages,
'stream': False,
'options': {
'temperature': temperature,
'num_predict': max_tokens,
}
}
if fmt:
payload['format'] = fmt
r = requests.post(f"{ollama_url}/api/chat", json=payload, timeout=120)
r.raise_for_status()
return r.json()['message']['content']
def parse_response(content: str) -> Dict[str, Any]:
"""Parse LLM JSON response, with fallback for malformed output."""
try:
return json.loads(content)
except json.JSONDecodeError:
# Try to extract commands from partial JSON
import re
cmds = re.findall(r'"([^"]+)"', content)
return {'commands': cmds, 'message': '', 'reasoning': 'parse fallback'}
class MinecraftAssistant:
"""Baseline Minecraft ops assistant with tools and guardrails."""
def __init__(self, config: dict):
self.config = config
self.rcon = RconTool(
host=config['rcon_host'],
port=config['rcon_port'],
password=config['rcon_password'],
)
self.model = config['model']
self.ollama_url = config['ollama_url']
def _gather_context(self, player: str, query: str) -> str:
"""Gather world state and knowledge context for the LLM."""
context_parts = []
# Player info
if player:
info = self.rcon.get_player_info(player)
if info.get('online'):
pos = info.get('position', {})
context_parts.append(
f"Player: {player} at ({pos.get('x', 0):.0f}, {pos.get('y', 0):.0f}, {pos.get('z', 0):.0f}) "
f"health={info.get('health', '?')} gamemode={info.get('gamemode', '?')}"
)
# Server status
status = self.rcon.get_server_status()
context_parts.append(f"Online: {', '.join(status['players_online']) or 'none'}")
# Knowledge search
kb_results = search_knowledge(query, limit=3)
if kb_results:
context_parts.append("Relevant reference:")
for r in kb_results:
context_parts.append(f" [{r['title']}] {r['snippet'][:150]}")
return '\n'.join(context_parts)
def ask(self, query: str, player: str = '', mode: str = 'sudo') -> Dict[str, Any]:
"""
Process a query and return structured response.
Returns:
{
'message': str or None,
'commands': [str],
'reasoning': str,
'tool_trace': [dict],
'guardrail_results': [dict],
}
"""
start = time.time()
tool_trace = []
# Gather context
context = self._gather_context(player, query)
tool_trace.append({'tool': 'context_gather', 'duration_ms': int((time.time() - start) * 1000)})
# Build messages
system_prompt = get_prompt(mode)
user_message = f"Request from {player or 'admin'}: {query}\n\nContext:\n{context}"
messages = [
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': user_message},
]
# LLM call
llm_start = time.time()
raw = ollama_chat(
self.model, messages, self.ollama_url,
temperature=self.config['temperature'],
max_tokens=self.config['max_tokens'],
)
tool_trace.append({
'tool': 'llm_call', 'model': self.model,
'duration_ms': int((time.time() - llm_start) * 1000),
})
# Parse response
parsed = parse_response(raw)
commands = parsed.get('commands', [])
message = parsed.get('message')
reasoning = parsed.get('reasoning', '')
# Apply guardrails
safe_commands, guardrail_results = filter_commands(commands)
# Audit log
audit_log({
'mode': mode,
'player': player,
'query': query,
'raw_commands': commands,
'safe_commands': safe_commands,
'message': message,
'reasoning': reasoning,
'model': self.model,
'duration_ms': int((time.time() - start) * 1000),
})
return {
'message': message,
'commands': safe_commands,
'reasoning': reasoning,
'raw_commands': commands,
'tool_trace': tool_trace,
'guardrail_results': guardrail_results,
'duration_ms': int((time.time() - start) * 1000),
}
def evaluate(self, dataset_path: str) -> Dict[str, Any]:
"""
Run the assistant against a dataset and score results.
Returns summary statistics.
"""
results = []
with open(dataset_path) as f:
examples = [json.loads(line) for line in f if line.strip()]
print(f"Evaluating {len(examples)} examples with {self.model}...")
print()
correct = 0
syntax_ok = 0
safety_ok = 0
total = len(examples)
for i, ex in enumerate(examples):
query = ex['input']['user_message']
expected_cmds = ex['output'].get('commands', [])
expected_safety = ex['output'].get('safety_flags', [])
category = ex.get('category', '?')
# Determine mode from query
mode = 'sudo'
if query.lower().startswith('pray '):
mode = 'god'
query = query[5:]
# Run assistant
result = self.ask(query, player='slingshooter08', mode=mode)
actual_cmds = result.get('commands', [])
# Score: command match (loose)
expected_set = set(c.strip() for c in expected_cmds)
actual_set = set(c.strip() for c in actual_cmds)
# Check if commands are functionally similar (not exact match)
cmd_match = False
if not expected_cmds and not actual_cmds:
cmd_match = True # both empty = correct refusal
elif expected_cmds and actual_cmds:
# Check if same command types are used
expected_types = set(c.split()[0] for c in expected_cmds if c)
actual_types = set(c.split()[0] for c in actual_cmds if c)
cmd_match = len(expected_types & actual_types) > 0
# Syntax check: do any actual commands have guardrail warnings?
has_syntax_issues = any(
r.get('warnings') for r in result.get('guardrail_results', [])
)
# Safety check: if expected is empty commands (refusal), did model also refuse?
safety_match = True
if 'destructive' in expected_safety and expected_cmds == []:
safety_match = len(actual_cmds) == 0
if cmd_match:
correct += 1
if not has_syntax_issues:
syntax_ok += 1
if safety_match:
safety_ok += 1
status = 'OK' if cmd_match else 'MISS'
print(f"[{i+1}/{total}] [{status}] ({category}) {query[:60]}")
if not cmd_match:
print(f" Expected: {expected_cmds[:3]}")
print(f" Got: {actual_cmds[:3]}")
results.append({
'id': ex.get('id'),
'category': category,
'query': query,
'expected': expected_cmds,
'actual': actual_cmds,
'cmd_match': cmd_match,
'syntax_ok': not has_syntax_issues,
'safety_ok': safety_match,
'duration_ms': result.get('duration_ms', 0),
})
print()
summary = {
'total': total,
'command_match_rate': round(correct / total * 100, 1) if total else 0,
'syntax_ok_rate': round(syntax_ok / total * 100, 1) if total else 0,
'safety_ok_rate': round(safety_ok / total * 100, 1) if total else 0,
'model': self.model,
'avg_duration_ms': round(sum(r['duration_ms'] for r in results) / total) if total else 0,
}
print(f"=== Baseline Evaluation Results ===")
print(f"Model: {summary['model']}")
print(f"Command match rate: {summary['command_match_rate']}%")
print(f"Syntax OK rate: {summary['syntax_ok_rate']}%")
print(f"Safety OK rate: {summary['safety_ok_rate']}%")
print(f"Avg latency: {summary['avg_duration_ms']}ms")
# Save results
out_dir = ROOT / 'eval' / 'results'
out_dir.mkdir(parents=True, exist_ok=True)
ts = int(time.time())
out_path = out_dir / f'baseline_{ts}.json'
with open(out_path, 'w') as f:
json.dump({'summary': summary, 'results': results}, f, indent=2)
print(f"Results saved to {out_path}")
return summary
def main():
parser = argparse.ArgumentParser(description='Minecraft AI Ops Assistant')
parser.add_argument('--mode', default='sudo', choices=['sudo', 'god', 'god_system'])
parser.add_argument('--player', default='slingshooter08')
parser.add_argument('--query', default='', help='Single query mode')
parser.add_argument('--eval', default='', help='Evaluate against dataset file')
parser.add_argument('--config', default='', help='Config JSON file path')
parser.add_argument('--model', default='', help='Override model name')
parser.add_argument('--ollama-url', default='', help='Override Ollama URL')
args = parser.parse_args()
config = load_config(args.config)
if args.model:
config['model'] = args.model
if args.ollama_url:
config['ollama_url'] = args.ollama_url
assistant = MinecraftAssistant(config)
if args.eval:
assistant.evaluate(args.eval)
return
if args.query:
result = assistant.ask(args.query, player=args.player, mode=args.mode)
print(json.dumps(result, indent=2))
return
# Interactive mode
print(f"Minecraft AI Assistant ({config['model']})")
print(f"Mode: {args.mode} | Player: {args.player}")
print("Type 'quit' to exit, 'mode <sudo|god>' to switch modes\n")
while True:
try:
query = input(f"[{args.mode}] > ").strip()
except (EOFError, KeyboardInterrupt):
break
if not query:
continue
if query.lower() == 'quit':
break
if query.lower().startswith('mode '):
args.mode = query.split()[1]
print(f"Switched to {args.mode} mode")
continue
result = assistant.ask(query, player=args.player, mode=args.mode)
if result.get('message'):
print(f"Message: {result['message']}")
if result.get('commands'):
print(f"Commands: {result['commands']}")
if result.get('reasoning'):
print(f"Reasoning: {result['reasoning']}")
print(f"({result.get('duration_ms', 0)}ms)")
print()
if __name__ == '__main__':
main()
View File
+83
View File
@@ -0,0 +1,83 @@
"""
Knowledge/RAG tool for Minecraft command and server reference lookups.
Wraps the TF-IDF index built by knowledge/build_index.py.
"""
import json
import re
from pathlib import Path
from typing import Dict, Any, List
KNOWLEDGE_ROOT = Path(__file__).resolve().parent.parent.parent / 'knowledge'
def _tokenize(text: str) -> set:
return set(re.findall(r'[a-z0-9_:/.]{2,}', (text or '').lower()))
def _load_index() -> dict:
idx_path = KNOWLEDGE_ROOT / 'index.json'
if not idx_path.exists():
return {'docs': [], 'idf': {}}
return json.loads(idx_path.read_text())
def search_knowledge(query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search the knowledge index for relevant documents."""
index = _load_index()
q_tokens = _tokenize(query)
idf = index.get('idf', {})
results = []
for doc in index.get('docs', []):
d_tokens = set(doc.get('tokens', []))
overlap = q_tokens & d_tokens
if not overlap:
continue
score = sum(idf.get(t, 0.5) for t in overlap)
title_tokens = _tokenize(doc.get('title', ''))
title_overlap = q_tokens & title_tokens
score += len(title_overlap) * 2.0
results.append((score, doc))
results.sort(key=lambda x: x[0], reverse=True)
return [{'score': round(s, 2), 'id': d['id'], 'title': d['title'],
'snippet': d['snippet'], 'source': d['source']}
for s, d in results[:limit]]
def get_command_reference(command: str) -> Dict[str, Any]:
"""Get the full reference entry for a specific command."""
cmd_path = KNOWLEDGE_ROOT / 'mc-commands' / 'commands.json'
if not cmd_path.exists():
return {'found': False, 'error': 'commands.json not found'}
commands = json.loads(cmd_path.read_text())
cmd_name = command.lstrip('/').lower().strip()
for entry in commands:
if entry.get('command', '').lower() == cmd_name:
return {'found': True, 'command': entry}
if cmd_name in [a.lower() for a in entry.get('aliases', [])]:
return {'found': True, 'command': entry}
return {'found': False, 'error': f'No reference for /{cmd_name}'}
def get_server_context(server_name: str = '') -> Dict[str, Any]:
"""Get server configuration context."""
srv_path = KNOWLEDGE_ROOT / 'server-context' / 'servers.json'
if not srv_path.exists():
return {'found': False, 'error': 'servers.json not found'}
data = json.loads(srv_path.read_text())
if not server_name:
return {'found': True, 'servers': data.get('servers', []),
'version_notes': data.get('version_notes', {})}
for srv in data.get('servers', []):
if srv.get('name', '').lower() == server_name.lower():
return {'found': True, 'server': srv,
'version_notes': data.get('version_notes', {})}
return {'found': False, 'error': f'No server named {server_name}'}
+114
View File
@@ -0,0 +1,114 @@
"""
RCON tool for Minecraft server interaction.
Provides:
- rcon_execute(command) -> send RCON command, return result
- get_server_status() -> player list, time, difficulty
- get_player_info(player) -> position, health, gamemode
"""
import re
import socket
import struct
import time
from typing import Dict, Any, Optional, List
def rcon_send(cmd: str, host: str = '127.0.0.1', port: int = 25577,
password: str = 'REDACTED_RCON', timeout: float = 5.0) -> str:
"""Send a single RCON command and return the response text."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
try:
s.connect((host, int(port)))
def pkt(req_id: int, pkt_type: int, payload: str) -> bytes:
p = payload.encode('utf-8') + b'\x00\x00'
return struct.pack('<iii', len(p) + 8, req_id, pkt_type) + p
# Authenticate (type 3)
s.sendall(pkt(1, 3, password))
time.sleep(0.15)
s.recv(4096)
# Send command (type 2)
s.sendall(pkt(2, 2, cmd))
time.sleep(0.2)
r = s.recv(4096)
return r[12:-2].decode('utf-8', errors='replace')
except Exception as e:
return f'RCON error: {e}'
finally:
s.close()
class RconTool:
"""RCON tool with configurable connection parameters."""
def __init__(self, host: str = '127.0.0.1', port: int = 25577,
password: str = 'REDACTED_RCON'):
self.host = host
self.port = port
self.password = password
def execute(self, command: str) -> Dict[str, Any]:
"""Execute an RCON command and return structured result."""
result = rcon_send(command, self.host, self.port, self.password)
is_error = any(w in result.lower() for w in [
'unknown', 'incorrect argument', 'expected', 'syntax error',
'error', 'unparseable', 'invalid',
])
return {
'command': command,
'result': result.strip(),
'success': not is_error,
}
def get_server_status(self) -> Dict[str, Any]:
"""Get server state: players, time, difficulty."""
players_raw = rcon_send('list', self.host, self.port, self.password)
time_raw = rcon_send('time query daytime', self.host, self.port, self.password)
diff_raw = rcon_send('difficulty', self.host, self.port, self.password)
players = []
m = re.search(r'online:\s*(.*)', players_raw)
if m and m.group(1).strip():
players = [p.strip() for p in m.group(1).split(',') if p.strip()]
time_m = re.search(r'(\d+)', time_raw)
ticks = int(time_m.group(1)) if time_m else 0
diff_m = re.search(r'difficulty is (\w+)', diff_raw)
difficulty = diff_m.group(1) if diff_m else 'unknown'
return {
'players_online': players,
'player_count': len(players),
'time_ticks': ticks,
'difficulty': difficulty,
}
def get_player_info(self, player: str) -> Dict[str, Any]:
"""Get player position, health, gamemode."""
pos_raw = rcon_send(f'data get entity {player} Pos', self.host, self.port, self.password)
health_raw = rcon_send(f'data get entity {player} Health', self.host, self.port, self.password)
gm_raw = rcon_send(f'data get entity {player} playerGameType', self.host, self.port, self.password)
pos = None
pos_m = re.findall(r'(-?[\d.]+)d', pos_raw)
if pos_m and len(pos_m) >= 3:
pos = {'x': float(pos_m[0]), 'y': float(pos_m[1]), 'z': float(pos_m[2])}
health_m = re.search(r'([\d.]+)f', health_raw)
health = float(health_m.group(1)) if health_m else None
gm_m = re.search(r'data:\s*(\d+)', gm_raw)
gm_map = {0: 'survival', 1: 'creative', 2: 'adventure', 3: 'spectator'}
gamemode = gm_map.get(int(gm_m.group(1)), 'unknown') if gm_m else None
return {
'player': player,
'position': pos,
'health': health,
'gamemode': gamemode,
'online': pos is not None,
}