""" Log Query — focused queries against the server event log. Replaces the 200-line log dump with specific, targeted queries. Reads from the existing recent_log buffer in mc_aigod. Query types: chat — recent chat messages (optionally filtered by player) deaths — recent death events joins — recent join/leave events actions — recent commands/interactions all — recent events of any type Usage: from agent.tools.log_query import handle_log_query result = handle_log_query(recent_log_buffer, { "type": "chat", "player": "TheBigBoss", "limit": 5, }) """ import re from typing import Any, Dict, List, Optional from collections import deque # Patterns for classifying log events CHAT_PATTERN = re.compile(r'<(\w+)>\s*(.+)') DEATH_PATTERNS = [ re.compile(r'(\w+) (fell from a high place|hit the ground too hard|was slain by \w+|was shot by \w+|drowned|tried to swim in lava|burned to death|went up in flames|blew up|was blown up by \w+|suffocated|starved to death|was killed by \w+|was pricked to death|withered away|fell out of the world)'), ] JOIN_PATTERN = re.compile(r'(\w+) joined the game') LEAVE_PATTERN = re.compile(r'(\w+) left the game') ADVANCEMENT_PATTERN = re.compile(r'(\w+) has made the advancement \[(.+?)\]') COMMAND_PATTERN = re.compile(r'(\w+) issued server command: /(.+)') def classify_event(text: str) -> tuple: """Classify a log line into (type, player, detail).""" # Strip color codes and log prefix clean = re.sub(r'\xa7.', '', text) # Strip timestamp/thread prefix m = re.search(r'INFO\]: (.+)$', clean) if m: clean = m.group(1).strip() # Chat cm = CHAT_PATTERN.match(clean) if cm: return ("chat", cm.group(1), cm.group(2)) # Deaths for dp in DEATH_PATTERNS: dm = dp.search(clean) if dm: return ("death", dm.group(1), dm.group(0)) # Joins jm = JOIN_PATTERN.search(clean) if jm: return ("join", jm.group(1), f"{jm.group(1)} joined") # Leaves lm = LEAVE_PATTERN.search(clean) if lm: return ("leave", lm.group(1), f"{lm.group(1)} left") # Advancements am = ADVANCEMENT_PATTERN.search(clean) if am: return ("advancement", am.group(1), f"{am.group(1)} earned [{am.group(2)}]") # Commands com = COMMAND_PATTERN.search(clean) if com: return ("command", com.group(1), f"{com.group(1)}: /{com.group(2)}") return ("other", "", clean) def query_log(recent_log: list, query_type: str = "all", player: str = None, limit: int = 5) -> Dict[str, Any]: """ Query the log buffer for specific events. Args: recent_log: list of (timestamp_float, log_line_str) tuples query_type: chat, deaths, joins, actions, all player: optional player name filter limit: max results (default 5) Returns: {ok, results: [{type, player, detail, age_seconds}], count} """ type_map = { "chat": {"chat"}, "deaths": {"death"}, "joins": {"join", "leave"}, "actions": {"command", "advancement"}, "all": {"chat", "death", "join", "leave", "command", "advancement", "other"}, } allowed_types = type_map.get(query_type, type_map["all"]) results = [] import time now = time.time() # Iterate newest first for entry in reversed(list(recent_log)): if isinstance(entry, tuple) and len(entry) == 2: ts, line = entry else: continue event_type, event_player, detail = classify_event(line) if event_type not in allowed_types: continue if player and event_player.lower() != player.lower(): continue age = int(now - ts) age_str = f"{age}s ago" if age < 60 else f"{age//60}m ago" if age < 3600 else f"{age//3600}h ago" results.append({ "type": event_type, "player": event_player, "detail": detail, "age": age_str, }) if len(results) >= limit: break return { "ok": True, "results": results, "count": len(results), "query": {"type": query_type, "player": player, "limit": limit}, } def handle_log_query(recent_log, arguments: dict) -> Dict[str, Any]: """Tool handler for log.query calls.""" return query_log( recent_log=recent_log, query_type=arguments.get("type", "all"), player=arguments.get("player"), limit=int(arguments.get("limit", 5)), )