"""Diagnostic variant: patch-mode harness with aggressively truncated tool responses. Identical to harness_patch.py except every tool result is truncated to 800 chars before being sent back to the model. Tests the hypothesis that 26B's silent-stop is triggered by accumulated large tool-response context (full pytest output is ~4-6KB; this caps it at 800 chars to match what the log stores). """ from __future__ import annotations import json import os import shutil import subprocess import sys import time from pathlib import Path from urllib import request as urlreq OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "http://127.0.0.1:11434") MAX_ITERATIONS = 15 BASH_TIMEOUT_S = 30 REQUEST_TIMEOUT_S = 540 TOOL_RESULT_CAP = int(os.environ.get("TOOL_RESULT_CAP", "800")) # override via env SYSTEM_PROMPT = """You are a terminal coding agent. ## What you do - Read source and test files to understand the code - Make targeted edits to fix bugs so the tests pass - Run pytest to verify your fix - Stop once all tests pass and reply with a one-sentence summary ## What you do NOT do - Never modify files under tests/ - Never disable, skip, or delete tests - Never write outside the working directory - Never call tools after all tests pass — just reply with the summary and stop ## Available tools - read_file(path): read a file relative to the working directory - apply_patch(path, old_text, new_text): replace an exact unique text span in a file - run_bash(command): run a shell command in the working directory ## Rules - Start by reading README.md - Prefer minimal edits. Do not refactor unrelated code. - Run the full test suite after each edit to verify. - apply_patch requires old_text to appear EXACTLY ONCE in the file; include enough surrounding context to make it unique. """ USER_PROMPT = "Make the failing tests pass. Begin." TOOLS = [ { "type": "function", "function": { "name": "read_file", "description": "Read a file. Path is relative to the working directory.", "parameters": { "type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "apply_patch", "description": ( "Replace a unique span of text in a file. old_text must appear exactly once. " "Include surrounding context if needed to make the match unique." ), "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}, }, "required": ["path", "old_text", "new_text"], }, }, }, { "type": "function", "function": { "name": "run_bash", "description": "Run a shell command in the working directory. Returns stdout, stderr, and exit code.", "parameters": { "type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"], }, }, }, ] def safe_path(workdir: Path, rel: str) -> Path: p = (workdir / rel).resolve() if not str(p).startswith(str(workdir.resolve())): raise ValueError(f"path escapes workdir: {rel}") return p def tool_read_file(workdir: Path, args: dict) -> str: p = safe_path(workdir, args["path"]) if not p.exists(): return f"ERROR: {args['path']} does not exist" return p.read_text() def tool_apply_patch(workdir: Path, args: dict) -> str: p = safe_path(workdir, args["path"]) if not p.exists(): return f"ERROR: {args['path']} does not exist" old = args["old_text"] new = args["new_text"] text = p.read_text() occurrences = text.count(old) if occurrences == 0: return ( f"ERROR: old_text not found in {args['path']}. " "Re-read the file and copy the exact text (whitespace matters)." ) if occurrences > 1: return ( f"ERROR: old_text appears {occurrences} times in {args['path']}. " "Include more surrounding context so it matches exactly once." ) p.write_text(text.replace(old, new, 1)) return f"patched {args['path']} (replaced {len(old)} chars with {len(new)} chars)" def tool_run_bash(workdir: Path, args: dict) -> str: try: r = subprocess.run( ["bash", "-c", args["command"]], cwd=workdir, capture_output=True, text=True, timeout=BASH_TIMEOUT_S, ) except subprocess.TimeoutExpired: return f"ERROR: command timed out after {BASH_TIMEOUT_S}s" head = ( f"exit={r.returncode}\n" f"--- stdout ---\n{r.stdout[-4000:]}\n" f"--- stderr ---\n{r.stderr[-2000:]}" ) return head TOOL_DISPATCH = { "read_file": tool_read_file, "apply_patch": tool_apply_patch, "run_bash": tool_run_bash, } def ollama_chat(model: str, messages: list) -> dict: payload = { "model": model, "messages": messages, "tools": TOOLS, "stream": False, "think": False, "keep_alive": "10m", "options": { "num_ctx": 32768, "num_predict": 4096, "temperature": 0.3, }, } data = json.dumps(payload).encode() req = urlreq.Request( f"{OLLAMA_HOST}/api/chat", data=data, headers={"Content-Type": "application/json"}, ) with urlreq.urlopen(req, timeout=REQUEST_TIMEOUT_S) as resp: return json.loads(resp.read()) def pytest_passes(workdir: Path) -> bool: r = subprocess.run( ["python3", "-m", "pytest", "tests/", "-q"], cwd=workdir, capture_output=True, text=True, timeout=60, ) return r.returncode == 0 def run_bakeoff(model: str, workdir: Path, log_path: Path) -> dict: log_path.parent.mkdir(parents=True, exist_ok=True) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": USER_PROMPT}, ] trace = { "model": model, "edit_tool": "apply_patch", "tool_result_cap": TOOL_RESULT_CAP, "workdir": str(workdir), "started_at": time.time(), "turns": [], "final": None, } tool_call_counts = {"read_file": 0, "apply_patch": 0, "run_bash": 0} halt_reason = None for iteration in range(1, MAX_ITERATIONS + 1): turn_start = time.time() try: response = ollama_chat(model, messages) except Exception as e: halt_reason = f"chat_error: {e}" trace["turns"].append( {"iteration": iteration, "error": str(e), "elapsed_s": time.time() - turn_start} ) break assistant_msg = response.get("message", {}) content = assistant_msg.get("content", "") or "" tool_calls = assistant_msg.get("tool_calls", []) or [] turn = { "iteration": iteration, "elapsed_s": round(time.time() - turn_start, 2), "content": content, "tool_calls": [], "prompt_eval_count": response.get("prompt_eval_count"), "eval_count": response.get("eval_count"), } messages.append({"role": "assistant", "content": content, "tool_calls": tool_calls}) if not tool_calls: trace["turns"].append(turn) halt_reason = "no_tool_calls" break for tc in tool_calls: fn = tc.get("function", {}) name = fn.get("name") args = fn.get("arguments") or {} if isinstance(args, str): try: args = json.loads(args) except Exception: args = {"_raw": args} if name not in TOOL_DISPATCH: result = f"ERROR: unknown tool {name}" else: try: result = TOOL_DISPATCH[name](workdir, args) tool_call_counts[name] = tool_call_counts.get(name, 0) + 1 except Exception as e: result = f"ERROR: {e}" # THE HYPOTHESIS TEST: cap tool result before sending back to model result_sent = result[:TOOL_RESULT_CAP] turn["tool_calls"].append({"name": name, "arguments": args, "result": result_sent[:800]}) messages.append({"role": "tool", "content": result_sent}) trace["turns"].append(turn) if iteration == MAX_ITERATIONS: halt_reason = "iteration_cap" break final_pass = pytest_passes(workdir) trace["final"] = { "halt_reason": halt_reason, "tests_pass": final_pass, "iterations_used": len(trace["turns"]), "tool_call_counts": tool_call_counts, "wall_clock_s": round(time.time() - trace["started_at"], 2), } log_path.write_text(json.dumps(trace, indent=2, default=str)) return trace def main(): if len(sys.argv) != 4: print(__doc__, file=sys.stderr) sys.exit(2) model, workdir_s, log_s = sys.argv[1], sys.argv[2], sys.argv[3] workdir = Path(workdir_s).resolve() log_path = Path(log_s).resolve() seed = Path(__file__).parent / "task_seed" if workdir.exists(): shutil.rmtree(workdir) shutil.copytree(seed, workdir) result = run_bakeoff(model, workdir, log_path) final = result["final"] print( f"model={model} pass={final['tests_pass']} " f"iters={final['iterations_used']} " f"read={final['tool_call_counts'].get('read_file', 0)} " f"patch={final['tool_call_counts'].get('apply_patch', 0)} " f"bash={final['tool_call_counts'].get('run_bash', 0)} " f"halt={final['halt_reason']} " f"wall={final['wall_clock_s']}s" ) if __name__ == "__main__": main()