feat: add asset pool with severity tagging and rotation
This commit is contained in:
@@ -0,0 +1,130 @@
|
||||
"""Asset pool — manages generated images and audio on disk with severity tagging."""
|
||||
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
|
||||
from server.config import config
|
||||
|
||||
|
||||
@dataclass
|
||||
class Asset:
|
||||
"""A generated asset with metadata."""
|
||||
filename: str
|
||||
severity: float
|
||||
created_at: float
|
||||
asset_type: str # "image" or "audio"
|
||||
|
||||
@property
|
||||
def url(self) -> str:
|
||||
subdir = "img" if self.asset_type == "image" else "audio"
|
||||
return f"/assets/{subdir}/{self.filename}"
|
||||
|
||||
|
||||
class AssetPool:
|
||||
"""Thread-safe pool of generated assets with severity-based selection and rotation."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_dir: str | None = None,
|
||||
max_images: int | None = None,
|
||||
max_audio: int | None = None,
|
||||
):
|
||||
self.base_dir = Path(base_dir or config.assets_dir)
|
||||
self.max_images = max_images if max_images is not None else config.escalation.max_images
|
||||
self.max_audio = max_audio if max_audio is not None else config.escalation.max_audio_clips
|
||||
self._images: list[Asset] = []
|
||||
self._audio: list[Asset] = []
|
||||
self._lock = Lock()
|
||||
|
||||
(self.base_dir / "img").mkdir(parents=True, exist_ok=True)
|
||||
(self.base_dir / "audio").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@property
|
||||
def image_count(self) -> int:
|
||||
with self._lock:
|
||||
return len(self._images)
|
||||
|
||||
@property
|
||||
def audio_count(self) -> int:
|
||||
with self._lock:
|
||||
return len(self._audio)
|
||||
|
||||
def add_image(self, data: bytes, severity: float) -> str:
|
||||
"""Save image data to disk and add to pool. Returns URL path."""
|
||||
filename = f"{uuid.uuid4().hex[:12]}.png"
|
||||
path = self.base_dir / "img" / filename
|
||||
path.write_bytes(data)
|
||||
|
||||
asset = Asset(
|
||||
filename=filename,
|
||||
severity=severity,
|
||||
created_at=time.monotonic(),
|
||||
asset_type="image",
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._images.append(asset)
|
||||
self._rotate(self._images, self.max_images, "img")
|
||||
|
||||
return asset.url
|
||||
|
||||
def add_audio(self, data: bytes, severity: float) -> str:
|
||||
"""Save audio data to disk and add to pool. Returns URL path."""
|
||||
filename = f"{uuid.uuid4().hex[:12]}.wav"
|
||||
path = self.base_dir / "audio" / filename
|
||||
path.write_bytes(data)
|
||||
|
||||
asset = Asset(
|
||||
filename=filename,
|
||||
severity=severity,
|
||||
created_at=time.monotonic(),
|
||||
asset_type="audio",
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._audio.append(asset)
|
||||
self._rotate(self._audio, self.max_audio, "audio")
|
||||
|
||||
return asset.url
|
||||
|
||||
def select_image(self, target_severity: float) -> str | None:
|
||||
"""Select an image near the target severity. Weighted random, biased toward close matches."""
|
||||
with self._lock:
|
||||
return self._select(self._images, target_severity)
|
||||
|
||||
def select_audio(self, target_severity: float) -> str | None:
|
||||
"""Select an audio clip near the target severity."""
|
||||
with self._lock:
|
||||
return self._select(self._audio, target_severity)
|
||||
|
||||
def _select(self, assets: list[Asset], target: float) -> str | None:
|
||||
"""Weighted selection: closer severity = higher weight."""
|
||||
if not assets:
|
||||
return None
|
||||
weights = []
|
||||
for a in assets:
|
||||
distance = abs(a.severity - target)
|
||||
weights.append(1.0 / (1.0 + distance))
|
||||
chosen = random.choices(assets, weights=weights, k=1)[0]
|
||||
return chosen.url
|
||||
|
||||
def _rotate(self, assets: list[Asset], max_count: int, subdir: str) -> None:
|
||||
"""Remove oldest assets when pool exceeds capacity. Must hold lock."""
|
||||
while len(assets) > max_count:
|
||||
old = assets.pop(0)
|
||||
path = self.base_dir / subdir / old.filename
|
||||
try:
|
||||
path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def get_status(self) -> dict:
|
||||
with self._lock:
|
||||
return {
|
||||
"image_pool_size": len(self._images),
|
||||
"audio_pool_size": len(self._audio),
|
||||
}
|
||||
Reference in New Issue
Block a user