import hmac import json import os import queue import secrets import subprocess import threading import time from collections import defaultdict, deque from typing import Any, Deque, Dict, Optional from urllib.parse import urlparse from flask import Flask, Response, jsonify, render_template, request, session, stream_with_context from waitress import serve from werkzeug.exceptions import RequestEntityTooLarge from app_state import AppState from network_manager import NetworkManager from serial_bridge import SerialBroadcaster from timeline_service import ( TimelineNotFoundError, TimelineService, TimelineStorageError, TimelineValidationError, ) class SlidingWindowRateLimiter: def __init__(self, limit: int, window_seconds: int) -> None: self.limit = int(limit) self.window_seconds = int(window_seconds) self._hits: Dict[str, Deque[float]] = defaultdict(deque) self._lock = threading.Lock() def allow(self, key: str) -> bool: now = time.monotonic() cutoff = now - self.window_seconds with self._lock: bucket = self._hits[key] while bucket and bucket[0] < cutoff: bucket.popleft() if len(bucket) >= self.limit: return False bucket.append(now) return True class WebPortal: def __init__( self, state: AppState, network_manager: NetworkManager, broadcaster: SerialBroadcaster, log_dir: str = "/home/pi", log_prefix: str = "xxx", upload_dir: str = "/home/pi/timeline_uploads", template_folder: str = "../templates", static_folder: str = "../static", ) -> None: self.state = state self.network_manager = network_manager self.broadcaster = broadcaster self._status_refresh_interval_s = 15.0 self._last_status_refresh_mono = 0.0 self._status_lock = threading.Lock() self.timeline_service = TimelineService(log_dir=log_dir, log_prefix=log_prefix, upload_dir=upload_dir) self._upload_limiter = SlidingWindowRateLimiter(limit=10, window_seconds=60) self._download_limiter = SlidingWindowRateLimiter(limit=30, window_seconds=60) self.app = Flask(__name__, template_folder=template_folder, static_folder=static_folder) self.app.secret_key = os.environ.get("SERIAL_WEB_SECRET") or secrets.token_hex(32) self.app.config["SESSION_COOKIE_HTTPONLY"] = True self.app.config["SESSION_COOKIE_SAMESITE"] = "Strict" self.app.config["MAX_CONTENT_LENGTH"] = TimelineService.MAX_UPLOAD_SIZE + (1024 * 1024) self._register_routes() self._register_error_handlers() def _register_error_handlers(self) -> None: @self.app.errorhandler(RequestEntityTooLarge) def too_large(_exc: RequestEntityTooLarge) -> Response: if request.path.startswith("/api/timeline/uploads"): return jsonify({"ok": False, "message": "Upload exceeds size limit"}), 413 return jsonify({"ok": False, "message": "Request too large"}), 413 def _client_ip(self) -> str: if request.access_route: return request.access_route[0] return request.remote_addr or "unknown" def _ensure_csrf_token(self) -> str: token = session.get("csrf_token") if not token: token = secrets.token_urlsafe(32) session["csrf_token"] = token return str(token) def _is_valid_csrf(self) -> bool: expected = session.get("csrf_token") provided = ( request.headers.get("X-CSRF-Token") or request.form.get("csrf_token") or request.args.get("csrf_token") ) if not expected or not provided: return False return hmac.compare_digest(str(expected), str(provided)) def _is_same_origin(self) -> bool: expected_origin = request.host_url.rstrip("/") for header in ("Origin", "Referer"): value = request.headers.get(header, "").strip() if not value: continue try: parsed = urlparse(value) except ValueError: return False if not parsed.scheme or not parsed.netloc: return False origin = f"{parsed.scheme}://{parsed.netloc}".rstrip("/") return origin == expected_origin return False def _guard_secure_endpoint( self, *, require_csrf: bool = True, limiter: Optional[SlidingWindowRateLimiter] = None, ) -> Optional[Response]: if not self._is_same_origin(): return jsonify({"ok": False, "message": "Cross-origin requests are blocked"}), 403 if require_csrf and not self._is_valid_csrf(): return jsonify({"ok": False, "message": "Invalid CSRF token"}), 403 if limiter and not limiter.allow(self._client_ip()): return jsonify({"ok": False, "message": "Too many requests"}), 429 return None def _register_routes(self) -> None: @self.app.context_processor def inject_template_vars() -> Dict[str, Any]: return {"csrf_token": self._ensure_csrf_token()} @self.app.route("/") def index() -> str: return render_template("index.html") @self.app.route("/serial") def serial_page() -> str: return render_template("serial.html") @self.app.route("/timeline") def timeline_page() -> str: return render_template( "timeline.html", max_upload_mib=TimelineService.MAX_UPLOAD_SIZE // (1024 * 1024), max_upload_files=TimelineService.MAX_UPLOAD_FILES, ) @self.app.route("/api/status", methods=["GET"]) def status() -> Response: try: now = time.monotonic() with self._status_lock: if now - self._last_status_refresh_mono >= self._status_refresh_interval_s: self.network_manager.refresh_state() self._last_status_refresh_mono = now return jsonify(self.state.snapshot()) except Exception as exc: self.state.update_status("Status update failed", str(exc)) return jsonify(self.state.snapshot()), 503 @self.app.route("/api/scan", methods=["POST", "GET"]) def scan() -> Response: try: ssids = self.network_manager.scan_networks() return jsonify({"ok": True, "ssids": ssids}) except Exception as exc: self.state.update_status("Scan failed", str(exc)) return jsonify({"ok": False, "ssids": [], "message": str(exc)}), 503 @self.app.route("/api/connect", methods=["POST"]) def connect() -> Response: payload: Dict[str, Any] = request.get_json(silent=True) or {} ssid = (payload.get("ssid") or "").strip() password = payload.get("password") or "" ok, message = self.network_manager.connect_to_wifi(ssid, password) if not ok: self.state.update_status("Connect failed", message) try: self.network_manager.start_ap() except Exception: pass return jsonify({"ok": False, "message": message}), 400 self.network_manager.refresh_state() return jsonify({"ok": True, "message": message}) @self.app.route("/api/system/", methods=["POST"]) def system_action(action: str) -> Response: commands = { "reboot": ["systemctl", "reboot"], "shutdown": ["systemctl", "poweroff"], } cmd = commands.get(action) if cmd is None: return jsonify({"ok": False, "message": "Unknown action"}), 400 self.state.update_status(f"System action requested: {action}", "") def _exec() -> None: try: subprocess.run(cmd, capture_output=True, text=True, timeout=20, check=False) except Exception: pass threading.Thread(target=_exec, daemon=True, name=f"system-{action}").start() return jsonify({"ok": True, "message": f"{action} triggered"}) @self.app.route("/events/serial") def serial_events() -> Response: @stream_with_context def generate(): q = self.broadcaster.subscribe() try: yield "retry: 2000\n\n" while True: try: event = q.get(timeout=15) if isinstance(event, dict): payload = { "line": event.get("line", ""), "ts_iso": event.get("ts_iso", ""), "ts_hms": event.get("ts_hms", ""), "source": event.get("source", "serial"), } else: payload = {"line": str(event), "ts_iso": "", "ts_hms": "", "source": "serial"} data = json.dumps(payload) yield f"data: {data}\n\n" except queue.Empty: yield ": keepalive\n\n" finally: self.broadcaster.unsubscribe(q) return Response(generate(), mimetype="text/event-stream") @self.app.route("/api/timeline/uploads", methods=["GET"]) def list_timeline_uploads() -> Response: uploads = self.timeline_service.list_uploads() return jsonify({"ok": True, "uploads": uploads}) @self.app.route("/api/timeline/uploads", methods=["POST"]) def upload_timeline_csv() -> Response: guard = self._guard_secure_endpoint(limiter=self._upload_limiter) if guard is not None: return guard csv_file = request.files.get("file") if csv_file is None: return jsonify({"ok": False, "message": "Missing file field"}), 400 if not csv_file.filename: return jsonify({"ok": False, "message": "Filename is required"}), 400 raw = csv_file.stream.read(TimelineService.MAX_UPLOAD_SIZE + 1) if len(raw) > TimelineService.MAX_UPLOAD_SIZE: return jsonify({"ok": False, "message": "Upload exceeds size limit"}), 413 try: meta = self.timeline_service.upload_csv(csv_file.filename, raw) except TimelineValidationError as exc: return jsonify({"ok": False, "message": str(exc)}), 400 except TimelineStorageError as exc: return jsonify({"ok": False, "message": str(exc)}), 503 return jsonify({"ok": True, "upload": meta}), 201 @self.app.route("/api/timeline/uploads/", methods=["DELETE"]) def delete_timeline_csv(upload_id: str) -> Response: guard = self._guard_secure_endpoint(limiter=self._upload_limiter) if guard is not None: return guard try: self.timeline_service.delete_upload(upload_id) except TimelineValidationError as exc: return jsonify({"ok": False, "message": str(exc)}), 400 except TimelineNotFoundError: return jsonify({"ok": False, "message": "Upload not found"}), 404 return jsonify({"ok": True}) @self.app.route("/api/timeline", methods=["GET"]) def timeline_data() -> Response: hours_arg = request.args.get("hours", "6") upload_id = request.args.get("upload_id", "") try: hours = int(hours_arg) except ValueError: hours = 6 try: data = self.timeline_service.get_timeline(hours=hours, upload_id=upload_id) except TimelineValidationError as exc: return jsonify({"ok": False, "message": str(exc)}), 400 except TimelineNotFoundError: return jsonify({"ok": False, "message": "Upload not found"}), 404 except TimelineStorageError as exc: return jsonify({"ok": False, "message": str(exc)}), 503 return jsonify({"ok": True, **data}) @self.app.route("/api/timeline/download", methods=["GET"]) def download_timeline() -> Response: guard = self._guard_secure_endpoint(limiter=self._download_limiter) if guard is not None: return guard kind = request.args.get("kind", "serial") upload_id = request.args.get("upload_id", "") hours_arg = request.args.get("hours", "6") try: hours = int(hours_arg) except ValueError: hours = 6 try: filename, payload = self.timeline_service.build_download( kind=kind, hours=hours, upload_id=upload_id, ) except TimelineValidationError as exc: return jsonify({"ok": False, "message": str(exc)}), 400 except TimelineNotFoundError: return jsonify({"ok": False, "message": "Upload not found"}), 404 except TimelineStorageError as exc: return jsonify({"ok": False, "message": str(exc)}), 503 headers = { "Content-Disposition": f'attachment; filename="{filename}"', "Cache-Control": "no-store", "X-Content-Type-Options": "nosniff", } return Response(payload, headers=headers, mimetype="text/csv") def run(self, host: str = "0.0.0.0", port: int = 80) -> None: serve(self.app, host=host, port=port, threads=6)