"""Asset pool — manages generated images and audio on disk with severity tagging.""" import random import time import uuid from dataclasses import dataclass from pathlib import Path from threading import Lock from server.config import config @dataclass class Asset: """A generated asset with metadata.""" filename: str severity: float created_at: float asset_type: str # "image" or "audio" @property def url(self) -> str: subdir = "img" if self.asset_type == "image" else "audio" return f"/assets/{subdir}/{self.filename}" class AssetPool: """Thread-safe pool of generated assets with severity-based selection and rotation.""" def __init__( self, base_dir: str | None = None, max_images: int | None = None, max_audio: int | None = None, ): self.base_dir = Path(base_dir or config.assets_dir) self.max_images = max_images if max_images is not None else config.escalation.max_images self.max_audio = max_audio if max_audio is not None else config.escalation.max_audio_clips self._images: list[Asset] = [] self._audio: list[Asset] = [] self._lock = Lock() (self.base_dir / "img").mkdir(parents=True, exist_ok=True) (self.base_dir / "audio").mkdir(parents=True, exist_ok=True) @property def image_count(self) -> int: with self._lock: return len(self._images) @property def audio_count(self) -> int: with self._lock: return len(self._audio) def add_image(self, data: bytes, severity: float) -> str: """Save image data to disk and add to pool. Returns URL path.""" filename = f"{uuid.uuid4().hex[:12]}.png" path = self.base_dir / "img" / filename path.write_bytes(data) asset = Asset( filename=filename, severity=severity, created_at=time.monotonic(), asset_type="image", ) with self._lock: self._images.append(asset) self._rotate(self._images, self.max_images, "img") return asset.url def add_audio(self, data: bytes, severity: float) -> str: """Save audio data to disk and add to pool. Returns URL path.""" filename = f"{uuid.uuid4().hex[:12]}.wav" path = self.base_dir / "audio" / filename path.write_bytes(data) asset = Asset( filename=filename, severity=severity, created_at=time.monotonic(), asset_type="audio", ) with self._lock: self._audio.append(asset) self._rotate(self._audio, self.max_audio, "audio") return asset.url def select_image(self, target_severity: float) -> str | None: """Select an image near the target severity. Weighted random, biased toward close matches.""" with self._lock: return self._select(self._images, target_severity) def select_audio(self, target_severity: float) -> str | None: """Select an audio clip near the target severity.""" with self._lock: return self._select(self._audio, target_severity) def _select(self, assets: list[Asset], target: float) -> str | None: """Weighted selection: closer severity = higher weight.""" if not assets: return None weights = [] for a in assets: distance = abs(a.severity - target) weights.append(1.0 / (1.0 + distance)) chosen = random.choices(assets, weights=weights, k=1)[0] return chosen.url def _rotate(self, assets: list[Asset], max_count: int, subdir: str) -> None: """Remove oldest assets when pool exceeds capacity. Must hold lock.""" while len(assets) > max_count: old = assets.pop(0) path = self.base_dir / subdir / old.filename try: path.unlink(missing_ok=True) except OSError: pass def get_status(self) -> dict: with self._lock: return { "image_pool_size": len(self._images), "audio_pool_size": len(self._audio), }