diff --git a/server/streaming.py b/server/streaming.py new file mode 100644 index 0000000..43b4503 --- /dev/null +++ b/server/streaming.py @@ -0,0 +1,90 @@ +"""WebSocket broadcast manager for streaming horror to connected clients.""" + +import json +import random + +from fastapi import WebSocket + + +class StreamManager: + """Manages WebSocket clients and broadcasts horror events.""" + + TRANSITIONS = ["crossfade", "dissolve", "glitch_cut", "melt_morph"] + + def __init__(self): + self._clients: set[WebSocket] = set() + + @property + def client_count(self) -> int: + return len(self._clients) + + def add_client(self, ws: WebSocket) -> None: + self._clients.add(ws) + + def remove_client(self, ws: WebSocket) -> None: + self._clients.discard(ws) + + async def _broadcast(self, message: str) -> None: + """Send to all clients, remove dead ones.""" + dead: list[WebSocket] = [] + for ws in self._clients: + try: + await ws.send_text(message) + except Exception: + dead.append(ws) + for ws in dead: + self._clients.discard(ws) + + async def broadcast_phase(self, intensity: float, params: dict) -> None: + """Push a phase update with current intensity and rendering params.""" + msg = json.dumps({ + "type": "phase", + "intensity": round(intensity, 2), + "params": params, + }) + await self._broadcast(msg) + + async def broadcast_asset( + self, url: str, severity: float, transition: str | None = None, + ) -> None: + """Push a new image asset reference.""" + if transition is None: + transition = random.choice(self.TRANSITIONS) + msg = json.dumps({ + "type": "asset", + "url": url, + "severity": round(severity, 2), + "transition": transition, + }) + await self._broadcast(msg) + + async def broadcast_whisper( + self, url: str, pan: float, volume: float, reverb: float, + ) -> None: + """Push a whisper audio clip reference.""" + msg = json.dumps({ + "type": "whisper", + "url": url, + "pan": round(pan, 2), + "volume": round(volume, 2), + "reverb": round(reverb, 2), + }) + await self._broadcast(msg) + + async def broadcast_address(self, audio_b64: str, text: str) -> None: + """Push a direct address audio clip (base64-encoded WAV).""" + msg = json.dumps({ + "type": "address", + "audio": audio_b64, + "text": text, + }) + await self._broadcast(msg) + + async def broadcast_scare(self, effect: str, duration_ms: int) -> None: + """Push a scare event (face flash, white-out, inversion, etc.).""" + msg = json.dumps({ + "type": "scare", + "effect": effect, + "duration_ms": duration_ms, + }) + await self._broadcast(msg) diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 0000000..ee66d0c --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,99 @@ +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from server.streaming import StreamManager + + +@pytest.fixture +def manager(): + return StreamManager() + + +@pytest.fixture +def mock_ws(): + ws = AsyncMock() + ws.send_text = AsyncMock() + return ws + + +class TestClientManagement: + def test_add_client(self, manager, mock_ws): + """Adding a client increases count.""" + manager.add_client(mock_ws) + assert manager.client_count == 1 + + def test_remove_client(self, manager, mock_ws): + """Removing a client decreases count.""" + manager.add_client(mock_ws) + manager.remove_client(mock_ws) + assert manager.client_count == 0 + + def test_remove_missing_client(self, manager, mock_ws): + """Removing a non-existent client doesn't error.""" + manager.remove_client(mock_ws) + assert manager.client_count == 0 + + +class TestBroadcast: + @pytest.mark.asyncio + async def test_broadcast_phase(self, manager, mock_ws): + """Phase update is broadcast to all clients.""" + manager.add_client(mock_ws) + await manager.broadcast_phase( + intensity=2.4, + params={"morph_speed": 0.35, "shader_severity": 0.6, "palette": "crimson_void"}, + ) + mock_ws.send_text.assert_called_once() + msg = json.loads(mock_ws.send_text.call_args[0][0]) + assert msg["type"] == "phase" + assert msg["intensity"] == 2.4 + assert msg["params"]["morph_speed"] == 0.35 + + @pytest.mark.asyncio + async def test_broadcast_asset(self, manager, mock_ws): + """Asset notification is broadcast.""" + manager.add_client(mock_ws) + await manager.broadcast_asset( + url="/assets/img/abc.png", + severity=1.8, + transition="melt", + ) + msg = json.loads(mock_ws.send_text.call_args[0][0]) + assert msg["type"] == "asset" + assert msg["url"] == "/assets/img/abc.png" + + @pytest.mark.asyncio + async def test_broadcast_whisper(self, manager, mock_ws): + """Whisper notification is broadcast.""" + manager.add_client(mock_ws) + await manager.broadcast_whisper( + url="/assets/audio/w01.wav", + pan=-0.3, + volume=0.4, + reverb=0.7, + ) + msg = json.loads(mock_ws.send_text.call_args[0][0]) + assert msg["type"] == "whisper" + + @pytest.mark.asyncio + async def test_broadcast_scare(self, manager, mock_ws): + """Scare event is broadcast.""" + manager.add_client(mock_ws) + await manager.broadcast_scare(effect="face_flash", duration_ms=150) + msg = json.loads(mock_ws.send_text.call_args[0][0]) + assert msg["type"] == "scare" + assert msg["effect"] == "face_flash" + + @pytest.mark.asyncio + async def test_dead_client_cleanup(self, manager): + """Dead clients are removed on broadcast.""" + dead_ws = AsyncMock() + dead_ws.send_text = AsyncMock(side_effect=Exception("connection closed")) + manager.add_client(dead_ws) + assert manager.client_count == 1 + + await manager.broadcast_phase(intensity=1.0, params={}) + assert manager.client_count == 0