131 lines
4.1 KiB
Python
131 lines
4.1 KiB
Python
"""Asset pool — manages generated images and audio on disk with severity tagging."""
|
|
|
|
import random
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from threading import Lock
|
|
|
|
from server.config import config
|
|
|
|
|
|
@dataclass
|
|
class Asset:
|
|
"""A generated asset with metadata."""
|
|
filename: str
|
|
severity: float
|
|
created_at: float
|
|
asset_type: str # "image" or "audio"
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
subdir = "img" if self.asset_type == "image" else "audio"
|
|
return f"/assets/{subdir}/{self.filename}"
|
|
|
|
|
|
class AssetPool:
|
|
"""Thread-safe pool of generated assets with severity-based selection and rotation."""
|
|
|
|
def __init__(
|
|
self,
|
|
base_dir: str | None = None,
|
|
max_images: int | None = None,
|
|
max_audio: int | None = None,
|
|
):
|
|
self.base_dir = Path(base_dir or config.assets_dir)
|
|
self.max_images = max_images if max_images is not None else config.escalation.max_images
|
|
self.max_audio = max_audio if max_audio is not None else config.escalation.max_audio_clips
|
|
self._images: list[Asset] = []
|
|
self._audio: list[Asset] = []
|
|
self._lock = Lock()
|
|
|
|
(self.base_dir / "img").mkdir(parents=True, exist_ok=True)
|
|
(self.base_dir / "audio").mkdir(parents=True, exist_ok=True)
|
|
|
|
@property
|
|
def image_count(self) -> int:
|
|
with self._lock:
|
|
return len(self._images)
|
|
|
|
@property
|
|
def audio_count(self) -> int:
|
|
with self._lock:
|
|
return len(self._audio)
|
|
|
|
def add_image(self, data: bytes, severity: float) -> str:
|
|
"""Save image data to disk and add to pool. Returns URL path."""
|
|
filename = f"{uuid.uuid4().hex[:12]}.png"
|
|
path = self.base_dir / "img" / filename
|
|
path.write_bytes(data)
|
|
|
|
asset = Asset(
|
|
filename=filename,
|
|
severity=severity,
|
|
created_at=time.monotonic(),
|
|
asset_type="image",
|
|
)
|
|
|
|
with self._lock:
|
|
self._images.append(asset)
|
|
self._rotate(self._images, self.max_images, "img")
|
|
|
|
return asset.url
|
|
|
|
def add_audio(self, data: bytes, severity: float) -> str:
|
|
"""Save audio data to disk and add to pool. Returns URL path."""
|
|
filename = f"{uuid.uuid4().hex[:12]}.wav"
|
|
path = self.base_dir / "audio" / filename
|
|
path.write_bytes(data)
|
|
|
|
asset = Asset(
|
|
filename=filename,
|
|
severity=severity,
|
|
created_at=time.monotonic(),
|
|
asset_type="audio",
|
|
)
|
|
|
|
with self._lock:
|
|
self._audio.append(asset)
|
|
self._rotate(self._audio, self.max_audio, "audio")
|
|
|
|
return asset.url
|
|
|
|
def select_image(self, target_severity: float) -> str | None:
|
|
"""Select an image near the target severity. Weighted random, biased toward close matches."""
|
|
with self._lock:
|
|
return self._select(self._images, target_severity)
|
|
|
|
def select_audio(self, target_severity: float) -> str | None:
|
|
"""Select an audio clip near the target severity."""
|
|
with self._lock:
|
|
return self._select(self._audio, target_severity)
|
|
|
|
def _select(self, assets: list[Asset], target: float) -> str | None:
|
|
"""Weighted selection: closer severity = higher weight."""
|
|
if not assets:
|
|
return None
|
|
weights = []
|
|
for a in assets:
|
|
distance = abs(a.severity - target)
|
|
weights.append(1.0 / (1.0 + distance))
|
|
chosen = random.choices(assets, weights=weights, k=1)[0]
|
|
return chosen.url
|
|
|
|
def _rotate(self, assets: list[Asset], max_count: int, subdir: str) -> None:
|
|
"""Remove oldest assets when pool exceeds capacity. Must hold lock."""
|
|
while len(assets) > max_count:
|
|
old = assets.pop(0)
|
|
path = self.base_dir / subdir / old.filename
|
|
try:
|
|
path.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
|
|
def get_status(self) -> dict:
|
|
with self._lock:
|
|
return {
|
|
"image_pool_size": len(self._images),
|
|
"audio_pool_size": len(self._audio),
|
|
}
|