Add retrieval-grounded sudo flow and execution feedback loop

This commit is contained in:
Claude Code
2026-03-17 19:53:32 -04:00
parent f4ce19db6d
commit 40b4da345a
6 changed files with 705 additions and 15 deletions
+319 -5
View File
@@ -13,6 +13,7 @@ Execution safety remains in mc_aigod_paper.py.
"""
import json
import hashlib
import logging
import os
import re
@@ -20,6 +21,8 @@ import sqlite3
import threading
import time
import uuid
from pathlib import Path
from urllib.parse import urlparse
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@@ -76,12 +79,16 @@ class SessionState:
_sessions: Dict[str, SessionState] = {}
_sessions_lock = threading.Lock()
_kb_lock = threading.Lock()
_kb_index_cache: Dict[str, Any] = {'loaded_at': 0.0, 'docs': []}
_KB_ALLOWED_EXTS = {'.md', '.txt', '.json'}
COMMAND_PREFIXES_BY_MODE = {
'sudo': [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ',
'kill ', 'summon ', 'tellraw ', 'worldborder ', 'fill ', 'setblock ',
'clone ',
'clone ', 'gamemode ', 'template ',
],
'god': [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ',
@@ -106,6 +113,21 @@ def load_config() -> Dict[str, Any]:
'command_model': 'qwen3-coder:30b',
'tool_model': 'qwen2.5:1.5b',
'session_ttl_seconds': 21600,
'knowledge_base_dir': '/var/lib/mc-langgraph-gateway/knowledge',
'knowledge_index_file': '/var/lib/mc-langgraph-gateway/knowledge/index.json',
'knowledge_auto_index_on_start': True,
'knowledge_bootstrap_on_start': True,
'knowledge_bootstrap_urls': [
'https://minecraft.wiki/w/Commands/fill',
'https://minecraft.wiki/w/Commands/setblock',
'https://minecraft.wiki/w/Commands/clone',
'https://minecraft.wiki/w/Commands/summon',
'https://minecraft.wiki/w/Commands/execute',
'https://minecraft.wiki/w/TNT',
'https://minecraft.wiki/w/Explosion',
'https://minecraft.wiki/w/Tutorial:Worldedit',
],
'knowledge_max_doc_bytes': 200000,
}
@@ -114,6 +136,228 @@ DB_PATH = CFG.get('session_db_path', '/var/lib/mc-langgraph-gateway/sessions.db'
_db_lock = threading.Lock()
def _kb_root() -> str:
root = str(CFG.get('knowledge_base_dir', '/var/lib/mc-langgraph-gateway/knowledge')).strip()
return root or '/var/lib/mc-langgraph-gateway/knowledge'
def _kb_index_path() -> str:
path = str(CFG.get('knowledge_index_file', '')).strip()
if path:
return path
return os.path.join(_kb_root(), 'index.json')
def _kb_tokenize(text: str) -> List[str]:
toks = re.findall(r'[a-z0-9_]{2,}', (text or '').lower())
if not toks:
return []
out: List[str] = []
seen = set()
for t in toks:
if t in seen:
continue
seen.add(t)
out.append(t)
if len(out) >= 300:
break
return out
def _kb_html_to_text(html: str) -> str:
body = re.sub(r'(?is)<script.*?>.*?</script>', ' ', html or '')
body = re.sub(r'(?is)<style.*?>.*?</style>', ' ', body)
body = re.sub(r'(?is)<[^>]+>', ' ', body)
body = re.sub(r'\s+', ' ', body).strip()
return body
def _kb_slug(s: str) -> str:
n = re.sub(r'[^a-zA-Z0-9._-]+', '_', (s or '').strip())
n = n.strip('._-')
return (n[:80] or 'doc').lower()
def _kb_fetch_url(url: str) -> Dict[str, Any]:
max_bytes = int(CFG.get('knowledge_max_doc_bytes', 200000))
r = requests.get(url, timeout=25)
r.raise_for_status()
ct = (r.headers.get('content-type') or '').lower()
raw = r.content[:max_bytes]
if 'html' in ct:
text = _kb_html_to_text(raw.decode(errors='replace'))
else:
text = raw.decode(errors='replace')
title = ''
m = re.search(r'(?is)<title>(.*?)</title>', r.text if 'html' in ct else '')
if m:
title = re.sub(r'\s+', ' ', m.group(1)).strip()
return {'title': title, 'text': text}
def _kb_ingest_url(url: str) -> Dict[str, Any]:
parsed = urlparse(url)
host = (parsed.netloc or '').lower()
if host not in set(str(h).lower() for h in CFG.get('knowledge_allowed_hosts', [
'minecraft.wiki', 'www.minecraft.wiki', 'docs.papermc.io', 'intellectualsites.github.io', 'enginehub.org', 'worldedit.enginehub.org'
])):
return {'ok': False, 'error': f'host not allowed: {host}'}
try:
fetched = _kb_fetch_url(url)
text = (fetched.get('text') or '').strip()
if len(text) < 80:
return {'ok': False, 'error': 'document too short'}
title = fetched.get('title') or os.path.basename(parsed.path) or host
root = Path(_kb_root())
root.mkdir(parents=True, exist_ok=True)
digest = hashlib.sha1(url.encode()).hexdigest()[:12]
fname = f"{_kb_slug(title)}_{digest}.md"
out = root / fname
out.write_text(f"# {title}\n\nSource: {url}\n\n{text}\n", encoding='utf-8')
return {'ok': True, 'path': str(out), 'source': url, 'title': title}
except Exception as e:
return {'ok': False, 'error': str(e)}
def _kb_build_index() -> Dict[str, Any]:
root = Path(_kb_root())
root.mkdir(parents=True, exist_ok=True)
docs = []
for p in root.rglob('*'):
if not p.is_file() or p.suffix.lower() not in _KB_ALLOWED_EXTS:
continue
try:
text = p.read_text(encoding='utf-8', errors='replace')
except Exception:
continue
title = p.name
m = re.search(r'^#\s+(.+)$', text, re.MULTILINE)
if m:
title = m.group(1).strip()[:120]
snippet = re.sub(r'\s+', ' ', text[:800]).strip()
tokens = _kb_tokenize(text)
rel = str(p.relative_to(root))
doc_id = hashlib.sha1(rel.encode()).hexdigest()[:12]
docs.append({
'id': doc_id,
'path': rel,
'title': title,
'snippet': snippet[:260],
'tokens': tokens,
'mtime': p.stat().st_mtime,
})
out = {'generated_at': time.time(), 'docs': docs}
idx = Path(_kb_index_path())
idx.parent.mkdir(parents=True, exist_ok=True)
idx.write_text(json.dumps(out, ensure_ascii=True), encoding='utf-8')
with _kb_lock:
_kb_index_cache['loaded_at'] = time.time()
_kb_index_cache['docs'] = docs
return {'ok': True, 'count': len(docs), 'path': str(idx)}
def _kb_load_index(force: bool = False) -> List[Dict[str, Any]]:
with _kb_lock:
if _kb_index_cache.get('docs') and not force:
return list(_kb_index_cache['docs'])
idx = Path(_kb_index_path())
if not idx.exists():
_kb_build_index()
try:
data = json.loads(idx.read_text(encoding='utf-8'))
except Exception:
_kb_build_index()
data = json.loads(idx.read_text(encoding='utf-8'))
docs = data.get('docs') or []
with _kb_lock:
_kb_index_cache['loaded_at'] = time.time()
_kb_index_cache['docs'] = docs
return docs
def _kb_bootstrap_if_needed() -> None:
if not bool(CFG.get('knowledge_bootstrap_on_start', True)):
return
root = Path(_kb_root())
root.mkdir(parents=True, exist_ok=True)
existing = [p for p in root.rglob('*') if p.is_file() and p.suffix.lower() in _KB_ALLOWED_EXTS]
if existing:
return
urls = CFG.get('knowledge_bootstrap_urls', []) or []
if not urls:
return
ok = 0
for url in urls:
res = _kb_ingest_url(str(url))
if res.get('ok'):
ok += 1
log.info('knowledge bootstrap completed: %d/%d docs ingested', ok, len(urls))
def _kb_search(query: str, limit: int = 5) -> List[Dict[str, Any]]:
docs = _kb_load_index()
q_tokens = set(_kb_tokenize(query))
if not q_tokens:
return []
scored = []
q_lower = query.lower()
for d in docs:
tokens = set(d.get('tokens') or [])
overlap = len(q_tokens.intersection(tokens))
if overlap <= 0:
continue
score = overlap
if q_lower in (d.get('title', '').lower()):
score += 3
if q_lower in (d.get('snippet', '').lower()):
score += 1
scored.append((score, d))
scored.sort(key=lambda x: x[0], reverse=True)
out = []
for _, d in scored[:max(1, limit)]:
out.append({
'doc_id': d.get('id'),
'title': d.get('title'),
'path': d.get('path'),
'snippet': d.get('snippet'),
})
return out
def _kb_read(doc_id: str, query: str = '') -> Dict[str, Any]:
docs = _kb_load_index()
hit = None
for d in docs:
if d.get('id') == doc_id:
hit = d
break
if not hit:
return {'ok': False, 'error': 'doc_id not found', 'results': []}
full = Path(_kb_root()) / str(hit.get('path'))
if not full.exists():
return {'ok': False, 'error': 'file missing', 'results': []}
text = full.read_text(encoding='utf-8', errors='replace')
q = (query or '').strip().lower()
if q and q in text.lower():
idx = text.lower().find(q)
start = max(0, idx - 350)
end = min(len(text), idx + 650)
excerpt = text[start:end]
else:
excerpt = text[:1000]
return {
'ok': True,
'results': [{
'doc_id': doc_id,
'title': hit.get('title'),
'path': hit.get('path'),
'text': re.sub(r'\s+', ' ', excerpt).strip(),
}],
}
def _db_enabled() -> bool:
return bool(CFG.get('session_persistence_enabled', True))
@@ -407,13 +651,35 @@ def tool_wiki_lookup(query: str) -> Dict[str, Any]:
return {'ok': False, 'error': str(e), 'results': []}
def _tool_router(user_text: str, max_steps: int) -> List[Dict[str, Any]]:
def tool_local_search(query: str) -> Dict[str, Any]:
try:
rows = _kb_search(query, limit=5)
return {'ok': True, 'results': rows}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def tool_local_read(doc_id: str, query: str = '') -> Dict[str, Any]:
try:
return _kb_read(doc_id, query)
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def _tool_router(user_text: str, max_steps: int, mode: str, context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Very small bounded heuristic tool planner."""
text = user_text.lower()
calls: List[Dict[str, Any]] = []
if max_steps <= 0:
return calls
if mode == 'sudo':
q = user_text
req = str((context or {}).get('request') or '').strip()
if req:
q = req
calls.append({'tool': 'local.search', 'query': q})
if any(k in text for k in ['wiki', 'minecraft', 'item id', 'recipe', 'craft']):
calls.append({'tool': 'minecraft.wiki_lookup', 'query': user_text})
@@ -433,7 +699,15 @@ def _commands_prompt(mode: str) -> str:
'You are a Minecraft command translator. Return ONLY JSON: {"commands": ["..."]}.\n'
f'Allowed command prefixes: {allowed}.\n'
'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n'
'If unsafe/unknown, return empty commands.'
'Use TOOL results as your source of truth. Do not invent command syntax not supported by retrieved context.\n'
'Read context.sudo_failures and avoid repeating those exact failing patterns.\n'
'Never use old enchantment NBT {Enchantments:[...]} syntax; use item[enchantments={...}] format.\n'
'For TNT, never append a count to summon; use multiple summon commands instead.\n'
'Keep target scope narrow: if request is about "me/my", do not use @a unless explicitly requested.\n'
'You may output template workflow meta-commands: template search <query>, template pick <n> [name], template build <name>.\n'
'For build/make/create requests, prefer the template workflow instead of raw block-by-block commands.\n'
'If request is ambiguous or unsupported, choose a closest valid in-game workaround and keep scope bounded.\n'
'If still unsafe/unknown, return empty commands.'
)
if mode == 'god_system':
@@ -441,6 +715,7 @@ def _commands_prompt(mode: str) -> str:
'You are Minecraft divine system automation. Return ONLY JSON: {"commands": ["..."]}.\n'
f'Allowed command prefixes: {allowed}.\n'
'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n'
'Use valid 1.21 syntax: effect give <player> ..., and weather is clear/rain/thunder only.\n'
'This mode is for intervention/first-login events. Prefer benevolent or thematic world actions.\n'
'If you include kill commands, keep it to at most one player.'
)
@@ -449,6 +724,9 @@ def _commands_prompt(mode: str) -> str:
'You are Minecraft God command planner. Return ONLY JSON: {"commands": ["..."]}.\n'
f'Allowed command prefixes: {allowed}.\n'
'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n'
'Use valid 1.21 syntax: effect give <player> ..., and weather is clear/rain/thunder only.\n'
'Avoid accidental lethal vertical teleports in benevolent responses unless explicitly requested.\n'
'Do not use tp in helpful responses unless user explicitly asks for movement.\n'
'Balance benevolence and judgment based on context.\n'
'Use valid Minecraft command syntax only.'
)
@@ -524,25 +802,53 @@ def run_pipeline(session: SessionState, req: MessageRequest) -> MessageResponse:
user_blob = f"message: {user_text}\ncontext: {context_json}"
session.messages.append({'role': req.role, 'content': user_blob})
# Feedback-only messages update session state without running LLM/tools.
if bool((req.context or {}).get('feedback_only', False)):
session.messages.append({
'role': 'assistant',
'content': json.dumps({'message': '', 'commands': []}, ensure_ascii=True)
})
_db_upsert_session(session)
return MessageResponse(message=None, commands=[], tool_trace=[])
_db_upsert_session(session)
tool_trace: List[Dict[str, Any]] = []
tool_results_block = ''
if req.allow_tools:
calls = _tool_router(user_text, max(0, min(req.max_tool_steps, 6)))
calls = _tool_router(
user_text,
max(0, min(req.max_tool_steps, 6)),
session.mode,
req.context or {},
)
for c in calls:
tool = c['tool']
q = c['query']
q = c.get('query', '')
if tool == 'web.search':
out = tool_web_search(q)
elif tool == 'minecraft.wiki_lookup':
out = tool_wiki_lookup(q)
elif tool == 'local.search':
out = tool_local_search(q)
elif tool == 'local.read':
out = tool_local_read(str(c.get('doc_id', '')), q)
else:
out = {'ok': False, 'error': 'unknown tool', 'results': []}
tool_trace.append({'tool': tool, 'input': q, 'ok': out.get('ok', False), 'results_count': len(out.get('results', []))})
tool_results_block += f"\nTOOL {tool} query={q}\nRESULT={json.dumps(out, ensure_ascii=True)[:3000]}\n"
# localized retrieval hop: after index search, fetch one top document excerpt
if tool == 'local.search' and out.get('ok') and out.get('results') and len(tool_trace) < max(0, min(req.max_tool_steps, 6)):
top = out['results'][0]
doc_id = str(top.get('doc_id', ''))
if doc_id:
read_out = tool_local_read(doc_id, q)
tool_trace.append({'tool': 'local.read', 'input': doc_id, 'ok': read_out.get('ok', False), 'results_count': len(read_out.get('results', []))})
tool_results_block += f"\nTOOL local.read doc_id={doc_id}\nRESULT={json.dumps(read_out, ensure_ascii=True)[:3000]}\n"
# Commands call
cmd_messages = [
{'role': 'system', 'content': _commands_prompt(session.mode)},
@@ -618,4 +924,12 @@ def close_session(session_id: str):
return {'closed': existed}
try:
_kb_bootstrap_if_needed()
if bool(CFG.get('knowledge_auto_index_on_start', True)):
meta = _kb_build_index()
log.info('knowledge index ready: %s docs=%s', meta.get('path'), meta.get('count'))
except Exception as e:
log.warning('knowledge bootstrap/index failed: %s', e)
_db_init()