Files
Mortdecai/data/scrape_server_logs.py
T
Seth 0473eb0b50 Minecraft knowledge corpus, recipe trees, GitHub scraper, 644 examples
Knowledge corpus (knowledge/mc-data/):
- 1505 items, 886 crafting recipes, 1166 blocks from minecraft-data 1.21.11
- Recipe dependency tree builder (knowledge/build_recipe_tree.py)
- Crafting chain training: "give me everything to make X from scratch"
- Smelting recipes, version awareness examples

Training data (644 examples total):
- 107 command syntax reference examples (every command + common errors)
- 176 recipe/crafting chain examples (63 crafting, 103 material-giving, 11 smelting)
- 344 Claude-distilled examples (222 sudo + 122 god via Haiku)
- Live bot audit data ingested (128 examples from dev server)

Swarm bots:
- Swimming/water escape logic
- Door opening
- Context-aware prayers (inventory, health, time, depth)
- Prefix enforcement on all Gemini/Dolphin prompts

GitHub log scraper (data/scrape_server_logs.py):
- Searches GitHub for Minecraft server logs with commands
- Strict 1.20.5+ version filter
- Extracts command pairs, converts to training format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 20:33:09 -04:00

711 lines
23 KiB
Python

#!/usr/bin/env python3
"""
Scrape Minecraft server logs from GitHub to extract command examples for training.
Searches public repos for server log files, filters for 1.20.5+ versions,
extracts player/console/RCON commands, and converts them to the project's
JSONL training schema.
Usage:
python3 data/scrape_server_logs.py
python3 data/scrape_server_logs.py --dry-run --max-repos 10
python3 data/scrape_server_logs.py --output-dir /tmp/scraped
"""
import argparse
import json
import os
import re
import subprocess
import sys
import time
import uuid
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SEARCH_QUERIES = [
'minecraft server.log "issued server command"',
'minecraft latest.log "issued server command"',
'minecraft latest.log rcon',
'paper server "executed command"',
'worldedit "//set"',
'minecraft "gamemode" "give" server command',
]
# Versions we accept: 1.20.5, 1.20.6, 1.21, 1.21.x
MIN_VERSION = (1, 20, 5)
VERSION_PATTERNS = [
# "Starting minecraft server version 1.21.1"
re.compile(r"Starting minecraft server version\s+([\d.]+)", re.IGNORECASE),
# "This server is running Paper version 1.21.1-..."
re.compile(r"This server is running\s+\S+\s+version\s+([\d.]+)", re.IGNORECASE),
# "Paper version git-Paper-123 (MC: 1.21.1)"
re.compile(r"\(MC:\s*([\d.]+)\)", re.IGNORECASE),
# "Server version: 1.21.1"
re.compile(r"Server version[:\s]+([\d.]+)", re.IGNORECASE),
# Spigot / Purpur / Folia variants
re.compile(r"Implementing API version\s+([\d.]+)", re.IGNORECASE),
]
# Command extraction patterns
# Group 1 = player, Group 2 = command (with leading /)
CMD_ISSUED = re.compile(
r"(\w{3,16})\s+issued server command:\s+(/.+)", re.IGNORECASE
)
# Alternate formats: [player: issued server command: /cmd] or <player> issued ...
CMD_ISSUED_ALT = re.compile(
r"[<\[](\w{3,16})[>:\]]\s+issued server command:\s+(/.+)", re.IGNORECASE
)
# RCON: "RCON executing: /command" or "Rcon Executing console command: /cmd"
RCON_CMD = re.compile(
r"RCON\s+(?:executing|Executing)[^/]*(/.+)", re.IGNORECASE
)
# [Server] /command (console)
CONSOLE_CMD = re.compile(
r"\[Server\]\s+(/.+)", re.IGNORECASE
)
# WorldEdit: player used //set stone etc. (via log)
WORLDEDIT_CMD = re.compile(
r"(\w+)\s+used\s+(//\w+.+)", re.IGNORECASE
)
# Generic WorldEdit commands found directly in text
WORLDEDIT_INLINE = re.compile(
r"(//(?:set|replace|copy|paste|cut|move|stack|undo|redo|fill|walls|"
r"outline|sphere|cyl|hcyl|hsphere|drain|fixwater|snow|thaw|green|"
r"regen|overlay|naturalize|deform|hollow|center|pos1|pos2|wand|"
r"expand|contract|shift|sel|count|distr)\b\S*(?:\s+\S+)*)"
)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB
# Mapping command names to natural-language templates for user_message synthesis
COMMAND_NL_TEMPLATES = {
"give": "give {player} {args}",
"gamemode": "switch {player} to {args} mode",
"tp": "teleport {player} {args}",
"teleport": "teleport {player} {args}",
"time": "set the time to {args}",
"weather": "change the weather to {args}",
"effect": "apply effect {args} to {player}",
"kill": "kill {target}",
"summon": "summon {args}",
"setblock": "place {args}",
"fill": "fill area with {args}",
"enchant": "enchant {args}",
"clear": "clear {player}'s inventory",
"xp": "give xp {args}",
"experience": "give experience {args}",
"ban": "ban {player}",
"kick": "kick {player}",
"op": "make {player} an operator",
"deop": "remove operator from {player}",
"msg": "message {args}",
"tell": "message {args}",
"say": "announce {args}",
"difficulty": "set difficulty to {args}",
"spawnpoint": "set spawn point {args}",
"setworldspawn": "set world spawn {args}",
"gamerule": "set gamerule {args}",
"particle": "create particle {args}",
"playsound": "play sound {args}",
"title": "show title {args}",
"scoreboard": "scoreboard {args}",
"execute": "execute {args}",
"data": "modify data {args}",
"attribute": "modify attribute {args}",
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def run_gh(args: list[str], timeout: int = 30) -> Optional[str]:
"""Run a gh CLI command and return stdout, or None on error."""
cmd = ["gh"] + args
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
if result.returncode != 0:
stderr = result.stderr.strip()
# Rate limit hit
if "rate limit" in stderr.lower() or "403" in stderr:
print(f" [rate-limit] Sleeping 60s ...")
time.sleep(60)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout
)
if result.returncode != 0:
return None
else:
return None
return result.stdout
except subprocess.TimeoutExpired:
print(f" [timeout] gh command timed out: {' '.join(cmd[:6])}")
return None
except Exception as e:
print(f" [error] gh command failed: {e}")
return None
def parse_version(version_str: str) -> Optional[tuple]:
"""Parse '1.21.1' into (1, 21, 1). Returns None on failure."""
parts = version_str.strip().split(".")
try:
nums = tuple(int(p) for p in parts)
# Pad to 3 components
while len(nums) < 3:
nums = nums + (0,)
return nums[:3]
except (ValueError, IndexError):
return None
def version_acceptable(version_str: str) -> bool:
"""Return True if version >= 1.20.5."""
v = parse_version(version_str)
if v is None:
return False
return v >= MIN_VERSION
def detect_version(text: str) -> Optional[str]:
"""Try to detect the Minecraft server version from log text."""
for pat in VERSION_PATTERNS:
m = pat.search(text)
if m:
return m.group(1)
return None
def extract_commands(text: str) -> list[dict]:
"""
Extract commands from log text.
Returns list of dicts: {player, command, source_type, context_line}
"""
results = []
lines = text.splitlines()
for i, line in enumerate(lines):
next_line = lines[i + 1].strip() if i + 1 < len(lines) else ""
# Player issued server command
m = CMD_ISSUED.search(line)
if not m:
m = CMD_ISSUED_ALT.search(line)
if m:
results.append({
"player": m.group(1),
"command": m.group(2).strip(),
"source_type": "player_command",
"response": next_line if next_line else None,
})
continue
# RCON
m = RCON_CMD.search(line)
if m:
results.append({
"player": "RCON",
"command": m.group(1).strip(),
"source_type": "rcon",
"response": next_line if next_line else None,
})
continue
# Console
m = CONSOLE_CMD.search(line)
if m:
results.append({
"player": "Console",
"command": m.group(1).strip(),
"source_type": "console",
"response": next_line if next_line else None,
})
continue
# WorldEdit (player used //cmd)
m = WORLDEDIT_CMD.search(line)
if m:
results.append({
"player": m.group(1),
"command": m.group(2).strip(),
"source_type": "worldedit",
"response": None,
})
continue
return results
def synthesize_user_message(command: str, player: str) -> str:
"""
Convert a raw command like '/give player diamond_sword 1'
into a natural language request like 'give me a diamond sword'.
"""
# Strip leading /
cmd = command.lstrip("/")
parts = cmd.split(None, 1)
if not parts:
return cmd
cmd_name = parts[0].lower()
args_str = parts[1] if len(parts) > 1 else ""
# Try to produce something reasonable
if cmd_name == "give" and args_str:
# /give player item count
give_parts = args_str.split()
if len(give_parts) >= 2:
item = give_parts[1].replace("minecraft:", "").replace("_", " ")
count = give_parts[2] if len(give_parts) > 2 else "1"
if count == "1":
return f"give me a {item}"
return f"give me {count} {item}"
if cmd_name == "gamemode" and args_str:
gm_parts = args_str.split()
mode = gm_parts[0] if gm_parts else args_str
return f"put me in {mode} mode"
if cmd_name in ("tp", "teleport") and args_str:
return f"teleport to {args_str}"
if cmd_name == "time" and args_str:
return f"set the time to {args_str.replace('set ', '')}"
if cmd_name == "weather" and args_str:
return f"make the weather {args_str}"
if cmd_name == "effect" and args_str:
effect_parts = args_str.split()
# effect give player effect_name ...
if len(effect_parts) >= 3 and effect_parts[0] == "give":
eff_name = effect_parts[2].replace("minecraft:", "").replace("_", " ")
return f"give me {eff_name} effect"
return f"apply effect {args_str}"
if cmd_name == "kill":
return f"kill {args_str if args_str else 'me'}"
if cmd_name == "summon" and args_str:
entity = args_str.split()[0].replace("minecraft:", "").replace("_", " ")
return f"summon a {entity}"
if cmd_name in ("setblock", "fill") and args_str:
# Try to find the block name
block_match = re.search(r"minecraft:(\w+)", args_str)
if block_match:
block = block_match.group(1).replace("_", " ")
if cmd_name == "fill":
return f"fill the area with {block}"
return f"place a {block} block"
return f"{cmd_name} {args_str}"
if cmd_name == "difficulty" and args_str:
return f"set difficulty to {args_str}"
if cmd_name == "gamerule" and args_str:
return f"set gamerule {args_str}"
if cmd_name.startswith("/"):
# WorldEdit command
we_cmd = cmd_name.lstrip("/")
return f"worldedit {we_cmd} {args_str}".strip()
# Fallback: just use the command as-is
return cmd
def command_to_training_example(
cmd_info: dict,
version: str,
repo_name: str,
existing_commands: set,
) -> Optional[dict]:
"""Convert an extracted command into a training example dict."""
raw_cmd = cmd_info["command"].lstrip("/")
player = cmd_info["player"]
# Skip empty or very short commands
if len(raw_cmd) < 2:
return None
# Skip plugin-specific commands that aren't vanilla/paper
first_word = raw_cmd.split()[0].lower() if raw_cmd.split() else ""
# Allow known vanilla + worldedit commands, skip obscure plugin ones
# (We keep a generous allowlist rather than a blocklist)
SKIP_PREFIXES = {
"pl", "plugins", "ver", "version", "about", "help", "?",
"tps", "spark", "perm", "lp", "luckperms", "essentials",
"eco", "economy", "vault", "cmi", "nucleus", "chat",
"party", "guild", "clan", "faction", "f", "home", "sethome",
"warp", "setwarp", "spawn", "hub", "lobby", "menu",
"shop", "ah", "auction", "buy", "sell", "pay", "bal",
"balance", "money", "trade", "market", "store",
"rank", "rankup", "prestige", "level", "stats",
"vote", "reward", "crate", "key", "kit",
"fly", "god", "vanish", "nick", "nickname",
"dynmap", "map", "bluemap",
"worldguard", "wg", "region",
"towny", "town", "nation", "plot", "resident",
"mcmmo", "mining", "excavation", "repair",
"jobs", "quests", "quest",
"discord", "link",
}
if first_word in SKIP_PREFIXES:
return None
# Deduplicate
if raw_cmd in existing_commands:
return None
user_msg = synthesize_user_message(cmd_info["command"], player)
example = {
"id": f"scraped-{uuid.uuid4().hex[:12]}",
"source": "scraped_github",
"category": "command_gen",
"input": {
"user_message": user_msg,
"server_context": {
"server_type": "paper",
"version": version,
},
},
"output": {
"reasoning": f"Extracted from GitHub repo {repo_name} server log.",
"commands": [raw_cmd],
"safety_flags": [],
},
"metadata": {
"difficulty": "easy",
"validated": False,
"extracted_from": f"github:{repo_name}",
"risk_level": 3,
},
}
return example
def load_existing_commands(seed_path: str) -> set:
"""Load commands from existing dataset for deduplication."""
commands = set()
if not os.path.exists(seed_path):
return commands
with open(seed_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
for cmd in obj.get("output", {}).get("commands", []):
commands.add(cmd)
except json.JSONDecodeError:
continue
return commands
# ---------------------------------------------------------------------------
# GitHub search
# ---------------------------------------------------------------------------
def search_github_code(query: str, max_results: int = 100) -> list[dict]:
"""
Search GitHub code via gh api and return a list of file info dicts.
Each dict has: repo, path, html_url, sha.
"""
results = []
per_page = min(max_results, 30) # GitHub caps at 30 for code search
page = 1
while len(results) < max_results:
# gh api uses the REST endpoint
api_path = (
f"/search/code?q={query}&per_page={per_page}&page={page}"
)
raw = run_gh(["api", api_path], timeout=30)
if raw is None:
break
try:
data = json.loads(raw)
except json.JSONDecodeError:
break
items = data.get("items", [])
if not items:
break
for item in items:
repo = item.get("repository", {}).get("full_name", "")
path = item.get("path", "")
sha = item.get("sha", "")
html_url = item.get("html_url", "")
results.append({
"repo": repo,
"path": path,
"sha": sha,
"html_url": html_url,
})
if len(items) < per_page:
break
page += 1
# Rate limit politeness
time.sleep(2)
if page > 5: # Safety cap: 5 pages max
break
return results[:max_results]
def download_file_content(repo: str, path: str) -> Optional[str]:
"""Download a file from a GitHub repo. Returns text content or None."""
# First check size via the API
api_path = f"/repos/{repo}/contents/{path}"
raw = run_gh(["api", api_path], timeout=30)
if raw is None:
return None
try:
data = json.loads(raw)
except json.JSONDecodeError:
return None
size = data.get("size", 0)
if size > MAX_FILE_SIZE:
print(f" [skip] {repo}/{path}: too large ({size / 1024 / 1024:.1f} MB)")
return None
download_url = data.get("download_url")
if not download_url:
# Try to get via git blob
sha = data.get("sha", "")
if sha:
blob_raw = run_gh(
["api", f"/repos/{repo}/git/blobs/{sha}",
"-H", "Accept: application/vnd.github.raw"],
timeout=30,
)
return blob_raw
return None
# Download via curl (gh doesn't handle raw downloads well)
try:
result = subprocess.run(
["curl", "-sL", "--max-filesize", str(MAX_FILE_SIZE), download_url],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
return result.stdout
except Exception:
pass
return None
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Scrape Minecraft server logs from GitHub for training data."
)
parser.add_argument(
"--dry-run", action="store_true",
help="Search but don't download files.",
)
parser.add_argument(
"--max-repos", type=int, default=50,
help="Maximum number of repos/files to check (default: 50).",
)
parser.add_argument(
"--output-dir", type=str, default=None,
help="Output directory (default: data/raw/).",
)
args = parser.parse_args()
# Resolve paths
script_dir = Path(__file__).resolve().parent
project_root = script_dir.parent if script_dir.name == "data" else script_dir
if args.output_dir:
output_dir = Path(args.output_dir)
else:
output_dir = project_root / "data" / "raw"
output_dir.mkdir(parents=True, exist_ok=True)
raw_output = output_dir / "scraped_github.jsonl"
training_output = output_dir / "scraped_training.jsonl"
seed_path = project_root / "data" / "processed" / "seed_dataset.jsonl"
# Load existing commands for dedup
existing_commands = load_existing_commands(str(seed_path))
print(f"Loaded {len(existing_commands)} existing commands for dedup.")
# ------------------------------------------------------------------
# Phase 1: Search GitHub
# ------------------------------------------------------------------
print("\n=== Phase 1: Searching GitHub ===")
all_files: dict[str, dict] = {} # keyed by repo/path to dedup
for query in SEARCH_QUERIES:
print(f"\n Query: {query}")
results = search_github_code(query, max_results=100)
print(f" Found {len(results)} results.")
for r in results:
key = f"{r['repo']}/{r['path']}"
if key not in all_files:
all_files[key] = r
time.sleep(2) # Politeness between queries
print(f"\nTotal unique files found: {len(all_files)}")
if args.dry_run:
print("\n[DRY RUN] Listing files that would be downloaded:")
for key, info in list(all_files.items())[:args.max_repos]:
print(f" {info['repo']}/{info['path']}")
print(f"\nWould check up to {min(len(all_files), args.max_repos)} files.")
return
# ------------------------------------------------------------------
# Phase 2: Download and process files
# ------------------------------------------------------------------
print("\n=== Phase 2: Downloading and processing ===")
stats = {
"files_checked": 0,
"files_with_version": 0,
"files_accepted": 0,
"files_rejected_version": 0,
"files_no_version": 0,
"total_commands": 0,
"training_examples": 0,
"version_distribution": {},
}
raw_commands: list[dict] = []
training_examples: list[dict] = []
seen_commands: set = set(existing_commands)
file_list = list(all_files.values())[:args.max_repos]
for i, file_info in enumerate(file_list):
repo = file_info["repo"]
path = file_info["path"]
print(f"\n[{i + 1}/{len(file_list)}] {repo}/{path}")
stats["files_checked"] += 1
content = download_file_content(repo, path)
if content is None:
print(" [skip] Could not download.")
time.sleep(2)
continue
# Detect version
version = detect_version(content)
if version:
stats["files_with_version"] += 1
stats["version_distribution"][version] = (
stats["version_distribution"].get(version, 0) + 1
)
if not version_acceptable(version):
print(f" [reject] Version {version} is too old (need >= 1.20.5).")
stats["files_rejected_version"] += 1
time.sleep(2)
continue
print(f" [ok] Version {version}")
stats["files_accepted"] += 1
else:
print(" [skip] No version detected in log.")
stats["files_no_version"] += 1
time.sleep(2)
continue
# Extract commands
commands = extract_commands(content)
print(f" Extracted {len(commands)} commands.")
stats["total_commands"] += len(commands)
for cmd_info in commands:
# Save raw
raw_entry = {
"repo": repo,
"path": path,
"version": version,
**cmd_info,
}
raw_commands.append(raw_entry)
# Convert to training example
example = command_to_training_example(
cmd_info, version, repo, seen_commands
)
if example:
training_examples.append(example)
# Track for dedup
for cmd in example["output"]["commands"]:
seen_commands.add(cmd)
stats["training_examples"] += 1
time.sleep(2) # Politeness
# ------------------------------------------------------------------
# Phase 3: Save results
# ------------------------------------------------------------------
print("\n=== Phase 3: Saving results ===")
with open(raw_output, "w") as f:
for entry in raw_commands:
f.write(json.dumps(entry) + "\n")
print(f" Raw commands: {raw_output} ({len(raw_commands)} entries)")
with open(training_output, "w") as f:
for entry in training_examples:
f.write(json.dumps(entry) + "\n")
print(f" Training examples: {training_output} ({len(training_examples)} entries)")
# ------------------------------------------------------------------
# Summary
# ------------------------------------------------------------------
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
print(f" Files found (unique): {len(all_files)}")
print(f" Files checked: {stats['files_checked']}")
print(f" Files with version detected: {stats['files_with_version']}")
print(f" Files accepted (>= 1.20.5): {stats['files_accepted']}")
print(f" Files rejected (old version): {stats['files_rejected_version']}")
print(f" Files skipped (no version): {stats['files_no_version']}")
print(f" Total commands extracted: {stats['total_commands']}")
print(f" Training examples generated: {stats['training_examples']}")
print(f"\n Version distribution:")
for ver, count in sorted(stats["version_distribution"].items()):
accepted = "ok" if version_acceptable(ver) else "REJECTED"
print(f" {ver}: {count} files [{accepted}]")
print("=" * 60)
if __name__ == "__main__":
main()