""" piNail2 — Flask Web Application Main entry point. Runs the Flask web server and initializes the PID controller. Provides REST API endpoints and serves the single-page dashboard. Usage: python3 app.py python3 app.py --config /path/to/config.json """ import sys import os import signal import atexit import logging import argparse import json import time import threading import copy from datetime import datetime from flask import Flask, render_template, jsonify, request from config import Config from thermocouple import Thermocouple from pid_controller import PIDController # --------------------------------------------------------------------------- # Logging setup # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(sys.stdout), ] ) log = logging.getLogger("piNail2") APP_INSTANCE_ID = str(int(time.time())) APP_VERSION = "v2.1.0" # --------------------------------------------------------------------------- # Flask app # --------------------------------------------------------------------------- app = Flask(__name__) # These are set in main() before the app starts config = None # type: Config controllers = {} thermocouples = {} channel_configs = {} NAIL_IDS = ("nail1", "nail2") CONFIG_WRITE_LOCK = threading.Lock() CONTROL_LIMITS = { "loop_size_ms": {"min": 1500, "max": 5000}, "sleep_time": {"min": 0.15, "max": 0.6}, } PID_GUARDRAILS = { "balanced": {"kP_max": 80.0, "kI_max": 4.0, "kD_max": 50.0}, "responsive": {"kP_max": 50.0, "kI_max": 2.5, "kD_max": 25.0}, "extreme": {"kP_max": 35.0, "kI_max": 1.8, "kD_max": 15.0}, } def classify_timing_profile(loop_size_ms, sleep_time): """Classify timing aggressiveness for safety guardrails.""" if loop_size_ms <= 1400 or sleep_time <= 0.14: return "extreme" if loop_size_ms <= 1800 or sleep_time <= 0.20: return "responsive" if loop_size_ms <= 2800 or sleep_time <= 0.32: return "balanced" return "conservative" def clamp_pid_for_profile(profile, pid_status): """Clamp PID gains to guardrails for aggressive timing profiles.""" limits = PID_GUARDRAILS.get(profile) if not limits: return None before = { "kP": float(pid_status.get("kP", 0.0)), "kI": float(pid_status.get("kI", 0.0)), "kD": float(pid_status.get("kD", 0.0)), } after = { "kP": min(before["kP"], limits["kP_max"]), "kI": min(before["kI"], limits["kI_max"]), "kD": min(before["kD"], limits["kD_max"]), } changed = ( abs(before["kP"] - after["kP"]) > 1e-9 or abs(before["kI"] - after["kI"]) > 1e-9 or abs(before["kD"] - after["kD"]) > 1e-9 ) if not changed: return None return { "before": before, "after": after, "limits": limits, } def normalize_nail_id(raw_nail): if raw_nail is None: return "nail1" text = str(raw_nail).strip().lower() if text in ("1", "nail1"): return "nail1" if text in ("2", "nail2"): return "nail2" return None def resolve_nail_id(body=None): q_nail = request.args.get("nail") if q_nail is not None: nail_id = normalize_nail_id(q_nail) if nail_id: return nail_id if body and "nail" in body: nail_id = normalize_nail_id(body.get("nail")) if nail_id: return nail_id return "nail1" def get_nail_parts(nail_id): controller = controllers.get(nail_id) tc = thermocouples.get(nail_id) ch_cfg = channel_configs.get(nail_id) if controller is None or tc is None or ch_cfg is None: return None, None, None return controller, tc, ch_cfg def ensure_nails_config(base_config): nails = base_config.get("nails") if isinstance(nails, dict) and "nail1" in nails and "nail2" in nails: return data = base_config.data default_nails = copy.deepcopy(data.get("nails", {})) nail1 = default_nails.get("nail1", {}) nail2 = default_nails.get("nail2", {}) for section in ("pid", "control", "flight", "scheduler", "safety", "logging", "autotune"): if section in data: nail1[section] = copy.deepcopy(data[section]) if "gpio" in data: gpio1 = copy.deepcopy(data["gpio"]) gpio2 = copy.deepcopy(nail2.get("gpio", {})) nail1["gpio"] = gpio1 nail2["gpio"] = { "relay_pin": gpio2.get("relay_pin", 9), "clk": gpio2.get("clk", 17), "cs": gpio2.get("cs", 18), "do": gpio2.get("do", 27), } default_nails["nail1"] = nail1 default_nails["nail2"] = nail2 base_config._data["nails"] = default_nails base_config.save() class ChannelConfigProxy: def __init__(self, root_config, nail_id): self._root = root_config self._nail_id = nail_id def _channel_dict(self): nails = self._root._data.setdefault("nails", {}) return nails.setdefault(self._nail_id, {}) def get(self, section, key=None): section_data = self._root._data.get("nails", {}).get(self._nail_id, {}).get(section) if section_data is None: section_data = self._root.get(section) if key is None: return copy.deepcopy(section_data if isinstance(section_data, dict) else {}) return copy.deepcopy((section_data or {}).get(key)) def set(self, section, key, value): with CONFIG_WRITE_LOCK: section_data = self._channel_dict().setdefault(section, {}) section_data[key] = value self._root.save() def update_section(self, section, values): with CONFIG_WRITE_LOCK: section_data = self._channel_dict().setdefault(section, {}) section_data.update(values) self._root.save() def reload_if_changed(self): return self._root.reload_if_changed() # --------------------------------------------------------------------------- # Routes — Pages # --------------------------------------------------------------------------- @app.route("/") def index(): """Serve the main dashboard.""" return render_template( "index.html", app_version=APP_VERSION, copyright_year=datetime.now().year, ) # --------------------------------------------------------------------------- # Routes — API # --------------------------------------------------------------------------- @app.route("/api/status") def api_status(): """Return current controller state.""" nail_id = resolve_nail_id() controller, tc, ch_cfg = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 status = controller.status status["nail"] = nail_id status["instance_id"] = APP_INSTANCE_ID status["thermocouple"] = tc.stats status["presets"] = config.get("presets") status["config"] = { "loop_size_ms": ch_cfg.get("control", "loop_size_ms"), "sleep_time": ch_cfg.get("control", "sleep_time"), "safety": ch_cfg.get("safety"), } return jsonify(status) @app.route("/api/status/all") def api_status_all(): payload = { "instance_id": APP_INSTANCE_ID, "presets": config.get("presets"), "nails": {}, } for nail_id in NAIL_IDS: controller, tc, ch_cfg = get_nail_parts(nail_id) if controller is None: continue status = controller.status status["nail"] = nail_id status["thermocouple"] = tc.stats status["config"] = { "loop_size_ms": ch_cfg.get("control", "loop_size_ms"), "sleep_time": ch_cfg.get("control", "sleep_time"), "safety": ch_cfg.get("safety"), } payload["nails"][nail_id] = status return jsonify(payload) @app.route("/api/heartbeat") def api_heartbeat(): """Lightweight health endpoint for frontend reconnect logic.""" return jsonify({ "ok": True, "instance_id": APP_INSTANCE_ID, "ts": time.time(), "controller_alive": {n: controllers[n].is_alive for n in controllers}, "watchdog_ok": {n: controllers[n].watchdog_ok for n in controllers}, }) @app.route("/api/history") def api_history(): """Return recent temperature history for charting.""" nail_id = resolve_nail_id() controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 since = request.args.get("since", 0, type=float) if since > 0: data = controller.get_history_since(since) else: data = controller.history return jsonify(data) @app.route("/api/power", methods=["POST"]) def api_power(): """Toggle controller on/off.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 enable = body.get("enabled") if enable is None: # Toggle enable = not controller.status["enabled"] controller.set_power(enable) return jsonify({"enabled": enable, "ok": True, "nail": nail_id}) @app.route("/api/setpoint", methods=["POST"]) def api_setpoint(): """Change the target temperature.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, ch_cfg = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 value = body.get("setpoint") if value is None: return jsonify({"error": "Missing 'setpoint' field"}), 400 try: value = float(value) except (TypeError, ValueError): return jsonify({"error": "Invalid setpoint value"}), 400 safety = ch_cfg.get("safety") if value > safety["max_temp_f"]: return jsonify({"error": "Setpoint {} exceeds max {}F".format(value, safety['max_temp_f'])}), 400 if value < safety["min_temp_f"]: return jsonify({"error": "Setpoint {} below min {}F".format(value, safety['min_temp_f'])}), 400 controller.set_setpoint(value) return jsonify({"setpoint": value, "ok": True, "nail": nail_id}) @app.route("/api/control", methods=["POST"]) def api_control(): """Update loop timing controls (loop_size_ms and sleep_time).""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, ch_cfg = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 updates = {} confirm_extreme = bool(body.get("confirm_extreme", False)) if "loop_size_ms" in body: try: loop_size_ms = int(body.get("loop_size_ms")) except (TypeError, ValueError): return jsonify({"error": "Invalid loop_size_ms"}), 400 if loop_size_ms < CONTROL_LIMITS["loop_size_ms"]["min"] or loop_size_ms > CONTROL_LIMITS["loop_size_ms"]["max"]: return jsonify({ "error": "loop_size_ms out of range", "min": CONTROL_LIMITS["loop_size_ms"]["min"], "max": CONTROL_LIMITS["loop_size_ms"]["max"], }), 400 updates["loop_size_ms"] = loop_size_ms if "sleep_time" in body: try: sleep_time = float(body.get("sleep_time")) except (TypeError, ValueError): return jsonify({"error": "Invalid sleep_time"}), 400 if sleep_time < CONTROL_LIMITS["sleep_time"]["min"] or sleep_time > CONTROL_LIMITS["sleep_time"]["max"]: return jsonify({ "error": "sleep_time out of range", "min": CONTROL_LIMITS["sleep_time"]["min"], "max": CONTROL_LIMITS["sleep_time"]["max"], }), 400 updates["sleep_time"] = sleep_time if not updates: return jsonify({"error": "No control fields provided"}), 400 current_loop = ch_cfg.get("control", "loop_size_ms") current_sleep = ch_cfg.get("control", "sleep_time") new_loop = updates.get("loop_size_ms", current_loop) new_sleep = updates.get("sleep_time", current_sleep) profile = classify_timing_profile(new_loop, new_sleep) if profile == "extreme" and not confirm_extreme: return jsonify({ "error": "Extreme timing requires confirm_extreme=true", "profile": profile, "control": { "loop_size_ms": new_loop, "sleep_time": new_sleep, }, }), 400 ch_cfg.update_section("control", updates) pid_status = controller.status.get("pid", {}) clamp_info = clamp_pid_for_profile(profile, pid_status) if clamp_info: mode = pid_status.get("proportional_mode", "error") controller.set_pid_tuning( clamp_info["after"]["kP"], clamp_info["after"]["kI"], clamp_info["after"]["kD"], None, mode, ) return jsonify({ "ok": True, "control": { "loop_size_ms": ch_cfg.get("control", "loop_size_ms"), "sleep_time": ch_cfg.get("control", "sleep_time"), }, "nail": nail_id, "profile": profile, "pid_clamped": bool(clamp_info), "pid_guardrail": clamp_info, "limits": CONTROL_LIMITS, }) @app.route("/api/flight", methods=["POST"]) def api_flight(): """Update flight behavior settings and optional mode transition.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 takeoff_seconds = body.get("takeoff_seconds") descent_seconds = body.get("descent_seconds") turbo = body.get("turbo") descent_target_f = body.get("descent_target_f") mode = body.get("mode") if takeoff_seconds is not None: try: takeoff_seconds = float(takeoff_seconds) except (TypeError, ValueError): return jsonify({"error": "Invalid takeoff_seconds"}), 400 if takeoff_seconds < 5 or takeoff_seconds > 1800: return jsonify({"error": "takeoff_seconds out of range (5-1800)"}), 400 if descent_seconds is not None: try: descent_seconds = float(descent_seconds) except (TypeError, ValueError): return jsonify({"error": "Invalid descent_seconds"}), 400 if descent_seconds < 5 or descent_seconds > 1800: return jsonify({"error": "descent_seconds out of range (5-1800)"}), 400 if descent_target_f is not None: try: descent_target_f = float(descent_target_f) except (TypeError, ValueError): return jsonify({"error": "Invalid descent_target_f"}), 400 controller.set_flight_config( takeoff_seconds=takeoff_seconds, descent_seconds=descent_seconds, turbo=turbo, descent_target_f=descent_target_f, ) if mode == "takeoff": controller.start_takeoff() elif mode == "cruise": controller.start_cruise() elif mode == "descent": controller.start_descent() elif mode == "grounded": controller.set_power(False) return jsonify({"ok": True, "status": controller.status, "nail": nail_id}) @app.route("/api/scheduler", methods=["POST"]) def api_scheduler(): """Update daily cutoff scheduler settings.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 enabled = body.get("enabled") cutoff_times = body.get("cutoff_times") if cutoff_times is not None: if not isinstance(cutoff_times, list): return jsonify({"error": "cutoff_times must be a list of HH:MM strings"}), 400 normalized = [] for t in cutoff_times: if not isinstance(t, str) or len(t) != 5 or t[2] != ":": return jsonify({"error": "Invalid time '{}'; expected HH:MM".format(t)}), 400 hh, mm = t.split(":", 1) if not hh.isdigit() or not mm.isdigit(): return jsonify({"error": "Invalid time '{}'; expected HH:MM".format(t)}), 400 hhv = int(hh) mmv = int(mm) if hhv < 0 or hhv > 23 or mmv < 0 or mmv > 59: return jsonify({"error": "Invalid time '{}'; expected HH:MM".format(t)}), 400 normalized.append("{:02d}:{:02d}".format(hhv, mmv)) cutoff_times = sorted(list(set(normalized))) controller.set_scheduler(enabled=enabled, cutoff_times=cutoff_times) return jsonify({"ok": True, "scheduler": controller.status.get("scheduler"), "nail": nail_id}) @app.route("/api/pid", methods=["POST"]) def api_pid(): """Update PID tuning parameters.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 kp = body.get("kP") ki = body.get("kI") kd = body.get("kD") p_on_m = body.get("proportional_on_measurement") p_mode = body.get("proportional_mode") if kp is None or ki is None or kd is None: return jsonify({"error": "Missing kP, kI, or kD"}), 400 try: kp, ki, kd = float(kp), float(ki), float(kd) except (TypeError, ValueError): return jsonify({"error": "Invalid PID values"}), 400 controller.set_pid_tuning(kp, ki, kd, p_on_m, p_mode) mode_out = "measurement" if controller.status["pid"]["proportional_on_measurement"] else "error" return jsonify({ "kP": kp, "kI": ki, "kD": kd, "nail": nail_id, "proportional_on_measurement": controller.status["pid"]["proportional_on_measurement"], "proportional_mode": mode_out, "ok": True, }) @app.route("/api/preset/", methods=["POST"]) def api_preset(name): """Apply a named temperature preset.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 presets = config.get("presets") if name not in presets: return jsonify({"error": "Unknown preset '{}'".format(name), "available": list(presets.keys())}), 404 value = presets[name] controller.set_setpoint(value) return jsonify({"preset": name, "setpoint": value, "ok": True, "nail": nail_id}) @app.route("/api/presets", methods=["GET"]) def api_presets(): """List all presets.""" return jsonify(config.get("presets")) @app.route("/api/presets", methods=["POST"]) def api_presets_update(): """Add or update a preset.""" body = request.get_json(force=True, silent=True) or {} name = body.get("name") value = body.get("setpoint") if not name or value is None: return jsonify({"error": "Missing 'name' or 'setpoint'"}), 400 try: value = float(value) except (TypeError, ValueError): return jsonify({"error": "Invalid setpoint value"}), 400 presets = config.get("presets") presets[name] = value config.update_section("presets", presets) return jsonify({"ok": True, "presets": presets}) @app.route("/api/presets/", methods=["DELETE"]) def api_preset_delete(name): """Delete a preset.""" presets = config.get("presets") if name not in presets: return jsonify({"error": "Unknown preset '{}'".format(name)}), 404 del presets[name] # Overwrite entire presets section config._data["presets"] = presets config.save() return jsonify({"ok": True, "presets": presets}) @app.route("/api/config", methods=["GET"]) def api_config(): """Return full config (read-only view).""" return jsonify(config.data) @app.route("/api/pid/reset", methods=["POST"]) def api_pid_reset(): """Reset PID controller internals (clears integral windup).""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 controller._pid.reset() return jsonify({"ok": True, "message": "PID reset", "nail": nail_id}) @app.route("/api/autotune", methods=["GET"]) def api_autotune_status(): """Return current autotune state.""" nail_id = resolve_nail_id() controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 payload = controller.autotune_status payload["nail"] = nail_id return jsonify(payload) @app.route("/api/autotune/start", methods=["POST"]) def api_autotune_start(): """Start relay-based PID autotune.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 if not controller.status.get("enabled"): controller.start() time.sleep(0.2) setpoint = body.get("setpoint") hysteresis = body.get("hysteresis") cycles = body.get("cycles") controller.start_autotune(setpoint=setpoint, hysteresis=hysteresis, cycles=cycles) return jsonify({"ok": True, "autotune": controller.autotune_status, "nail": nail_id}) @app.route("/api/autotune/stop", methods=["POST"]) def api_autotune_stop(): """Stop PID autotune.""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 controller.stop_autotune("Autotune stopped by user") return jsonify({"ok": True, "autotune": controller.autotune_status, "nail": nail_id}) @app.route("/api/safety/reset", methods=["POST"]) def api_safety_reset(): """Reset safety trip (clears the safety flag so controller can be restarted).""" body = request.get_json(force=True, silent=True) or {} nail_id = resolve_nail_id(body) controller, _, _ = get_nail_parts(nail_id) if controller is None: return jsonify({"error": "Unknown nail"}), 404 controller._safety_tripped = False controller._safety_reason = "" return jsonify({"ok": True, "nail": nail_id}) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def shutdown_handler(*args): """Handle SIGTERM/SIGINT gracefully.""" log.info("Shutdown signal received") for nail_id in controllers: try: controllers[nail_id].cleanup() except Exception as e: log.error("Cleanup failed for %s: %s", nail_id, e) sys.exit(0) def cleanup_all(): for nail_id in controllers: try: controllers[nail_id].cleanup() except Exception as e: log.error("Cleanup failed for %s: %s", nail_id, e) def main(): global config parser = argparse.ArgumentParser(description="piNail2 — E-Nail Temperature Controller") parser.add_argument("--config", default="config.json", help="Path to config file") args = parser.parse_args() # Load config config = Config(args.config) log.info("Configuration loaded from %s", args.config) ensure_nails_config(config) for nail_id in NAIL_IDS: ch_cfg = ChannelConfigProxy(config, nail_id) channel_configs[nail_id] = ch_cfg gpio_cfg = ch_cfg.get("gpio") safety_cfg = ch_cfg.get("safety") tc = Thermocouple( clk=gpio_cfg["clk"], cs=gpio_cfg["cs"], do=gpio_cfg["do"], spike_threshold=safety_cfg["spike_threshold_f"] ) thermocouples[nail_id] = tc controller = PIDController(ch_cfg, tc) controller.start_monitoring() controllers[nail_id] = controller log.info("Initialized %s (relay pin %s)", nail_id, gpio_cfg.get("relay_pin")) # Register cleanup signal.signal(signal.SIGTERM, shutdown_handler) signal.signal(signal.SIGINT, shutdown_handler) atexit.register(cleanup_all) # Start Flask web_cfg = config.get("web") log.info("Starting web server on %s:%d", web_cfg["host"], web_cfg["port"]) app.run( host=web_cfg["host"], port=web_cfg["port"], debug=False, threaded=True ) if __name__ == "__main__": main()