""" piNail2 Configuration Management JSON-based config with named fields, load/save, defaults, and hot-reload support. Replaces the old positional P_I_D_values.txt format. """ import json import os import logging import copy log = logging.getLogger(__name__) DEFAULT_CONFIG = { "pid": { "kP": 113.1768, "kI": 3.5335, "kD": 500.0, "proportional_on_measurement": False }, "control": { "setpoint": 530, "loop_size_ms": 1800, "sleep_time": 0.2, "enabled": False }, "flight": { "mode": "grounded", "takeoff_seconds": 300, "descent_seconds": 300, "turbo": False, "descent_target_f": 120, "ambient_temp_f": 75 }, "scheduler": { "enabled": True, "cutoff_times": [ "23:00" ] }, "safety": { "max_temp_f": 800, "spike_threshold_f": 50.0, "idle_shutoff_minutes": 30, "watchdog_timeout_s": 10, "min_temp_f": 0, "sensor_stale_seconds": 8, "sensor_stale_delta_f": 0.8, "stale_output_ratio": 0.65 }, "gpio": { "relay_pin": 2, "clk": 3, "cs": 14, "do": 4 }, "logging": { "log_resolution": 1, "log_directory": "./logs", "max_log_lines": 10000 }, "presets": { "Low Temp": 450, "Medium": 530, "High": 650 }, "web": { "host": "0.0.0.0", "port": 5000, "update_interval_ms": 500 }, "autotune": { "hysteresis_f": 8.0, "cycles": 4 }, "nails": { "nail1": { "pid": { "kP": 113.1768, "kI": 3.5335, "kD": 500.0, "proportional_on_measurement": False }, "control": { "setpoint": 530, "loop_size_ms": 1800, "sleep_time": 0.2, "enabled": False }, "flight": { "mode": "grounded", "takeoff_seconds": 300, "descent_seconds": 300, "turbo": False, "descent_target_f": 120, "ambient_temp_f": 75 }, "scheduler": { "enabled": True, "cutoff_times": [ "23:00" ] }, "safety": { "max_temp_f": 800, "spike_threshold_f": 50.0, "idle_shutoff_minutes": 30, "watchdog_timeout_s": 10, "min_temp_f": 0, "sensor_stale_seconds": 8, "sensor_stale_delta_f": 0.8, "stale_output_ratio": 0.65 }, "gpio": { "relay_pin": 2, "clk": 3, "cs": 14, "do": 4 }, "logging": { "log_resolution": 1, "log_directory": "./logs/nail1", "max_log_lines": 10000 }, "autotune": { "hysteresis_f": 8.0, "cycles": 4 } }, "nail2": { "pid": { "kP": 113.1768, "kI": 3.5335, "kD": 500.0, "proportional_on_measurement": False }, "control": { "setpoint": 530, "loop_size_ms": 1800, "sleep_time": 0.2, "enabled": False }, "flight": { "mode": "grounded", "takeoff_seconds": 300, "descent_seconds": 300, "turbo": False, "descent_target_f": 120, "ambient_temp_f": 75 }, "scheduler": { "enabled": True, "cutoff_times": [ "23:00" ] }, "safety": { "max_temp_f": 800, "spike_threshold_f": 50.0, "idle_shutoff_minutes": 30, "watchdog_timeout_s": 10, "min_temp_f": 0, "sensor_stale_seconds": 8, "sensor_stale_delta_f": 0.8, "stale_output_ratio": 0.65 }, "gpio": { "relay_pin": 22, "clk": 27, "cs": 18, "do": 17 }, "logging": { "log_resolution": 1, "log_directory": "./logs/nail2", "max_log_lines": 10000 }, "autotune": { "hysteresis_f": 8.0, "cycles": 4 } } } } class Config: """Thread-safe configuration manager with file persistence.""" def __init__(self, config_path="config.json"): self._path = config_path self._data = copy.deepcopy(DEFAULT_CONFIG) self._mtime = 0 self.load() def load(self): """Load config from disk, merging with defaults for any missing keys.""" if not os.path.exists(self._path): log.info("No config file found at %s, creating with defaults", self._path) self.save() return try: with open(self._path, 'r') as f: user_config = json.load(f) self._data = self._deep_merge(copy.deepcopy(DEFAULT_CONFIG), user_config) self._mtime = os.path.getmtime(self._path) log.info("Config loaded from %s", self._path) except (json.JSONDecodeError, IOError) as e: log.error("Failed to load config from %s: %s. Using defaults.", self._path, e) self._data = copy.deepcopy(DEFAULT_CONFIG) def save(self): """Write current config to disk.""" try: with open(self._path, 'w') as f: json.dump(self._data, f, indent=2) self._mtime = os.path.getmtime(self._path) log.info("Config saved to %s", self._path) except IOError as e: log.error("Failed to save config to %s: %s", self._path, e) def reload_if_changed(self): """Reload config from disk if the file has been modified externally.""" try: current_mtime = os.path.getmtime(self._path) if current_mtime > self._mtime: log.info("Config file changed on disk, reloading") self.load() return True except OSError: pass return False def get(self, section, key=None): """Get a config value. If key is None, returns the entire section.""" if key is None: return copy.deepcopy(self._data.get(section, {})) return self._data.get(section, {}).get(key) def set(self, section, key, value): """Set a config value and save to disk.""" if section not in self._data: self._data[section] = {} self._data[section][key] = value self.save() def update_section(self, section, values): """Update multiple keys in a section and save to disk.""" if section not in self._data: self._data[section] = {} self._data[section].update(values) self.save() @property def data(self): """Return a deep copy of the full config dict.""" return copy.deepcopy(self._data) @staticmethod def _deep_merge(base, override): """Recursively merge override into base. Override values win.""" result = base.copy() for key, value in override.items(): if key in result and isinstance(result[key], dict) and isinstance(value, dict): result[key] = Config._deep_merge(result[key], value) else: result[key] = value return result