""" piNail2 Thermocouple Driver Wraps the MAX6675 thermocouple reader with: - Celsius to Fahrenheit conversion - Spike/outlier filtering (median of recent readings) - Open thermocouple detection - Error handling """ import logging import collections import math import importlib import types log = logging.getLogger(__name__) def c_to_f(c): """Convert Celsius to Fahrenheit.""" return c * 9.0 / 5.0 + 32.0 class Thermocouple: """ MAX6675 thermocouple reader with spike filtering. The MAX6675 occasionally returns spurious readings (we've seen 869F spikes in otherwise stable ~680F data). This class maintains a sliding window of recent readings and rejects outliers. """ def __init__(self, clk, cs, do, spike_threshold=50.0, window_size=5): """ Args: clk: GPIO pin for SPI clock cs: GPIO pin for chip select do: GPIO pin for data out (MISO) spike_threshold: Maximum allowed jump between filtered readings (in F) window_size: Number of recent readings to keep for median filtering """ self._spike_threshold = spike_threshold self._window_size = window_size self._readings = collections.deque(maxlen=window_size) self._last_good_temp = None self._raw_temp = 0.0 self._filtered_temp = 0.0 self._is_connected = False self._spike_count = 0 self._total_reads = 0 try: sensor_ctor = None module_candidates = [ ("MAX6675.MAX6675", "MAX6675"), ("MAX6675.MAX6675.MAX6675", "MAX6675"), ("MAX6675.MAX6675", None), ("MAX6675.MAX6675.MAX6675", None), ] for module_name, attr_name in module_candidates: try: module = importlib.import_module(module_name) except Exception: continue obj = getattr(module, attr_name, None) if attr_name else module if isinstance(obj, types.ModuleType): obj = getattr(obj, "MAX6675", None) if callable(obj): sensor_ctor = obj break if not callable(sensor_ctor): raise TypeError("MAX6675 constructor unavailable") self._sensor = sensor_ctor(clk, cs, do) self._is_connected = True log.info("MAX6675 thermocouple initialized (CLK=%d, CS=%d, DO=%d)", clk, cs, do) except Exception as e: log.error("Failed to initialize MAX6675: %s", e) self._sensor = None self._is_connected = False def read(self): """ Read temperature from the thermocouple. Returns: float: Filtered temperature in Fahrenheit, or None if sensor is disconnected. """ if self._sensor is None: return None self._total_reads += 1 try: raw_c = self._sensor.readTempC() except Exception as e: log.error("Thermocouple read error: %s", e) self._is_connected = False return self._last_good_temp # Check for open thermocouple / invalid frame. # MAX6675 open probe often reports NaN; guard all non-finite values. if raw_c is None: log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c) self._is_connected = False return self._last_good_temp try: raw_c = float(raw_c) except (TypeError, ValueError): log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c) self._is_connected = False return self._last_good_temp if (not math.isfinite(raw_c)) or raw_c > 1023: log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c) self._is_connected = False return self._last_good_temp self._is_connected = True raw_f = c_to_f(raw_c) self._raw_temp = raw_f # Spike detection: if we have a previous good reading, check for unreasonable jumps if self._last_good_temp is not None: delta = abs(raw_f - self._last_good_temp) if delta > self._spike_threshold: self._spike_count += 1 log.warning( "Spike detected: %.1fF -> %.1fF (delta=%.1fF, count=%d). Using last good value.", self._last_good_temp, raw_f, delta, self._spike_count ) # Still add to window but return last good — if multiple consecutive # readings are in the "spike" range, they'll become the new normal # via the median filter self._readings.append(raw_f) # If we've had many consecutive "spikes", the temperature probably # genuinely changed fast (e.g., touching the nail with concentrate) if self._spike_count >= self._window_size: log.info("Spike count exceeded window size, accepting new baseline %.1fF", raw_f) self._spike_count = 0 self._last_good_temp = raw_f self._filtered_temp = raw_f return raw_f self._filtered_temp = self._last_good_temp return self._last_good_temp # Normal reading — reset spike counter, add to window self._spike_count = 0 self._readings.append(raw_f) # Median filter if len(self._readings) >= 3: sorted_readings = sorted(self._readings) median = sorted_readings[len(sorted_readings) // 2] self._filtered_temp = median self._last_good_temp = median else: self._filtered_temp = raw_f self._last_good_temp = raw_f return self._filtered_temp @property def raw_temp(self): """Last raw (unfiltered) temperature reading in Fahrenheit.""" return self._raw_temp @property def filtered_temp(self): """Last filtered temperature reading in Fahrenheit.""" return self._filtered_temp @property def is_connected(self): """Whether the thermocouple appears to be connected.""" return self._is_connected @property def spike_count(self): """Number of spike events detected since last normal reading.""" return self._spike_count @property def stats(self): """Return diagnostic stats.""" def safe(value): if isinstance(value, (int, float)) and math.isfinite(float(value)): return round(float(value), 2) return None return { "raw_temp": safe(self._raw_temp), "filtered_temp": safe(self._filtered_temp), "is_connected": self._is_connected, "total_reads": self._total_reads, "recent_readings": [safe(r) for r in self._readings], }