Initialize piNail project with modern piNail2 web controller
This commit is contained in:
@@ -0,0 +1,158 @@
|
||||
"""
|
||||
piNail2 Thermocouple Driver
|
||||
|
||||
Wraps the MAX6675 thermocouple reader with:
|
||||
- Celsius to Fahrenheit conversion
|
||||
- Spike/outlier filtering (median of recent readings)
|
||||
- Open thermocouple detection
|
||||
- Error handling
|
||||
"""
|
||||
|
||||
import logging
|
||||
import collections
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def c_to_f(c):
|
||||
"""Convert Celsius to Fahrenheit."""
|
||||
return c * 9.0 / 5.0 + 32.0
|
||||
|
||||
|
||||
class Thermocouple:
|
||||
"""
|
||||
MAX6675 thermocouple reader with spike filtering.
|
||||
|
||||
The MAX6675 occasionally returns spurious readings (we've seen 869F spikes
|
||||
in otherwise stable ~680F data). This class maintains a sliding window of
|
||||
recent readings and rejects outliers.
|
||||
"""
|
||||
|
||||
def __init__(self, clk, cs, do, spike_threshold=50.0, window_size=5):
|
||||
"""
|
||||
Args:
|
||||
clk: GPIO pin for SPI clock
|
||||
cs: GPIO pin for chip select
|
||||
do: GPIO pin for data out (MISO)
|
||||
spike_threshold: Maximum allowed jump between filtered readings (in F)
|
||||
window_size: Number of recent readings to keep for median filtering
|
||||
"""
|
||||
self._spike_threshold = spike_threshold
|
||||
self._window_size = window_size
|
||||
self._readings = collections.deque(maxlen=window_size)
|
||||
self._last_good_temp = None
|
||||
self._raw_temp = 0.0
|
||||
self._filtered_temp = 0.0
|
||||
self._is_connected = False
|
||||
self._spike_count = 0
|
||||
self._total_reads = 0
|
||||
|
||||
try:
|
||||
import MAX6675.MAX6675 as MAX6675
|
||||
self._sensor = MAX6675.MAX6675(clk, cs, do)
|
||||
self._is_connected = True
|
||||
log.info("MAX6675 thermocouple initialized (CLK=%d, CS=%d, DO=%d)", clk, cs, do)
|
||||
except Exception as e:
|
||||
log.error("Failed to initialize MAX6675: %s", e)
|
||||
self._sensor = None
|
||||
self._is_connected = False
|
||||
|
||||
def read(self):
|
||||
"""
|
||||
Read temperature from the thermocouple.
|
||||
|
||||
Returns:
|
||||
float: Filtered temperature in Fahrenheit, or None if sensor is disconnected.
|
||||
"""
|
||||
if self._sensor is None:
|
||||
return None
|
||||
|
||||
self._total_reads += 1
|
||||
|
||||
try:
|
||||
raw_c = self._sensor.readTempC()
|
||||
except Exception as e:
|
||||
log.error("Thermocouple read error: %s", e)
|
||||
self._is_connected = False
|
||||
return self._last_good_temp
|
||||
|
||||
# Check for open thermocouple (MAX6675 returns very high values or specific error codes)
|
||||
if raw_c is None or raw_c > 1023:
|
||||
log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c)
|
||||
self._is_connected = False
|
||||
return self._last_good_temp
|
||||
|
||||
self._is_connected = True
|
||||
raw_f = c_to_f(raw_c)
|
||||
self._raw_temp = raw_f
|
||||
|
||||
# Spike detection: if we have a previous good reading, check for unreasonable jumps
|
||||
if self._last_good_temp is not None:
|
||||
delta = abs(raw_f - self._last_good_temp)
|
||||
if delta > self._spike_threshold:
|
||||
self._spike_count += 1
|
||||
log.warning(
|
||||
"Spike detected: %.1fF -> %.1fF (delta=%.1fF, count=%d). Using last good value.",
|
||||
self._last_good_temp, raw_f, delta, self._spike_count
|
||||
)
|
||||
# Still add to window but return last good — if multiple consecutive
|
||||
# readings are in the "spike" range, they'll become the new normal
|
||||
# via the median filter
|
||||
self._readings.append(raw_f)
|
||||
# If we've had many consecutive "spikes", the temperature probably
|
||||
# genuinely changed fast (e.g., touching the nail with concentrate)
|
||||
if self._spike_count >= self._window_size:
|
||||
log.info("Spike count exceeded window size, accepting new baseline %.1fF", raw_f)
|
||||
self._spike_count = 0
|
||||
self._last_good_temp = raw_f
|
||||
self._filtered_temp = raw_f
|
||||
return raw_f
|
||||
self._filtered_temp = self._last_good_temp
|
||||
return self._last_good_temp
|
||||
|
||||
# Normal reading — reset spike counter, add to window
|
||||
self._spike_count = 0
|
||||
self._readings.append(raw_f)
|
||||
|
||||
# Median filter
|
||||
if len(self._readings) >= 3:
|
||||
sorted_readings = sorted(self._readings)
|
||||
median = sorted_readings[len(sorted_readings) // 2]
|
||||
self._filtered_temp = median
|
||||
self._last_good_temp = median
|
||||
else:
|
||||
self._filtered_temp = raw_f
|
||||
self._last_good_temp = raw_f
|
||||
|
||||
return self._filtered_temp
|
||||
|
||||
@property
|
||||
def raw_temp(self):
|
||||
"""Last raw (unfiltered) temperature reading in Fahrenheit."""
|
||||
return self._raw_temp
|
||||
|
||||
@property
|
||||
def filtered_temp(self):
|
||||
"""Last filtered temperature reading in Fahrenheit."""
|
||||
return self._filtered_temp
|
||||
|
||||
@property
|
||||
def is_connected(self):
|
||||
"""Whether the thermocouple appears to be connected."""
|
||||
return self._is_connected
|
||||
|
||||
@property
|
||||
def spike_count(self):
|
||||
"""Number of spike events detected since last normal reading."""
|
||||
return self._spike_count
|
||||
|
||||
@property
|
||||
def stats(self):
|
||||
"""Return diagnostic stats."""
|
||||
return {
|
||||
"raw_temp": round(self._raw_temp, 2),
|
||||
"filtered_temp": round(self._filtered_temp, 2),
|
||||
"is_connected": self._is_connected,
|
||||
"total_reads": self._total_reads,
|
||||
"recent_readings": [round(r, 2) for r in self._readings],
|
||||
}
|
||||
Reference in New Issue
Block a user