Files
3d-printing/cam-control/server.py
T
Mortdecai 9daf6acd66 feat: initial project setup with camera control panel
3D printing project with Ender 3 V3 SE documentation, OctoPrint/Pi
infrastructure docs, and PTZ camera control webapp for two TENVIS
cameras proxied through Raspberry Pi to Frigate NVR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 19:14:30 -04:00

308 lines
12 KiB
Python

#!/usr/bin/env python3
"""PTZ Camera Control Panel for two TENVIS cameras proxied via Raspberry Pi."""
import json
import urllib.request
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
CAMERAS = {
"cam1": {"host": "192.168.0.102", "http_port": 18080, "label": "Printer Cam 1"},
"cam2": {"host": "192.168.0.102", "http_port": 28080, "label": "Printer Cam 2"},
}
GO2RTC = "127.0.0.1:1984"
AUTH = ("admin", "admin")
PORT = 8090
HTML = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Camera Control</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { background: #1a1a2e; color: #e0e0e0; font-family: -apple-system, system-ui, sans-serif; }
h1 { text-align: center; padding: 16px 0 8px; font-size: 1.4rem; color: #c0c0c0; }
.grid { display: flex; flex-wrap: wrap; justify-content: center; gap: 16px; padding: 8px 16px 16px; }
.cam-panel {
background: #16213e; border-radius: 12px; padding: 12px;
flex: 1 1 480px; max-width: 640px; min-width: 320px;
}
.cam-label { text-align: center; font-size: 1.1rem; font-weight: 600; margin-bottom: 8px; color: #a0c4ff; }
.feed { width: 100%; aspect-ratio: 16/9; background: #000; border-radius: 8px; object-fit: contain; }
.controls { display: flex; gap: 12px; margin-top: 10px; align-items: flex-start; justify-content: center; flex-wrap: wrap; }
.dpad {
display: grid; grid-template-columns: repeat(3, 48px); grid-template-rows: repeat(3, 48px);
gap: 4px;
}
.dpad button {
background: #0f3460; border: 1px solid #1a5276; border-radius: 8px; color: #e0e0e0;
font-size: 1.2rem; cursor: pointer; transition: background 0.15s;
display: flex; align-items: center; justify-content: center;
}
.dpad button:hover { background: #1a5276; }
.dpad button:active { background: #2980b9; }
.dpad .center { background: #c0392b; border-color: #e74c3c; font-size: 0.7rem; font-weight: 700; }
.dpad .center:hover { background: #e74c3c; }
.dpad .empty { visibility: hidden; }
.side-controls { display: flex; flex-direction: column; gap: 8px; }
.side-controls label { font-size: 0.8rem; color: #888; }
.side-controls select, .side-controls button {
background: #0f3460; border: 1px solid #1a5276; border-radius: 6px; color: #e0e0e0;
padding: 6px 10px; font-size: 0.85rem; cursor: pointer;
}
.side-controls select:hover, .side-controls button:hover { background: #1a5276; }
.toggle-row { display: flex; gap: 6px; }
.toggle-row button { flex: 1; font-size: 0.8rem; padding: 6px 8px; }
.toggle-row button.active { background: #2980b9; border-color: #3498db; }
.status { text-align: center; font-size: 0.75rem; color: #666; margin-top: 4px; min-height: 1.2em; }
</style>
</head>
<body>
<h1>Camera Control Panel</h1>
<div class="grid" id="grid"></div>
<script>
const CAMS = """ + json.dumps({k: v["label"] for k, v in CAMERAS.items()}) + """;
function buildPanel(camId, label) {
return `
<div class="cam-panel" id="panel-${camId}">
<div class="cam-label">${label}</div>
<img class="feed" id="feed-${camId}" alt="${label} feed">
<div class="controls">
<div class="dpad">
<div class="empty"></div>
<button onmousedown="ptz('${camId}','up',0)" onmouseup="ptz('${camId}','stop',0)" ontouchstart="ptz('${camId}','up',0)" ontouchend="ptz('${camId}','stop',0)">&#9650;</button>
<div class="empty"></div>
<button onmousedown="ptz('${camId}','left',0)" onmouseup="ptz('${camId}','stop',0)" ontouchstart="ptz('${camId}','left',0)" ontouchend="ptz('${camId}','stop',0)">&#9664;</button>
<button class="center" onclick="ptz('${camId}','home',1)">HOME</button>
<button onmousedown="ptz('${camId}','right',0)" onmouseup="ptz('${camId}','stop',0)" ontouchstart="ptz('${camId}','right',0)" ontouchend="ptz('${camId}','stop',0)">&#9654;</button>
<div class="empty"></div>
<button onmousedown="ptz('${camId}','down',0)" onmouseup="ptz('${camId}','stop',0)" ontouchstart="ptz('${camId}','down',0)" ontouchend="ptz('${camId}','stop',0)">&#9660;</button>
<div class="empty"></div>
</div>
<div class="side-controls">
<div>
<label>Step Mode</label>
<select id="step-${camId}">
<option value="0">Continuous (hold)</option>
<option value="1">Single step</option>
</select>
</div>
<div>
<label>Speed</label>
<select id="speed-${camId}" onchange="setSpeed('${camId}')">
<option value="1">1 (slow)</option>
<option value="2" selected>2</option>
<option value="3">3</option>
<option value="4">4</option>
<option value="5">5 (fast)</option>
</select>
</div>
<div>
<label>Image</label>
<div class="toggle-row">
<button id="flip-${camId}" onclick="toggleAttr('${camId}','flip')">Flip</button>
<button id="mirror-${camId}" onclick="toggleAttr('${camId}','mirror')">Mirror</button>
</div>
</div>
</div>
</div>
<div class="status" id="status-${camId}"></div>
</div>`;
}
document.getElementById('grid').innerHTML = Object.entries(CAMS).map(([id, label]) => buildPanel(id, label)).join('');
async function ptz(cam, act, step) {
const stepMode = document.getElementById('step-' + cam).value;
const s = step !== undefined ? step : parseInt(stepMode);
const realStep = act === 'stop' ? 0 : (stepMode === '1' ? 1 : s);
try {
const r = await fetch(`/api/${cam}/ptz?act=${act}&step=${realStep}`);
const t = await r.text();
showStatus(cam, `${act} - ${t.trim()}`);
} catch(e) { showStatus(cam, 'Error: ' + e.message); }
}
async function setSpeed(cam) {
const spd = document.getElementById('speed-' + cam).value;
try {
const r = await fetch(`/api/${cam}/speed?pan=${spd}&tilt=${spd}`);
const t = await r.text();
showStatus(cam, `Speed set to ${spd}`);
} catch(e) { showStatus(cam, 'Error: ' + e.message); }
}
async function toggleAttr(cam, attr) {
const btn = document.getElementById(attr + '-' + cam);
const isActive = btn.classList.contains('active');
const val = isActive ? 'off' : 'on';
try {
const r = await fetch(`/api/${cam}/image?${attr}=${val}`);
const t = await r.text();
if (t.includes('ok')) {
btn.classList.toggle('active');
showStatus(cam, `${attr} ${val}`);
} else { showStatus(cam, t.trim()); }
} catch(e) { showStatus(cam, 'Error: ' + e.message); }
}
async function loadImageState(cam) {
try {
const r = await fetch(`/api/${cam}/imageattr`);
const t = await r.text();
const flip = t.match(/flip="(\w+)"/);
const mirror = t.match(/mirror="(\w+)"/);
if (flip && flip[1] === 'on') document.getElementById('flip-' + cam).classList.add('active');
if (mirror && mirror[1] === 'on') document.getElementById('mirror-' + cam).classList.add('active');
const speed = t.match(/panspeed="(\d+)"/);
if (speed) document.getElementById('speed-' + cam).value = speed[1];
} catch(e) {}
}
function showStatus(cam, msg) {
const el = document.getElementById('status-' + cam);
el.textContent = msg;
clearTimeout(el._t);
el._t = setTimeout(() => el.textContent = '', 3000);
}
Object.keys(CAMS).forEach(loadImageState);
// Snapshot polling for live feeds
function startFeed(camId) {
const img = document.getElementById('feed-' + camId);
let running = true;
async function poll() {
if (!running) return;
const next = new Image();
next.onload = () => { img.src = next.src; setTimeout(poll, 200); };
next.onerror = () => { setTimeout(poll, 1000); };
next.src = '/api/' + camId + '/frame?t=' + Date.now();
}
poll();
}
Object.keys(CAMS).forEach(startFeed);
</script>
</body>
</html>"""
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.split("?")[0]
query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
if path == "/":
self._respond(200, "text/html", HTML)
return
# /api/<cam>/stream -> proxy go2rtc MJPEG
parts = path.strip("/").split("/")
if len(parts) >= 3 and parts[0] == "api" and parts[1] in CAMERAS:
cam_id = parts[1]
cam = CAMERAS[cam_id]
action = parts[2]
if action == "stream":
self._proxy_stream(cam_id)
return
elif action == "frame":
self._proxy_frame(cam_id)
return
elif action == "ptz":
act = query.get("act", ["stop"])[0]
step = query.get("step", ["0"])[0]
resp = self._cam_cgi(cam, f"cmd=ptzctrl&-step={step}&-act={act}")
self._respond(200, "text/plain", resp)
return
elif action == "speed":
pan = query.get("pan", ["2"])[0]
tilt = query.get("tilt", ["2"])[0]
resp = self._cam_cgi(cam, f"cmd=setptzspeed&-panspeed={pan}&-tiltspeed={tilt}")
self._respond(200, "text/plain", resp)
return
elif action == "image":
params = "&".join(f"-{k}={v[0]}" for k, v in query.items())
resp = self._cam_cgi(cam, f"cmd=setimageattr&{params}")
self._respond(200, "text/plain", resp)
return
elif action == "imageattr":
resp = self._cam_cgi(cam, "cmd=getimageattr")
resp2 = self._cam_cgi(cam, "cmd=getptzspeed")
self._respond(200, "text/plain", resp + "\n" + resp2)
return
self._respond(404, "text/plain", "Not found")
def _cam_cgi(self, cam, params):
url = f"http://{cam['host']}:{cam['http_port']}/cgi-bin/hi3510/param.cgi?{params}"
try:
req = urllib.request.Request(url)
cred = f"{AUTH[0]}:{AUTH[1]}"
import base64
req.add_header("Authorization", "Basic " + base64.b64encode(cred.encode()).decode())
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.read().decode()
except Exception as e:
return f"Error: {e}"
def _proxy_frame(self, cam_id):
url = f"http://{GO2RTC}/api/frame.jpeg?src=printer_{cam_id}"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
data = resp.read()
self.send_response(200)
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", str(len(data)))
self.send_header("Cache-Control", "no-cache, no-store")
self.end_headers()
self.wfile.write(data)
except Exception as e:
self._respond(502, "text/plain", f"Frame error: {e}")
def _proxy_stream(self, cam_id):
url = f"http://{GO2RTC}/api/stream.mjpeg?src=printer_{cam_id}"
try:
req = urllib.request.Request(url)
resp = urllib.request.urlopen(req, timeout=10)
content_type = resp.headers.get("Content-Type", "multipart/x-mixed-replace")
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Cache-Control", "no-cache")
self.end_headers()
while True:
chunk = resp.read(8192)
if not chunk:
break
self.wfile.write(chunk)
except (BrokenPipeError, ConnectionResetError):
pass
except Exception as e:
self._respond(502, "text/plain", f"Stream error: {e}")
def _respond(self, code, content_type, body):
data = body.encode() if isinstance(body, str) else body
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt, *args):
pass # suppress request logs
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
if __name__ == "__main__":
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
print(f"Camera Control Panel running on http://0.0.0.0:{PORT}")
server.serve_forever()