#!/usr/bin/env python3 """ pretrain_policy.py — Give the RL policy a head start via behavioral cloning. Generates synthetic expert demonstrations from our hand-coded survival rules, then trains the policy network to imitate them. The resulting weights become the starting point for PPO (instead of random initialization). Usage: python3 training/rl/pretrain_policy.py # Then run train_combat.py — it will load the pretrained checkpoint """ import numpy as np import torch import torch.nn as nn from pathlib import Path from stable_baselines3 import PPO from gymnasium import spaces ROOT = Path(__file__).resolve().parent.parent.parent CKPT_DIR = ROOT / "training" / "rl" / "checkpoints" # Actions: 0=forward, 1=fight, 2=flee, 3=eat, 4=sprint, 5=idle # Obs: [hp, food, nearest_mob_dist, nearest_mob_angle, mob_count, # has_sword, has_armor, has_food, is_day, on_water, # y_level_norm, damage_taken, is_fleeing] def expert_action(obs): """Hand-coded expert policy — the survival rules we discovered tonight.""" hp = obs[0] # 0-1 (0=dead, 1=full) food = obs[1] # 0-1 mob_dist = obs[2] # 0-1 (0=right here, 1=24+ blocks) mob_count = obs[4] # 0-1 (0=none, 1=10+) has_sword = obs[5] # 0 or 1 has_food = obs[7] # 0 or 1 damage = obs[11] # 0-1 is_fleeing = obs[12] # 0 or 1 # PRIORITY 1: Flee if critical HP if hp < 0.25: # < 5 HP if has_food and food < 0.7: return 3 # eat return 2 # flee # PRIORITY 2: Flee if overwhelmed (3+ mobs and not full HP) if mob_count > 0.3 and hp < 0.6: return 2 # flee # PRIORITY 3: Eat if hungry and have food if food < 0.7 and has_food and hp < 0.8: return 3 # eat # PRIORITY 4: Fight if mob nearby and have sword if mob_dist < 0.25 and has_sword: # < 6 blocks return 1 # fight # PRIORITY 5: Approach mob if nearby but not in melee if mob_dist < 0.5 and has_sword: # < 12 blocks return 0 # forward (approach) # PRIORITY 6: Sprint if taking damage (dodge) if damage > 0: return 4 # sprint # PRIORITY 7: Explore if mob_dist > 0.8: # no mobs nearby return 0 # forward # Default: idle return 5 def generate_expert_data(n_samples=50000): """Generate diverse observations and expert actions.""" obs_list = [] act_list = [] for _ in range(n_samples): # Random observation (covering the full state space) obs = np.zeros(13, dtype=np.float32) obs[0] = np.random.beta(2, 1) # hp: skew toward higher obs[1] = np.random.beta(2, 1) # food: skew toward higher obs[2] = np.random.uniform(0, 1) # mob distance obs[3] = np.random.uniform(0, 1) # mob angle obs[4] = np.random.beta(1, 3) # mob count: skew toward fewer obs[5] = float(np.random.random() > 0.3) # has_sword: 70% chance obs[6] = float(np.random.random() > 0.4) # has_armor: 60% chance obs[7] = float(np.random.random() > 0.3) # has_food: 70% chance obs[8] = float(np.random.random() > 0.4) # is_day: 60% chance obs[9] = float(np.random.random() > 0.85) # on_water: 15% chance obs[10] = np.random.uniform(0.15, 0.3) # y_level: surface range obs[11] = np.random.beta(1, 5) # damage: skew toward low obs[12] = float(obs[0] < 0.25) # is_fleeing action = expert_action(obs) obs_list.append(obs) act_list.append(action) return np.array(obs_list), np.array(act_list) def pretrain(): print("Generating 50,000 expert demonstrations...") obs_data, act_data = generate_expert_data(50000) # Show action distribution unique, counts = np.unique(act_data, return_counts=True) action_names = ["forward", "fight", "flee", "eat", "sprint", "idle"] print("\nExpert action distribution:") for a, c in zip(unique, counts): print(f" {action_names[a]:10} {c:6} ({c/len(act_data)*100:.1f}%)") # Create a PPO model with the same architecture import gymnasium as gym class DummyMCEnv(gym.Env): metadata = {"render_modes": []} def __init__(self): self.observation_space = spaces.Box(low=0, high=1, shape=(13,), dtype=np.float32) self.action_space = spaces.Discrete(6) def reset(self, **kw): return np.zeros(13, dtype=np.float32), {} def step(self, a): return np.zeros(13, dtype=np.float32), 0, True, False, {} dummy_env = DummyMCEnv() model = PPO( "MlpPolicy", dummy_env, verbose=0, policy_kwargs={"net_arch": [64, 64]}, ) # Extract the policy network and train via supervised learning policy = model.policy optimizer = torch.optim.Adam(policy.parameters(), lr=1e-3) criterion = nn.CrossEntropyLoss() obs_tensor = torch.FloatTensor(obs_data) act_tensor = torch.LongTensor(act_data) print(f"\nPretraining policy ({sum(p.numel() for p in policy.parameters()):,} params)...") batch_size = 256 n_epochs = 20 for epoch in range(n_epochs): # Shuffle perm = torch.randperm(len(obs_tensor)) total_loss = 0 correct = 0 n_batches = 0 for i in range(0, len(obs_tensor), batch_size): idx = perm[i:i+batch_size] batch_obs = obs_tensor[idx] batch_act = act_tensor[idx] # Forward through policy network features = policy.extract_features(batch_obs, policy.pi_features_extractor) latent = policy.mlp_extractor.forward_actor(features) logits = policy.action_net(latent) loss = criterion(logits, batch_act) optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() correct += (logits.argmax(dim=1) == batch_act).sum().item() n_batches += 1 accuracy = correct / len(obs_tensor) * 100 avg_loss = total_loss / n_batches print(f" Epoch {epoch+1:2d}/{n_epochs}: loss={avg_loss:.4f} accuracy={accuracy:.1f}%") # Save the pretrained model CKPT_DIR.mkdir(parents=True, exist_ok=True) save_path = CKPT_DIR / "combat_ppo_pretrained.zip" model.save(str(save_path)) print(f"\nPretrained model saved to {save_path}") print("PPO will resume from this checkpoint and improve via RL.") if __name__ == "__main__": pretrain()