271 lines
7.8 KiB
Python
271 lines
7.8 KiB
Python
"""
|
|
piNail2 Configuration Management
|
|
|
|
JSON-based config with named fields, load/save, defaults, and hot-reload support.
|
|
Replaces the old positional P_I_D_values.txt format.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import logging
|
|
import copy
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
DEFAULT_CONFIG = {
|
|
"pid": {
|
|
"kP": 113.1768,
|
|
"kI": 3.5335,
|
|
"kD": 500.0,
|
|
"proportional_on_measurement": False
|
|
},
|
|
"control": {
|
|
"setpoint": 530,
|
|
"loop_size_ms": 1800,
|
|
"sleep_time": 0.2,
|
|
"enabled": False
|
|
},
|
|
"flight": {
|
|
"mode": "grounded",
|
|
"takeoff_seconds": 300,
|
|
"descent_seconds": 300,
|
|
"turbo": False,
|
|
"descent_target_f": 120,
|
|
"ambient_temp_f": 75
|
|
},
|
|
"scheduler": {
|
|
"enabled": True,
|
|
"cutoff_times": [
|
|
"23:00"
|
|
]
|
|
},
|
|
"safety": {
|
|
"max_temp_f": 800,
|
|
"spike_threshold_f": 50.0,
|
|
"idle_shutoff_minutes": 30,
|
|
"watchdog_timeout_s": 10,
|
|
"min_temp_f": 0,
|
|
"sensor_stale_seconds": 8,
|
|
"sensor_stale_delta_f": 0.8,
|
|
"stale_output_ratio": 0.65
|
|
},
|
|
"gpio": {
|
|
"relay_pin": 2,
|
|
"clk": 3,
|
|
"cs": 14,
|
|
"do": 4
|
|
},
|
|
"logging": {
|
|
"log_resolution": 1,
|
|
"log_directory": "./logs",
|
|
"max_log_lines": 10000
|
|
},
|
|
"presets": {
|
|
"Low Temp": 450,
|
|
"Medium": 530,
|
|
"High": 650
|
|
},
|
|
"web": {
|
|
"host": "0.0.0.0",
|
|
"port": 5000,
|
|
"update_interval_ms": 500
|
|
},
|
|
"autotune": {
|
|
"hysteresis_f": 8.0,
|
|
"cycles": 4
|
|
},
|
|
"nails": {
|
|
"nail1": {
|
|
"pid": {
|
|
"kP": 113.1768,
|
|
"kI": 3.5335,
|
|
"kD": 500.0,
|
|
"proportional_on_measurement": False
|
|
},
|
|
"control": {
|
|
"setpoint": 530,
|
|
"loop_size_ms": 1800,
|
|
"sleep_time": 0.2,
|
|
"enabled": False
|
|
},
|
|
"flight": {
|
|
"mode": "grounded",
|
|
"takeoff_seconds": 300,
|
|
"descent_seconds": 300,
|
|
"turbo": False,
|
|
"descent_target_f": 120,
|
|
"ambient_temp_f": 75
|
|
},
|
|
"scheduler": {
|
|
"enabled": True,
|
|
"cutoff_times": [
|
|
"23:00"
|
|
]
|
|
},
|
|
"safety": {
|
|
"max_temp_f": 800,
|
|
"spike_threshold_f": 50.0,
|
|
"idle_shutoff_minutes": 30,
|
|
"watchdog_timeout_s": 10,
|
|
"min_temp_f": 0,
|
|
"sensor_stale_seconds": 8,
|
|
"sensor_stale_delta_f": 0.8,
|
|
"stale_output_ratio": 0.65
|
|
},
|
|
"gpio": {
|
|
"relay_pin": 2,
|
|
"clk": 3,
|
|
"cs": 14,
|
|
"do": 4
|
|
},
|
|
"logging": {
|
|
"log_resolution": 1,
|
|
"log_directory": "./logs/nail1",
|
|
"max_log_lines": 10000
|
|
},
|
|
"autotune": {
|
|
"hysteresis_f": 8.0,
|
|
"cycles": 4
|
|
}
|
|
},
|
|
"nail2": {
|
|
"pid": {
|
|
"kP": 113.1768,
|
|
"kI": 3.5335,
|
|
"kD": 500.0,
|
|
"proportional_on_measurement": False
|
|
},
|
|
"control": {
|
|
"setpoint": 530,
|
|
"loop_size_ms": 1800,
|
|
"sleep_time": 0.2,
|
|
"enabled": False
|
|
},
|
|
"flight": {
|
|
"mode": "grounded",
|
|
"takeoff_seconds": 300,
|
|
"descent_seconds": 300,
|
|
"turbo": False,
|
|
"descent_target_f": 120,
|
|
"ambient_temp_f": 75
|
|
},
|
|
"scheduler": {
|
|
"enabled": True,
|
|
"cutoff_times": [
|
|
"23:00"
|
|
]
|
|
},
|
|
"safety": {
|
|
"max_temp_f": 800,
|
|
"spike_threshold_f": 50.0,
|
|
"idle_shutoff_minutes": 30,
|
|
"watchdog_timeout_s": 10,
|
|
"min_temp_f": 0,
|
|
"sensor_stale_seconds": 8,
|
|
"sensor_stale_delta_f": 0.8,
|
|
"stale_output_ratio": 0.65
|
|
},
|
|
"gpio": {
|
|
"relay_pin": 22,
|
|
"clk": 27,
|
|
"cs": 18,
|
|
"do": 17
|
|
},
|
|
"logging": {
|
|
"log_resolution": 1,
|
|
"log_directory": "./logs/nail2",
|
|
"max_log_lines": 10000
|
|
},
|
|
"autotune": {
|
|
"hysteresis_f": 8.0,
|
|
"cycles": 4
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
class Config:
|
|
"""Thread-safe configuration manager with file persistence."""
|
|
|
|
def __init__(self, config_path="config.json"):
|
|
self._path = config_path
|
|
self._data = copy.deepcopy(DEFAULT_CONFIG)
|
|
self._mtime = 0
|
|
self.load()
|
|
|
|
def load(self):
|
|
"""Load config from disk, merging with defaults for any missing keys."""
|
|
if not os.path.exists(self._path):
|
|
log.info("No config file found at %s, creating with defaults", self._path)
|
|
self.save()
|
|
return
|
|
|
|
try:
|
|
with open(self._path, 'r') as f:
|
|
user_config = json.load(f)
|
|
self._data = self._deep_merge(copy.deepcopy(DEFAULT_CONFIG), user_config)
|
|
self._mtime = os.path.getmtime(self._path)
|
|
log.info("Config loaded from %s", self._path)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
log.error("Failed to load config from %s: %s. Using defaults.", self._path, e)
|
|
self._data = copy.deepcopy(DEFAULT_CONFIG)
|
|
|
|
def save(self):
|
|
"""Write current config to disk."""
|
|
try:
|
|
with open(self._path, 'w') as f:
|
|
json.dump(self._data, f, indent=2)
|
|
self._mtime = os.path.getmtime(self._path)
|
|
log.info("Config saved to %s", self._path)
|
|
except IOError as e:
|
|
log.error("Failed to save config to %s: %s", self._path, e)
|
|
|
|
def reload_if_changed(self):
|
|
"""Reload config from disk if the file has been modified externally."""
|
|
try:
|
|
current_mtime = os.path.getmtime(self._path)
|
|
if current_mtime > self._mtime:
|
|
log.info("Config file changed on disk, reloading")
|
|
self.load()
|
|
return True
|
|
except OSError:
|
|
pass
|
|
return False
|
|
|
|
def get(self, section, key=None):
|
|
"""Get a config value. If key is None, returns the entire section."""
|
|
if key is None:
|
|
return copy.deepcopy(self._data.get(section, {}))
|
|
return self._data.get(section, {}).get(key)
|
|
|
|
def set(self, section, key, value):
|
|
"""Set a config value and save to disk."""
|
|
if section not in self._data:
|
|
self._data[section] = {}
|
|
self._data[section][key] = value
|
|
self.save()
|
|
|
|
def update_section(self, section, values):
|
|
"""Update multiple keys in a section and save to disk."""
|
|
if section not in self._data:
|
|
self._data[section] = {}
|
|
self._data[section].update(values)
|
|
self.save()
|
|
|
|
@property
|
|
def data(self):
|
|
"""Return a deep copy of the full config dict."""
|
|
return copy.deepcopy(self._data)
|
|
|
|
@staticmethod
|
|
def _deep_merge(base, override):
|
|
"""Recursively merge override into base. Override values win."""
|
|
result = base.copy()
|
|
for key, value in override.items():
|
|
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
|
result[key] = Config._deep_merge(result[key], value)
|
|
else:
|
|
result[key] = value
|
|
return result
|