Add audit log ingestion pipeline with language/leak filtering
data/ingest_audit.py: - Pulls training audit logs from CT 644 (dev + prod) - Filters: language mismatch (Chinese output for English input), system prompt leaks, empty responses, duplicates - Keeps multilingual examples where input/output languages match - Converts to dataset schema, appends to seed_dataset.jsonl - --dry-run to preview, --source dev/prod/both Tested: 237 entries → 112 kept (16 lang mismatch, 10 prompt leak, 86 dupe, 13 empty dropped) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,272 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ingest_audit.py — Pull training audit logs from CT 644, filter, and merge into dataset.
|
||||
|
||||
Filters:
|
||||
- Drops examples where output language doesn't match input language
|
||||
- Drops empty responses (no commands AND no message)
|
||||
- Drops duplicate user_messages already in the dataset
|
||||
- Keeps multilingual examples where input/output languages match
|
||||
- Tags all as validated=false, needs human review
|
||||
|
||||
Usage:
|
||||
python3 data/ingest_audit.py # pull, filter, merge
|
||||
python3 data/ingest_audit.py --dry-run # show what would be added
|
||||
python3 data/ingest_audit.py --source dev # only dev server
|
||||
python3 data/ingest_audit.py --source prod # only prod server
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
DATASET = ROOT / "data" / "processed" / "seed_dataset.jsonl"
|
||||
RAW_DIR = ROOT / "data" / "raw"
|
||||
|
||||
|
||||
# --- Language detection (simple, no dependencies) ---
|
||||
|
||||
def has_cjk(text: str) -> bool:
|
||||
"""Check if text contains CJK (Chinese/Japanese/Korean) characters."""
|
||||
return bool(re.search(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text))
|
||||
|
||||
|
||||
def has_cyrillic(text: str) -> bool:
|
||||
return bool(re.search(r'[\u0400-\u04ff]', text))
|
||||
|
||||
|
||||
def has_arabic(text: str) -> bool:
|
||||
return bool(re.search(r'[\u0600-\u06ff]', text))
|
||||
|
||||
|
||||
def detect_script(text: str) -> str:
|
||||
"""Detect the dominant script of a text. Returns 'latin', 'cjk', 'cyrillic', 'arabic', or 'mixed'."""
|
||||
if not text or len(text.strip()) < 3:
|
||||
return "latin"
|
||||
# Count character types
|
||||
latin = len(re.findall(r'[a-zA-Z]', text))
|
||||
cjk = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text))
|
||||
cyrillic = len(re.findall(r'[\u0400-\u04ff]', text))
|
||||
arabic = len(re.findall(r'[\u0600-\u06ff]', text))
|
||||
|
||||
total = latin + cjk + cyrillic + arabic
|
||||
if total == 0:
|
||||
return "latin"
|
||||
|
||||
if cjk / total > 0.3:
|
||||
return "cjk"
|
||||
if cyrillic / total > 0.3:
|
||||
return "cyrillic"
|
||||
if arabic / total > 0.3:
|
||||
return "arabic"
|
||||
return "latin"
|
||||
|
||||
|
||||
def languages_match(input_text: str, output_message: str) -> bool:
|
||||
"""Check if input and output are in the same script family.
|
||||
Commands are always English/latin, so we only check the message field."""
|
||||
if not output_message or len(output_message.strip()) < 3:
|
||||
return True # no message to check
|
||||
input_script = detect_script(input_text)
|
||||
output_script = detect_script(output_message)
|
||||
return input_script == output_script
|
||||
|
||||
|
||||
def is_system_prompt_leak(message: str) -> bool:
|
||||
"""Detect if the model leaked its system prompt as the God message."""
|
||||
if not message:
|
||||
return False
|
||||
leak_patterns = [
|
||||
"you are a Minecraft",
|
||||
"command translator",
|
||||
"Return ONLY JSON",
|
||||
"SYNTAX RULES",
|
||||
"RISK GRADIENT",
|
||||
"permission level",
|
||||
"CRITICAL RULES",
|
||||
]
|
||||
lower = message.lower()
|
||||
return sum(1 for p in leak_patterns if p.lower() in lower) >= 2
|
||||
|
||||
|
||||
# --- Remote data pulling ---
|
||||
|
||||
def pull_audit_log(source: str) -> list:
|
||||
"""Pull training audit log from CT 644."""
|
||||
if source == "dev":
|
||||
remote_path = "/var/log/mc_training_audit_dev.jsonl"
|
||||
else:
|
||||
remote_path = "/var/log/mc_training_audit.jsonl"
|
||||
|
||||
local_path = RAW_DIR / f"training_audit_{source}_latest.jsonl"
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
f'ssh pve112 "pct pull 644 {remote_path} /tmp/audit_{source}.jsonl" && '
|
||||
f'scp pve112:/tmp/audit_{source}.jsonl {local_path}',
|
||||
shell=True, capture_output=True, text=True, timeout=30
|
||||
)
|
||||
if result.returncode != 0:
|
||||
print(f"Warning: could not pull {source} audit log: {result.stderr[:200]}")
|
||||
return []
|
||||
except Exception as e:
|
||||
print(f"Warning: pull failed for {source}: {e}")
|
||||
return []
|
||||
|
||||
entries = []
|
||||
with open(local_path) as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return entries
|
||||
|
||||
|
||||
# --- Conversion ---
|
||||
|
||||
def convert_entry(entry: dict, idx: int, source_tag: str) -> dict:
|
||||
"""Convert a training audit entry to dataset schema format."""
|
||||
inp = entry.get("input", {})
|
||||
out = entry.get("output", {})
|
||||
mode = entry.get("mode", "sudo")
|
||||
player = entry.get("player", "unknown")
|
||||
|
||||
commands = out.get("commands_executed", []) or out.get("commands_generated", [])
|
||||
message = out.get("message", "")
|
||||
|
||||
example = {
|
||||
"id": f"audit-{source_tag}-{idx:04d}",
|
||||
"source": "sudo_log" if mode == "sudo" else "prayer_log",
|
||||
"category": entry.get("category", "command_gen"),
|
||||
"input": {
|
||||
"user_message": inp.get("user_message", ""),
|
||||
"server_context": inp.get("server_context", {}),
|
||||
},
|
||||
"output": {
|
||||
"reasoning": f"Live {source_tag} interaction from {player} via {mode} mode.",
|
||||
"commands": commands,
|
||||
"message": message if mode in ("god", "god_system") else None,
|
||||
"safety_flags": [],
|
||||
},
|
||||
"metadata": {
|
||||
"difficulty": "medium",
|
||||
"validated": False,
|
||||
"extracted_from": f"training audit log ({source_tag})",
|
||||
"scoring_mode": "soft" if mode in ("god", "god_system") else "strict",
|
||||
},
|
||||
}
|
||||
|
||||
return example
|
||||
|
||||
|
||||
# --- Main ---
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Ingest training audit logs")
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--source", choices=["dev", "prod", "both"], default="both")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load existing messages to deduplicate
|
||||
existing_messages = set()
|
||||
if DATASET.exists():
|
||||
with open(DATASET) as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
ex = json.loads(line)
|
||||
existing_messages.add(ex.get("input", {}).get("user_message", ""))
|
||||
|
||||
# Pull data
|
||||
sources = ["dev", "prod"] if args.source == "both" else [args.source]
|
||||
all_entries = []
|
||||
for src in sources:
|
||||
entries = pull_audit_log(src)
|
||||
print(f"Pulled {len(entries)} entries from {src}")
|
||||
all_entries.extend([(e, src) for e in entries])
|
||||
|
||||
# Filter and convert
|
||||
kept = []
|
||||
dropped = {"duplicate": 0, "empty": 0, "lang_mismatch": 0, "prompt_leak": 0, "feedback": 0}
|
||||
|
||||
for entry, src in all_entries:
|
||||
# Skip feedback entries (bug_log)
|
||||
if entry.get("source") == "player_feedback":
|
||||
dropped["feedback"] += 1
|
||||
continue
|
||||
|
||||
inp = entry.get("input", {})
|
||||
out = entry.get("output", {})
|
||||
user_msg = inp.get("user_message", "")
|
||||
message = out.get("message", "")
|
||||
commands = out.get("commands_executed", []) or out.get("commands_generated", [])
|
||||
|
||||
# Skip duplicates
|
||||
if user_msg in existing_messages:
|
||||
dropped["duplicate"] += 1
|
||||
continue
|
||||
|
||||
# Skip empty (no commands AND no message)
|
||||
if not commands and not message:
|
||||
dropped["empty"] += 1
|
||||
continue
|
||||
|
||||
# Skip language mismatch (Chinese output for English input, etc.)
|
||||
if not languages_match(user_msg, message):
|
||||
dropped["lang_mismatch"] += 1
|
||||
continue
|
||||
|
||||
# Skip system prompt leaks
|
||||
if is_system_prompt_leak(message):
|
||||
dropped["prompt_leak"] += 1
|
||||
continue
|
||||
|
||||
example = convert_entry(entry, len(kept), src)
|
||||
kept.append(example)
|
||||
existing_messages.add(user_msg) # prevent within-batch dupes
|
||||
|
||||
print(f"\nResults:")
|
||||
print(f" Total entries: {len(all_entries)}")
|
||||
print(f" Kept: {len(kept)}")
|
||||
print(f" Dropped:")
|
||||
for reason, count in sorted(dropped.items()):
|
||||
if count > 0:
|
||||
print(f" {reason:20} {count}")
|
||||
|
||||
if args.dry_run:
|
||||
print(f"\n[DRY RUN] Would append {len(kept)} examples to {DATASET}")
|
||||
for ex in kept[:5]:
|
||||
msg = ex["input"]["user_message"][:60]
|
||||
cmds = len(ex["output"]["commands"])
|
||||
print(f" {ex['id']}: {msg} [{cmds} cmds]")
|
||||
return
|
||||
|
||||
if not kept:
|
||||
print("Nothing to add.")
|
||||
return
|
||||
|
||||
# Append to dataset
|
||||
with open(DATASET, "a") as f:
|
||||
for ex in kept:
|
||||
f.write(json.dumps(ex, ensure_ascii=False) + "\n")
|
||||
|
||||
print(f"\nAppended {len(kept)} examples to {DATASET}")
|
||||
|
||||
# Validate
|
||||
result = subprocess.run(
|
||||
[sys.executable, str(ROOT / "data" / "validate_dataset.py"), str(DATASET)],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
print(result.stdout)
|
||||
if result.returncode != 0:
|
||||
print("WARNING: Validation failed!")
|
||||
print(result.stdout)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user