#!/usr/bin/env python3
"""PTZ Camera Control Panel for two TENVIS cameras proxied via Raspberry Pi."""
import json
import urllib.request
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
CAMERAS = {
"cam1": {"host": "192.168.0.102", "http_port": 18080, "label": "Printer Cam 1"},
"cam2": {"host": "192.168.0.102", "http_port": 28080, "label": "Printer Cam 2"},
}
GO2RTC = "127.0.0.1:1984"
AUTH = ("admin", "admin")
PORT = 8090
HTML = """
Camera Control
Camera Control Panel
"""
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.split("?")[0]
query = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
if path == "/":
self._respond(200, "text/html", HTML)
return
# /api//stream -> proxy go2rtc MJPEG
parts = path.strip("/").split("/")
if len(parts) >= 3 and parts[0] == "api" and parts[1] in CAMERAS:
cam_id = parts[1]
cam = CAMERAS[cam_id]
action = parts[2]
if action == "stream":
self._proxy_stream(cam_id)
return
elif action == "frame":
self._proxy_frame(cam_id)
return
elif action == "ptz":
act = query.get("act", ["stop"])[0]
step = query.get("step", ["0"])[0]
resp = self._cam_cgi(cam, f"cmd=ptzctrl&-step={step}&-act={act}")
self._respond(200, "text/plain", resp)
return
elif action == "speed":
pan = query.get("pan", ["2"])[0]
tilt = query.get("tilt", ["2"])[0]
resp = self._cam_cgi(cam, f"cmd=setptzspeed&-panspeed={pan}&-tiltspeed={tilt}")
self._respond(200, "text/plain", resp)
return
elif action == "image":
params = "&".join(f"-{k}={v[0]}" for k, v in query.items())
resp = self._cam_cgi(cam, f"cmd=setimageattr&{params}")
self._respond(200, "text/plain", resp)
return
elif action == "imageattr":
resp = self._cam_cgi(cam, "cmd=getimageattr")
resp2 = self._cam_cgi(cam, "cmd=getptzspeed")
self._respond(200, "text/plain", resp + "\n" + resp2)
return
self._respond(404, "text/plain", "Not found")
def _cam_cgi(self, cam, params):
url = f"http://{cam['host']}:{cam['http_port']}/cgi-bin/hi3510/param.cgi?{params}"
try:
req = urllib.request.Request(url)
cred = f"{AUTH[0]}:{AUTH[1]}"
import base64
req.add_header("Authorization", "Basic " + base64.b64encode(cred.encode()).decode())
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.read().decode()
except Exception as e:
return f"Error: {e}"
def _proxy_frame(self, cam_id):
url = f"http://{GO2RTC}/api/frame.jpeg?src=printer_{cam_id}"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
data = resp.read()
self.send_response(200)
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", str(len(data)))
self.send_header("Cache-Control", "no-cache, no-store")
self.end_headers()
self.wfile.write(data)
except Exception as e:
self._respond(502, "text/plain", f"Frame error: {e}")
def _proxy_stream(self, cam_id):
url = f"http://{GO2RTC}/api/stream.mjpeg?src=printer_{cam_id}"
try:
req = urllib.request.Request(url)
resp = urllib.request.urlopen(req, timeout=10)
content_type = resp.headers.get("Content-Type", "multipart/x-mixed-replace")
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Cache-Control", "no-cache")
self.end_headers()
while True:
chunk = resp.read(8192)
if not chunk:
break
self.wfile.write(chunk)
except (BrokenPipeError, ConnectionResetError):
pass
except Exception as e:
self._respond(502, "text/plain", f"Stream error: {e}")
def _respond(self, code, content_type, body):
data = body.encode() if isinstance(body, str) else body
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt, *args):
pass # suppress request logs
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True
if __name__ == "__main__":
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
print(f"Camera Control Panel running on http://0.0.0.0:{PORT}")
server.serve_forever()