diff --git a/server/escalation.py b/server/escalation.py new file mode 100644 index 0000000..1e81f05 --- /dev/null +++ b/server/escalation.py @@ -0,0 +1,121 @@ +"""Escalation engine — intensity curve, phase params, stochastic timing.""" + +import math +import random +import time + +from server.config import config + + +class EscalationEngine: + """Computes intensity and rendering parameters from elapsed time. + + Intensity follows a logarithmic curve: intensity = log(1 + elapsed * rate). + Rendering parameters are derived from intensity and clamped to [0, 1]. + Timing intervals are randomized within ranges that shrink with intensity. + """ + + PALETTES = [ + "void_black", "crimson_void", "deep_rot", "sickly_green", + "bruise_purple", "ash_grey", "bile_yellow", "blood_orange", + ] + + def __init__(self, rate: float | None = None): + self.rate = rate if rate is not None else config.escalation.rate + self.session_start: float | None = None + + def start_session(self) -> None: + """Begin a new escalation session.""" + self.session_start = time.monotonic() + + def reset(self) -> None: + """Restart the session from zero.""" + self.session_start = time.monotonic() + + def get_elapsed(self) -> float: + """Seconds since session start.""" + if self.session_start is None: + return 0.0 + return time.monotonic() - self.session_start + + def get_intensity(self, elapsed: float | None = None) -> float: + """Compute intensity from elapsed seconds. Logarithmic, never peaks.""" + if elapsed is None: + elapsed = self.get_elapsed() + return math.log(1 + elapsed * self.rate) + + def get_phase_params(self, intensity: float | None = None) -> dict: + """Derive rendering parameters from current intensity. + + All values clamped to [0, 1]. Higher intensity = more severe effects. + """ + if intensity is None: + intensity = self.get_intensity() + + def _sigmoid(x: float, midpoint: float = 2.0, steepness: float = 1.5) -> float: + """Smooth 0→1 mapping centered at midpoint.""" + return 1.0 / (1.0 + math.exp(-steepness * (x - midpoint))) + + morph_speed = _sigmoid(intensity, midpoint=1.5, steepness=1.2) + shader_severity = _sigmoid(intensity, midpoint=2.0, steepness=1.0) + voice_frequency = _sigmoid(intensity, midpoint=2.5, steepness=0.8) + noise_level = _sigmoid(intensity, midpoint=3.0, steepness=1.0) + surprise_chance = min(1.0, _sigmoid(intensity, midpoint=3.5, steepness=0.6)) + + # Palette shifts to more aggressive colors at higher intensity + palette_index = min(int(intensity), len(self.PALETTES) - 1) + + return { + "morph_speed": round(morph_speed, 3), + "shader_severity": round(shader_severity, 3), + "voice_frequency": round(voice_frequency, 3), + "noise_level": round(noise_level, 3), + "surprise_chance": round(surprise_chance, 3), + "palette": self.PALETTES[palette_index], + } + + def get_asset_swap_interval(self, intensity: float | None = None) -> float: + """Random interval until next asset swap. Shrinks with intensity.""" + if intensity is None: + intensity = self.get_intensity() + cfg = config.escalation + # Lerp from max to min as intensity increases, with noise + t = min(1.0, intensity / 5.0) + base = cfg.asset_swap_max - t * (cfg.asset_swap_max - cfg.asset_swap_min) + jitter = random.uniform(-base * 0.3, base * 0.3) + return max(cfg.asset_swap_min, base + jitter) + + def get_voice_interval(self, intensity: float | None = None) -> float: + """Poisson-distributed interval until next voice clip. Mean decreases with intensity.""" + if intensity is None: + intensity = self.get_intensity() + cfg = config.escalation + # Mean interval decreases from voice_mean_interval toward 3s + mean = max(3.0, cfg.voice_mean_interval / (1 + intensity)) + return random.expovariate(1.0 / mean) + + def should_fake_calm(self) -> bool: + """Roll for a fake calm period.""" + return random.random() < config.escalation.fake_calm_chance + + def get_fake_calm_duration(self) -> float: + """Duration of a fake calm period.""" + cfg = config.escalation + return random.uniform(cfg.fake_calm_duration_min, cfg.fake_calm_duration_max) + + def should_cluster_burst(self) -> bool: + """Roll for a cluster burst (rapid-fire events).""" + return random.random() < config.escalation.cluster_burst_chance + + def get_cluster_burst_count(self) -> int: + """Number of events in a cluster burst.""" + cfg = config.escalation + return random.randint(cfg.cluster_burst_count_min, cfg.cluster_burst_count_max) + + def select_severity(self, intensity: float | None = None) -> float: + """Pick a target severity for the next asset. Biased toward current intensity.""" + if intensity is None: + intensity = self.get_intensity() + # Normal distribution centered on intensity, clipped to [0, intensity+1] + severity = random.gauss(intensity, 0.5) + return max(0.0, min(severity, intensity + 1.0)) diff --git a/tests/test_escalation.py b/tests/test_escalation.py new file mode 100644 index 0000000..4e30602 --- /dev/null +++ b/tests/test_escalation.py @@ -0,0 +1,116 @@ +import math +import time + +from server.escalation import EscalationEngine + + +class TestIntensityCurve: + def test_intensity_at_zero(self): + """Intensity is 0 at start.""" + engine = EscalationEngine(rate=0.05) + assert engine.get_intensity(elapsed=0.0) == 0.0 + + def test_intensity_at_40s(self): + """Intensity ~1.0 at 40s with default rate.""" + engine = EscalationEngine(rate=0.05) + intensity = engine.get_intensity(elapsed=40.0) + assert abs(intensity - math.log(1 + 40 * 0.05)) < 0.01 + + def test_intensity_monotonic(self): + """Intensity always increases.""" + engine = EscalationEngine(rate=0.05) + prev = 0.0 + for t in [10, 30, 60, 120, 300, 600, 1800]: + val = engine.get_intensity(elapsed=float(t)) + assert val > prev + prev = val + + def test_intensity_never_peaks(self): + """Even at 1 hour, intensity is still rising.""" + engine = EscalationEngine(rate=0.05) + at_30m = engine.get_intensity(elapsed=1800.0) + at_60m = engine.get_intensity(elapsed=3600.0) + assert at_60m > at_30m + + +class TestPhaseParams: + def test_low_intensity_params(self): + """Low intensity produces slow, subtle params.""" + engine = EscalationEngine(rate=0.05) + params = engine.get_phase_params(intensity=0.5) + assert params["morph_speed"] < 0.3 + assert params["shader_severity"] < 0.3 + + def test_high_intensity_params(self): + """High intensity produces fast, severe params.""" + engine = EscalationEngine(rate=0.05) + params = engine.get_phase_params(intensity=4.0) + assert params["morph_speed"] > 0.6 + assert params["shader_severity"] > 0.7 + + def test_params_are_clamped(self): + """Params stay in 0-1 range even at extreme intensity.""" + engine = EscalationEngine(rate=0.05) + params = engine.get_phase_params(intensity=100.0) + for key in ["morph_speed", "shader_severity", "voice_frequency", "noise_level"]: + assert 0.0 <= params[key] <= 1.0 + + +class TestAssetSwapInterval: + def test_low_intensity_slow_swaps(self): + """Low intensity means long intervals between swaps.""" + engine = EscalationEngine(rate=0.05) + interval = engine.get_asset_swap_interval(intensity=0.5) + assert interval >= 5.0 + + def test_high_intensity_fast_swaps(self): + """High intensity means short intervals.""" + engine = EscalationEngine(rate=0.05) + interval = engine.get_asset_swap_interval(intensity=5.0) + assert interval <= 4.0 + + def test_interval_has_randomness(self): + """Consecutive calls produce different intervals.""" + engine = EscalationEngine(rate=0.05) + intervals = [engine.get_asset_swap_interval(intensity=2.0) for _ in range(20)] + assert len(set(round(i, 2) for i in intervals)) > 1 + + +class TestVoiceInterval: + def test_low_intensity_rare_voices(self): + """Low intensity = voices are rare.""" + engine = EscalationEngine(rate=0.05) + # Run many trials since exponential distribution is stochastic + intervals = [engine.get_voice_interval(intensity=0.5) for _ in range(50)] + mean = sum(intervals) / len(intervals) + assert mean > 20.0 # Mean should be around 40 + + def test_high_intensity_frequent_voices(self): + """High intensity = voices are frequent.""" + engine = EscalationEngine(rate=0.05) + intervals = [engine.get_voice_interval(intensity=5.0) for _ in range(50)] + mean = sum(intervals) / len(intervals) + assert mean < 20.0 + + +class TestSessionTiming: + def test_start_sets_time(self): + """Starting a session records the start time.""" + engine = EscalationEngine(rate=0.05) + engine.start_session() + assert engine.session_start is not None + + def test_elapsed_time(self): + """Elapsed time increases after start.""" + engine = EscalationEngine(rate=0.05) + engine.start_session() + elapsed = engine.get_elapsed() + assert elapsed >= 0.0 + + def test_reset_clears_session(self): + """Reset restarts the session.""" + engine = EscalationEngine(rate=0.05) + engine.start_session() + time.sleep(0.01) + engine.reset() + assert engine.get_elapsed() < 0.1