#!/usr/bin/env python3 """ ingest_audit.py — Pull training audit logs from CT 644, filter, and merge into dataset. Filters: - Drops examples where output language doesn't match input language - Drops empty responses (no commands AND no message) - Drops duplicate user_messages already in the dataset - Keeps multilingual examples where input/output languages match - Tags all as validated=false, needs human review Usage: python3 data/ingest_audit.py # pull, filter, merge python3 data/ingest_audit.py --dry-run # show what would be added python3 data/ingest_audit.py --source dev # only dev server python3 data/ingest_audit.py --source prod # only prod server """ import argparse import json import re import subprocess import sys from pathlib import Path ROOT = Path(__file__).resolve().parent.parent DATASET = ROOT / "data" / "processed" / "seed_dataset.jsonl" RAW_DIR = ROOT / "data" / "raw" # --- Language detection (simple, no dependencies) --- def has_cjk(text: str) -> bool: """Check if text contains CJK (Chinese/Japanese/Korean) characters.""" return bool(re.search(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text)) def has_cyrillic(text: str) -> bool: return bool(re.search(r'[\u0400-\u04ff]', text)) def has_arabic(text: str) -> bool: return bool(re.search(r'[\u0600-\u06ff]', text)) def detect_script(text: str) -> str: """Detect the dominant script of a text. Returns 'latin', 'cjk', 'cyrillic', 'arabic', or 'mixed'.""" if not text or len(text.strip()) < 3: return "latin" # Count character types latin = len(re.findall(r'[a-zA-Z]', text)) cjk = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text)) cyrillic = len(re.findall(r'[\u0400-\u04ff]', text)) arabic = len(re.findall(r'[\u0600-\u06ff]', text)) total = latin + cjk + cyrillic + arabic if total == 0: return "latin" if cjk / total > 0.3: return "cjk" if cyrillic / total > 0.3: return "cyrillic" if arabic / total > 0.3: return "arabic" return "latin" def languages_match(input_text: str, output_message: str) -> bool: """Check if input and output are in the same script family. Commands are always English/latin, so we only check the message field.""" if not output_message or len(output_message.strip()) < 3: return True # no message to check input_script = detect_script(input_text) output_script = detect_script(output_message) return input_script == output_script def is_system_prompt_leak(message: str) -> bool: """Detect if the model leaked its system prompt as the God message.""" if not message: return False leak_patterns = [ "you are a Minecraft", "command translator", "Return ONLY JSON", "SYNTAX RULES", "RISK GRADIENT", "permission level", "CRITICAL RULES", ] lower = message.lower() return sum(1 for p in leak_patterns if p.lower() in lower) >= 2 # --- Remote data pulling --- def pull_audit_log(source: str) -> list: """Pull training audit log from CT 644.""" if source == "dev": remote_path = "/var/log/mc_training_audit_dev.jsonl" else: remote_path = "/var/log/mc_training_audit.jsonl" local_path = RAW_DIR / f"training_audit_{source}_latest.jsonl" try: result = subprocess.run( f'ssh pve112 "pct pull 644 {remote_path} /tmp/audit_{source}.jsonl" && ' f'scp pve112:/tmp/audit_{source}.jsonl {local_path}', shell=True, capture_output=True, text=True, timeout=30 ) if result.returncode != 0: print(f"Warning: could not pull {source} audit log: {result.stderr[:200]}") return [] except Exception as e: print(f"Warning: pull failed for {source}: {e}") return [] entries = [] with open(local_path) as f: for line in f: if line.strip(): try: entries.append(json.loads(line)) except json.JSONDecodeError: continue return entries # --- Conversion --- def convert_entry(entry: dict, idx: int, source_tag: str) -> dict: """Convert a training audit entry to dataset schema format.""" inp = entry.get("input", {}) out = entry.get("output", {}) mode = entry.get("mode", "sudo") player = entry.get("player", "unknown") commands = out.get("commands_executed", []) or out.get("commands_generated", []) message = out.get("message", "") example = { "id": f"audit-{source_tag}-{idx:04d}", "source": "sudo_log" if mode == "sudo" else "prayer_log", "category": entry.get("category", "command_gen"), "input": { "user_message": inp.get("user_message", ""), "server_context": inp.get("server_context", {}), }, "output": { "reasoning": f"Live {source_tag} interaction from {player} via {mode} mode.", "commands": commands, "message": message if mode in ("god", "god_system") else None, "safety_flags": [], }, "metadata": { "difficulty": "medium", "validated": False, "extracted_from": f"training audit log ({source_tag})", "scoring_mode": "soft" if mode in ("god", "god_system") else "strict", }, } return example # --- Main --- def main(): parser = argparse.ArgumentParser(description="Ingest training audit logs") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--source", choices=["dev", "prod", "both"], default="both") args = parser.parse_args() # Load existing messages to deduplicate existing_messages = set() if DATASET.exists(): with open(DATASET) as f: for line in f: if line.strip(): ex = json.loads(line) existing_messages.add(ex.get("input", {}).get("user_message", "")) # Pull data sources = ["dev", "prod"] if args.source == "both" else [args.source] all_entries = [] for src in sources: entries = pull_audit_log(src) print(f"Pulled {len(entries)} entries from {src}") all_entries.extend([(e, src) for e in entries]) # Filter and convert kept = [] dropped = {"duplicate": 0, "empty": 0, "lang_mismatch": 0, "prompt_leak": 0, "feedback": 0} for entry, src in all_entries: # Skip feedback entries (bug_log) if entry.get("source") == "player_feedback": dropped["feedback"] += 1 continue inp = entry.get("input", {}) out = entry.get("output", {}) user_msg = inp.get("user_message", "") message = out.get("message", "") commands = out.get("commands_executed", []) or out.get("commands_generated", []) # Skip duplicates if user_msg in existing_messages: dropped["duplicate"] += 1 continue # Skip empty (no commands AND no message) if not commands and not message: dropped["empty"] += 1 continue # Skip language mismatch (Chinese output for English input, etc.) if not languages_match(user_msg, message): dropped["lang_mismatch"] += 1 continue # Skip system prompt leaks if is_system_prompt_leak(message): dropped["prompt_leak"] += 1 continue example = convert_entry(entry, len(kept), src) kept.append(example) existing_messages.add(user_msg) # prevent within-batch dupes print(f"\nResults:") print(f" Total entries: {len(all_entries)}") print(f" Kept: {len(kept)}") print(f" Dropped:") for reason, count in sorted(dropped.items()): if count > 0: print(f" {reason:20} {count}") if args.dry_run: print(f"\n[DRY RUN] Would append {len(kept)} examples to {DATASET}") for ex in kept[:5]: msg = ex["input"]["user_message"][:60] cmds = len(ex["output"]["commands"]) print(f" {ex['id']}: {msg} [{cmds} cmds]") return if not kept: print("Nothing to add.") return # Append to dataset with open(DATASET, "a") as f: for ex in kept: f.write(json.dumps(ex, ensure_ascii=False) + "\n") print(f"\nAppended {len(kept)} examples to {DATASET}") # Validate result = subprocess.run( [sys.executable, str(ROOT / "data" / "validate_dataset.py"), str(DATASET)], capture_output=True, text=True ) print(result.stdout) if result.returncode != 0: print("WARNING: Validation failed!") print(result.stdout) if __name__ == "__main__": main()