336 lines
12 KiB
Python
336 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
piNail2 — TTY Status Display Panel
|
|
|
|
Displays a fancy real-time status panel on the HDMI display showing:
|
|
- Dual nail temperature status and control mode
|
|
- PID parameters and error/output values
|
|
- Flight mode and current phase
|
|
- Safety status
|
|
- Performance metrics
|
|
|
|
Compatible with Python 3.5+
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import json
|
|
import requests
|
|
import signal
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
# Python 3.5 compatibility - no f-strings
|
|
try:
|
|
from http.client import HTTPConnection
|
|
HTTPConnection.debuglevel = 0
|
|
except ImportError:
|
|
from httplib import HTTPConnection
|
|
HTTPConnection.debuglevel = 0
|
|
|
|
# Configuration
|
|
API_BASE = "http://localhost:5000"
|
|
UPDATE_INTERVAL = 0.5 # seconds between fetches
|
|
DISPLAY_INTERVAL = 2.0 # seconds between screen redraws
|
|
TTY_PATH = "/dev/tty1"
|
|
|
|
# ANSI color codes
|
|
class Colors:
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
UNDERLINE = "\033[4m"
|
|
|
|
# Foreground colors
|
|
BLACK = "\033[30m"
|
|
RED = "\033[31m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
BLUE = "\033[34m"
|
|
MAGENTA = "\033[35m"
|
|
CYAN = "\033[36m"
|
|
WHITE = "\033[37m"
|
|
|
|
# Bright colors
|
|
BRIGHT_RED = "\033[91m"
|
|
BRIGHT_GREEN = "\033[92m"
|
|
BRIGHT_YELLOW = "\033[93m"
|
|
BRIGHT_BLUE = "\033[94m"
|
|
BRIGHT_MAGENTA = "\033[95m"
|
|
BRIGHT_CYAN = "\033[96m"
|
|
|
|
# Background colors
|
|
BG_RED = "\033[41m"
|
|
BG_GREEN = "\033[42m"
|
|
BG_YELLOW = "\033[43m"
|
|
BG_BLUE = "\033[44m"
|
|
BG_MAGENTA = "\033[45m"
|
|
BG_CYAN = "\033[46m"
|
|
|
|
|
|
class StatusDisplay:
|
|
def __init__(self):
|
|
self.running = True
|
|
self.last_update = 0
|
|
self.last_draw = 0
|
|
self.last_frame = None
|
|
self.data = {
|
|
"nail1": {},
|
|
"nail2": {},
|
|
}
|
|
self.lock = threading.Lock()
|
|
|
|
def fetch_status(self):
|
|
"""Fetch status from API."""
|
|
try:
|
|
resp = requests.get("{}/api/status/all".format(API_BASE), timeout=2)
|
|
if resp.status_code == 200:
|
|
with self.lock:
|
|
payload = resp.json()
|
|
self.data = payload.get("nails", {})
|
|
self.last_update = time.time()
|
|
return True
|
|
except Exception as e:
|
|
pass
|
|
return False
|
|
|
|
def fetch_worker(self):
|
|
"""Background thread that fetches status periodically."""
|
|
while self.running:
|
|
self.fetch_status()
|
|
time.sleep(UPDATE_INTERVAL)
|
|
|
|
def format_temp(self, temp_f):
|
|
"""Format temperature with color coding."""
|
|
if temp_f is None:
|
|
return "{}ERR{}".format(Colors.RED, Colors.RESET)
|
|
temp_f = float(temp_f)
|
|
|
|
if temp_f < 0:
|
|
color = Colors.RED
|
|
elif temp_f < 100:
|
|
color = Colors.BLUE
|
|
elif temp_f < 400:
|
|
color = Colors.YELLOW
|
|
else:
|
|
color = Colors.BRIGHT_RED
|
|
|
|
return "{}{:6.1f}F{}".format(color, temp_f, Colors.RESET)
|
|
|
|
def format_status_icon(self, enabled, has_error):
|
|
"""Return status icon."""
|
|
if has_error:
|
|
return "{}●{}".format(Colors.RED, Colors.RESET)
|
|
elif enabled:
|
|
return "{}●{}".format(Colors.GREEN, Colors.RESET)
|
|
else:
|
|
return "{}○{}".format(Colors.DIM, Colors.RESET)
|
|
|
|
def format_mode_badge(self, mode):
|
|
"""Format flight mode as a fancy badge."""
|
|
modes = {
|
|
"grounded": (Colors.CYAN, "GROUNDED"),
|
|
"takeoff": (Colors.BRIGHT_YELLOW, "TAKEOFF "),
|
|
"cruise": (Colors.BRIGHT_GREEN, "CRUISE "),
|
|
"descent": (Colors.BRIGHT_MAGENTA, "DESCENT "),
|
|
}
|
|
color, label = modes.get(mode, (Colors.WHITE, "????? "))
|
|
return "{}[{}]{}".format(color, label, Colors.RESET)
|
|
|
|
def format_phase_badge(self, phase):
|
|
"""Format phase as a badge."""
|
|
phases = {
|
|
"heating": (Colors.BRIGHT_RED, "HEATING"),
|
|
"cooling": (Colors.BRIGHT_BLUE, "COOLING"),
|
|
"idle": (Colors.DIM, " IDLE "),
|
|
}
|
|
color, label = phases.get(phase, (Colors.WHITE, " ? "))
|
|
return "{}{}{}".format(color, label, Colors.RESET)
|
|
|
|
def strip_ansi(self, text):
|
|
"""Remove ANSI color codes from text for length calculation."""
|
|
import re
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
return ansi_escape.sub('', text)
|
|
|
|
def draw_nail_panel_compact(self, nail_id, status):
|
|
"""Draw compact status lines for a single nail."""
|
|
if not status:
|
|
return ["No data", "No data", "No data"]
|
|
|
|
enabled = status.get("enabled", False)
|
|
has_error = status.get("error", False)
|
|
state = "ERR" if has_error else ("ON" if enabled else "OFF")
|
|
|
|
current = float(status.get("current_temp", 0))
|
|
setpoint = float(status.get("setpoint", 0))
|
|
current_str = "{:5.0f}F".format(current) if current else "ERROR"
|
|
setpoint_str = "{:5.0f}F".format(setpoint)
|
|
|
|
error = float(status.get("error", 0))
|
|
output = float(status.get("output", 0))
|
|
output_pct = min(100.0, max(0.0, output * 100))
|
|
bar_filled = int(output_pct / 10)
|
|
bar = "[{}{}]".format("=" * bar_filled, " " * (10 - bar_filled))
|
|
|
|
mode = status.get("flight_mode", "grounded")[:8]
|
|
phase = status.get("phase", "idle")[:8]
|
|
safety = status.get("safety", {})
|
|
safe = "OK" if (safety.get("temp_ok", True) and safety.get("tc_ok", True) and safety.get("watchdog_ok", True)) else "WARN"
|
|
|
|
return [
|
|
"State: {} Temp: {} Set: {}".format(state, current_str, setpoint_str),
|
|
"Err: {:+6.1f}F Out: {:5.1f}% {}".format(error, output_pct, bar),
|
|
"Mode: {:<8} Phase: {:<8} Safe: {}".format(mode, phase, safe),
|
|
]
|
|
|
|
def draw_frame(self):
|
|
"""Draw the complete status display frame."""
|
|
lines = []
|
|
|
|
# ASCII Art Title with timestamp
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
lines.append("")
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ######: ###### ### ## :##: ###### ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" #######: ###### ### ## ## ###### ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## :## ## ###: ## #### ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## #### ## #### ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## :## ## ##:#: ## :# #: ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" #######: ## ## ## ## #::# ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ######: ## ## ## ## ## ## ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## :#:## ###### ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## #### .######. ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## :### :## ##: ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ###### ## ### ### ### ###### ######## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ###### ## ### ##: :## ###### ######## ", Colors.RESET))
|
|
|
|
# Timestamp on its own line
|
|
lines.append("")
|
|
lines.append("{}{}{}".format(
|
|
Colors.DIM,
|
|
timestamp.center(70),
|
|
Colors.RESET
|
|
))
|
|
|
|
lines.append("")
|
|
|
|
# Get data safely
|
|
with self.lock:
|
|
nail1_data = self.data.get("nail1", {})
|
|
nail2_data = self.data.get("nail2", {})
|
|
|
|
# Draw vertical distinct boxes (compact, avoids side bleed)
|
|
box_width = 58
|
|
|
|
def pad(text):
|
|
if len(text) >= box_width:
|
|
return text[:box_width]
|
|
return text + (" " * (box_width - len(text)))
|
|
|
|
nail1_lines = self.draw_nail_panel_compact("nail1", nail1_data)
|
|
lines.append("{}┌{}┐{}".format(Colors.CYAN, "─" * box_width, Colors.RESET))
|
|
lines.append("{}│{}│{}".format(
|
|
Colors.CYAN,
|
|
pad(" NAIL 1"),
|
|
Colors.RESET,
|
|
))
|
|
lines.append("{}├{}┤{}".format(Colors.CYAN, "─" * box_width, Colors.RESET))
|
|
for panel_line in nail1_lines:
|
|
lines.append("{}│{}│{}".format(Colors.CYAN, pad(panel_line), Colors.RESET))
|
|
lines.append("{}└{}┘{}".format(Colors.CYAN, "─" * box_width, Colors.RESET))
|
|
|
|
lines.append("")
|
|
|
|
nail2_lines = self.draw_nail_panel_compact("nail2", nail2_data)
|
|
lines.append("{}┌{}┐{}".format(Colors.CYAN, "─" * box_width, Colors.RESET))
|
|
lines.append("{}│{}│{}".format(
|
|
Colors.CYAN,
|
|
pad(" NAIL 2"),
|
|
Colors.RESET,
|
|
))
|
|
lines.append("{}├{}┤{}".format(Colors.CYAN, "─" * box_width, Colors.RESET))
|
|
for panel_line in nail2_lines:
|
|
lines.append("{}│{}│{}".format(Colors.CYAN, pad(panel_line), Colors.RESET))
|
|
lines.append("{}└{}┘{}".format(Colors.CYAN, "─" * box_width, Colors.RESET))
|
|
|
|
return "\n".join(lines)
|
|
|
|
def display_loop(self):
|
|
"""Main display loop."""
|
|
# Start fetch worker thread
|
|
fetch_thread = threading.Thread(target=self.fetch_worker)
|
|
fetch_thread.daemon = True
|
|
fetch_thread.start()
|
|
|
|
# Initial draw
|
|
sys.stdout.write("\033[2J\033[H")
|
|
sys.stdout.flush()
|
|
self.last_draw = time.time()
|
|
|
|
try:
|
|
while self.running:
|
|
now = time.time()
|
|
|
|
# Only redraw if enough time has passed (reduces flicker)
|
|
if now - self.last_draw >= DISPLAY_INTERVAL:
|
|
# Clear screen (ANSI escape)
|
|
sys.stdout.write("\033[2J\033[H")
|
|
sys.stdout.flush()
|
|
|
|
# Draw frame
|
|
frame = self.draw_frame()
|
|
sys.stdout.write(frame)
|
|
sys.stdout.write("\n")
|
|
sys.stdout.flush()
|
|
|
|
self.last_draw = now
|
|
|
|
time.sleep(UPDATE_INTERVAL)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
self.running = False
|
|
|
|
def run(self):
|
|
"""Run the status display."""
|
|
# Redirect stdout to TTY if available
|
|
if os.path.exists(TTY_PATH):
|
|
try:
|
|
tty_fd = os.open(TTY_PATH, os.O_WRONLY)
|
|
os.dup2(tty_fd, 1)
|
|
os.close(tty_fd)
|
|
except Exception:
|
|
pass
|
|
|
|
# Handle signals
|
|
def sigint_handler(signum, frame):
|
|
self.running = False
|
|
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
|
|
# Run display loop
|
|
self.display_loop()
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
display = StatusDisplay()
|
|
display.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|