Files
piNail/piNail2/pid_controller.py
T

900 lines
34 KiB
Python

"""
piNail2 PID Controller
Runs the PID control loop in a background thread. Controls a relay via GPIO
using software PWM (duty cycle within a time window) to maintain a target
temperature on an e-nail heating coil.
Safety features:
- Hard max temperature cutoff
- Thermocouple disconnect detection -> relay OFF
- Idle auto-shutoff after configurable timeout
- Watchdog: detects if control loop stalls
- Proper GPIO cleanup on shutdown
"""
import time
import threading
import logging
import os
import csv
import math
from datetime import datetime, timedelta
log = logging.getLogger(__name__)
class PIDController:
"""
Threaded PID temperature controller for e-nail heating.
The control loop runs in a background daemon thread. It reads temperature
from a Thermocouple object, computes PID output, and drives a relay GPIO
pin using software PWM (time-proportional control within each loop cycle).
Thread-safe properties expose current state to the Flask web server.
"""
def __init__(self, config, thermocouple):
"""
Args:
config: Config instance
thermocouple: Thermocouple instance
"""
self._config = config
self._tc = thermocouple
self._lock = threading.Lock()
# State
self._enabled = False
self._temp = 0.0
self._setpoint = config.get("control", "setpoint")
self._output = 0.0
self._relay_on = False
self._loop_count = 0
self._start_time = None
self._last_loop_time = None
self._idle_since = None # timestamp when temp first reached setpoint vicinity
self._safety_tripped = False
self._safety_reason = ""
self._thread = None
self._stop_event = threading.Event()
# Autotune state
self._autotune_active = False
self._autotune_target = self._setpoint
self._autotune_hysteresis = config.get("autotune", "hysteresis_f")
self._autotune_cycles = config.get("autotune", "cycles")
self._autotune_heating = False
self._autotune_phase_started = None
self._autotune_phase_extreme = None
self._autotune_high_peaks = []
self._autotune_low_peaks = []
self._autotune_last_result = None
self._autotune_message = ""
# Sensor plausibility state
self._stale_reference_temp = None
self._stale_reference_time = None
# Flight mode state machine
self._mode = "grounded"
self._target_setpoint = self._setpoint
self._mode_started_at = None
self._mode_from_temp = None
self._takeoff_effective_seconds = None
self._scheduler_last_trigger = None
# PID instance
from simple_pid import PID
pid_cfg = config.get("pid")
self._pid = PID(
pid_cfg["kP"],
pid_cfg["kI"],
pid_cfg["kD"],
setpoint=self._setpoint
)
self._pid.proportional_on_measurement = pid_cfg.get("proportional_on_measurement", True)
loop_size = config.get("control", "loop_size_ms")
self._pid.output_limits = (0, loop_size)
# GPIO setup
self._relay_pin = config.get("gpio", "relay_pin")
self._gpio = None
try:
import RPi.GPIO as GPIO
self._gpio = GPIO
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(self._relay_pin, GPIO.OUT, initial=GPIO.LOW)
log.info("GPIO initialized, relay pin %d set LOW", self._relay_pin)
except Exception as e:
log.error("Failed to initialize GPIO: %s", e)
# Logging setup
self._log_dir = config.get("logging", "log_directory")
os.makedirs(self._log_dir, exist_ok=True)
self._log_file = None
self._log_writer = None
self._log_counter = 0
self._history = [] # Recent data points for the web UI
self._history_max = 1000
self._init_log_file()
def _init_log_file(self):
"""Create a new CSV log file with a timestamp in the filename."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(self._log_dir, "pinail_{}.csv".format(timestamp))
self._log_file = open(path, 'w', newline='')
self._log_writer = csv.writer(self._log_file)
self._log_writer.writerow([
"timestamp", "temp_f", "setpoint_f", "output", "relay",
"flight_setpoint_f", "mode", "kP", "kI", "kD", "loop_size_ms"
])
log.info("Log file created: %s", path)
except Exception as e:
log.error("Failed to create log file: %s", e)
def start(self):
"""Enable power/PID control and ensure background monitor loop is running."""
self.start_cruise()
log.info("PID power enabled (setpoint=%.0fF)", self._setpoint)
def start_monitoring(self):
"""Start background temperature monitoring loop with power disabled."""
self._start_thread_if_needed()
with self._lock:
self._enabled = False
self._enter_mode("grounded", self._temp)
log.info("Background monitoring started (power disabled)")
def set_power(self, enabled):
"""Toggle power/PID control while keeping monitoring loop running."""
if enabled:
self.start_cruise()
log.info("PID power enabled")
return
self._start_thread_if_needed()
with self._lock:
self._enabled = False
self._enter_mode("grounded", self._temp)
log.info("PID power disabled")
self._relay_off()
def _start_thread_if_needed(self):
"""Start background control thread once, if not already running."""
with self._lock:
if self._thread is not None and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._control_loop, daemon=True, name="pid-loop")
self._thread.start()
log.info("Control loop thread started")
def stop(self):
"""Stop the monitoring/controller thread and turn off the relay."""
log.info("Stopping PID controller thread")
self._stop_event.set()
with self._lock:
self._enabled = False
self._autotune_active = False
self._enter_mode("grounded", self._temp)
self._relay_off()
if self._thread is not None:
self._thread.join(timeout=5)
log.info("PID controller thread stopped")
def _enter_mode(self, mode, now_temp=None):
self._mode = mode
self._mode_started_at = time.monotonic()
if now_temp is not None:
self._mode_from_temp = float(now_temp)
else:
self._mode_from_temp = self._temp
if mode != "takeoff":
self._takeoff_effective_seconds = None
self._config.update_section("flight", {"mode": mode})
log.info("Flight mode -> %s", mode)
def _compute_takeoff_duration(self):
"""Scale takeoff duration by how hot the coil already is."""
flight = self._config.get("flight")
base_takeoff = max(5.0, float(flight.get("takeoff_seconds", 300)))
ambient = float(flight.get("ambient_temp_f", 75))
start_temp = self._mode_from_temp if self._mode_from_temp is not None else self._temp
full_delta = self._target_setpoint - ambient
remaining_delta = self._target_setpoint - start_temp
if full_delta <= 1.0:
return base_takeoff
if remaining_delta <= 0:
return 0.0
ratio = max(0.0, min(1.0, remaining_delta / full_delta))
return max(5.0, base_takeoff * ratio)
def _compute_flight_setpoint(self, temp):
flight = self._config.get("flight")
takeoff_s = max(1.0, float(flight.get("takeoff_seconds", 90)))
descent_s = max(1.0, float(flight.get("descent_seconds", 90)))
descent_target = float(flight.get("descent_target_f", 120))
turbo = bool(flight.get("turbo", False))
if self._mode == "grounded":
self._enabled = False
return self._setpoint
if self._mode == "takeoff":
if turbo:
self._enter_mode("cruise", temp)
return self._target_setpoint
start_temp = self._mode_from_temp if self._mode_from_temp is not None else temp
effective_takeoff = self._takeoff_effective_seconds
if effective_takeoff is None:
effective_takeoff = self._compute_takeoff_duration()
self._takeoff_effective_seconds = effective_takeoff
if effective_takeoff <= 0:
self._enter_mode("cruise", temp)
return self._target_setpoint
elapsed = max(0.0, time.monotonic() - (self._mode_started_at or time.monotonic()))
p = min(1.0, elapsed / effective_takeoff)
ramp = start_temp + (self._target_setpoint - start_temp) * p
if p >= 1.0:
self._enter_mode("cruise", temp)
return self._target_setpoint
return ramp
if self._mode == "descent":
if turbo:
self._enter_mode("grounded", temp)
self._enabled = False
return descent_target
start_sp = self._mode_from_temp if self._mode_from_temp is not None else self._setpoint
elapsed = max(0.0, time.monotonic() - (self._mode_started_at or time.monotonic()))
p = min(1.0, elapsed / descent_s)
ramp = start_sp + (descent_target - start_sp) * p
if p >= 1.0:
self._enabled = False
self._enter_mode("grounded", temp)
return ramp
# cruise
return self._target_setpoint
def _scheduler_check(self):
sched = self._config.get("scheduler")
if not sched.get("enabled", False):
return
times = sched.get("cutoff_times", [])
if not isinstance(times, list):
return
now = datetime.now()
key = now.strftime("%Y-%m-%d %H:%M")
hhmm = now.strftime("%H:%M")
if key == self._scheduler_last_trigger:
return
if hhmm in times and self._mode not in ("grounded", "descent"):
self._scheduler_last_trigger = key
self.start_descent()
log.info("Scheduler triggered descent at %s", hhmm)
def _mode_eta_seconds(self):
if self._mode not in ("takeoff", "descent"):
return None
started = self._mode_started_at
if started is None:
return None
flight = self._config.get("flight")
if self._mode == "takeoff":
total = self._takeoff_effective_seconds
if total is None:
total = float(flight.get("takeoff_seconds", 300))
else:
total = float(flight.get("descent_seconds", 300))
remaining = max(0.0, total - (time.monotonic() - started))
return round(remaining, 1)
def _next_cutoff_seconds(self):
sched = self._config.get("scheduler")
if not sched.get("enabled", False):
return None
times = sched.get("cutoff_times", [])
if not isinstance(times, list) or not times:
return None
now = datetime.now()
best_seconds = None
for t in times:
try:
hh, mm = t.split(":", 1)
hhv = int(hh)
mmv = int(mm)
except Exception:
continue
target = now.replace(hour=hhv, minute=mmv, second=0, microsecond=0)
if target <= now:
target = target + timedelta(days=1)
delta_sec = (target - now).total_seconds()
if best_seconds is None or delta_sec < best_seconds:
best_seconds = delta_sec
if best_seconds is None:
return None
return round(best_seconds, 1)
def start_takeoff(self):
self._start_thread_if_needed()
with self._lock:
if self._autotune_active:
self._autotune_active = False
self._autotune_message = "Autotune stopped: flight mode takeoff"
self._enabled = True
self._safety_tripped = False
self._safety_reason = ""
self._start_time = time.monotonic()
self._idle_since = None
self._pid.reset()
self._target_setpoint = self._setpoint
self._enter_mode("takeoff", self._temp)
self._takeoff_effective_seconds = self._compute_takeoff_duration()
log.info(
"Takeoff effective duration %.1fs (base %.1fs)",
self._takeoff_effective_seconds,
float(self._config.get("flight").get("takeoff_seconds", 300)),
)
def start_descent(self):
with self._lock:
if self._autotune_active:
self._autotune_active = False
self._autotune_message = "Autotune stopped: flight mode descent"
if self._mode == "grounded":
return
self._enter_mode("descent", self._setpoint)
self._takeoff_effective_seconds = None
def start_cruise(self):
self._start_thread_if_needed()
with self._lock:
self._enabled = True
self._safety_tripped = False
self._safety_reason = ""
self._start_time = time.monotonic()
self._idle_since = None
self._pid.reset()
self._target_setpoint = self._setpoint
self._enter_mode("cruise", self._temp)
self._takeoff_effective_seconds = None
def set_flight_config(self, takeoff_seconds=None, descent_seconds=None, turbo=None, descent_target_f=None):
flight = self._config.get("flight")
updates = {}
if takeoff_seconds is not None:
updates["takeoff_seconds"] = float(takeoff_seconds)
if descent_seconds is not None:
updates["descent_seconds"] = float(descent_seconds)
if turbo is not None:
updates["turbo"] = bool(turbo)
if descent_target_f is not None:
updates["descent_target_f"] = float(descent_target_f)
if updates:
flight.update(updates)
self._config.update_section("flight", flight)
def set_scheduler(self, enabled=None, cutoff_times=None):
sched = self._config.get("scheduler")
if enabled is not None:
sched["enabled"] = bool(enabled)
if cutoff_times is not None:
sched["cutoff_times"] = list(cutoff_times)
self._config.update_section("scheduler", sched)
def _relay_off(self):
"""Ensure relay is OFF."""
if self._gpio is not None:
try:
self._gpio.output(self._relay_pin, self._gpio.LOW)
except Exception as e:
log.error("Failed to turn relay off: %s", e)
self._relay_on = False
def _relay_set(self, on):
"""Set relay state."""
if self._gpio is not None:
try:
self._gpio.output(
self._relay_pin,
self._gpio.HIGH if on else self._gpio.LOW
)
except Exception as e:
log.error("Failed to set relay: %s", e)
self._relay_on = on
def _reset_stale_tracker(self, temp=None):
self._stale_reference_temp = temp
self._stale_reference_time = time.monotonic()
def _check_sensor_stale(self, temp, output, loop_size, safety):
"""Trip safety if sensor appears stuck while heater drive is high."""
if temp is None:
return False
delta_limit = float(safety.get("sensor_stale_delta_f", 0.8))
stale_seconds = float(safety.get("sensor_stale_seconds", 8.0))
high_ratio = float(safety.get("stale_output_ratio", 0.65))
if self._stale_reference_time is None or self._stale_reference_temp is None:
self._reset_stale_tracker(temp)
return False
# Only enforce stale detection while requesting strong heat output.
if output < (high_ratio * loop_size):
self._reset_stale_tracker(temp)
return False
delta = abs(temp - self._stale_reference_temp)
if delta >= delta_limit:
self._reset_stale_tracker(temp)
return False
elapsed = time.monotonic() - self._stale_reference_time
if elapsed >= stale_seconds:
self._trip_safety("Sensor stale while high heater demand")
return True
return False
def _reset_autotune_state(self):
self._autotune_heating = False
self._autotune_phase_started = None
self._autotune_phase_extreme = None
self._autotune_high_peaks = []
self._autotune_low_peaks = []
def start_autotune(self, setpoint=None, hysteresis=None, cycles=None):
with self._lock:
if self._mode in ("takeoff", "descent"):
self._autotune_active = False
self._autotune_message = "Autotune disabled during {}".format(self._mode)
log.warning(self._autotune_message)
return
if setpoint is not None:
self._setpoint = float(setpoint)
self._config.set("control", "setpoint", float(setpoint))
self._autotune_target = self._setpoint
if hysteresis is not None:
self._autotune_hysteresis = float(hysteresis)
if cycles is not None:
self._autotune_cycles = int(cycles)
self._autotune_active = True
self._autotune_message = "Autotune running"
self._autotune_last_result = None
self._reset_autotune_state()
self._pid.reset()
log.info(
"Autotune started target=%.1fF hysteresis=%.1f cycles=%d",
self._autotune_target,
self._autotune_hysteresis,
self._autotune_cycles,
)
def stop_autotune(self, message="Autotune stopped"):
with self._lock:
self._autotune_active = False
self._autotune_message = message
self._relay_off()
def _update_autotune(self, temp, loop_size):
target = self._autotune_target
h = self._autotune_hysteresis
now = time.monotonic()
if self._autotune_phase_started is None:
self._autotune_phase_started = now
self._autotune_phase_extreme = temp
self._autotune_heating = temp < target
if self._autotune_heating:
if self._autotune_phase_extreme is None or temp > self._autotune_phase_extreme:
self._autotune_phase_extreme = temp
if temp >= target + h:
self._autotune_high_peaks.append((now, self._autotune_phase_extreme))
self._autotune_heating = False
self._autotune_phase_started = now
self._autotune_phase_extreme = temp
else:
if self._autotune_phase_extreme is None or temp < self._autotune_phase_extreme:
self._autotune_phase_extreme = temp
if temp <= target - h:
self._autotune_low_peaks.append((now, self._autotune_phase_extreme))
self._autotune_heating = True
self._autotune_phase_started = now
self._autotune_phase_extreme = temp
highs = len(self._autotune_high_peaks)
lows = len(self._autotune_low_peaks)
if min(highs, lows) >= self._autotune_cycles and highs >= 2:
high_vals = [v for _, v in self._autotune_high_peaks[-self._autotune_cycles:]]
low_vals = [v for _, v in self._autotune_low_peaks[-self._autotune_cycles:]]
amp = (sum(high_vals) / float(len(high_vals)) - sum(low_vals) / float(len(low_vals))) / 2.0
if amp <= 0:
self.stop_autotune("Autotune failed: invalid oscillation amplitude")
return 0.0
high_times = [t for t, _ in self._autotune_high_peaks[-self._autotune_cycles:]]
periods = []
for i in range(1, len(high_times)):
periods.append(high_times[i] - high_times[i - 1])
if not periods:
self.stop_autotune("Autotune failed: not enough periods")
return 0.0
pu = sum(periods) / float(len(periods))
if pu <= 0:
self.stop_autotune("Autotune failed: invalid period")
return 0.0
d = loop_size / 2.0
ku = (4.0 * d) / (math.pi * amp)
kp = 0.6 * ku
ki = (1.2 * ku) / pu
kd = 0.075 * ku * pu
# Safety clamp for aggressive/unstable autotune outputs.
kp = max(0.0, min(kp, 300.0))
ki = max(0.0, min(ki, 50.0))
kd = max(0.0, min(kd, 500.0))
self._pid.tunings = (kp, ki, kd)
self._pid.reset()
self._config.update_section("pid", {
"kP": round(kp, 4),
"kI": round(ki, 4),
"kD": round(kd, 4),
})
self._autotune_last_result = {
"kP": round(kp, 4),
"kI": round(ki, 4),
"kD": round(kd, 4),
"Ku": round(ku, 4),
"Pu": round(pu, 4),
"amplitude": round(amp, 4),
}
self.stop_autotune("Autotune complete")
log.info("Autotune complete: %s", self._autotune_last_result)
return 0.0
return float(loop_size if self._autotune_heating else 0.0)
def _control_loop(self):
"""
Main PID control loop. Runs in a background thread.
Each iteration is one "loop cycle" of loop_size_ms milliseconds.
Within each cycle, the relay is ON for a proportion of time equal to
the PID output, implementing time-proportional software PWM.
"""
try:
while not self._stop_event.is_set():
# Reload config if file changed
self._config.reload_if_changed()
self._scheduler_check()
# Apply current PID tuning
pid_cfg = self._config.get("pid")
self._pid.tunings = (pid_cfg["kP"], pid_cfg["kI"], pid_cfg["kD"])
self._pid.proportional_on_measurement = pid_cfg.get(
"proportional_on_measurement", True
)
control_cfg = self._config.get("control")
loop_size = control_cfg["loop_size_ms"]
sleep_time = control_cfg["sleep_time"]
log_resolution = self._config.get("logging", "log_resolution")
self._pid.output_limits = (0, loop_size)
# Read temperature
temp = self._tc.read()
if temp is None:
log.error("Thermocouple read returned None, disabling power for safety")
self._trip_safety("Thermocouple disconnected")
time.sleep(0.2)
continue
with self._lock:
self._temp = temp
flight_setpoint = self._compute_flight_setpoint(temp)
with self._lock:
self._pid.setpoint = flight_setpoint
# Safety checks
safety = self._config.get("safety")
if temp > safety["max_temp_f"]:
self._trip_safety("Temperature {:.0f}F exceeds max {}F".format(temp, safety['max_temp_f']))
time.sleep(0.2)
continue
if not self._tc.is_connected:
self._trip_safety("Thermocouple disconnected")
time.sleep(0.2)
continue
if not self._enabled:
self._relay_off()
self._reset_stale_tracker(temp)
with self._lock:
self._output = 0.0
self._last_loop_time = time.monotonic()
self._loop_count += 1
self._log_counter += 1
if self._log_counter >= log_resolution:
self._log_data_point(temp, 0.0)
self._log_counter = 0
time.sleep(max(0.2, sleep_time))
continue
# Idle shutoff check
idle_minutes = safety.get("idle_shutoff_minutes", 0)
if idle_minutes > 0:
near_setpoint = abs(temp - self._setpoint) < 20
if near_setpoint:
if self._idle_since is None:
self._idle_since = time.monotonic()
elif (time.monotonic() - self._idle_since) > (idle_minutes * 60):
self._trip_safety(
"Idle shutoff: at setpoint for {} minutes".format(idle_minutes)
)
break
else:
self._idle_since = None
# PID compute and software PWM cycle
start_ms = time.monotonic() * 1000
end_ms = start_ms + loop_size
while time.monotonic() * 1000 < end_ms:
if self._stop_event.is_set():
break
temp = self._tc.read()
if temp is not None:
with self._lock:
self._temp = temp
if temp > safety["max_temp_f"]:
self._trip_safety("Temperature {:.0f}F exceeds max {}F".format(temp, safety['max_temp_f']))
break
if not self._tc.is_connected:
self._trip_safety("Thermocouple disconnected")
break
if self._autotune_active:
output = self._update_autotune(temp, loop_size)
else:
output = self._pid(temp)
with self._lock:
self._output = output
if self._check_sensor_stale(temp, output, loop_size, safety):
break
# Software PWM: relay ON for first `output` ms of the cycle
elapsed_ms = time.monotonic() * 1000 - start_ms
should_be_on = elapsed_ms < output
self._relay_set(should_be_on)
# Logging
self._log_counter += 1
if self._log_counter >= log_resolution:
self._log_data_point(temp, output)
self._log_counter = 0
with self._lock:
self._last_loop_time = time.monotonic()
self._loop_count += 1
time.sleep(sleep_time)
except Exception as e:
log.exception("Control loop crashed: %s", e)
self._trip_safety("Control loop error: {}".format(e))
finally:
self._relay_off()
log.info("Control loop thread exiting, relay OFF")
def _trip_safety(self, reason):
"""Trip safety shutdown."""
if self._safety_tripped and self._safety_reason == reason:
self._enabled = False
self._relay_off()
return
log.warning("SAFETY TRIP: %s", reason)
with self._lock:
self._safety_tripped = True
self._safety_reason = reason
self._enabled = False
self._enter_mode("grounded", self._temp)
self._relay_off()
def _log_data_point(self, temp, output):
"""Write a data point to the CSV log and in-memory history."""
now = time.time()
flight_setpoint = self._pid.setpoint
row = [
"{:.3f}".format(now),
"{:.2f}".format(temp),
"{:.2f}".format(self._setpoint),
"{:.2f}".format(output),
1 if self._relay_on else 0,
"{:.2f}".format(flight_setpoint),
self._mode,
"{:.4f}".format(self._pid.Kp),
"{:.4f}".format(self._pid.Ki),
"{:.4f}".format(self._pid.Kd),
self._config.get("control", "loop_size_ms")
]
# CSV file
if self._log_writer:
try:
self._log_writer.writerow(row)
self._log_file.flush()
except Exception as e:
log.error("Failed to write log: %s", e)
# In-memory history for web UI
data_point = {
"timestamp": now,
"temp": round(temp, 2),
"setpoint": round(self._setpoint, 2),
"flight_setpoint": round(flight_setpoint, 2),
"mode": self._mode,
"output": round(output, 2),
"relay": self._relay_on
}
with self._lock:
self._history.append(data_point)
if len(self._history) > self._history_max:
self._history = self._history[-self._history_max:]
# --- Public API (thread-safe) ---
def set_setpoint(self, value):
"""Change the target temperature."""
with self._lock:
old_setpoint = self._setpoint
self._setpoint = float(value)
self._target_setpoint = float(value)
self._idle_since = None # Reset idle timer on setpoint change
if abs(self._setpoint - old_setpoint) >= 10:
self._pid.reset()
self._config.set("control", "setpoint", float(value))
log.info("Setpoint changed to %.0fF", value)
def set_pid_tuning(self, kp, ki, kd, proportional_on_measurement=None, proportional_mode=None):
"""Update PID gains."""
section = {"kP": kp, "kI": ki, "kD": kd}
if proportional_mode in ("error", "measurement"):
proportional_on_measurement = (proportional_mode == "measurement")
if proportional_on_measurement is not None:
section["proportional_on_measurement"] = bool(proportional_on_measurement)
self._pid.proportional_on_measurement = bool(proportional_on_measurement)
self._config.update_section("pid", section)
self._pid.tunings = (kp, ki, kd)
self._pid.reset()
log.info("PID tuning updated: kP=%.4f, kI=%.4f, kD=%.4f", kp, ki, kd)
@property
def status(self):
"""Return a dict of current controller state (thread-safe snapshot)."""
with self._lock:
uptime = None
if self._start_time is not None and self._enabled:
uptime = round(time.monotonic() - self._start_time, 1)
return {
"enabled": self._enabled,
"mode": self._mode,
"temp": round(self._temp, 2),
"setpoint": round(self._setpoint, 2),
"effective_setpoint": round(self._pid.setpoint, 2),
"output": round(self._output, 2),
"relay_on": self._relay_on,
"loop_count": self._loop_count,
"uptime_seconds": uptime,
"safety_tripped": self._safety_tripped,
"safety_reason": self._safety_reason,
"thermocouple_connected": self._tc.is_connected,
"pid": {
"kP": self._pid.Kp,
"kI": self._pid.Ki,
"kD": self._pid.Kd,
"proportional_on_measurement": self._pid.proportional_on_measurement,
"proportional_mode": "measurement" if self._pid.proportional_on_measurement else "error",
},
"autotune": {
"active": self._autotune_active,
"target": round(self._autotune_target, 2),
"hysteresis": round(self._autotune_hysteresis, 2),
"cycles": self._autotune_cycles,
"high_peaks": len(self._autotune_high_peaks),
"low_peaks": len(self._autotune_low_peaks),
"phase": "heating" if self._autotune_heating else "cooling",
"message": self._autotune_message,
"last_result": self._autotune_last_result,
},
"flight": self._config.get("flight"),
"scheduler": self._config.get("scheduler"),
"mode_eta_seconds": self._mode_eta_seconds(),
"next_cutoff_seconds": self._next_cutoff_seconds(),
}
@property
def autotune_status(self):
with self._lock:
return {
"active": self._autotune_active,
"target": round(self._autotune_target, 2),
"hysteresis": round(self._autotune_hysteresis, 2),
"cycles": self._autotune_cycles,
"high_peaks": len(self._autotune_high_peaks),
"low_peaks": len(self._autotune_low_peaks),
"phase": "heating" if self._autotune_heating else "cooling",
"message": self._autotune_message,
"last_result": self._autotune_last_result,
}
@property
def history(self):
"""Return recent data points for charting."""
with self._lock:
return list(self._history)
def get_history_since(self, since_timestamp):
"""Return data points newer than the given timestamp."""
with self._lock:
return [p for p in self._history if p["timestamp"] > since_timestamp]
@property
def is_alive(self):
"""Check if the control loop thread is alive."""
return self._thread is not None and self._thread.is_alive()
@property
def watchdog_ok(self):
"""Check if the control loop has updated recently."""
if not self._enabled:
return True
with self._lock:
if self._last_loop_time is None:
return True
timeout = self._config.get("safety", "watchdog_timeout_s")
return (time.monotonic() - self._last_loop_time) < timeout
def cleanup(self):
"""Clean up GPIO and close log file. Call on application shutdown."""
self.stop()
if self._log_file:
try:
self._log_file.close()
except Exception:
pass
if self._gpio is not None:
try:
self._gpio.cleanup()
log.info("GPIO cleaned up")
except Exception as e:
log.error("GPIO cleanup error: %s", e)