159 lines
5.4 KiB
Python
159 lines
5.4 KiB
Python
"""
|
|
piNail2 Thermocouple Driver
|
|
|
|
Wraps the MAX6675 thermocouple reader with:
|
|
- Celsius to Fahrenheit conversion
|
|
- Spike/outlier filtering (median of recent readings)
|
|
- Open thermocouple detection
|
|
- Error handling
|
|
"""
|
|
|
|
import logging
|
|
import collections
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def c_to_f(c):
|
|
"""Convert Celsius to Fahrenheit."""
|
|
return c * 9.0 / 5.0 + 32.0
|
|
|
|
|
|
class Thermocouple:
|
|
"""
|
|
MAX6675 thermocouple reader with spike filtering.
|
|
|
|
The MAX6675 occasionally returns spurious readings (we've seen 869F spikes
|
|
in otherwise stable ~680F data). This class maintains a sliding window of
|
|
recent readings and rejects outliers.
|
|
"""
|
|
|
|
def __init__(self, clk, cs, do, spike_threshold=50.0, window_size=5):
|
|
"""
|
|
Args:
|
|
clk: GPIO pin for SPI clock
|
|
cs: GPIO pin for chip select
|
|
do: GPIO pin for data out (MISO)
|
|
spike_threshold: Maximum allowed jump between filtered readings (in F)
|
|
window_size: Number of recent readings to keep for median filtering
|
|
"""
|
|
self._spike_threshold = spike_threshold
|
|
self._window_size = window_size
|
|
self._readings = collections.deque(maxlen=window_size)
|
|
self._last_good_temp = None
|
|
self._raw_temp = 0.0
|
|
self._filtered_temp = 0.0
|
|
self._is_connected = False
|
|
self._spike_count = 0
|
|
self._total_reads = 0
|
|
|
|
try:
|
|
import MAX6675.MAX6675 as MAX6675
|
|
self._sensor = MAX6675.MAX6675(clk, cs, do)
|
|
self._is_connected = True
|
|
log.info("MAX6675 thermocouple initialized (CLK=%d, CS=%d, DO=%d)", clk, cs, do)
|
|
except Exception as e:
|
|
log.error("Failed to initialize MAX6675: %s", e)
|
|
self._sensor = None
|
|
self._is_connected = False
|
|
|
|
def read(self):
|
|
"""
|
|
Read temperature from the thermocouple.
|
|
|
|
Returns:
|
|
float: Filtered temperature in Fahrenheit, or None if sensor is disconnected.
|
|
"""
|
|
if self._sensor is None:
|
|
return None
|
|
|
|
self._total_reads += 1
|
|
|
|
try:
|
|
raw_c = self._sensor.readTempC()
|
|
except Exception as e:
|
|
log.error("Thermocouple read error: %s", e)
|
|
self._is_connected = False
|
|
return self._last_good_temp
|
|
|
|
# Check for open thermocouple (MAX6675 returns very high values or specific error codes)
|
|
if raw_c is None or raw_c > 1023:
|
|
log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c)
|
|
self._is_connected = False
|
|
return self._last_good_temp
|
|
|
|
self._is_connected = True
|
|
raw_f = c_to_f(raw_c)
|
|
self._raw_temp = raw_f
|
|
|
|
# Spike detection: if we have a previous good reading, check for unreasonable jumps
|
|
if self._last_good_temp is not None:
|
|
delta = abs(raw_f - self._last_good_temp)
|
|
if delta > self._spike_threshold:
|
|
self._spike_count += 1
|
|
log.warning(
|
|
"Spike detected: %.1fF -> %.1fF (delta=%.1fF, count=%d). Using last good value.",
|
|
self._last_good_temp, raw_f, delta, self._spike_count
|
|
)
|
|
# Still add to window but return last good — if multiple consecutive
|
|
# readings are in the "spike" range, they'll become the new normal
|
|
# via the median filter
|
|
self._readings.append(raw_f)
|
|
# If we've had many consecutive "spikes", the temperature probably
|
|
# genuinely changed fast (e.g., touching the nail with concentrate)
|
|
if self._spike_count >= self._window_size:
|
|
log.info("Spike count exceeded window size, accepting new baseline %.1fF", raw_f)
|
|
self._spike_count = 0
|
|
self._last_good_temp = raw_f
|
|
self._filtered_temp = raw_f
|
|
return raw_f
|
|
self._filtered_temp = self._last_good_temp
|
|
return self._last_good_temp
|
|
|
|
# Normal reading — reset spike counter, add to window
|
|
self._spike_count = 0
|
|
self._readings.append(raw_f)
|
|
|
|
# Median filter
|
|
if len(self._readings) >= 3:
|
|
sorted_readings = sorted(self._readings)
|
|
median = sorted_readings[len(sorted_readings) // 2]
|
|
self._filtered_temp = median
|
|
self._last_good_temp = median
|
|
else:
|
|
self._filtered_temp = raw_f
|
|
self._last_good_temp = raw_f
|
|
|
|
return self._filtered_temp
|
|
|
|
@property
|
|
def raw_temp(self):
|
|
"""Last raw (unfiltered) temperature reading in Fahrenheit."""
|
|
return self._raw_temp
|
|
|
|
@property
|
|
def filtered_temp(self):
|
|
"""Last filtered temperature reading in Fahrenheit."""
|
|
return self._filtered_temp
|
|
|
|
@property
|
|
def is_connected(self):
|
|
"""Whether the thermocouple appears to be connected."""
|
|
return self._is_connected
|
|
|
|
@property
|
|
def spike_count(self):
|
|
"""Number of spike events detected since last normal reading."""
|
|
return self._spike_count
|
|
|
|
@property
|
|
def stats(self):
|
|
"""Return diagnostic stats."""
|
|
return {
|
|
"raw_temp": round(self._raw_temp, 2),
|
|
"filtered_temp": round(self._filtered_temp, 2),
|
|
"is_connected": self._is_connected,
|
|
"total_reads": self._total_reads,
|
|
"recent_readings": [round(r, 2) for r in self._readings],
|
|
}
|