Replace briefing system with fancy CLI status display panel
- Removed hourly news briefing system (briefing service/timer) - Added tty_status_display.py: real-time status panel showing dual nail status - Temperature, setpoint, error, PID output with visual bar - Flight mode and phase badges with color coding - PID coefficients and integral/derivative values - Safety status indicators - Continuous color-coded display with ANSI formatting - Added pinail-status.service: systemd service for status display - Runs as root with direct TTY output to /dev/tty1 - Auto-restart on failure, requires pinail2.service - Logs errors to /tmp/status_display.log - Deployed and verified running on Pi (service active)
This commit is contained in:
@@ -0,0 +1,20 @@
|
||||
[Unit]
|
||||
Description=piNail2 Status Display Panel
|
||||
After=pinail2.service network-online.target
|
||||
Wants=network-online.target
|
||||
Requires=pinail2.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=root
|
||||
WorkingDirectory=/home/pi/piNail2
|
||||
ExecStart=/usr/bin/python3 /home/pi/piNail2/tty_status_display.py
|
||||
Restart=always
|
||||
RestartSec=5
|
||||
StandardOutput=tty
|
||||
StandardError=file:/tmp/status_display.log
|
||||
TTYPath=/dev/tty1
|
||||
SyslogIdentifier=pinail-status
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
@@ -0,0 +1,327 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
piNail2 — TTY Status Display Panel
|
||||
|
||||
Displays a fancy real-time status panel on the HDMI display showing:
|
||||
- Dual nail temperature status and control mode
|
||||
- PID parameters and error/output values
|
||||
- Flight mode and current phase
|
||||
- Safety status
|
||||
- Performance metrics
|
||||
|
||||
Compatible with Python 3.5+
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import requests
|
||||
import signal
|
||||
import threading
|
||||
from datetime import datetime
|
||||
|
||||
# Python 3.5 compatibility - no f-strings
|
||||
try:
|
||||
from http.client import HTTPConnection
|
||||
HTTPConnection.debuglevel = 0
|
||||
except ImportError:
|
||||
from httplib import HTTPConnection
|
||||
HTTPConnection.debuglevel = 0
|
||||
|
||||
# Configuration
|
||||
API_BASE = "http://localhost:5000"
|
||||
UPDATE_INTERVAL = 1.0 # seconds
|
||||
TTY_PATH = "/dev/tty1"
|
||||
|
||||
# ANSI color codes
|
||||
class Colors:
|
||||
RESET = "\033[0m"
|
||||
BOLD = "\033[1m"
|
||||
DIM = "\033[2m"
|
||||
UNDERLINE = "\033[4m"
|
||||
|
||||
# Foreground colors
|
||||
BLACK = "\033[30m"
|
||||
RED = "\033[31m"
|
||||
GREEN = "\033[32m"
|
||||
YELLOW = "\033[33m"
|
||||
BLUE = "\033[34m"
|
||||
MAGENTA = "\033[35m"
|
||||
CYAN = "\033[36m"
|
||||
WHITE = "\033[37m"
|
||||
|
||||
# Bright colors
|
||||
BRIGHT_RED = "\033[91m"
|
||||
BRIGHT_GREEN = "\033[92m"
|
||||
BRIGHT_YELLOW = "\033[93m"
|
||||
BRIGHT_BLUE = "\033[94m"
|
||||
BRIGHT_MAGENTA = "\033[95m"
|
||||
BRIGHT_CYAN = "\033[96m"
|
||||
|
||||
# Background colors
|
||||
BG_RED = "\033[41m"
|
||||
BG_GREEN = "\033[42m"
|
||||
BG_YELLOW = "\033[43m"
|
||||
BG_BLUE = "\033[44m"
|
||||
BG_MAGENTA = "\033[45m"
|
||||
BG_CYAN = "\033[46m"
|
||||
|
||||
|
||||
class StatusDisplay:
|
||||
def __init__(self):
|
||||
self.running = True
|
||||
self.last_update = 0
|
||||
self.data = {
|
||||
"nail1": {},
|
||||
"nail2": {},
|
||||
}
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def fetch_status(self):
|
||||
"""Fetch status from API."""
|
||||
try:
|
||||
resp = requests.get("{}/api/status/all".format(API_BASE), timeout=2)
|
||||
if resp.status_code == 200:
|
||||
with self.lock:
|
||||
payload = resp.json()
|
||||
self.data = payload.get("nails", {})
|
||||
self.last_update = time.time()
|
||||
return True
|
||||
except Exception as e:
|
||||
pass
|
||||
return False
|
||||
|
||||
def fetch_worker(self):
|
||||
"""Background thread that fetches status periodically."""
|
||||
while self.running:
|
||||
self.fetch_status()
|
||||
time.sleep(UPDATE_INTERVAL)
|
||||
|
||||
def format_temp(self, temp_f):
|
||||
"""Format temperature with color coding."""
|
||||
if temp_f is None:
|
||||
return "{}ERR{}".format(Colors.RED, Colors.RESET)
|
||||
temp_f = float(temp_f)
|
||||
|
||||
if temp_f < 0:
|
||||
color = Colors.RED
|
||||
elif temp_f < 100:
|
||||
color = Colors.BLUE
|
||||
elif temp_f < 400:
|
||||
color = Colors.YELLOW
|
||||
else:
|
||||
color = Colors.BRIGHT_RED
|
||||
|
||||
return "{}{:6.1f}F{}".format(color, temp_f, Colors.RESET)
|
||||
|
||||
def format_status_icon(self, enabled, has_error):
|
||||
"""Return status icon."""
|
||||
if has_error:
|
||||
return "{}●{}".format(Colors.RED, Colors.RESET)
|
||||
elif enabled:
|
||||
return "{}●{}".format(Colors.GREEN, Colors.RESET)
|
||||
else:
|
||||
return "{}○{}".format(Colors.DIM, Colors.RESET)
|
||||
|
||||
def format_mode_badge(self, mode):
|
||||
"""Format flight mode as a fancy badge."""
|
||||
modes = {
|
||||
"grounded": (Colors.CYAN, "GROUNDED"),
|
||||
"takeoff": (Colors.BRIGHT_YELLOW, "TAKEOFF "),
|
||||
"cruise": (Colors.BRIGHT_GREEN, "CRUISE "),
|
||||
"descent": (Colors.BRIGHT_MAGENTA, "DESCENT "),
|
||||
}
|
||||
color, label = modes.get(mode, (Colors.WHITE, "????? "))
|
||||
return "{}[{}]{}".format(color, label, Colors.RESET)
|
||||
|
||||
def format_phase_badge(self, phase):
|
||||
"""Format phase as a badge."""
|
||||
phases = {
|
||||
"heating": (Colors.BRIGHT_RED, "HEATING"),
|
||||
"cooling": (Colors.BRIGHT_BLUE, "COOLING"),
|
||||
"idle": (Colors.DIM, " IDLE "),
|
||||
}
|
||||
color, label = phases.get(phase, (Colors.WHITE, " ? "))
|
||||
return "{}{}{}".format(color, label, Colors.RESET)
|
||||
|
||||
def draw_nail_panel(self, nail_id, status):
|
||||
"""Draw status panel for a single nail."""
|
||||
if not status:
|
||||
return []
|
||||
|
||||
lines = []
|
||||
|
||||
# Header
|
||||
nail_name = "NAIL 1" if nail_id == "nail1" else "NAIL 2"
|
||||
enabled = status.get("enabled", False)
|
||||
has_error = status.get("error", False)
|
||||
icon = self.format_status_icon(enabled, has_error)
|
||||
|
||||
lines.append("{}{} {} {}{}".format(
|
||||
Colors.BOLD, nail_name, icon,
|
||||
"ONLINE" if enabled else "OFFLINE",
|
||||
Colors.RESET
|
||||
))
|
||||
|
||||
# Temperature display
|
||||
current = float(status.get("current_temp", 0))
|
||||
setpoint = float(status.get("setpoint", 0))
|
||||
lines.append(" Temp: {} / {}{}C".format(
|
||||
self.format_temp(current),
|
||||
self.format_temp(setpoint),
|
||||
Colors.RESET
|
||||
))
|
||||
|
||||
# Error display
|
||||
error = float(status.get("error", 0))
|
||||
error_color = Colors.RED if abs(error) > 20 else Colors.YELLOW if abs(error) > 5 else Colors.GREEN
|
||||
lines.append(" Error: {}{:+7.1f}F{}".format(error_color, error, Colors.RESET))
|
||||
|
||||
# PID Output
|
||||
output = float(status.get("output", 0))
|
||||
output_pct = min(100.0, max(0.0, output * 100))
|
||||
bar_filled = int(output_pct / 5)
|
||||
bar = "{}|{}{}|{}".format(
|
||||
Colors.BRIGHT_GREEN,
|
||||
"=" * bar_filled,
|
||||
" " * (20 - bar_filled),
|
||||
Colors.RESET
|
||||
)
|
||||
lines.append(" Output: {} {:5.1f}%".format(bar, output_pct))
|
||||
|
||||
# Flight mode and phase
|
||||
mode = status.get("flight_mode", "grounded")
|
||||
phase = status.get("phase", "idle")
|
||||
lines.append(" Mode: {} Phase: {}".format(
|
||||
self.format_mode_badge(mode),
|
||||
self.format_phase_badge(phase)
|
||||
))
|
||||
|
||||
# PID coefficients
|
||||
pid = status.get("pid", {})
|
||||
kp = float(pid.get("kP", 0))
|
||||
ki = float(pid.get("kI", 0))
|
||||
kd = float(pid.get("kD", 0))
|
||||
lines.append(" PID: kP={:6.2f} kI={:6.2f} kD={:6.2f}".format(kp, ki, kd))
|
||||
|
||||
# Integral and derivative
|
||||
integral = float(pid.get("integral", 0))
|
||||
derivative = float(pid.get("derivative", 0))
|
||||
lines.append(" Err-I: {:8.1f} Err-D: {:8.3f}".format(integral, derivative))
|
||||
|
||||
# Safety status
|
||||
safety = status.get("safety", {})
|
||||
temp_ok = safety.get("temp_ok", True)
|
||||
tc_ok = safety.get("tc_ok", True)
|
||||
watchdog_ok = safety.get("watchdog_ok", True)
|
||||
|
||||
safety_status = "{}OK{}".format(Colors.GREEN, Colors.RESET) if (temp_ok and tc_ok and watchdog_ok) else "{}WARN{}".format(Colors.RED, Colors.RESET)
|
||||
lines.append(" Safety: {}".format(safety_status))
|
||||
|
||||
return lines
|
||||
|
||||
def draw_frame(self):
|
||||
"""Draw the complete status display frame."""
|
||||
lines = []
|
||||
|
||||
# Title
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
lines.append("")
|
||||
lines.append("{}{}═════════════════════════════════════════════════════{}".format(
|
||||
Colors.BOLD + Colors.CYAN,
|
||||
" piNail2 STATUS DISPLAY ",
|
||||
Colors.RESET
|
||||
))
|
||||
lines.append("{}═════════════════════════════════════════════════════{}".format(
|
||||
Colors.CYAN,
|
||||
Colors.RESET
|
||||
))
|
||||
lines.append("{}{}{}".format(Colors.DIM, timestamp, Colors.RESET))
|
||||
lines.append("")
|
||||
|
||||
# Get data safely
|
||||
with self.lock:
|
||||
nail1_data = self.data.get("nail1", {})
|
||||
nail2_data = self.data.get("nail2", {})
|
||||
|
||||
# Nail 1 panel
|
||||
lines.append("{}┌─ NAIL 1 {}".format(Colors.CYAN, Colors.RESET))
|
||||
lines.append("│")
|
||||
for line in self.draw_nail_panel("nail1", nail1_data):
|
||||
lines.append("│ {}".format(line))
|
||||
lines.append("│")
|
||||
|
||||
# Separator
|
||||
lines.append("")
|
||||
|
||||
# Nail 2 panel
|
||||
lines.append("{}┌─ NAIL 2 {}".format(Colors.CYAN, Colors.RESET))
|
||||
lines.append("│")
|
||||
for line in self.draw_nail_panel("nail2", nail2_data):
|
||||
lines.append("│ {}".format(line))
|
||||
lines.append("│")
|
||||
|
||||
# Footer
|
||||
lines.append("")
|
||||
lines.append("{}═════════════════════════════════════════════════════{}".format(
|
||||
Colors.CYAN,
|
||||
Colors.RESET
|
||||
))
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def display_loop(self):
|
||||
"""Main display loop."""
|
||||
# Start fetch worker thread
|
||||
fetch_thread = threading.Thread(target=self.fetch_worker)
|
||||
fetch_thread.daemon = True
|
||||
fetch_thread.start()
|
||||
|
||||
try:
|
||||
while self.running:
|
||||
# Clear screen (ANSI escape)
|
||||
sys.stdout.write("\033[2J\033[H")
|
||||
sys.stdout.flush()
|
||||
|
||||
# Draw frame
|
||||
frame = self.draw_frame()
|
||||
sys.stdout.write(frame)
|
||||
sys.stdout.write("\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
time.sleep(UPDATE_INTERVAL)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
self.running = False
|
||||
|
||||
def run(self):
|
||||
"""Run the status display."""
|
||||
# Redirect stdout to TTY if available
|
||||
if os.path.exists(TTY_PATH):
|
||||
try:
|
||||
tty_fd = os.open(TTY_PATH, os.O_WRONLY)
|
||||
os.dup2(tty_fd, 1)
|
||||
os.close(tty_fd)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Handle signals
|
||||
def sigint_handler(signum, frame):
|
||||
self.running = False
|
||||
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
|
||||
# Run display loop
|
||||
self.display_loop()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point."""
|
||||
display = StatusDisplay()
|
||||
display.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user