""" piNail2 PID Controller Runs the PID control loop in a background thread. Controls a relay via GPIO using software PWM (duty cycle within a time window) to maintain a target temperature on an e-nail heating coil. Safety features: - Hard max temperature cutoff - Thermocouple disconnect detection -> relay OFF - Idle auto-shutoff after configurable timeout - Watchdog: detects if control loop stalls - Proper GPIO cleanup on shutdown """ import time import threading import logging import os import csv import math from datetime import datetime log = logging.getLogger(__name__) class PIDController: """ Threaded PID temperature controller for e-nail heating. The control loop runs in a background daemon thread. It reads temperature from a Thermocouple object, computes PID output, and drives a relay GPIO pin using software PWM (time-proportional control within each loop cycle). Thread-safe properties expose current state to the Flask web server. """ def __init__(self, config, thermocouple): """ Args: config: Config instance thermocouple: Thermocouple instance """ self._config = config self._tc = thermocouple self._lock = threading.Lock() # State self._enabled = False self._temp = 0.0 self._setpoint = config.get("control", "setpoint") self._output = 0.0 self._relay_on = False self._loop_count = 0 self._start_time = None self._last_loop_time = None self._idle_since = None # timestamp when temp first reached setpoint vicinity self._safety_tripped = False self._safety_reason = "" self._thread = None self._stop_event = threading.Event() # Autotune state self._autotune_active = False self._autotune_target = self._setpoint self._autotune_hysteresis = config.get("autotune", "hysteresis_f") self._autotune_cycles = config.get("autotune", "cycles") self._autotune_heating = False self._autotune_phase_started = None self._autotune_phase_extreme = None self._autotune_high_peaks = [] self._autotune_low_peaks = [] self._autotune_last_result = None self._autotune_message = "" # PID instance from simple_pid import PID pid_cfg = config.get("pid") self._pid = PID( pid_cfg["kP"], pid_cfg["kI"], pid_cfg["kD"], setpoint=self._setpoint ) self._pid.proportional_on_measurement = pid_cfg.get("proportional_on_measurement", True) loop_size = config.get("control", "loop_size_ms") self._pid.output_limits = (0, loop_size) # GPIO setup self._relay_pin = config.get("gpio", "relay_pin") self._gpio = None try: import RPi.GPIO as GPIO self._gpio = GPIO GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(self._relay_pin, GPIO.OUT, initial=GPIO.LOW) log.info("GPIO initialized, relay pin %d set LOW", self._relay_pin) except Exception as e: log.error("Failed to initialize GPIO: %s", e) # Logging setup self._log_dir = config.get("logging", "log_directory") os.makedirs(self._log_dir, exist_ok=True) self._log_file = None self._log_writer = None self._log_counter = 0 self._history = [] # Recent data points for the web UI self._history_max = 1000 self._init_log_file() def _init_log_file(self): """Create a new CSV log file with a timestamp in the filename.""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") path = os.path.join(self._log_dir, "pinail_{}.csv".format(timestamp)) self._log_file = open(path, 'w', newline='') self._log_writer = csv.writer(self._log_file) self._log_writer.writerow([ "timestamp", "temp_f", "setpoint_f", "output", "relay", "kP", "kI", "kD", "loop_size_ms" ]) log.info("Log file created: %s", path) except Exception as e: log.error("Failed to create log file: %s", e) def start(self): """Enable the controller and start the background control loop.""" with self._lock: if self._thread is not None and self._thread.is_alive(): log.warning("Controller is already running") return self._enabled = True self._safety_tripped = False self._safety_reason = "" self._start_time = time.monotonic() self._idle_since = None self._stop_event.clear() self._pid.reset() self._thread = threading.Thread(target=self._control_loop, daemon=True, name="pid-loop") self._thread.start() log.info("PID controller started (setpoint=%.0fF)", self._setpoint) def stop(self): """Stop the controller and turn off the relay.""" log.info("Stopping PID controller") self._stop_event.set() with self._lock: self._enabled = False self._autotune_active = False self._relay_off() if self._thread is not None: self._thread.join(timeout=5) log.info("PID controller stopped") def _relay_off(self): """Ensure relay is OFF.""" if self._gpio is not None: try: self._gpio.output(self._relay_pin, self._gpio.LOW) except Exception as e: log.error("Failed to turn relay off: %s", e) self._relay_on = False def _relay_set(self, on): """Set relay state.""" if self._gpio is not None: try: self._gpio.output( self._relay_pin, self._gpio.HIGH if on else self._gpio.LOW ) except Exception as e: log.error("Failed to set relay: %s", e) self._relay_on = on def _reset_autotune_state(self): self._autotune_heating = False self._autotune_phase_started = None self._autotune_phase_extreme = None self._autotune_high_peaks = [] self._autotune_low_peaks = [] def start_autotune(self, setpoint=None, hysteresis=None, cycles=None): with self._lock: if setpoint is not None: self._setpoint = float(setpoint) self._config.set("control", "setpoint", float(setpoint)) self._autotune_target = self._setpoint if hysteresis is not None: self._autotune_hysteresis = float(hysteresis) if cycles is not None: self._autotune_cycles = int(cycles) self._autotune_active = True self._autotune_message = "Autotune running" self._autotune_last_result = None self._reset_autotune_state() self._pid.reset() log.info( "Autotune started target=%.1fF hysteresis=%.1f cycles=%d", self._autotune_target, self._autotune_hysteresis, self._autotune_cycles, ) def stop_autotune(self, message="Autotune stopped"): with self._lock: self._autotune_active = False self._autotune_message = message self._relay_off() def _update_autotune(self, temp, loop_size): target = self._autotune_target h = self._autotune_hysteresis now = time.monotonic() if self._autotune_phase_started is None: self._autotune_phase_started = now self._autotune_phase_extreme = temp self._autotune_heating = temp < target if self._autotune_heating: if self._autotune_phase_extreme is None or temp > self._autotune_phase_extreme: self._autotune_phase_extreme = temp if temp >= target + h: self._autotune_high_peaks.append((now, self._autotune_phase_extreme)) self._autotune_heating = False self._autotune_phase_started = now self._autotune_phase_extreme = temp else: if self._autotune_phase_extreme is None or temp < self._autotune_phase_extreme: self._autotune_phase_extreme = temp if temp <= target - h: self._autotune_low_peaks.append((now, self._autotune_phase_extreme)) self._autotune_heating = True self._autotune_phase_started = now self._autotune_phase_extreme = temp highs = len(self._autotune_high_peaks) lows = len(self._autotune_low_peaks) if min(highs, lows) >= self._autotune_cycles and highs >= 2: high_vals = [v for _, v in self._autotune_high_peaks[-self._autotune_cycles:]] low_vals = [v for _, v in self._autotune_low_peaks[-self._autotune_cycles:]] amp = (sum(high_vals) / float(len(high_vals)) - sum(low_vals) / float(len(low_vals))) / 2.0 if amp <= 0: self.stop_autotune("Autotune failed: invalid oscillation amplitude") return 0.0 high_times = [t for t, _ in self._autotune_high_peaks[-self._autotune_cycles:]] periods = [] for i in range(1, len(high_times)): periods.append(high_times[i] - high_times[i - 1]) if not periods: self.stop_autotune("Autotune failed: not enough periods") return 0.0 pu = sum(periods) / float(len(periods)) if pu <= 0: self.stop_autotune("Autotune failed: invalid period") return 0.0 d = loop_size / 2.0 ku = (4.0 * d) / (math.pi * amp) kp = 0.6 * ku ki = (1.2 * ku) / pu kd = 0.075 * ku * pu # Safety clamp for aggressive/unstable autotune outputs. kp = max(0.0, min(kp, 300.0)) ki = max(0.0, min(ki, 50.0)) kd = max(0.0, min(kd, 500.0)) self._pid.tunings = (kp, ki, kd) self._pid.reset() self._config.update_section("pid", { "kP": round(kp, 4), "kI": round(ki, 4), "kD": round(kd, 4), }) self._autotune_last_result = { "kP": round(kp, 4), "kI": round(ki, 4), "kD": round(kd, 4), "Ku": round(ku, 4), "Pu": round(pu, 4), "amplitude": round(amp, 4), } self.stop_autotune("Autotune complete") log.info("Autotune complete: %s", self._autotune_last_result) return 0.0 return float(loop_size if self._autotune_heating else 0.0) def _control_loop(self): """ Main PID control loop. Runs in a background thread. Each iteration is one "loop cycle" of loop_size_ms milliseconds. Within each cycle, the relay is ON for a proportion of time equal to the PID output, implementing time-proportional software PWM. """ log.info("Control loop thread started") try: while not self._stop_event.is_set(): # Reload config if file changed self._config.reload_if_changed() # Apply current PID tuning pid_cfg = self._config.get("pid") self._pid.tunings = (pid_cfg["kP"], pid_cfg["kI"], pid_cfg["kD"]) self._pid.proportional_on_measurement = pid_cfg.get( "proportional_on_measurement", True ) control_cfg = self._config.get("control") loop_size = control_cfg["loop_size_ms"] sleep_time = control_cfg["sleep_time"] log_resolution = self._config.get("logging", "log_resolution") self._pid.output_limits = (0, loop_size) with self._lock: self._pid.setpoint = self._setpoint # Read temperature temp = self._tc.read() if temp is None: log.error("Thermocouple read returned None, disabling for safety") self._trip_safety("Thermocouple disconnected") break with self._lock: self._temp = temp # Safety checks safety = self._config.get("safety") if temp > safety["max_temp_f"]: self._trip_safety("Temperature {:.0f}F exceeds max {}F".format(temp, safety['max_temp_f'])) break if not self._tc.is_connected: self._trip_safety("Thermocouple disconnected") break # Idle shutoff check idle_minutes = safety.get("idle_shutoff_minutes", 0) if idle_minutes > 0: near_setpoint = abs(temp - self._setpoint) < 20 if near_setpoint: if self._idle_since is None: self._idle_since = time.monotonic() elif (time.monotonic() - self._idle_since) > (idle_minutes * 60): self._trip_safety( "Idle shutoff: at setpoint for {} minutes".format(idle_minutes) ) break else: self._idle_since = None # PID compute and software PWM cycle start_ms = time.monotonic() * 1000 end_ms = start_ms + loop_size while time.monotonic() * 1000 < end_ms: if self._stop_event.is_set(): break temp = self._tc.read() if temp is not None: with self._lock: self._temp = temp if self._autotune_active: output = self._update_autotune(temp, loop_size) else: output = self._pid(temp) with self._lock: self._output = output # Software PWM: relay ON for first `output` ms of the cycle elapsed_ms = time.monotonic() * 1000 - start_ms should_be_on = elapsed_ms < output self._relay_set(should_be_on) # Logging self._log_counter += 1 if self._log_counter >= log_resolution: self._log_data_point(temp, output) self._log_counter = 0 with self._lock: self._last_loop_time = time.monotonic() self._loop_count += 1 time.sleep(sleep_time) except Exception as e: log.exception("Control loop crashed: %s", e) self._trip_safety("Control loop error: {}".format(e)) finally: self._relay_off() log.info("Control loop thread exiting, relay OFF") def _trip_safety(self, reason): """Trip safety shutdown.""" log.warning("SAFETY TRIP: %s", reason) with self._lock: self._safety_tripped = True self._safety_reason = reason self._enabled = False self._relay_off() def _log_data_point(self, temp, output): """Write a data point to the CSV log and in-memory history.""" now = time.time() row = [ "{:.3f}".format(now), "{:.2f}".format(temp), "{:.2f}".format(self._setpoint), "{:.2f}".format(output), 1 if self._relay_on else 0, "{:.4f}".format(self._pid.Kp), "{:.4f}".format(self._pid.Ki), "{:.4f}".format(self._pid.Kd), self._config.get("control", "loop_size_ms") ] # CSV file if self._log_writer: try: self._log_writer.writerow(row) self._log_file.flush() except Exception as e: log.error("Failed to write log: %s", e) # In-memory history for web UI data_point = { "timestamp": now, "temp": round(temp, 2), "setpoint": round(self._setpoint, 2), "output": round(output, 2), "relay": self._relay_on } with self._lock: self._history.append(data_point) if len(self._history) > self._history_max: self._history = self._history[-self._history_max:] # --- Public API (thread-safe) --- def set_setpoint(self, value): """Change the target temperature.""" with self._lock: old_setpoint = self._setpoint self._setpoint = float(value) self._idle_since = None # Reset idle timer on setpoint change if abs(self._setpoint - old_setpoint) >= 10: self._pid.reset() self._config.set("control", "setpoint", float(value)) log.info("Setpoint changed to %.0fF", value) def set_pid_tuning(self, kp, ki, kd, proportional_on_measurement=None, proportional_mode=None): """Update PID gains.""" section = {"kP": kp, "kI": ki, "kD": kd} if proportional_mode in ("error", "measurement"): proportional_on_measurement = (proportional_mode == "measurement") if proportional_on_measurement is not None: section["proportional_on_measurement"] = bool(proportional_on_measurement) self._pid.proportional_on_measurement = bool(proportional_on_measurement) self._config.update_section("pid", section) self._pid.tunings = (kp, ki, kd) self._pid.reset() log.info("PID tuning updated: kP=%.4f, kI=%.4f, kD=%.4f", kp, ki, kd) @property def status(self): """Return a dict of current controller state (thread-safe snapshot).""" with self._lock: uptime = None if self._start_time is not None and self._enabled: uptime = round(time.monotonic() - self._start_time, 1) return { "enabled": self._enabled, "temp": round(self._temp, 2), "setpoint": round(self._setpoint, 2), "output": round(self._output, 2), "relay_on": self._relay_on, "loop_count": self._loop_count, "uptime_seconds": uptime, "safety_tripped": self._safety_tripped, "safety_reason": self._safety_reason, "thermocouple_connected": self._tc.is_connected, "pid": { "kP": self._pid.Kp, "kI": self._pid.Ki, "kD": self._pid.Kd, "proportional_on_measurement": self._pid.proportional_on_measurement, "proportional_mode": "measurement" if self._pid.proportional_on_measurement else "error", }, "autotune": { "active": self._autotune_active, "target": round(self._autotune_target, 2), "hysteresis": round(self._autotune_hysteresis, 2), "cycles": self._autotune_cycles, "high_peaks": len(self._autotune_high_peaks), "low_peaks": len(self._autotune_low_peaks), "phase": "heating" if self._autotune_heating else "cooling", "message": self._autotune_message, "last_result": self._autotune_last_result, }, } @property def autotune_status(self): with self._lock: return { "active": self._autotune_active, "target": round(self._autotune_target, 2), "hysteresis": round(self._autotune_hysteresis, 2), "cycles": self._autotune_cycles, "high_peaks": len(self._autotune_high_peaks), "low_peaks": len(self._autotune_low_peaks), "phase": "heating" if self._autotune_heating else "cooling", "message": self._autotune_message, "last_result": self._autotune_last_result, } @property def history(self): """Return recent data points for charting.""" with self._lock: return list(self._history) def get_history_since(self, since_timestamp): """Return data points newer than the given timestamp.""" with self._lock: return [p for p in self._history if p["timestamp"] > since_timestamp] @property def is_alive(self): """Check if the control loop thread is alive.""" return self._thread is not None and self._thread.is_alive() @property def watchdog_ok(self): """Check if the control loop has updated recently.""" if not self._enabled: return True with self._lock: if self._last_loop_time is None: return True timeout = self._config.get("safety", "watchdog_timeout_s") return (time.monotonic() - self._last_loop_time) < timeout def cleanup(self): """Clean up GPIO and close log file. Call on application shutdown.""" self.stop() if self._log_file: try: self._log_file.close() except Exception: pass if self._gpio is not None: try: self._gpio.cleanup() log.info("GPIO cleaned up") except Exception as e: log.error("GPIO cleanup error: %s", e)