e00d454b19
- agent/serve.py: CLI assistant with interactive, single-query, and eval modes (Ollama + qwen3-coder) - agent/tools/rcon_tool.py: RCON execute, server status, player info - agent/tools/knowledge_tool.py: TF-IDF RAG search, command reference lookup, server context - agent/guardrails/command_filter.py: 14-prefix allowlist, execute-tail bypass detection, destructive flags, 1.21 syntax warnings, audit log - agent/prompts/system_prompts.py: sudo (pure commands), god (persona), intervention (benign) system prompts - Guardrails tested: 10/10 allowlist, 5/6 syntax warnings pass
376 lines
13 KiB
Python
376 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Minecraft AI Ops Assistant -- Baseline (No Fine-Tuning)
|
|
|
|
Prompt-only assistant using qwen3-coder via Ollama with tool calling.
|
|
This is the Phase 1.4 baseline to measure against future fine-tuned models.
|
|
|
|
Usage:
|
|
# Interactive CLI mode
|
|
python3 agent/serve.py --mode sudo --player slingshooter08
|
|
|
|
# Single query mode
|
|
python3 agent/serve.py --mode sudo --player slingshooter08 --query "give me diamond armor"
|
|
|
|
# Evaluate against dataset
|
|
python3 agent/serve.py --eval data/processed/seed_dataset.jsonl
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
import requests
|
|
|
|
# Add project root to path
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from agent.tools.rcon_tool import RconTool
|
|
from agent.tools.knowledge_tool import search_knowledge, get_command_reference, get_server_context
|
|
from agent.guardrails.command_filter import validate_command, filter_commands, audit_log
|
|
from agent.prompts.system_prompts import get_prompt
|
|
|
|
|
|
DEFAULT_CONFIG = {
|
|
'ollama_url': 'http://192.168.0.179:11434',
|
|
'model': 'qwen3-coder:30b',
|
|
'rcon_host': '127.0.0.1',
|
|
'rcon_port': 25577,
|
|
'rcon_password': 'REDACTED_RCON',
|
|
'max_tool_steps': 3,
|
|
'temperature': 0.2,
|
|
'max_tokens': 300,
|
|
}
|
|
|
|
|
|
def load_config(path: str = '') -> dict:
|
|
"""Load config from file or use defaults."""
|
|
if path and Path(path).exists():
|
|
with open(path) as f:
|
|
cfg = json.load(f)
|
|
return {**DEFAULT_CONFIG, **cfg}
|
|
return dict(DEFAULT_CONFIG)
|
|
|
|
|
|
def ollama_chat(model: str, messages: List[Dict], ollama_url: str,
|
|
temperature: float = 0.2, max_tokens: int = 300,
|
|
fmt: Optional[str] = 'json') -> str:
|
|
"""Call Ollama chat API."""
|
|
payload = {
|
|
'model': model,
|
|
'messages': messages,
|
|
'stream': False,
|
|
'options': {
|
|
'temperature': temperature,
|
|
'num_predict': max_tokens,
|
|
}
|
|
}
|
|
if fmt:
|
|
payload['format'] = fmt
|
|
|
|
r = requests.post(f"{ollama_url}/api/chat", json=payload, timeout=120)
|
|
r.raise_for_status()
|
|
return r.json()['message']['content']
|
|
|
|
|
|
def parse_response(content: str) -> Dict[str, Any]:
|
|
"""Parse LLM JSON response, with fallback for malformed output."""
|
|
try:
|
|
return json.loads(content)
|
|
except json.JSONDecodeError:
|
|
# Try to extract commands from partial JSON
|
|
import re
|
|
cmds = re.findall(r'"([^"]+)"', content)
|
|
return {'commands': cmds, 'message': '', 'reasoning': 'parse fallback'}
|
|
|
|
|
|
class MinecraftAssistant:
|
|
"""Baseline Minecraft ops assistant with tools and guardrails."""
|
|
|
|
def __init__(self, config: dict):
|
|
self.config = config
|
|
self.rcon = RconTool(
|
|
host=config['rcon_host'],
|
|
port=config['rcon_port'],
|
|
password=config['rcon_password'],
|
|
)
|
|
self.model = config['model']
|
|
self.ollama_url = config['ollama_url']
|
|
|
|
def _gather_context(self, player: str, query: str) -> str:
|
|
"""Gather world state and knowledge context for the LLM."""
|
|
context_parts = []
|
|
|
|
# Player info
|
|
if player:
|
|
info = self.rcon.get_player_info(player)
|
|
if info.get('online'):
|
|
pos = info.get('position', {})
|
|
context_parts.append(
|
|
f"Player: {player} at ({pos.get('x', 0):.0f}, {pos.get('y', 0):.0f}, {pos.get('z', 0):.0f}) "
|
|
f"health={info.get('health', '?')} gamemode={info.get('gamemode', '?')}"
|
|
)
|
|
|
|
# Server status
|
|
status = self.rcon.get_server_status()
|
|
context_parts.append(f"Online: {', '.join(status['players_online']) or 'none'}")
|
|
|
|
# Knowledge search
|
|
kb_results = search_knowledge(query, limit=3)
|
|
if kb_results:
|
|
context_parts.append("Relevant reference:")
|
|
for r in kb_results:
|
|
context_parts.append(f" [{r['title']}] {r['snippet'][:150]}")
|
|
|
|
return '\n'.join(context_parts)
|
|
|
|
def ask(self, query: str, player: str = '', mode: str = 'sudo') -> Dict[str, Any]:
|
|
"""
|
|
Process a query and return structured response.
|
|
|
|
Returns:
|
|
{
|
|
'message': str or None,
|
|
'commands': [str],
|
|
'reasoning': str,
|
|
'tool_trace': [dict],
|
|
'guardrail_results': [dict],
|
|
}
|
|
"""
|
|
start = time.time()
|
|
tool_trace = []
|
|
|
|
# Gather context
|
|
context = self._gather_context(player, query)
|
|
tool_trace.append({'tool': 'context_gather', 'duration_ms': int((time.time() - start) * 1000)})
|
|
|
|
# Build messages
|
|
system_prompt = get_prompt(mode)
|
|
user_message = f"Request from {player or 'admin'}: {query}\n\nContext:\n{context}"
|
|
|
|
messages = [
|
|
{'role': 'system', 'content': system_prompt},
|
|
{'role': 'user', 'content': user_message},
|
|
]
|
|
|
|
# LLM call
|
|
llm_start = time.time()
|
|
raw = ollama_chat(
|
|
self.model, messages, self.ollama_url,
|
|
temperature=self.config['temperature'],
|
|
max_tokens=self.config['max_tokens'],
|
|
)
|
|
tool_trace.append({
|
|
'tool': 'llm_call', 'model': self.model,
|
|
'duration_ms': int((time.time() - llm_start) * 1000),
|
|
})
|
|
|
|
# Parse response
|
|
parsed = parse_response(raw)
|
|
commands = parsed.get('commands', [])
|
|
message = parsed.get('message')
|
|
reasoning = parsed.get('reasoning', '')
|
|
|
|
# Apply guardrails
|
|
safe_commands, guardrail_results = filter_commands(commands)
|
|
|
|
# Audit log
|
|
audit_log({
|
|
'mode': mode,
|
|
'player': player,
|
|
'query': query,
|
|
'raw_commands': commands,
|
|
'safe_commands': safe_commands,
|
|
'message': message,
|
|
'reasoning': reasoning,
|
|
'model': self.model,
|
|
'duration_ms': int((time.time() - start) * 1000),
|
|
})
|
|
|
|
return {
|
|
'message': message,
|
|
'commands': safe_commands,
|
|
'reasoning': reasoning,
|
|
'raw_commands': commands,
|
|
'tool_trace': tool_trace,
|
|
'guardrail_results': guardrail_results,
|
|
'duration_ms': int((time.time() - start) * 1000),
|
|
}
|
|
|
|
def evaluate(self, dataset_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Run the assistant against a dataset and score results.
|
|
|
|
Returns summary statistics.
|
|
"""
|
|
results = []
|
|
with open(dataset_path) as f:
|
|
examples = [json.loads(line) for line in f if line.strip()]
|
|
|
|
print(f"Evaluating {len(examples)} examples with {self.model}...")
|
|
print()
|
|
|
|
correct = 0
|
|
syntax_ok = 0
|
|
safety_ok = 0
|
|
total = len(examples)
|
|
|
|
for i, ex in enumerate(examples):
|
|
query = ex['input']['user_message']
|
|
expected_cmds = ex['output'].get('commands', [])
|
|
expected_safety = ex['output'].get('safety_flags', [])
|
|
category = ex.get('category', '?')
|
|
|
|
# Determine mode from query
|
|
mode = 'sudo'
|
|
if query.lower().startswith('pray '):
|
|
mode = 'god'
|
|
query = query[5:]
|
|
|
|
# Run assistant
|
|
result = self.ask(query, player='slingshooter08', mode=mode)
|
|
actual_cmds = result.get('commands', [])
|
|
|
|
# Score: command match (loose)
|
|
expected_set = set(c.strip() for c in expected_cmds)
|
|
actual_set = set(c.strip() for c in actual_cmds)
|
|
|
|
# Check if commands are functionally similar (not exact match)
|
|
cmd_match = False
|
|
if not expected_cmds and not actual_cmds:
|
|
cmd_match = True # both empty = correct refusal
|
|
elif expected_cmds and actual_cmds:
|
|
# Check if same command types are used
|
|
expected_types = set(c.split()[0] for c in expected_cmds if c)
|
|
actual_types = set(c.split()[0] for c in actual_cmds if c)
|
|
cmd_match = len(expected_types & actual_types) > 0
|
|
|
|
# Syntax check: do any actual commands have guardrail warnings?
|
|
has_syntax_issues = any(
|
|
r.get('warnings') for r in result.get('guardrail_results', [])
|
|
)
|
|
|
|
# Safety check: if expected is empty commands (refusal), did model also refuse?
|
|
safety_match = True
|
|
if 'destructive' in expected_safety and expected_cmds == []:
|
|
safety_match = len(actual_cmds) == 0
|
|
|
|
if cmd_match:
|
|
correct += 1
|
|
if not has_syntax_issues:
|
|
syntax_ok += 1
|
|
if safety_match:
|
|
safety_ok += 1
|
|
|
|
status = 'OK' if cmd_match else 'MISS'
|
|
print(f"[{i+1}/{total}] [{status}] ({category}) {query[:60]}")
|
|
if not cmd_match:
|
|
print(f" Expected: {expected_cmds[:3]}")
|
|
print(f" Got: {actual_cmds[:3]}")
|
|
|
|
results.append({
|
|
'id': ex.get('id'),
|
|
'category': category,
|
|
'query': query,
|
|
'expected': expected_cmds,
|
|
'actual': actual_cmds,
|
|
'cmd_match': cmd_match,
|
|
'syntax_ok': not has_syntax_issues,
|
|
'safety_ok': safety_match,
|
|
'duration_ms': result.get('duration_ms', 0),
|
|
})
|
|
|
|
print()
|
|
summary = {
|
|
'total': total,
|
|
'command_match_rate': round(correct / total * 100, 1) if total else 0,
|
|
'syntax_ok_rate': round(syntax_ok / total * 100, 1) if total else 0,
|
|
'safety_ok_rate': round(safety_ok / total * 100, 1) if total else 0,
|
|
'model': self.model,
|
|
'avg_duration_ms': round(sum(r['duration_ms'] for r in results) / total) if total else 0,
|
|
}
|
|
print(f"=== Baseline Evaluation Results ===")
|
|
print(f"Model: {summary['model']}")
|
|
print(f"Command match rate: {summary['command_match_rate']}%")
|
|
print(f"Syntax OK rate: {summary['syntax_ok_rate']}%")
|
|
print(f"Safety OK rate: {summary['safety_ok_rate']}%")
|
|
print(f"Avg latency: {summary['avg_duration_ms']}ms")
|
|
|
|
# Save results
|
|
out_dir = ROOT / 'eval' / 'results'
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
ts = int(time.time())
|
|
out_path = out_dir / f'baseline_{ts}.json'
|
|
with open(out_path, 'w') as f:
|
|
json.dump({'summary': summary, 'results': results}, f, indent=2)
|
|
print(f"Results saved to {out_path}")
|
|
|
|
return summary
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Minecraft AI Ops Assistant')
|
|
parser.add_argument('--mode', default='sudo', choices=['sudo', 'god', 'god_system'])
|
|
parser.add_argument('--player', default='slingshooter08')
|
|
parser.add_argument('--query', default='', help='Single query mode')
|
|
parser.add_argument('--eval', default='', help='Evaluate against dataset file')
|
|
parser.add_argument('--config', default='', help='Config JSON file path')
|
|
parser.add_argument('--model', default='', help='Override model name')
|
|
parser.add_argument('--ollama-url', default='', help='Override Ollama URL')
|
|
args = parser.parse_args()
|
|
|
|
config = load_config(args.config)
|
|
if args.model:
|
|
config['model'] = args.model
|
|
if args.ollama_url:
|
|
config['ollama_url'] = args.ollama_url
|
|
|
|
assistant = MinecraftAssistant(config)
|
|
|
|
if args.eval:
|
|
assistant.evaluate(args.eval)
|
|
return
|
|
|
|
if args.query:
|
|
result = assistant.ask(args.query, player=args.player, mode=args.mode)
|
|
print(json.dumps(result, indent=2))
|
|
return
|
|
|
|
# Interactive mode
|
|
print(f"Minecraft AI Assistant ({config['model']})")
|
|
print(f"Mode: {args.mode} | Player: {args.player}")
|
|
print("Type 'quit' to exit, 'mode <sudo|god>' to switch modes\n")
|
|
|
|
while True:
|
|
try:
|
|
query = input(f"[{args.mode}] > ").strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
break
|
|
|
|
if not query:
|
|
continue
|
|
if query.lower() == 'quit':
|
|
break
|
|
if query.lower().startswith('mode '):
|
|
args.mode = query.split()[1]
|
|
print(f"Switched to {args.mode} mode")
|
|
continue
|
|
|
|
result = assistant.ask(query, player=args.player, mode=args.mode)
|
|
if result.get('message'):
|
|
print(f"Message: {result['message']}")
|
|
if result.get('commands'):
|
|
print(f"Commands: {result['commands']}")
|
|
if result.get('reasoning'):
|
|
print(f"Reasoning: {result['reasoning']}")
|
|
print(f"({result.get('duration_ms', 0)}ms)")
|
|
print()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|