""" piNail2 PID Controller Runs the PID control loop in a background thread. Controls a relay via GPIO using software PWM (duty cycle within a time window) to maintain a target temperature on an e-nail heating coil. Safety features: - Hard max temperature cutoff - Thermocouple disconnect detection -> relay OFF - Idle auto-shutoff after configurable timeout - Watchdog: detects if control loop stalls - Proper GPIO cleanup on shutdown """ import time import threading import logging import os import csv import math from datetime import datetime, timedelta log = logging.getLogger(__name__) class PIDController: """ Threaded PID temperature controller for e-nail heating. The control loop runs in a background daemon thread. It reads temperature from a Thermocouple object, computes PID output, and drives a relay GPIO pin using software PWM (time-proportional control within each loop cycle). Thread-safe properties expose current state to the Flask web server. """ def __init__(self, config, thermocouple): """ Args: config: Config instance thermocouple: Thermocouple instance """ self._config = config self._tc = thermocouple self._lock = threading.Lock() # State self._enabled = False self._temp = 0.0 self._setpoint = config.get("control", "setpoint") self._output = 0.0 self._relay_on = False self._loop_count = 0 self._start_time = None self._last_loop_time = None self._idle_since = None # timestamp when temp first reached setpoint vicinity self._safety_tripped = False self._safety_reason = "" self._thread = None self._stop_event = threading.Event() # Autotune state self._autotune_active = False self._autotune_target = self._setpoint self._autotune_hysteresis = config.get("autotune", "hysteresis_f") self._autotune_cycles = config.get("autotune", "cycles") self._autotune_heating = False self._autotune_phase_started = None self._autotune_phase_extreme = None self._autotune_high_peaks = [] self._autotune_low_peaks = [] self._autotune_last_result = None self._autotune_message = "" # Sensor plausibility state self._stale_reference_temp = None self._stale_reference_time = None # Flight mode state machine self._mode = "grounded" self._target_setpoint = self._setpoint self._mode_started_at = None self._mode_from_temp = None self._takeoff_effective_seconds = None self._scheduler_last_trigger = None # PID instance from simple_pid import PID pid_cfg = config.get("pid") self._pid = PID( pid_cfg["kP"], pid_cfg["kI"], pid_cfg["kD"], setpoint=self._setpoint ) self._pid.proportional_on_measurement = pid_cfg.get("proportional_on_measurement", True) loop_size = config.get("control", "loop_size_ms") self._pid.output_limits = (0, loop_size) # GPIO setup self._relay_pin = config.get("gpio", "relay_pin") self._gpio = None try: import RPi.GPIO as GPIO self._gpio = GPIO GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(self._relay_pin, GPIO.OUT, initial=GPIO.LOW) log.info("GPIO initialized, relay pin %d set LOW", self._relay_pin) except Exception as e: log.error("Failed to initialize GPIO: %s", e) # Logging setup self._log_dir = config.get("logging", "log_directory") os.makedirs(self._log_dir, exist_ok=True) self._log_file = None self._log_writer = None self._log_counter = 0 self._history = [] # Recent data points for the web UI self._history_max = 1000 self._init_log_file() def _init_log_file(self): """Create a new CSV log file with a timestamp in the filename.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") path = os.path.join(self._log_dir, "pinail_{}.csv".format(timestamp)) self._log_file = open(path, 'w', newline='') self._log_writer = csv.writer(self._log_file) self._log_writer.writerow([ "timestamp", "temp_f", "setpoint_f", "output", "relay", "flight_setpoint_f", "mode", "kP", "kI", "kD", "loop_size_ms" ]) log.info("Log file created: %s", path) except Exception as e: log.error("Failed to create log file: %s", e) def start(self): """Enable power/PID control and ensure background monitor loop is running.""" self.start_cruise() log.info("PID power enabled (setpoint=%.0fF)", self._setpoint) def start_monitoring(self): """Start background temperature monitoring loop with power disabled.""" self._start_thread_if_needed() with self._lock: self._enabled = False self._enter_mode("grounded", self._temp) log.info("Background monitoring started (power disabled)") def set_power(self, enabled): """Toggle power/PID control while keeping monitoring loop running.""" if enabled: self.start_cruise() log.info("PID power enabled") return self._start_thread_if_needed() with self._lock: self._enabled = False self._enter_mode("grounded", self._temp) log.info("PID power disabled") self._relay_off() def _start_thread_if_needed(self): """Start background control thread once, if not already running.""" with self._lock: if self._thread is not None and self._thread.is_alive(): return self._stop_event.clear() self._thread = threading.Thread(target=self._control_loop, daemon=True, name="pid-loop") self._thread.start() log.info("Control loop thread started") def stop(self): """Stop the monitoring/controller thread and turn off the relay.""" log.info("Stopping PID controller thread") self._stop_event.set() with self._lock: self._enabled = False self._autotune_active = False self._enter_mode("grounded", self._temp) self._relay_off() if self._thread is not None: self._thread.join(timeout=5) log.info("PID controller thread stopped") def _enter_mode(self, mode, now_temp=None): self._mode = mode self._mode_started_at = time.monotonic() if now_temp is not None: self._mode_from_temp = float(now_temp) else: self._mode_from_temp = self._temp if mode != "takeoff": self._takeoff_effective_seconds = None self._config.update_section("flight", {"mode": mode}) log.info("Flight mode -> %s", mode) def _compute_takeoff_duration(self): """Scale takeoff duration by how hot the coil already is.""" flight = self._config.get("flight") base_takeoff = max(5.0, float(flight.get("takeoff_seconds", 300))) ambient = float(flight.get("ambient_temp_f", 75)) start_temp = self._mode_from_temp if self._mode_from_temp is not None else self._temp full_delta = self._target_setpoint - ambient remaining_delta = self._target_setpoint - start_temp if full_delta <= 1.0: return base_takeoff if remaining_delta <= 0: return 0.0 ratio = max(0.0, min(1.0, remaining_delta / full_delta)) return max(5.0, base_takeoff * ratio) def _compute_flight_setpoint(self, temp): flight = self._config.get("flight") takeoff_s = max(1.0, float(flight.get("takeoff_seconds", 90))) descent_s = max(1.0, float(flight.get("descent_seconds", 90))) descent_target = float(flight.get("descent_target_f", 120)) turbo = bool(flight.get("turbo", False)) if self._mode == "grounded": self._enabled = False return self._setpoint if self._mode == "takeoff": if turbo: self._enter_mode("cruise", temp) return self._target_setpoint start_temp = self._mode_from_temp if self._mode_from_temp is not None else temp effective_takeoff = self._takeoff_effective_seconds if effective_takeoff is None: effective_takeoff = self._compute_takeoff_duration() self._takeoff_effective_seconds = effective_takeoff if effective_takeoff <= 0: self._enter_mode("cruise", temp) return self._target_setpoint elapsed = max(0.0, time.monotonic() - (self._mode_started_at or time.monotonic())) p = min(1.0, elapsed / effective_takeoff) ramp = start_temp + (self._target_setpoint - start_temp) * p if p >= 1.0: self._enter_mode("cruise", temp) return self._target_setpoint return ramp if self._mode == "descent": if turbo: self._enter_mode("grounded", temp) self._enabled = False return descent_target start_sp = self._mode_from_temp if self._mode_from_temp is not None else self._setpoint elapsed = max(0.0, time.monotonic() - (self._mode_started_at or time.monotonic())) p = min(1.0, elapsed / descent_s) ramp = start_sp + (descent_target - start_sp) * p if p >= 1.0: self._enabled = False self._enter_mode("grounded", temp) return ramp # cruise return self._target_setpoint def _scheduler_check(self): sched = self._config.get("scheduler") if not sched.get("enabled", False): return times = sched.get("cutoff_times", []) if not isinstance(times, list): return now = datetime.now() key = now.strftime("%Y-%m-%d %H:%M") hhmm = now.strftime("%H:%M") if key == self._scheduler_last_trigger: return if hhmm in times and self._mode not in ("grounded", "descent"): self._scheduler_last_trigger = key self.start_descent() log.info("Scheduler triggered descent at %s", hhmm) def _mode_eta_seconds(self): if self._mode not in ("takeoff", "descent"): return None started = self._mode_started_at if started is None: return None flight = self._config.get("flight") if self._mode == "takeoff": total = self._takeoff_effective_seconds if total is None: total = float(flight.get("takeoff_seconds", 300)) else: total = float(flight.get("descent_seconds", 300)) remaining = max(0.0, total - (time.monotonic() - started)) return round(remaining, 1) def _next_cutoff_seconds(self): sched = self._config.get("scheduler") if not sched.get("enabled", False): return None times = sched.get("cutoff_times", []) if not isinstance(times, list) or not times: return None now = datetime.now() best_seconds = None for t in times: try: hh, mm = t.split(":", 1) hhv = int(hh) mmv = int(mm) except Exception: continue target = now.replace(hour=hhv, minute=mmv, second=0, microsecond=0) if target <= now: target = target + timedelta(days=1) delta_sec = (target - now).total_seconds() if best_seconds is None or delta_sec < best_seconds: best_seconds = delta_sec if best_seconds is None: return None return round(best_seconds, 1) def start_takeoff(self): self._start_thread_if_needed() with self._lock: if self._autotune_active: self._autotune_active = False self._autotune_message = "Autotune stopped: flight mode takeoff" self._enabled = True self._safety_tripped = False self._safety_reason = "" self._start_time = time.monotonic() self._idle_since = None self._pid.reset() self._target_setpoint = self._setpoint self._enter_mode("takeoff", self._temp) self._takeoff_effective_seconds = self._compute_takeoff_duration() log.info( "Takeoff effective duration %.1fs (base %.1fs)", self._takeoff_effective_seconds, float(self._config.get("flight").get("takeoff_seconds", 300)), ) def start_descent(self): with self._lock: if self._autotune_active: self._autotune_active = False self._autotune_message = "Autotune stopped: flight mode descent" if self._mode == "grounded": return self._enter_mode("descent", self._setpoint) self._takeoff_effective_seconds = None def start_cruise(self): self._start_thread_if_needed() with self._lock: self._enabled = True self._safety_tripped = False self._safety_reason = "" self._start_time = time.monotonic() self._idle_since = None self._pid.reset() self._target_setpoint = self._setpoint self._enter_mode("cruise", self._temp) self._takeoff_effective_seconds = None def set_flight_config(self, takeoff_seconds=None, descent_seconds=None, turbo=None, descent_target_f=None): flight = self._config.get("flight") updates = {} if takeoff_seconds is not None: updates["takeoff_seconds"] = float(takeoff_seconds) if descent_seconds is not None: updates["descent_seconds"] = float(descent_seconds) if turbo is not None: updates["turbo"] = bool(turbo) if descent_target_f is not None: updates["descent_target_f"] = float(descent_target_f) if updates: flight.update(updates) self._config.update_section("flight", flight) def set_scheduler(self, enabled=None, cutoff_times=None): sched = self._config.get("scheduler") if enabled is not None: sched["enabled"] = bool(enabled) if cutoff_times is not None: sched["cutoff_times"] = list(cutoff_times) self._config.update_section("scheduler", sched) def _relay_off(self): """Ensure relay is OFF.""" if self._gpio is not None: try: self._gpio.output(self._relay_pin, self._gpio.LOW) except Exception as e: log.error("Failed to turn relay off: %s", e) self._relay_on = False def _relay_set(self, on): """Set relay state.""" if self._gpio is not None: try: self._gpio.output( self._relay_pin, self._gpio.HIGH if on else self._gpio.LOW ) except Exception as e: log.error("Failed to set relay: %s", e) self._relay_on = on def _reset_stale_tracker(self, temp=None): self._stale_reference_temp = temp self._stale_reference_time = time.monotonic() def _check_sensor_stale(self, temp, output, loop_size, safety): """Trip safety if sensor appears stuck while heater drive is high.""" if temp is None: return False delta_limit = float(safety.get("sensor_stale_delta_f", 0.8)) stale_seconds = float(safety.get("sensor_stale_seconds", 8.0)) high_ratio = float(safety.get("stale_output_ratio", 0.65)) if self._stale_reference_time is None or self._stale_reference_temp is None: self._reset_stale_tracker(temp) return False # Only enforce stale detection while requesting strong heat output. if output < (high_ratio * loop_size): self._reset_stale_tracker(temp) return False delta = abs(temp - self._stale_reference_temp) if delta >= delta_limit: self._reset_stale_tracker(temp) return False elapsed = time.monotonic() - self._stale_reference_time if elapsed >= stale_seconds: self._trip_safety("Sensor stale while high heater demand") return True return False def _reset_autotune_state(self): self._autotune_heating = False self._autotune_phase_started = None self._autotune_phase_extreme = None self._autotune_high_peaks = [] self._autotune_low_peaks = [] def start_autotune(self, setpoint=None, hysteresis=None, cycles=None): with self._lock: if self._mode in ("takeoff", "descent"): self._autotune_active = False self._autotune_message = "Autotune disabled during {}".format(self._mode) log.warning(self._autotune_message) return if setpoint is not None: self._setpoint = float(setpoint) self._config.set("control", "setpoint", float(setpoint)) self._autotune_target = self._setpoint if hysteresis is not None: self._autotune_hysteresis = float(hysteresis) if cycles is not None: self._autotune_cycles = int(cycles) self._autotune_active = True self._autotune_message = "Autotune running" self._autotune_last_result = None self._reset_autotune_state() self._pid.reset() log.info( "Autotune started target=%.1fF hysteresis=%.1f cycles=%d", self._autotune_target, self._autotune_hysteresis, self._autotune_cycles, ) def stop_autotune(self, message="Autotune stopped"): with self._lock: self._autotune_active = False self._autotune_message = message self._relay_off() def _update_autotune(self, temp, loop_size): target = self._autotune_target h = self._autotune_hysteresis now = time.monotonic() if self._autotune_phase_started is None: self._autotune_phase_started = now self._autotune_phase_extreme = temp self._autotune_heating = temp < target if self._autotune_heating: if self._autotune_phase_extreme is None or temp > self._autotune_phase_extreme: self._autotune_phase_extreme = temp if temp >= target + h: self._autotune_high_peaks.append((now, self._autotune_phase_extreme)) self._autotune_heating = False self._autotune_phase_started = now self._autotune_phase_extreme = temp else: if self._autotune_phase_extreme is None or temp < self._autotune_phase_extreme: self._autotune_phase_extreme = temp if temp <= target - h: self._autotune_low_peaks.append((now, self._autotune_phase_extreme)) self._autotune_heating = True self._autotune_phase_started = now self._autotune_phase_extreme = temp highs = len(self._autotune_high_peaks) lows = len(self._autotune_low_peaks) if min(highs, lows) >= self._autotune_cycles and highs >= 2: high_vals = [v for _, v in self._autotune_high_peaks[-self._autotune_cycles:]] low_vals = [v for _, v in self._autotune_low_peaks[-self._autotune_cycles:]] amp = (sum(high_vals) / float(len(high_vals)) - sum(low_vals) / float(len(low_vals))) / 2.0 if amp <= 0: self.stop_autotune("Autotune failed: invalid oscillation amplitude") return 0.0 high_times = [t for t, _ in self._autotune_high_peaks[-self._autotune_cycles:]] periods = [] for i in range(1, len(high_times)): periods.append(high_times[i] - high_times[i - 1]) if not periods: self.stop_autotune("Autotune failed: not enough periods") return 0.0 pu = sum(periods) / float(len(periods)) if pu <= 0: self.stop_autotune("Autotune failed: invalid period") return 0.0 d = loop_size / 2.0 ku = (4.0 * d) / (math.pi * amp) kp = 0.6 * ku ki = (1.2 * ku) / pu kd = 0.075 * ku * pu # Safety clamp for aggressive/unstable autotune outputs. kp = max(0.0, min(kp, 300.0)) ki = max(0.0, min(ki, 50.0)) kd = max(0.0, min(kd, 500.0)) self._pid.tunings = (kp, ki, kd) self._pid.reset() self._config.update_section("pid", { "kP": round(kp, 4), "kI": round(ki, 4), "kD": round(kd, 4), }) self._autotune_last_result = { "kP": round(kp, 4), "kI": round(ki, 4), "kD": round(kd, 4), "Ku": round(ku, 4), "Pu": round(pu, 4), "amplitude": round(amp, 4), } self.stop_autotune("Autotune complete") log.info("Autotune complete: %s", self._autotune_last_result) return 0.0 return float(loop_size if self._autotune_heating else 0.0) def _control_loop(self): """ Main PID control loop. Runs in a background thread. Each iteration is one "loop cycle" of loop_size_ms milliseconds. Within each cycle, the relay is ON for a proportion of time equal to the PID output, implementing time-proportional software PWM. """ try: while not self._stop_event.is_set(): # Reload config if file changed self._config.reload_if_changed() self._scheduler_check() # Apply current PID tuning pid_cfg = self._config.get("pid") self._pid.tunings = (pid_cfg["kP"], pid_cfg["kI"], pid_cfg["kD"]) self._pid.proportional_on_measurement = pid_cfg.get( "proportional_on_measurement", True ) control_cfg = self._config.get("control") loop_size = control_cfg["loop_size_ms"] sleep_time = control_cfg["sleep_time"] log_resolution = self._config.get("logging", "log_resolution") self._pid.output_limits = (0, loop_size) # Read temperature temp = self._tc.read() if temp is None: log.error("Thermocouple read returned None, disabling power for safety") self._trip_safety("Thermocouple disconnected") time.sleep(0.2) continue with self._lock: self._temp = temp flight_setpoint = self._compute_flight_setpoint(temp) with self._lock: self._pid.setpoint = flight_setpoint # Safety checks safety = self._config.get("safety") if temp > safety["max_temp_f"]: self._trip_safety("Temperature {:.0f}F exceeds max {}F".format(temp, safety['max_temp_f'])) time.sleep(0.2) continue if not self._tc.is_connected: self._trip_safety("Thermocouple disconnected") time.sleep(0.2) continue if not self._enabled: self._relay_off() self._reset_stale_tracker(temp) with self._lock: self._output = 0.0 self._last_loop_time = time.monotonic() self._loop_count += 1 self._log_counter += 1 if self._log_counter >= log_resolution: self._log_data_point(temp, 0.0) self._log_counter = 0 time.sleep(max(0.2, sleep_time)) continue # Idle shutoff check idle_minutes = safety.get("idle_shutoff_minutes", 0) if idle_minutes > 0: near_setpoint = abs(temp - self._setpoint) < 20 if near_setpoint: if self._idle_since is None: self._idle_since = time.monotonic() elif (time.monotonic() - self._idle_since) > (idle_minutes * 60): self._trip_safety( "Idle shutoff: at setpoint for {} minutes".format(idle_minutes) ) break else: self._idle_since = None # PID compute and software PWM cycle start_ms = time.monotonic() * 1000 end_ms = start_ms + loop_size while time.monotonic() * 1000 < end_ms: if self._stop_event.is_set(): break temp = self._tc.read() if temp is not None: with self._lock: self._temp = temp if temp > safety["max_temp_f"]: self._trip_safety("Temperature {:.0f}F exceeds max {}F".format(temp, safety['max_temp_f'])) break if not self._tc.is_connected: self._trip_safety("Thermocouple disconnected") break if self._autotune_active: output = self._update_autotune(temp, loop_size) else: output = self._pid(temp) with self._lock: self._output = output if self._check_sensor_stale(temp, output, loop_size, safety): break # Software PWM: relay ON for first `output` ms of the cycle elapsed_ms = time.monotonic() * 1000 - start_ms should_be_on = elapsed_ms < output self._relay_set(should_be_on) # Logging self._log_counter += 1 if self._log_counter >= log_resolution: self._log_data_point(temp, output) self._log_counter = 0 with self._lock: self._last_loop_time = time.monotonic() self._loop_count += 1 time.sleep(sleep_time) except Exception as e: log.exception("Control loop crashed: %s", e) self._trip_safety("Control loop error: {}".format(e)) finally: self._relay_off() log.info("Control loop thread exiting, relay OFF") def _trip_safety(self, reason): """Trip safety shutdown.""" if self._safety_tripped and self._safety_reason == reason: self._enabled = False self._relay_off() return log.warning("SAFETY TRIP: %s", reason) with self._lock: self._safety_tripped = True self._safety_reason = reason self._enabled = False self._enter_mode("grounded", self._temp) self._relay_off() def _log_data_point(self, temp, output): """Write a data point to the CSV log and in-memory history.""" now = time.time() flight_setpoint = self._pid.setpoint row = [ "{:.3f}".format(now), "{:.2f}".format(temp), "{:.2f}".format(self._setpoint), "{:.2f}".format(output), 1 if self._relay_on else 0, "{:.2f}".format(flight_setpoint), self._mode, "{:.4f}".format(self._pid.Kp), "{:.4f}".format(self._pid.Ki), "{:.4f}".format(self._pid.Kd), self._config.get("control", "loop_size_ms") ] # CSV file if self._log_writer: try: self._log_writer.writerow(row) self._log_file.flush() except Exception as e: log.error("Failed to write log: %s", e) # In-memory history for web UI data_point = { "timestamp": now, "temp": round(temp, 2), "setpoint": round(self._setpoint, 2), "flight_setpoint": round(flight_setpoint, 2), "mode": self._mode, "output": round(output, 2), "relay": self._relay_on } with self._lock: self._history.append(data_point) if len(self._history) > self._history_max: self._history = self._history[-self._history_max:] # --- Public API (thread-safe) --- def set_setpoint(self, value): """Change the target temperature.""" with self._lock: old_setpoint = self._setpoint self._setpoint = float(value) self._target_setpoint = float(value) self._idle_since = None # Reset idle timer on setpoint change if abs(self._setpoint - old_setpoint) >= 10: self._pid.reset() self._config.set("control", "setpoint", float(value)) log.info("Setpoint changed to %.0fF", value) def set_pid_tuning(self, kp, ki, kd, proportional_on_measurement=None, proportional_mode=None): """Update PID gains.""" section = {"kP": kp, "kI": ki, "kD": kd} if proportional_mode in ("error", "measurement"): proportional_on_measurement = (proportional_mode == "measurement") if proportional_on_measurement is not None: section["proportional_on_measurement"] = bool(proportional_on_measurement) self._pid.proportional_on_measurement = bool(proportional_on_measurement) self._config.update_section("pid", section) self._pid.tunings = (kp, ki, kd) self._pid.reset() log.info("PID tuning updated: kP=%.4f, kI=%.4f, kD=%.4f", kp, ki, kd) @property def status(self): """Return a dict of current controller state (thread-safe snapshot).""" with self._lock: uptime = None if self._start_time is not None and self._enabled: uptime = round(time.monotonic() - self._start_time, 1) return { "enabled": self._enabled, "mode": self._mode, "temp": round(self._temp, 2), "setpoint": round(self._setpoint, 2), "effective_setpoint": round(self._pid.setpoint, 2), "output": round(self._output, 2), "relay_on": self._relay_on, "loop_count": self._loop_count, "uptime_seconds": uptime, "safety_tripped": self._safety_tripped, "safety_reason": self._safety_reason, "thermocouple_connected": self._tc.is_connected, "pid": { "kP": self._pid.Kp, "kI": self._pid.Ki, "kD": self._pid.Kd, "proportional_on_measurement": self._pid.proportional_on_measurement, "proportional_mode": "measurement" if self._pid.proportional_on_measurement else "error", }, "autotune": { "active": self._autotune_active, "target": round(self._autotune_target, 2), "hysteresis": round(self._autotune_hysteresis, 2), "cycles": self._autotune_cycles, "high_peaks": len(self._autotune_high_peaks), "low_peaks": len(self._autotune_low_peaks), "phase": "heating" if self._autotune_heating else "cooling", "message": self._autotune_message, "last_result": self._autotune_last_result, }, "flight": self._config.get("flight"), "scheduler": self._config.get("scheduler"), "mode_eta_seconds": self._mode_eta_seconds(), "next_cutoff_seconds": self._next_cutoff_seconds(), } @property def autotune_status(self): with self._lock: return { "active": self._autotune_active, "target": round(self._autotune_target, 2), "hysteresis": round(self._autotune_hysteresis, 2), "cycles": self._autotune_cycles, "high_peaks": len(self._autotune_high_peaks), "low_peaks": len(self._autotune_low_peaks), "phase": "heating" if self._autotune_heating else "cooling", "message": self._autotune_message, "last_result": self._autotune_last_result, } @property def history(self): """Return recent data points for charting.""" with self._lock: return list(self._history) def get_history_since(self, since_timestamp): """Return data points newer than the given timestamp.""" with self._lock: return [p for p in self._history if p["timestamp"] > since_timestamp] @property def is_alive(self): """Check if the control loop thread is alive.""" return self._thread is not None and self._thread.is_alive() @property def watchdog_ok(self): """Check if the control loop has updated recently.""" if not self._enabled: return True with self._lock: if self._last_loop_time is None: return True timeout = self._config.get("safety", "watchdog_timeout_s") return (time.monotonic() - self._last_loop_time) < timeout def cleanup(self): """Clean up GPIO and close log file. Call on application shutdown.""" self.stop() if self._log_file: try: self._log_file.close() except Exception: pass if self._gpio is not None: try: self._gpio.cleanup() log.info("GPIO cleaned up") except Exception as e: log.error("GPIO cleanup error: %s", e)