#!/usr/bin/env python3 """PTZ Camera Control Panel for two TENVIS cameras proxied via Raspberry Pi.""" import json import urllib.request import urllib.parse from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn CAMERAS = { "cam1": {"host": "192.168.0.102", "http_port": 18080, "label": "Printer Cam 1"}, "cam2": {"host": "192.168.0.102", "http_port": 28080, "label": "Printer Cam 2"}, } GO2RTC = "127.0.0.1:1984" AUTH = ("admin", "admin") PORT = 8090 HTML = """ Camera Control

Camera Control Panel

""" class Handler(BaseHTTPRequestHandler): def do_GET(self): path = self.path.split("?")[0] query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query) if path == "/": self._respond(200, "text/html", HTML) return # /api//stream -> proxy go2rtc MJPEG parts = path.strip("/").split("/") if len(parts) >= 3 and parts[0] == "api" and parts[1] in CAMERAS: cam_id = parts[1] cam = CAMERAS[cam_id] action = parts[2] if action == "stream": self._proxy_stream(cam_id) return elif action == "frame": self._proxy_frame(cam_id) return elif action == "ptz": act = query.get("act", ["stop"])[0] step = query.get("step", ["0"])[0] resp = self._cam_cgi(cam, f"cmd=ptzctrl&-step={step}&-act={act}") self._respond(200, "text/plain", resp) return elif action == "speed": pan = query.get("pan", ["2"])[0] tilt = query.get("tilt", ["2"])[0] resp = self._cam_cgi(cam, f"cmd=setptzspeed&-panspeed={pan}&-tiltspeed={tilt}") self._respond(200, "text/plain", resp) return elif action == "image": params = "&".join(f"-{k}={v[0]}" for k, v in query.items()) resp = self._cam_cgi(cam, f"cmd=setimageattr&{params}") self._respond(200, "text/plain", resp) return elif action == "imageattr": resp = self._cam_cgi(cam, "cmd=getimageattr") resp2 = self._cam_cgi(cam, "cmd=getptzspeed") self._respond(200, "text/plain", resp + "\n" + resp2) return self._respond(404, "text/plain", "Not found") def _cam_cgi(self, cam, params): url = f"http://{cam['host']}:{cam['http_port']}/cgi-bin/hi3510/param.cgi?{params}" try: req = urllib.request.Request(url) cred = f"{AUTH[0]}:{AUTH[1]}" import base64 req.add_header("Authorization", "Basic " + base64.b64encode(cred.encode()).decode()) with urllib.request.urlopen(req, timeout=5) as resp: return resp.read().decode() except Exception as e: return f"Error: {e}" def _proxy_frame(self, cam_id): url = f"http://{GO2RTC}/api/frame.jpeg?src=printer_{cam_id}" try: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=5) as resp: data = resp.read() self.send_response(200) self.send_header("Content-Type", "image/jpeg") self.send_header("Content-Length", str(len(data))) self.send_header("Cache-Control", "no-cache, no-store") self.end_headers() self.wfile.write(data) except Exception as e: self._respond(502, "text/plain", f"Frame error: {e}") def _proxy_stream(self, cam_id): url = f"http://{GO2RTC}/api/stream.mjpeg?src=printer_{cam_id}" try: req = urllib.request.Request(url) resp = urllib.request.urlopen(req, timeout=10) content_type = resp.headers.get("Content-Type", "multipart/x-mixed-replace") self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Cache-Control", "no-cache") self.end_headers() while True: chunk = resp.read(8192) if not chunk: break self.wfile.write(chunk) except (BrokenPipeError, ConnectionResetError): pass except Exception as e: self._respond(502, "text/plain", f"Stream error: {e}") def _respond(self, code, content_type, body): data = body.encode() if isinstance(body, str) else body self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def log_message(self, fmt, *args): pass # suppress request logs class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): daemon_threads = True if __name__ == "__main__": server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler) print(f"Camera Control Panel running on http://0.0.0.0:{PORT}") server.serve_forever()