#!/usr/bin/env python3 """ POS Daily Briefing — Epson TM-m30 Fetches unread news from FreshRSS, summarizes via Ollama, prints ESC/POS receipt. """ import os import re import json import socket import requests import yfinance as yf from datetime import datetime # ── Config ──────────────────────────────────────────────────────────────────── CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") with open(CONFIG_PATH) as f: config = json.load(f) FRESHRSS_URL = config["freshrss_url"] FRESHRSS_USER = config["freshrss_user"] FRESHRSS_TOKEN = config["freshrss_api_key"] OLLAMA_URL = config["ollama_url"] OLLAMA_MODEL = config["ollama_model"] YOURLS_URL = config["yourls_url"] YOURLS_USER = config["yourls_user"] YOURLS_PASS = config["yourls_pass"] GRAFANA_URL = config["grafana_url"] GRAFANA_TOKEN = config["grafana_token"] REMOTE_HOST = config["remote_host"] REMOTE_PASS = config["remote_pass"] PRINTER_IP = config["printer_ip"] PRINTER_PORT = int(config.get("printer_port", 9100)) # TM-m30 80mm paper, Font B — column width measured from test print COLS = 57 # ── FreshRSS ────────────────────────────────────────────────────────────────── def fetch_news(): try: resp = requests.post( f"{FRESHRSS_URL}/api/greader.php/accounts/ClientLogin", data={"Email": FRESHRSS_USER, "Passwd": FRESHRSS_TOKEN}, timeout=15 ) if resp.status_code != 200: print(f"[!] Login failed: {resp.status_code}") return [], None auth_token = next( (l.split("=", 1)[1] for l in resp.text.splitlines() if l.startswith("Auth=")), None ) if not auth_token: print("[!] No auth token in login response") return [], None except Exception as e: print(f"[!] Login exception: {e}") return [], None headers = {"Authorization": f"GoogleLogin auth={auth_token}"} try: r = requests.get( f"{FRESHRSS_URL}/api/greader.php/reader/api/0/stream/contents/user/-/state/com.google/reading-list", params={"n": 100, "xt": "user/-/state/com.google/read", "output": "json"}, headers=headers, timeout=15 ) if r.status_code != 200: return [], auth_token items = [] for item in r.json().get("items", []): long_id = item.get("id", "") summary = (item.get("summary", {}).get("content", "") or item.get("content", {}).get("content", "")) url = "" if item.get("canonical"): url = item["canonical"][0].get("href", "") if not url and item.get("alternate"): url = item["alternate"][0].get("href", "") if not url: item_id = long_id.split("/")[-1] if "/" in long_id else "" url = f"https://fresh.sethpc.xyz/i/?c=entry&a=read&id={item_id}" items.append({ "id": long_id, "title": item.get("title", ""), "summary": re.sub('<[^<]+?>', '', summary).strip(), "url": url, "source": item.get("origin", {}).get("title", "Unknown"), }) return items, auth_token except Exception as e: print(f"[!] Fetch exception: {e}") return [], auth_token def mark_read(item_ids, auth_token): if not item_ids or not auth_token: return headers = {"Authorization": f"GoogleLogin auth={auth_token}"} try: t = requests.get( f"{FRESHRSS_URL}/api/greader.php/reader/api/0/token", headers=headers, timeout=10 ).text.strip() requests.post( f"{FRESHRSS_URL}/api/greader.php/reader/api/0/edit-tag", headers=headers, data={"i": item_ids, "a": "user/-/state/com.google/read", "T": t}, timeout=10 ) print(f"Marked {len(item_ids)} articles as read.") except Exception as e: print(f"[!] mark_read error: {e}") # ── Ollama ──────────────────────────────────────────────────────────────────── def summarize_news(items, count=5, length="30-50 words"): if not items: return [] print(f"Phase 1: Selecting top {count} from {len(items)} items...") titles_input = "\n".join([f"ID {i} [{item['source']}]: {item['title']}" for i, item in enumerate(items)]) prompt_select = ( f"Select exactly {count} stories for a daily briefing:\n" "1. Priorities: Tech, Science, World News, General, Health.\n" "2. Variety: Max 2 stories per source.\n" "3. Entertainment: Max 2 total.\n" "4. Omit: Lottery, minor crime.\n" "5. Sports: Football (College/NFL) only.\n" f"Format: [0, 5, 12, ...]\n\nHEADLINES:\n{titles_input}" ) selected_indices = [] try: resp = requests.post( f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt_select, "stream": False, "options": {"num_ctx": 16384, "temperature": 0.1}, "format": "json"}, timeout=300 ).json() data = json.loads(resp.get("response", "[]")) selected_indices = data if isinstance(data, list) else list(data.values())[0] except Exception as e: print(f"[!] Selection error: {e}") selected_indices = list(range(min(count, len(items)))) candidates = [] source_counts = {} for idx in selected_indices: if isinstance(idx, int) and 0 <= idx < len(items): item = items[idx] src = item["source"] if source_counts.get(src, 0) < 2: candidates.append(item) source_counts[src] = source_counts.get(src, 0) + 1 while len(candidates) < count: for item in items: if item not in candidates and source_counts.get(item["source"], 0) < 2: candidates.append(item) source_counts[item["source"]] = source_counts.get(item["source"], 0) + 1 if len(candidates) == count: break break print(f"Phase 2: Summarizing {len(candidates)} items ({length})...") content_input = "".join([ f"ARTICLE {i} ({item['title']}):\n{item['summary'][:2000]}\n\n" for i, item in enumerate(candidates) ]) prompt_summary = ( "You are a professional news editor. Write a daily briefing.\n" f"For EACH article, write a concise summary ({length}).\n" "LANGUAGE RULE: Use English. Translate non-English/Spanish articles.\n" "Return STRICTLY as a JSON list: [{\"id\": 0, \"summary\": \"...\"}, ...]\n\n" f"CONTENT:\n{content_input}" ) try: resp = requests.post( f"{OLLAMA_URL}/api/generate", json={"model": OLLAMA_MODEL, "prompt": prompt_summary, "stream": False, "options": {"num_ctx": 16384, "temperature": 0.3}}, timeout=600 ).json() raw = resp.get("response", "") match = re.search(r'\[.*\]', raw, re.DOTALL) if match: summaries = json.loads(match.group(0)) else: parsed = json.loads(raw) summaries = parsed if isinstance(parsed, list) else list(parsed.values())[0] final = [] for s in summaries: idx = s.get("id") if idx is not None and 0 <= idx < len(candidates): candidates[idx]["ai_summary"] = s.get("summary") final.append(candidates[idx]) return final except Exception as e: print(f"[!] Summarize parse error: {e} — using raw summaries") for c in candidates: c["ai_summary"] = c.get("summary", "")[:200] return candidates # ── Dashboard data ──────────────────────────────────────────────────────────── def get_zfs_status(): try: cmd = (f"sshpass -p '{REMOTE_PASS}' ssh -o StrictHostKeyChecking=no " f"root@{REMOTE_HOST} \"zpool list tank -H -o name,size,alloc,free,cap,health\"") res = os.popen(cmd).read().strip().split() if len(res) >= 6: return {"name": res[0], "size": res[1], "alloc": res[2], "free": res[3], "cap": res[4], "health": res[5]} except: pass return None def get_grafana_metrics(): headers = {"Authorization": f"Bearer {GRAFANA_TOKEN}"} proxmox_hosts = ["192.168.0.173", "192.168.0.112", "192.168.0.197"] metrics_data = {} queries = { "CPU": '100 * (1 - avg by(instance)(irate(node_cpu_seconds_total{mode="idle"}[5m])))', "RAM": '100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)', "Disk": '100 - ((node_filesystem_avail_bytes{mountpoint="/"} * 100) / node_filesystem_size_bytes{mountpoint="/"})', } try: for n, q in queries.items(): r = requests.get( f"{GRAFANA_URL}/api/datasources/proxy/1/api/v1/query", params={"query": q}, headers=headers, timeout=10 ).json() for res in r.get("data", {}).get("result", []): inst = res.get("metric", {}).get("instance", "").split(":")[0] if inst in proxmox_hosts: metrics_data.setdefault(inst, []).append( f"{n}:{float(res.get('value', [0, 0])[1]):.1f}%" ) return [f"H{h.split('.')[-1]}:{''.join(s)}" for h, s in metrics_data.items()] except: return [] def get_weather(): try: r = requests.get( "https://api.open-meteo.com/v1/forecast" "?latitude=38.8421&longitude=-77.2683" "&daily=weathercode,temperature_2m_max,temperature_2m_min,precipitation_probability_max" "&temperature_unit=fahrenheit&timezone=auto&forecast_days=4", timeout=15 ).json() daily = r.get("daily", {}) wmo = { 0: "Sun", 1: "Sun", 2: "PCld", 3: "Ovcst", 45: "Fog", 48: "Fog", 51: "Driz", 53: "Driz", 55: "Driz", 61: "Rain", 63: "Rain", 65: "HvRain", 71: "Snow", 73: "Snow", 75: "HvSnow", 77: "Sleet", 80: "Shwr", 81: "Shwr", 82: "HvShwr", 85: "SnShwr", 86: "HvSnShwr", 95: "Strm", 96: "Strm", 99: "Strm", } parts = [] for i in range(len(daily.get("time", []))): d = datetime.strptime(daily["time"][i], "%Y-%m-%d").strftime("%a") cond = wmo.get(daily["weathercode"][i], "Unk") hi = daily["temperature_2m_max"][i] lo = daily["temperature_2m_min"][i] pop = daily["precipitation_probability_max"][i] parts.append(f"{d}:{cond} {hi:.0f}/{lo:.0f}({pop}%)") return "|".join(parts) except Exception as e: print(f"[!] Weather error: {e}") return None def get_financial_snapshot(): tickers = {"SPY": "S&P", "QQQ": "NAS", "BTC-USD": "BTC", "ETH-USD": "ETH", "GOOG": "GOOG"} snap = [] try: data = yf.download(list(tickers.keys()), period="5d", progress=False) for t, n in tickers.items(): try: snap.append(f"{n}:{data['Close'][t].dropna().iloc[-1]:.0f}") except: pass return "|".join(snap) except: return None def get_uptime_kuma_status(): try: res = os.popen( "pct exec 147 -- sqlite3 /app/uptime-kuma/data/kuma.db " "\"SELECT name FROM monitor WHERE active = 1 AND id IN " "(SELECT monitor_id FROM heartbeat WHERE id IN " "(SELECT MAX(id) FROM heartbeat GROUP BY monitor_id) AND status = 0);\"" ).read().strip() if res: return res.replace("\n", ", ") except: pass return None def get_reddit_top(): try: r = requests.get( "https://www.reddit.com/top.json?limit=1", headers={"User-Agent": "SethPC/1.0"}, timeout=10 ).json() top = r["data"]["children"][0]["data"] return { "title": top["title"], "subreddit": top["subreddit_name_prefixed"], "score": top["score"], "url": f"https://www.reddit.com{top['permalink']}", } except: return None # ── YOURLS ──────────────────────────────────────────────────────────────────── def shorten_url(long_url): if not long_url: return "" import random, string keyword = "r" + "".join(random.choices(string.ascii_lowercase + string.digits, k=3)) params = { "username": YOURLS_USER, "password": YOURLS_PASS, "action": "shorturl", "url": long_url, "keyword": keyword, "format": "json" } try: data = requests.get(YOURLS_URL, params=params, timeout=5).json() if "shorturl" in data: return data["shorturl"] if "url" in data and "shorturl" in data["url"]: return data["url"]["shorturl"] if data.get("code") == "error:keyword": params.pop("keyword") data = requests.get(YOURLS_URL, params=params, timeout=5).json() if "shorturl" in data: return data["shorturl"] if "url" in data and "shorturl" in data["url"]: return data["url"]["shorturl"] except Exception as e: print(f"[!] YOURLS error: {e}") return long_url # ── ESC/POS receipt builder ─────────────────────────────────────────────────── def build_receipt(articles, zfs, reddit, grafana, weather, finance, kuma): from escpos.printer import Dummy as EscPosDummy p = EscPosDummy(profile="default") def s(**kwargs): p.set(font='b', **kwargs) # Header s(align='center', bold=True) p.text("Daily Briefing\n") s(align='center', bold=False) p.text(datetime.now().strftime('%A, %B %d %I:%M %p') + "\n") p.text("=" * COLS + "\n") # Dashboard s(align='left', bold=True) p.text("SYSTEM & WEATHER\n") s(align='left', bold=False) if weather: p.text(f"Wx: {weather}\n") if kuma: s(align='left', bold=True) p.text(f"ALERT: {kuma}\n") s(align='left', bold=False) if zfs: p.text(f"ZFS: {zfs['alloc']}/{zfs['size']} ({zfs['cap']}) {zfs['health']}\n") if grafana: p.text(" | ".join(grafana) + "\n") if finance: p.text(f"Mkts: {finance}\n") p.text("-" * COLS + "\n") # Reddit if reddit: s(align='left', bold=True) p.text(f"REDDIT: {reddit.get('subreddit', '')}\n") s(align='left', bold=False) p.text(reddit['title'] + "\n") url_s = shorten_url(reddit.get('url', '')) if url_s: s(align='center', bold=False) p.qr(url_s, native=True, size=3) s(align='left', bold=False) p.text(f"{url_s} " + "-" * max(0, COLS - len(url_s) - 1) + "\n") else: p.text("-" * COLS + "\n") # Articles for i, item in enumerate(articles, 1): s(align='left', bold=True) p.text(f"{i}. [{item.get('source', '')}] {item.get('title', '')}\n") s(align='left', bold=False) summary = item.get('ai_summary', item.get('summary', '')) if summary: p.text(summary + "\n") url_s = shorten_url(item.get('url', '')) if url_s: s(align='center', bold=False) p.qr(url_s, native=True, size=3) s(align='left', bold=False) p.text(f"{url_s} " + "-" * max(0, COLS - len(url_s) - 1) + "\n") else: p.text("-" * COLS + "\n") # Footer p.text("\n") s(align='center', bold=False) p.text("-- end --\n\n") p.cut() return p.output def print_receipt(raw_bytes): try: with socket.create_connection((PRINTER_IP, PRINTER_PORT), timeout=10) as sock: sock.sendall(raw_bytes) print(f"✓ Receipt sent to {PRINTER_IP}:{PRINTER_PORT}") return True except Exception as e: print(f"[!] Print failed: {e}") return False # ── Main ────────────────────────────────────────────────────────────────────── def main(): items, token = fetch_news() zfs = get_zfs_status() reddit = get_reddit_top() grafana = get_grafana_metrics() weather = get_weather() finance = get_financial_snapshot() kuma = get_uptime_kuma_status() if not items: print("[!] No unread articles — printing dashboard only.") articles = summarize_news(items, count=5, length="30-50 words") if items else [] raw = build_receipt(articles, zfs, reddit, grafana, weather, finance, kuma) print_receipt(raw) if articles: mark_read([a['id'] for a in articles], token) if __name__ == "__main__": main()