Files
piNail/piNail2/app.py
T

748 lines
24 KiB
Python

"""
piNail2 — Flask Web Application
Main entry point. Runs the Flask web server and initializes the PID controller.
Provides REST API endpoints and serves the single-page dashboard.
Usage:
python3 app.py
python3 app.py --config /path/to/config.json
"""
import sys
import os
import signal
import atexit
import logging
import argparse
import json
import time
import threading
import copy
from datetime import datetime
from flask import Flask, render_template, jsonify, request
from config import Config
from thermocouple import Thermocouple
from pid_controller import PIDController
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
]
)
log = logging.getLogger("piNail2")
APP_INSTANCE_ID = str(int(time.time()))
APP_VERSION = "v2.1.0"
# ---------------------------------------------------------------------------
# Flask app
# ---------------------------------------------------------------------------
app = Flask(__name__)
# These are set in main() before the app starts
config = None # type: Config
controllers = {}
thermocouples = {}
channel_configs = {}
NAIL_IDS = ("nail1", "nail2")
CONFIG_WRITE_LOCK = threading.Lock()
CONTROL_LIMITS = {
"loop_size_ms": {"min": 1500, "max": 5000},
"sleep_time": {"min": 0.15, "max": 0.6},
}
PID_GUARDRAILS = {
"balanced": {"kP_max": 80.0, "kI_max": 4.0, "kD_max": 50.0},
"responsive": {"kP_max": 50.0, "kI_max": 2.5, "kD_max": 25.0},
"extreme": {"kP_max": 35.0, "kI_max": 1.8, "kD_max": 15.0},
}
def classify_timing_profile(loop_size_ms, sleep_time):
"""Classify timing aggressiveness for safety guardrails."""
if loop_size_ms <= 1400 or sleep_time <= 0.14:
return "extreme"
if loop_size_ms <= 1800 or sleep_time <= 0.20:
return "responsive"
if loop_size_ms <= 2800 or sleep_time <= 0.32:
return "balanced"
return "conservative"
def clamp_pid_for_profile(profile, pid_status):
"""Clamp PID gains to guardrails for aggressive timing profiles."""
limits = PID_GUARDRAILS.get(profile)
if not limits:
return None
before = {
"kP": float(pid_status.get("kP", 0.0)),
"kI": float(pid_status.get("kI", 0.0)),
"kD": float(pid_status.get("kD", 0.0)),
}
after = {
"kP": min(before["kP"], limits["kP_max"]),
"kI": min(before["kI"], limits["kI_max"]),
"kD": min(before["kD"], limits["kD_max"]),
}
changed = (
abs(before["kP"] - after["kP"]) > 1e-9
or abs(before["kI"] - after["kI"]) > 1e-9
or abs(before["kD"] - after["kD"]) > 1e-9
)
if not changed:
return None
return {
"before": before,
"after": after,
"limits": limits,
}
def normalize_nail_id(raw_nail):
if raw_nail is None:
return "nail1"
text = str(raw_nail).strip().lower()
if text in ("1", "nail1"):
return "nail1"
if text in ("2", "nail2"):
return "nail2"
return None
def resolve_nail_id(body=None):
q_nail = request.args.get("nail")
if q_nail is not None:
nail_id = normalize_nail_id(q_nail)
if nail_id:
return nail_id
if body and "nail" in body:
nail_id = normalize_nail_id(body.get("nail"))
if nail_id:
return nail_id
return "nail1"
def get_nail_parts(nail_id):
controller = controllers.get(nail_id)
tc = thermocouples.get(nail_id)
ch_cfg = channel_configs.get(nail_id)
if controller is None or tc is None or ch_cfg is None:
return None, None, None
return controller, tc, ch_cfg
def ensure_nails_config(base_config):
nails = base_config.get("nails")
if isinstance(nails, dict) and "nail1" in nails and "nail2" in nails:
return
data = base_config.data
default_nails = copy.deepcopy(data.get("nails", {}))
nail1 = default_nails.get("nail1", {})
nail2 = default_nails.get("nail2", {})
for section in ("pid", "control", "flight", "scheduler", "safety", "logging", "autotune"):
if section in data:
nail1[section] = copy.deepcopy(data[section])
if "gpio" in data:
gpio1 = copy.deepcopy(data["gpio"])
gpio2 = copy.deepcopy(nail2.get("gpio", {}))
nail1["gpio"] = gpio1
nail2["gpio"] = {
"relay_pin": gpio2.get("relay_pin", 9),
"clk": gpio2.get("clk", 17),
"cs": gpio2.get("cs", 18),
"do": gpio2.get("do", 27),
}
default_nails["nail1"] = nail1
default_nails["nail2"] = nail2
base_config._data["nails"] = default_nails
base_config.save()
class ChannelConfigProxy:
def __init__(self, root_config, nail_id):
self._root = root_config
self._nail_id = nail_id
def _channel_dict(self):
nails = self._root._data.setdefault("nails", {})
return nails.setdefault(self._nail_id, {})
def get(self, section, key=None):
section_data = self._root._data.get("nails", {}).get(self._nail_id, {}).get(section)
if section_data is None:
section_data = self._root.get(section)
if key is None:
return copy.deepcopy(section_data if isinstance(section_data, dict) else {})
return copy.deepcopy((section_data or {}).get(key))
def set(self, section, key, value):
with CONFIG_WRITE_LOCK:
section_data = self._channel_dict().setdefault(section, {})
section_data[key] = value
self._root.save()
def update_section(self, section, values):
with CONFIG_WRITE_LOCK:
section_data = self._channel_dict().setdefault(section, {})
section_data.update(values)
self._root.save()
def reload_if_changed(self):
return self._root.reload_if_changed()
# ---------------------------------------------------------------------------
# Routes — Pages
# ---------------------------------------------------------------------------
@app.route("/")
def index():
"""Serve the main dashboard."""
return render_template(
"index.html",
app_version=APP_VERSION,
copyright_year=datetime.now().year,
)
# ---------------------------------------------------------------------------
# Routes — API
# ---------------------------------------------------------------------------
@app.route("/api/status")
def api_status():
"""Return current controller state."""
nail_id = resolve_nail_id()
controller, tc, ch_cfg = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
status = controller.status
status["nail"] = nail_id
status["instance_id"] = APP_INSTANCE_ID
status["thermocouple"] = tc.stats
status["presets"] = config.get("presets")
status["config"] = {
"loop_size_ms": ch_cfg.get("control", "loop_size_ms"),
"sleep_time": ch_cfg.get("control", "sleep_time"),
"safety": ch_cfg.get("safety"),
}
return jsonify(status)
@app.route("/api/status/all")
def api_status_all():
payload = {
"instance_id": APP_INSTANCE_ID,
"presets": config.get("presets"),
"nails": {},
}
for nail_id in NAIL_IDS:
controller, tc, ch_cfg = get_nail_parts(nail_id)
if controller is None:
continue
status = controller.status
status["nail"] = nail_id
status["thermocouple"] = tc.stats
status["config"] = {
"loop_size_ms": ch_cfg.get("control", "loop_size_ms"),
"sleep_time": ch_cfg.get("control", "sleep_time"),
"safety": ch_cfg.get("safety"),
}
payload["nails"][nail_id] = status
return jsonify(payload)
@app.route("/api/heartbeat")
def api_heartbeat():
"""Lightweight health endpoint for frontend reconnect logic."""
return jsonify({
"ok": True,
"instance_id": APP_INSTANCE_ID,
"ts": time.time(),
"controller_alive": {n: controllers[n].is_alive for n in controllers},
"watchdog_ok": {n: controllers[n].watchdog_ok for n in controllers},
})
@app.route("/api/history")
def api_history():
"""Return recent temperature history for charting."""
nail_id = resolve_nail_id()
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
since = request.args.get("since", 0, type=float)
if since > 0:
data = controller.get_history_since(since)
else:
data = controller.history
return jsonify(data)
@app.route("/api/power", methods=["POST"])
def api_power():
"""Toggle controller on/off."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
enable = body.get("enabled")
if enable is None:
# Toggle
enable = not controller.status["enabled"]
controller.set_power(enable)
return jsonify({"enabled": enable, "ok": True, "nail": nail_id})
@app.route("/api/setpoint", methods=["POST"])
def api_setpoint():
"""Change the target temperature."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, ch_cfg = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
value = body.get("setpoint")
if value is None:
return jsonify({"error": "Missing 'setpoint' field"}), 400
try:
value = float(value)
except (TypeError, ValueError):
return jsonify({"error": "Invalid setpoint value"}), 400
safety = ch_cfg.get("safety")
if value > safety["max_temp_f"]:
return jsonify({"error": "Setpoint {} exceeds max {}F".format(value, safety['max_temp_f'])}), 400
if value < safety["min_temp_f"]:
return jsonify({"error": "Setpoint {} below min {}F".format(value, safety['min_temp_f'])}), 400
controller.set_setpoint(value)
return jsonify({"setpoint": value, "ok": True, "nail": nail_id})
@app.route("/api/control", methods=["POST"])
def api_control():
"""Update loop timing controls (loop_size_ms and sleep_time)."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, ch_cfg = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
updates = {}
confirm_extreme = bool(body.get("confirm_extreme", False))
if "loop_size_ms" in body:
try:
loop_size_ms = int(body.get("loop_size_ms"))
except (TypeError, ValueError):
return jsonify({"error": "Invalid loop_size_ms"}), 400
if loop_size_ms < CONTROL_LIMITS["loop_size_ms"]["min"] or loop_size_ms > CONTROL_LIMITS["loop_size_ms"]["max"]:
return jsonify({
"error": "loop_size_ms out of range",
"min": CONTROL_LIMITS["loop_size_ms"]["min"],
"max": CONTROL_LIMITS["loop_size_ms"]["max"],
}), 400
updates["loop_size_ms"] = loop_size_ms
if "sleep_time" in body:
try:
sleep_time = float(body.get("sleep_time"))
except (TypeError, ValueError):
return jsonify({"error": "Invalid sleep_time"}), 400
if sleep_time < CONTROL_LIMITS["sleep_time"]["min"] or sleep_time > CONTROL_LIMITS["sleep_time"]["max"]:
return jsonify({
"error": "sleep_time out of range",
"min": CONTROL_LIMITS["sleep_time"]["min"],
"max": CONTROL_LIMITS["sleep_time"]["max"],
}), 400
updates["sleep_time"] = sleep_time
if not updates:
return jsonify({"error": "No control fields provided"}), 400
current_loop = ch_cfg.get("control", "loop_size_ms")
current_sleep = ch_cfg.get("control", "sleep_time")
new_loop = updates.get("loop_size_ms", current_loop)
new_sleep = updates.get("sleep_time", current_sleep)
profile = classify_timing_profile(new_loop, new_sleep)
if profile == "extreme" and not confirm_extreme:
return jsonify({
"error": "Extreme timing requires confirm_extreme=true",
"profile": profile,
"control": {
"loop_size_ms": new_loop,
"sleep_time": new_sleep,
},
}), 400
ch_cfg.update_section("control", updates)
pid_status = controller.status.get("pid", {})
clamp_info = clamp_pid_for_profile(profile, pid_status)
if clamp_info:
mode = pid_status.get("proportional_mode", "error")
controller.set_pid_tuning(
clamp_info["after"]["kP"],
clamp_info["after"]["kI"],
clamp_info["after"]["kD"],
None,
mode,
)
return jsonify({
"ok": True,
"control": {
"loop_size_ms": ch_cfg.get("control", "loop_size_ms"),
"sleep_time": ch_cfg.get("control", "sleep_time"),
},
"nail": nail_id,
"profile": profile,
"pid_clamped": bool(clamp_info),
"pid_guardrail": clamp_info,
"limits": CONTROL_LIMITS,
})
@app.route("/api/flight", methods=["POST"])
def api_flight():
"""Update flight behavior settings and optional mode transition."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
takeoff_seconds = body.get("takeoff_seconds")
descent_seconds = body.get("descent_seconds")
turbo = body.get("turbo")
descent_target_f = body.get("descent_target_f")
mode = body.get("mode")
if takeoff_seconds is not None:
try:
takeoff_seconds = float(takeoff_seconds)
except (TypeError, ValueError):
return jsonify({"error": "Invalid takeoff_seconds"}), 400
if takeoff_seconds < 5 or takeoff_seconds > 1800:
return jsonify({"error": "takeoff_seconds out of range (5-1800)"}), 400
if descent_seconds is not None:
try:
descent_seconds = float(descent_seconds)
except (TypeError, ValueError):
return jsonify({"error": "Invalid descent_seconds"}), 400
if descent_seconds < 5 or descent_seconds > 1800:
return jsonify({"error": "descent_seconds out of range (5-1800)"}), 400
if descent_target_f is not None:
try:
descent_target_f = float(descent_target_f)
except (TypeError, ValueError):
return jsonify({"error": "Invalid descent_target_f"}), 400
controller.set_flight_config(
takeoff_seconds=takeoff_seconds,
descent_seconds=descent_seconds,
turbo=turbo,
descent_target_f=descent_target_f,
)
if mode == "takeoff":
controller.start_takeoff()
elif mode == "cruise":
controller.start_cruise()
elif mode == "descent":
controller.start_descent()
elif mode == "grounded":
controller.set_power(False)
return jsonify({"ok": True, "status": controller.status, "nail": nail_id})
@app.route("/api/scheduler", methods=["POST"])
def api_scheduler():
"""Update daily cutoff scheduler settings."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
enabled = body.get("enabled")
cutoff_times = body.get("cutoff_times")
if cutoff_times is not None:
if not isinstance(cutoff_times, list):
return jsonify({"error": "cutoff_times must be a list of HH:MM strings"}), 400
normalized = []
for t in cutoff_times:
if not isinstance(t, str) or len(t) != 5 or t[2] != ":":
return jsonify({"error": "Invalid time '{}'; expected HH:MM".format(t)}), 400
hh, mm = t.split(":", 1)
if not hh.isdigit() or not mm.isdigit():
return jsonify({"error": "Invalid time '{}'; expected HH:MM".format(t)}), 400
hhv = int(hh)
mmv = int(mm)
if hhv < 0 or hhv > 23 or mmv < 0 or mmv > 59:
return jsonify({"error": "Invalid time '{}'; expected HH:MM".format(t)}), 400
normalized.append("{:02d}:{:02d}".format(hhv, mmv))
cutoff_times = sorted(list(set(normalized)))
controller.set_scheduler(enabled=enabled, cutoff_times=cutoff_times)
return jsonify({"ok": True, "scheduler": controller.status.get("scheduler"), "nail": nail_id})
@app.route("/api/pid", methods=["POST"])
def api_pid():
"""Update PID tuning parameters."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
kp = body.get("kP")
ki = body.get("kI")
kd = body.get("kD")
p_on_m = body.get("proportional_on_measurement")
p_mode = body.get("proportional_mode")
if kp is None or ki is None or kd is None:
return jsonify({"error": "Missing kP, kI, or kD"}), 400
try:
kp, ki, kd = float(kp), float(ki), float(kd)
except (TypeError, ValueError):
return jsonify({"error": "Invalid PID values"}), 400
controller.set_pid_tuning(kp, ki, kd, p_on_m, p_mode)
mode_out = "measurement" if controller.status["pid"]["proportional_on_measurement"] else "error"
return jsonify({
"kP": kp,
"kI": ki,
"kD": kd,
"nail": nail_id,
"proportional_on_measurement": controller.status["pid"]["proportional_on_measurement"],
"proportional_mode": mode_out,
"ok": True,
})
@app.route("/api/preset/<name>", methods=["POST"])
def api_preset(name):
"""Apply a named temperature preset."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
presets = config.get("presets")
if name not in presets:
return jsonify({"error": "Unknown preset '{}'".format(name), "available": list(presets.keys())}), 404
value = presets[name]
controller.set_setpoint(value)
return jsonify({"preset": name, "setpoint": value, "ok": True, "nail": nail_id})
@app.route("/api/presets", methods=["GET"])
def api_presets():
"""List all presets."""
return jsonify(config.get("presets"))
@app.route("/api/presets", methods=["POST"])
def api_presets_update():
"""Add or update a preset."""
body = request.get_json(force=True, silent=True) or {}
name = body.get("name")
value = body.get("setpoint")
if not name or value is None:
return jsonify({"error": "Missing 'name' or 'setpoint'"}), 400
try:
value = float(value)
except (TypeError, ValueError):
return jsonify({"error": "Invalid setpoint value"}), 400
presets = config.get("presets")
presets[name] = value
config.update_section("presets", presets)
return jsonify({"ok": True, "presets": presets})
@app.route("/api/presets/<name>", methods=["DELETE"])
def api_preset_delete(name):
"""Delete a preset."""
presets = config.get("presets")
if name not in presets:
return jsonify({"error": "Unknown preset '{}'".format(name)}), 404
del presets[name]
# Overwrite entire presets section
config._data["presets"] = presets
config.save()
return jsonify({"ok": True, "presets": presets})
@app.route("/api/config", methods=["GET"])
def api_config():
"""Return full config (read-only view)."""
return jsonify(config.data)
@app.route("/api/pid/reset", methods=["POST"])
def api_pid_reset():
"""Reset PID controller internals (clears integral windup)."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
controller._pid.reset()
return jsonify({"ok": True, "message": "PID reset", "nail": nail_id})
@app.route("/api/autotune", methods=["GET"])
def api_autotune_status():
"""Return current autotune state."""
nail_id = resolve_nail_id()
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
payload = controller.autotune_status
payload["nail"] = nail_id
return jsonify(payload)
@app.route("/api/autotune/start", methods=["POST"])
def api_autotune_start():
"""Start relay-based PID autotune."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
if not controller.status.get("enabled"):
controller.start()
time.sleep(0.2)
setpoint = body.get("setpoint")
hysteresis = body.get("hysteresis")
cycles = body.get("cycles")
controller.start_autotune(setpoint=setpoint, hysteresis=hysteresis, cycles=cycles)
return jsonify({"ok": True, "autotune": controller.autotune_status, "nail": nail_id})
@app.route("/api/autotune/stop", methods=["POST"])
def api_autotune_stop():
"""Stop PID autotune."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
controller.stop_autotune("Autotune stopped by user")
return jsonify({"ok": True, "autotune": controller.autotune_status, "nail": nail_id})
@app.route("/api/safety/reset", methods=["POST"])
def api_safety_reset():
"""Reset safety trip (clears the safety flag so controller can be restarted)."""
body = request.get_json(force=True, silent=True) or {}
nail_id = resolve_nail_id(body)
controller, _, _ = get_nail_parts(nail_id)
if controller is None:
return jsonify({"error": "Unknown nail"}), 404
controller._safety_tripped = False
controller._safety_reason = ""
return jsonify({"ok": True, "nail": nail_id})
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def shutdown_handler(*args):
"""Handle SIGTERM/SIGINT gracefully."""
log.info("Shutdown signal received")
for nail_id in controllers:
try:
controllers[nail_id].cleanup()
except Exception as e:
log.error("Cleanup failed for %s: %s", nail_id, e)
sys.exit(0)
def cleanup_all():
for nail_id in controllers:
try:
controllers[nail_id].cleanup()
except Exception as e:
log.error("Cleanup failed for %s: %s", nail_id, e)
def main():
global config
parser = argparse.ArgumentParser(description="piNail2 — E-Nail Temperature Controller")
parser.add_argument("--config", default="config.json", help="Path to config file")
args = parser.parse_args()
# Load config
config = Config(args.config)
log.info("Configuration loaded from %s", args.config)
ensure_nails_config(config)
for nail_id in NAIL_IDS:
ch_cfg = ChannelConfigProxy(config, nail_id)
channel_configs[nail_id] = ch_cfg
gpio_cfg = ch_cfg.get("gpio")
safety_cfg = ch_cfg.get("safety")
tc = Thermocouple(
clk=gpio_cfg["clk"],
cs=gpio_cfg["cs"],
do=gpio_cfg["do"],
spike_threshold=safety_cfg["spike_threshold_f"]
)
thermocouples[nail_id] = tc
controller = PIDController(ch_cfg, tc)
controller.start_monitoring()
controllers[nail_id] = controller
log.info("Initialized %s (relay pin %s)", nail_id, gpio_cfg.get("relay_pin"))
# Register cleanup
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)
atexit.register(cleanup_all)
# Start Flask
web_cfg = config.get("web")
log.info("Starting web server on %s:%d", web_cfg["host"], web_cfg["port"])
app.run(
host=web_cfg["host"],
port=web_cfg["port"],
debug=False,
threaded=True
)
if __name__ == "__main__":
main()