2853fa3f8a
- Added strip_ansi() helper function to remove color codes before length calc - Fixed padding logic to calculate visible character length correctly - ANSI codes no longer mess up the box alignment - Both boxes should now align properly regardless of color content - Deployed and verified on Pi
383 lines
14 KiB
Python
383 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
piNail2 — TTY Status Display Panel
|
|
|
|
Displays a fancy real-time status panel on the HDMI display showing:
|
|
- Dual nail temperature status and control mode
|
|
- PID parameters and error/output values
|
|
- Flight mode and current phase
|
|
- Safety status
|
|
- Performance metrics
|
|
|
|
Compatible with Python 3.5+
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import json
|
|
import requests
|
|
import signal
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
# Python 3.5 compatibility - no f-strings
|
|
try:
|
|
from http.client import HTTPConnection
|
|
HTTPConnection.debuglevel = 0
|
|
except ImportError:
|
|
from httplib import HTTPConnection
|
|
HTTPConnection.debuglevel = 0
|
|
|
|
# Configuration
|
|
API_BASE = "http://localhost:5000"
|
|
UPDATE_INTERVAL = 0.5 # seconds between fetches
|
|
DISPLAY_INTERVAL = 2.0 # seconds between screen redraws
|
|
TTY_PATH = "/dev/tty1"
|
|
|
|
# ANSI color codes
|
|
class Colors:
|
|
RESET = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
UNDERLINE = "\033[4m"
|
|
|
|
# Foreground colors
|
|
BLACK = "\033[30m"
|
|
RED = "\033[31m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
BLUE = "\033[34m"
|
|
MAGENTA = "\033[35m"
|
|
CYAN = "\033[36m"
|
|
WHITE = "\033[37m"
|
|
|
|
# Bright colors
|
|
BRIGHT_RED = "\033[91m"
|
|
BRIGHT_GREEN = "\033[92m"
|
|
BRIGHT_YELLOW = "\033[93m"
|
|
BRIGHT_BLUE = "\033[94m"
|
|
BRIGHT_MAGENTA = "\033[95m"
|
|
BRIGHT_CYAN = "\033[96m"
|
|
|
|
# Background colors
|
|
BG_RED = "\033[41m"
|
|
BG_GREEN = "\033[42m"
|
|
BG_YELLOW = "\033[43m"
|
|
BG_BLUE = "\033[44m"
|
|
BG_MAGENTA = "\033[45m"
|
|
BG_CYAN = "\033[46m"
|
|
|
|
|
|
class StatusDisplay:
|
|
def __init__(self):
|
|
self.running = True
|
|
self.last_update = 0
|
|
self.last_draw = 0
|
|
self.last_frame = None
|
|
self.data = {
|
|
"nail1": {},
|
|
"nail2": {},
|
|
}
|
|
self.lock = threading.Lock()
|
|
|
|
def fetch_status(self):
|
|
"""Fetch status from API."""
|
|
try:
|
|
resp = requests.get("{}/api/status/all".format(API_BASE), timeout=2)
|
|
if resp.status_code == 200:
|
|
with self.lock:
|
|
payload = resp.json()
|
|
self.data = payload.get("nails", {})
|
|
self.last_update = time.time()
|
|
return True
|
|
except Exception as e:
|
|
pass
|
|
return False
|
|
|
|
def fetch_worker(self):
|
|
"""Background thread that fetches status periodically."""
|
|
while self.running:
|
|
self.fetch_status()
|
|
time.sleep(UPDATE_INTERVAL)
|
|
|
|
def format_temp(self, temp_f):
|
|
"""Format temperature with color coding."""
|
|
if temp_f is None:
|
|
return "{}ERR{}".format(Colors.RED, Colors.RESET)
|
|
temp_f = float(temp_f)
|
|
|
|
if temp_f < 0:
|
|
color = Colors.RED
|
|
elif temp_f < 100:
|
|
color = Colors.BLUE
|
|
elif temp_f < 400:
|
|
color = Colors.YELLOW
|
|
else:
|
|
color = Colors.BRIGHT_RED
|
|
|
|
return "{}{:6.1f}F{}".format(color, temp_f, Colors.RESET)
|
|
|
|
def format_status_icon(self, enabled, has_error):
|
|
"""Return status icon."""
|
|
if has_error:
|
|
return "{}●{}".format(Colors.RED, Colors.RESET)
|
|
elif enabled:
|
|
return "{}●{}".format(Colors.GREEN, Colors.RESET)
|
|
else:
|
|
return "{}○{}".format(Colors.DIM, Colors.RESET)
|
|
|
|
def format_mode_badge(self, mode):
|
|
"""Format flight mode as a fancy badge."""
|
|
modes = {
|
|
"grounded": (Colors.CYAN, "GROUNDED"),
|
|
"takeoff": (Colors.BRIGHT_YELLOW, "TAKEOFF "),
|
|
"cruise": (Colors.BRIGHT_GREEN, "CRUISE "),
|
|
"descent": (Colors.BRIGHT_MAGENTA, "DESCENT "),
|
|
}
|
|
color, label = modes.get(mode, (Colors.WHITE, "????? "))
|
|
return "{}[{}]{}".format(color, label, Colors.RESET)
|
|
|
|
def format_phase_badge(self, phase):
|
|
"""Format phase as a badge."""
|
|
phases = {
|
|
"heating": (Colors.BRIGHT_RED, "HEATING"),
|
|
"cooling": (Colors.BRIGHT_BLUE, "COOLING"),
|
|
"idle": (Colors.DIM, " IDLE "),
|
|
}
|
|
color, label = phases.get(phase, (Colors.WHITE, " ? "))
|
|
return "{}{}{}".format(color, label, Colors.RESET)
|
|
|
|
def strip_ansi(self, text):
|
|
"""Remove ANSI color codes from text for length calculation."""
|
|
import re
|
|
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
|
return ansi_escape.sub('', text)
|
|
|
|
def draw_nail_panel_compact(self, nail_id, status):
|
|
"""Draw compact status panel for a single nail (for side-by-side layout)."""
|
|
if not status:
|
|
return []
|
|
|
|
lines = []
|
|
|
|
# Header
|
|
enabled = status.get("enabled", False)
|
|
has_error = status.get("error", False)
|
|
icon = self.format_status_icon(enabled, has_error)
|
|
|
|
status_text = "ONLINE" if enabled else "OFFLINE"
|
|
lines.append("{} {}{}".format(icon, status_text, Colors.RESET))
|
|
|
|
# Temperature display
|
|
current = float(status.get("current_temp", 0))
|
|
setpoint = float(status.get("setpoint", 0))
|
|
temp_str = "{:5.0f}F".format(current) if current else "ERROR"
|
|
setpt_str = "{:5.0f}F".format(setpoint)
|
|
lines.append("T: {} S: {}".format(temp_str, setpt_str))
|
|
|
|
# Error display
|
|
error = float(status.get("error", 0))
|
|
error_color = Colors.RED if abs(error) > 20 else Colors.YELLOW if abs(error) > 5 else Colors.GREEN
|
|
lines.append("{}Err: {:+5.1f}F{}".format(error_color, error, Colors.RESET))
|
|
|
|
# PID Output bar
|
|
output = float(status.get("output", 0))
|
|
output_pct = min(100.0, max(0.0, output * 100))
|
|
bar_filled = int(output_pct / 5)
|
|
bar = "{}[{}{}]{}".format(
|
|
Colors.BRIGHT_GREEN,
|
|
"=" * bar_filled,
|
|
" " * (5 - bar_filled),
|
|
Colors.RESET
|
|
)
|
|
lines.append("Out: {} {:3.0f}%".format(bar, output_pct))
|
|
|
|
# Flight mode
|
|
mode = status.get("flight_mode", "grounded")
|
|
mode_short = mode[:7]
|
|
lines.append("Mode: {}".format(mode_short))
|
|
|
|
# Phase
|
|
phase = status.get("phase", "idle")
|
|
phase_short = phase[:7]
|
|
lines.append("Phase: {}".format(phase_short))
|
|
|
|
# Safety status
|
|
safety = status.get("safety", {})
|
|
temp_ok = safety.get("temp_ok", True)
|
|
tc_ok = safety.get("tc_ok", True)
|
|
watchdog_ok = safety.get("watchdog_ok", True)
|
|
|
|
safety_status = "{}OK{}".format(Colors.GREEN, Colors.RESET) if (temp_ok and tc_ok and watchdog_ok) else "{}WARN{}".format(Colors.RED, Colors.RESET)
|
|
lines.append("Safety: {}".format(safety_status))
|
|
|
|
return lines
|
|
|
|
def draw_frame(self):
|
|
"""Draw the complete status display frame."""
|
|
lines = []
|
|
|
|
# ASCII Art Title with timestamp
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
lines.append("")
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ######: ###### ### ## :##: ###### ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" #######: ###### ### ## ## ###### ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## :## ## ###: ## #### ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## #### ## #### ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## :## ## ##:#: ## :# #: ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" #######: ## ## ## ## #::# ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ######: ## ## ## ## ## ## ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## :#:## ###### ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## #### .######. ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ## ## :### :## ##: ## ## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ###### ## ### ### ### ###### ######## ", Colors.RESET))
|
|
lines.append("{}{}{}".format(Colors.BRIGHT_MAGENTA + Colors.BOLD,
|
|
" ## ###### ## ### ##: :## ###### ######## ", Colors.RESET))
|
|
|
|
# Timestamp on its own line
|
|
lines.append("")
|
|
lines.append("{}{}{}".format(
|
|
Colors.DIM,
|
|
timestamp.center(70),
|
|
Colors.RESET
|
|
))
|
|
|
|
lines.append("")
|
|
|
|
# Get data safely
|
|
with self.lock:
|
|
nail1_data = self.data.get("nail1", {})
|
|
nail2_data = self.data.get("nail2", {})
|
|
|
|
# Get compact panel lines for both nails
|
|
nail1_lines = self.draw_nail_panel_compact("nail1", nail1_data)
|
|
nail2_lines = self.draw_nail_panel_compact("nail2", nail2_data)
|
|
|
|
# Pad lines to same length
|
|
max_lines = max(len(nail1_lines), len(nail2_lines))
|
|
while len(nail1_lines) < max_lines:
|
|
nail1_lines.append("")
|
|
while len(nail2_lines) < max_lines:
|
|
nail2_lines.append("")
|
|
|
|
# Draw separate boxes for each nail side-by-side
|
|
# Top border
|
|
lines.append("{}┌──────────────────────────┐ ┌──────────────────────────┐{}".format(
|
|
Colors.CYAN, Colors.RESET))
|
|
|
|
# Header line with nail names
|
|
n1_header = "{}NAIL 1{}".format(Colors.BRIGHT_RED + Colors.BOLD, Colors.CYAN)
|
|
n2_header = "{}NAIL 2{}".format(Colors.BRIGHT_BLUE + Colors.BOLD, Colors.CYAN)
|
|
# Manually pad with spaces after stripping ANSI codes
|
|
n1_spaces = 22 - len(self.strip_ansi(n1_header))
|
|
n2_spaces = 22 - len(self.strip_ansi(n2_header))
|
|
lines.append("{}│ {}{:<{}}│ │ {}{:<{}}│{}".format(
|
|
Colors.CYAN, n1_header, "", n1_spaces, n2_header, "", n2_spaces, Colors.RESET))
|
|
|
|
# Middle border
|
|
lines.append("{}├──────────────────────────┤ ├──────────────────────────┤{}".format(
|
|
Colors.CYAN, Colors.RESET))
|
|
|
|
# Content lines
|
|
for n1_line, n2_line in zip(nail1_lines, nail2_lines):
|
|
# Calculate visible length without ANSI codes
|
|
n1_visible = self.strip_ansi(n1_line)[:22]
|
|
n2_visible = self.strip_ansi(n2_line)[:22]
|
|
n1_spaces = 22 - len(n1_visible)
|
|
n2_spaces = 22 - len(n2_visible)
|
|
|
|
lines.append("{}│ {}{:<{}}│ │ {}{:<{}}│{}".format(
|
|
Colors.CYAN,
|
|
n1_line,
|
|
"",
|
|
n1_spaces,
|
|
n2_line,
|
|
"",
|
|
n2_spaces,
|
|
Colors.RESET
|
|
))
|
|
|
|
# Bottom border
|
|
lines.append("{}└──────────────────────────┘ └──────────────────────────┘{}".format(
|
|
Colors.CYAN, Colors.RESET))
|
|
|
|
return "\n".join(lines)
|
|
|
|
def display_loop(self):
|
|
"""Main display loop."""
|
|
# Start fetch worker thread
|
|
fetch_thread = threading.Thread(target=self.fetch_worker)
|
|
fetch_thread.daemon = True
|
|
fetch_thread.start()
|
|
|
|
# Initial draw
|
|
sys.stdout.write("\033[2J\033[H")
|
|
sys.stdout.flush()
|
|
self.last_draw = time.time()
|
|
|
|
try:
|
|
while self.running:
|
|
now = time.time()
|
|
|
|
# Only redraw if enough time has passed (reduces flicker)
|
|
if now - self.last_draw >= DISPLAY_INTERVAL:
|
|
# Clear screen (ANSI escape)
|
|
sys.stdout.write("\033[2J\033[H")
|
|
sys.stdout.flush()
|
|
|
|
# Draw frame
|
|
frame = self.draw_frame()
|
|
sys.stdout.write(frame)
|
|
sys.stdout.write("\n")
|
|
sys.stdout.flush()
|
|
|
|
self.last_draw = now
|
|
|
|
time.sleep(UPDATE_INTERVAL)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
self.running = False
|
|
|
|
def run(self):
|
|
"""Run the status display."""
|
|
# Redirect stdout to TTY if available
|
|
if os.path.exists(TTY_PATH):
|
|
try:
|
|
tty_fd = os.open(TTY_PATH, os.O_WRONLY)
|
|
os.dup2(tty_fd, 1)
|
|
os.close(tty_fd)
|
|
except Exception:
|
|
pass
|
|
|
|
# Handle signals
|
|
def sigint_handler(signum, frame):
|
|
self.running = False
|
|
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
|
|
# Run display loop
|
|
self.display_loop()
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
display = StatusDisplay()
|
|
display.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|