From b5f34868d19e0935dfa09d9257bdb76147e042f5 Mon Sep 17 00:00:00 2001 From: acidburns Date: Sun, 22 Feb 2026 02:11:31 +0100 Subject: [PATCH] Add secure 6h timeline with CSV merge and timestamped serial events --- README.md | 27 ++ src/main.py | 18 +- src/serial_bridge.py | 35 ++- src/timeline_service.py | 659 ++++++++++++++++++++++++++++++++++++++++ src/webapp.py | 227 +++++++++++++- static/style.css | 98 ++++++ templates/index.html | 3 +- templates/serial.html | 7 +- templates/timeline.html | 264 ++++++++++++++++ 9 files changed, 1315 insertions(+), 23 deletions(-) create mode 100644 src/timeline_service.py create mode 100644 templates/timeline.html diff --git a/README.md b/README.md index 398d11d..7ff10f1 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ Headless Raspberry Pi Zero W project: - WiFi client or fallback AP (`serial` / `serialserial`) - Web portal (`http://192.168.4.1/` in AP mode) - ESP32 USB serial bridge with live SSE stream and daily log files (`/home/pi/xxx_YYYY-MM-DD_HH-MM-SS.log`) +- 6-hour timeline page with CSV upload + merged timeline downloads - Stable symlink to active log (`/home/pi/xxx.log`) - RTC boot restore + NTP sync + RTC write-back - Autostart via systemd @@ -21,12 +22,29 @@ Current implementation status: - Web portal is available on port `80`: - `/` WiFi scan + connect UI + system actions (reboot/shutdown) - `/serial` live serial console (SSE via `/events/serial`) + - `/timeline` last-6-hours split timeline (serial + uploaded CSV data) - `/api/status` polling is reduced/throttled (15s) for Pi Zero W performance - ESP32 serial bridge: - Auto-detects `/dev/ttyUSB*`, `/dev/ttyACM*`, `/dev/serial/by-id/*` - Reconnects automatically on unplug/replug - Daily log rollover at midnight with datetime filename + - Each line written to file is timestamped with full local ISO datetime (including UTC offset) + - SSE payload includes `line`, `ts_iso`, `ts_hms`, `source` - No log file is created while no serial device is connected +- Timeline CSV upload + merge: + - Upload endpoint: `POST /api/timeline/uploads` (multipart field `file`) + - Timeline data endpoint: `GET /api/timeline?hours=6&upload_id=` + - Download endpoint: `GET /api/timeline/download?kind=serial|merged&hours=6&upload_id=&csrf_token=` + - Upload persistence: `/home/pi/timeline_uploads/.csv` + sidecar metadata `/home/pi/timeline_uploads/.json` + - CSV parsing supports auto-detected timestamp columns (`ts_utc`, `timestamp`, `ts`, `unix`, `epoch`, `datetime`, `ts_local`, `ts_hms_local`, `time`) + - Timestamp parsing supports epoch seconds/milliseconds, ISO datetime strings, and `HH:MM:SS` with date inferred from filename (`YYYY-MM-DD`) or upload date + - Downloads include CSV formula-injection hardening (`=`, `+`, `-`, `@` prefixed with `'`) +- Upload/download hardening: + - Same-origin checks required for upload/delete/download timeline endpoints + - CSRF token required for upload/delete/download timeline endpoints + - In-memory rate limiting for upload/delete/download endpoints + - Strict upload ID validation and fixed server-side storage paths + - Upload caps: `10 MiB` per file, `20` files max, `200 MiB` total, `250000` CSV rows max, `64` columns max, `4096` chars per cell max - Once internet is available, NTP sync runs and writes corrected time back to RTC (`hwclock -w`). - After boot is ready, power/activity LED is set to 1 Hz blink (`timer`, 500ms on / 500ms off), if LED sysfs control is available. @@ -39,9 +57,18 @@ systemctl status serial-bridge journalctl -u serial-bridge -f ip a show wlan0 ls -l /home/pi/xxx.log /home/pi/xxx_*.log +ls -l /home/pi/timeline_uploads sudo hwclock -r ``` +Optional environment variables: +```bash +SERIAL_LOG_DIR=/home/pi +SERIAL_LOG_PREFIX=xxx +TIMELINE_UPLOAD_DIR=/home/pi/timeline_uploads +SERIAL_WEB_SECRET= +``` + ## RTC GPIO Wiring (Raspberry Pi Zero W) Use I2C1 pins on the 40-pin header: diff --git a/src/main.py b/src/main.py index 6daae85..0f468e6 100644 --- a/src/main.py +++ b/src/main.py @@ -105,11 +105,18 @@ def configure_logging() -> None: def main() -> None: configure_logging() state = AppState() + log_dir = os.environ.get("SERIAL_LOG_DIR", "/home/pi") + log_prefix = os.environ.get("SERIAL_LOG_PREFIX", "xxx") + upload_dir = os.environ.get("TIMELINE_UPLOAD_DIR", os.path.join(log_dir, "timeline_uploads")) nm = NetworkManager(state=state) rtc = RTCAndNTPManager(state=state) broadcaster = SerialBroadcaster() - bridge = SerialBridge(broadcaster=broadcaster) + bridge = SerialBridge( + broadcaster=broadcaster, + log_dir=log_dir, + log_prefix=log_prefix, + ) supervisor = Supervisor(state=state, nm=nm, rtc=rtc) state.update_status("Initializing", "") @@ -151,7 +158,14 @@ def main() -> None: signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) - web = WebPortal(state=state, network_manager=nm, broadcaster=broadcaster) + web = WebPortal( + state=state, + network_manager=nm, + broadcaster=broadcaster, + log_dir=log_dir, + log_prefix=log_prefix, + upload_dir=upload_dir, + ) try: web_thread = threading.Thread( diff --git a/src/serial_bridge.py b/src/serial_bridge.py index a6790e3..aa44e46 100644 --- a/src/serial_bridge.py +++ b/src/serial_bridge.py @@ -4,7 +4,7 @@ import queue import threading import time from datetime import datetime, timedelta -from typing import List, Optional +from typing import Dict, List, Optional import serial @@ -25,20 +25,20 @@ class SerialBroadcaster: if q in self._subscribers: self._subscribers.remove(q) - def publish(self, line: str) -> None: + def publish(self, event: Dict[str, str]) -> None: with self._lock: subscribers = list(self._subscribers) for q in subscribers: try: - q.put_nowait(line) + q.put_nowait(event) except queue.Full: try: q.get_nowait() except queue.Empty: pass try: - q.put_nowait(line) + q.put_nowait(event) except queue.Full: pass @@ -81,10 +81,10 @@ class SerialBridge(threading.Thread): def _open_serial(self, device: str) -> bool: try: self._serial = serial.Serial(device, self.baudrate, timeout=1) - self.broadcaster.publish(f"[bridge] connected: {device} @ {self.baudrate}") + self.broadcaster.publish(self._build_event(f"[bridge] connected: {device} @ {self.baudrate}")) return True except Exception as exc: - self.broadcaster.publish(f"[bridge] open failed ({device}): {exc}") + self.broadcaster.publish(self._build_event(f"[bridge] open failed ({device}): {exc}")) self._serial = None return False @@ -97,7 +97,7 @@ class SerialBridge(threading.Thread): self._serial = None def _current_time(self) -> datetime: - return datetime.now() + return datetime.now().astimezone() def _next_midnight_epoch(self, now: datetime) -> float: next_midnight = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) @@ -150,12 +150,20 @@ class SerialBridge(threading.Thread): self._close_log() self._open_log() - def _write_line(self, line: str) -> None: + def _build_event(self, line: str, ts: Optional[datetime] = None) -> Dict[str, str]: + moment = ts.astimezone() if ts else self._current_time() + return { + "line": line, + "ts_iso": moment.isoformat(timespec="seconds"), + "ts_hms": moment.strftime("%H:%M:%S"), + "source": "serial", + } + + def _write_line(self, ts_iso: str, line: str) -> None: if self._log_file is None: self._open_log() self._rotate_log_if_needed() - ts = self._current_time().strftime("%Y-%m-%dT%H:%M:%S") - self._log_file.write(f"{ts} {line}\n") + self._log_file.write(f"{ts_iso} {line}\n") def run(self) -> None: try: @@ -174,10 +182,11 @@ class SerialBridge(threading.Thread): if not raw: continue text = raw.decode("utf-8", errors="replace").rstrip("\r\n") - self._write_line(text) - self.broadcaster.publish(text) + event = self._build_event(text) + self._write_line(event["ts_iso"], text) + self.broadcaster.publish(event) except Exception as exc: - self.broadcaster.publish(f"[bridge] disconnected: {exc}") + self.broadcaster.publish(self._build_event(f"[bridge] disconnected: {exc}")) self._close_serial() self._close_log() time.sleep(2) diff --git a/src/timeline_service.py b/src/timeline_service.py new file mode 100644 index 0000000..645cc5e --- /dev/null +++ b/src/timeline_service.py @@ -0,0 +1,659 @@ +import csv +import glob +import io +import json +import os +import re +import secrets +import threading +from datetime import date, datetime, time as dt_time, timedelta, timezone +from typing import Any, Dict, List, Optional, Sequence, Tuple + + +class TimelineServiceError(Exception): + pass + + +class TimelineValidationError(TimelineServiceError): + pass + + +class TimelineNotFoundError(TimelineServiceError): + pass + + +class TimelineStorageError(TimelineServiceError): + pass + + +class TimelineService: + MAX_UPLOAD_SIZE = 10 * 1024 * 1024 + MAX_UPLOAD_FILES = 20 + MAX_UPLOAD_TOTAL = 200 * 1024 * 1024 + MAX_CSV_ROWS = 250000 + MAX_CSV_COLUMNS = 64 + MAX_CELL_CHARS = 4096 + MAX_TIMELINE_HOURS = 6 + TIMESTAMP_COLUMN_CANDIDATES: Sequence[str] = ( + "ts_utc", + "timestamp", + "ts", + "unix", + "epoch", + "datetime", + "ts_local", + "ts_hms_local", + "time", + ) + + _UPLOAD_ID_RE = re.compile(r"^[a-f0-9]{32}$") + _DATE_IN_NAME_RE = re.compile(r"(\d{4}-\d{2}-\d{2})") + _TIME_ONLY_RE = re.compile(r"^(\d{2}):(\d{2}):(\d{2})$") + + def __init__( + self, + log_dir: str = "/home/pi", + log_prefix: str = "xxx", + upload_dir: str = "/home/pi/timeline_uploads", + ) -> None: + self.log_dir = log_dir + self.log_prefix = log_prefix + self.upload_dir = upload_dir + self._lock = threading.Lock() + self._cache: Dict[str, Dict[str, Any]] = {} + + os.makedirs(self.upload_dir, exist_ok=True) + self._enforce_upload_limits() + + def upload_csv(self, filename: str, raw_data: bytes) -> Dict[str, Any]: + safe_name = os.path.basename((filename or "").strip()) or "upload.csv" + if not safe_name.lower().endswith(".csv"): + raise TimelineValidationError("Only .csv files are allowed") + + if len(raw_data) > self.MAX_UPLOAD_SIZE: + raise TimelineValidationError( + f"File is too large (max {self.MAX_UPLOAD_SIZE // (1024 * 1024)} MiB)" + ) + + text = self._decode_csv_bytes(raw_data) + header, time_column, has_parseable_ts = self._inspect_csv(text, safe_name) + if not has_parseable_ts: + raise TimelineValidationError("CSV has no parseable timestamp values") + + inferred_date = self._extract_date_from_name(safe_name) + uploaded_dt = self._now_local() + upload_id = secrets.token_hex(16) + + metadata = { + "id": upload_id, + "filename": safe_name, + "size": len(raw_data), + "uploaded_at": uploaded_dt.isoformat(timespec="seconds"), + "uploaded_epoch": uploaded_dt.timestamp(), + "time_column": time_column, + "header": header, + "inferred_date": inferred_date.isoformat() if inferred_date else "", + } + + with self._lock: + self._reserve_capacity(len(raw_data)) + csv_path = self._csv_path(upload_id) + meta_path = self._meta_path(upload_id) + + self._write_bytes_atomic(csv_path, raw_data) + self._write_json_atomic(meta_path, metadata) + self._cache.pop(upload_id, None) + self._enforce_upload_limits() + + return self._public_metadata(metadata) + + def list_uploads(self) -> List[Dict[str, Any]]: + metas = self._load_all_metadata() + metas.sort(key=lambda item: float(item.get("uploaded_epoch", 0.0)), reverse=True) + return [self._public_metadata(meta) for meta in metas] + + def delete_upload(self, upload_id: str) -> None: + upload_id = self._validated_upload_id(upload_id) + csv_path = self._csv_path(upload_id) + meta_path = self._meta_path(upload_id) + + removed_any = False + with self._lock: + for path in (csv_path, meta_path): + if os.path.exists(path): + os.remove(path) + removed_any = True + self._cache.pop(upload_id, None) + + if not removed_any: + raise TimelineNotFoundError("Upload not found") + + def get_timeline(self, hours: int = 6, upload_id: str = "") -> Dict[str, Any]: + hours = self._clamp_hours(hours) + now = self._now_local() + start = now - timedelta(hours=hours) + + serial_events = self._load_serial_events(start=start, end=now) + csv_events: List[Dict[str, Any]] = [] + csv_columns: List[str] = [] + upload_meta: Optional[Dict[str, Any]] = None + + selected_id = (upload_id or "").strip() + if selected_id: + upload_meta = self._load_metadata_by_id(selected_id) + if upload_meta is None: + raise TimelineNotFoundError("Upload not found") + csv_columns, all_csv_events = self._load_csv_events(upload_meta) + csv_events = [evt for evt in all_csv_events if start.timestamp() <= evt["ts_epoch"] <= now.timestamp()] + + return { + "hours": hours, + "window_start": start.isoformat(timespec="seconds"), + "window_end": now.isoformat(timespec="seconds"), + "serial": [self._event_for_api(evt) for evt in serial_events], + "csv": [self._event_for_api(evt) for evt in csv_events], + "csv_columns": csv_columns, + "selected_upload": self._public_metadata(upload_meta) if upload_meta else None, + "counts": {"serial": len(serial_events), "csv": len(csv_events)}, + } + + def build_download(self, kind: str, hours: int = 6, upload_id: str = "") -> Tuple[str, bytes]: + kind = (kind or "").strip().lower() + if kind not in {"serial", "merged"}: + raise TimelineValidationError("Invalid download kind") + + hours = self._clamp_hours(hours) + now = self._now_local() + start = now - timedelta(hours=hours) + + serial_events = self._load_serial_events(start=start, end=now) + csv_columns: List[str] = [] + csv_events: List[Dict[str, Any]] = [] + selected_id = (upload_id or "").strip() + if selected_id: + upload_meta = self._load_metadata_by_id(selected_id) + if upload_meta is None: + raise TimelineNotFoundError("Upload not found") + csv_columns, parsed = self._load_csv_events(upload_meta) + csv_events = [evt for evt in parsed if start.timestamp() <= evt["ts_epoch"] <= now.timestamp()] + + if kind == "serial": + header = ["ts_iso_local", "ts_hms", "source", "serial_line"] + rows = [ + [evt["ts_iso_local"], evt["ts_hms"], evt["source"], evt.get("serial_line", "")] + for evt in serial_events + ] + else: + header = ["ts_iso_local", "ts_hms", "source", "serial_line", *csv_columns] + rows = [] + merged = self._merge_events(serial_events, csv_events) + for evt in merged: + row = [ + evt["ts_iso_local"], + evt["ts_hms"], + evt["source"], + evt.get("serial_line", ""), + ] + values = evt.get("csv_values", {}) + for col in csv_columns: + row.append(values.get(col, "")) + rows.append(row) + + csv_bytes = self._encode_export_csv(header, rows) + stamp = now.strftime("%Y%m%d_%H%M%S") + name = f"timeline_{kind}_{stamp}.csv" + return name, csv_bytes + + def _event_for_api(self, event: Dict[str, Any]) -> Dict[str, Any]: + return { + "ts_iso": event["ts_iso_local"], + "ts_hms": event["ts_hms"], + "source": event["source"], + "line": event.get("serial_line", ""), + "csv_values": event.get("csv_values", {}), + } + + def _merge_events( + self, serial_events: List[Dict[str, Any]], csv_events: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + merged = list(serial_events) + list(csv_events) + merged.sort( + key=lambda evt: ( + float(evt["ts_epoch"]), + 0 if evt.get("source") == "serial" else 1, + evt.get("serial_line", ""), + ) + ) + return merged + + def _decode_csv_bytes(self, raw: bytes) -> str: + try: + return raw.decode("utf-8-sig") + except UnicodeDecodeError as exc: + raise TimelineValidationError(f"CSV must be UTF-8 encoded: {exc}") from exc + + def _inspect_csv(self, text: str, filename: str) -> Tuple[List[str], str, bool]: + reader = csv.reader(io.StringIO(text)) + try: + header_row = next(reader) + except StopIteration as exc: + raise TimelineValidationError("CSV file is empty") from exc + + header = [self._trim_header_cell(cell) for cell in header_row] + if not header: + raise TimelineValidationError("CSV header is empty") + if len(header) > self.MAX_CSV_COLUMNS: + raise TimelineValidationError(f"CSV has too many columns (max {self.MAX_CSV_COLUMNS})") + + time_column = self._detect_time_column(header) + if not time_column: + raise TimelineValidationError("No supported timestamp column found") + + time_idx = header.index(time_column) + fallback_date = self._extract_date_from_name(filename) or self._now_local().date() + + row_count = 0 + parseable_found = False + for row in reader: + row_count += 1 + if row_count > self.MAX_CSV_ROWS: + raise TimelineValidationError(f"CSV has too many rows (max {self.MAX_CSV_ROWS})") + if len(row) > self.MAX_CSV_COLUMNS: + raise TimelineValidationError(f"CSV row has too many columns (max {self.MAX_CSV_COLUMNS})") + for cell in row: + if len(cell) > self.MAX_CELL_CHARS: + raise TimelineValidationError( + f"CSV cell exceeds max length ({self.MAX_CELL_CHARS} characters)" + ) + + if time_idx >= len(row): + continue + if self._parse_csv_timestamp(row[time_idx], fallback_date) is not None: + parseable_found = True + + if row_count == 0: + raise TimelineValidationError("CSV has no data rows") + + return header, time_column, parseable_found + + def _detect_time_column(self, header: Sequence[str]) -> str: + mapping = {col.strip().lower(): idx for idx, col in enumerate(header)} + for candidate in self.TIMESTAMP_COLUMN_CANDIDATES: + if candidate in mapping: + return header[mapping[candidate]] + return "" + + def _load_serial_events(self, start: datetime, end: datetime) -> List[Dict[str, Any]]: + paths = self._candidate_log_paths(start) + events: List[Dict[str, Any]] = [] + start_epoch = start.timestamp() + end_epoch = end.timestamp() + + for path in paths: + try: + with open(path, "r", encoding="utf-8", errors="replace") as f: + for raw_line in f: + line = raw_line.rstrip("\r\n") + if not line: + continue + first_space = line.find(" ") + if first_space <= 0: + continue + token = line[:first_space] + payload = line[first_space + 1 :] + ts_dt = self._parse_log_timestamp(token) + if ts_dt is None: + continue + ts_epoch = ts_dt.timestamp() + if ts_epoch < start_epoch or ts_epoch > end_epoch: + continue + events.append( + { + "ts_epoch": ts_epoch, + "ts_iso_local": ts_dt.isoformat(timespec="seconds"), + "ts_hms": ts_dt.strftime("%H:%M:%S"), + "source": "serial", + "serial_line": payload, + } + ) + except OSError: + continue + + events.sort(key=lambda evt: (float(evt["ts_epoch"]), evt.get("serial_line", ""))) + return events + + def _candidate_log_paths(self, start: datetime) -> List[str]: + pattern = os.path.join(self.log_dir, f"{self.log_prefix}_*.log") + paths = [] + cutoff = start.timestamp() - 36 * 3600 + for path in glob.glob(pattern): + try: + if os.path.getmtime(path) < cutoff: + continue + except OSError: + continue + paths.append(path) + paths.sort() + return paths + + def _parse_log_timestamp(self, token: str) -> Optional[datetime]: + token = (token or "").strip() + if not token: + return None + raw = token.replace("Z", "+00:00") + try: + parsed = datetime.fromisoformat(raw) + except ValueError: + return None + if parsed.tzinfo is None: + return parsed.replace(tzinfo=self._local_tz()) + return parsed.astimezone(self._local_tz()) + + def _load_csv_events(self, metadata: Dict[str, Any]) -> Tuple[List[str], List[Dict[str, Any]]]: + upload_id = metadata["id"] + with self._lock: + cached = self._cache.get(upload_id) + if cached: + return list(cached["columns"]), list(cached["events"]) + + csv_path = self._csv_path(upload_id) + if not os.path.exists(csv_path): + raise TimelineNotFoundError("Upload file is missing") + + fallback_date = self._metadata_fallback_date(metadata) + expected_time_column = metadata.get("time_column", "") + events: List[Dict[str, Any]] = [] + columns: List[str] = [] + parseable_count = 0 + + try: + with open(csv_path, "r", encoding="utf-8-sig", errors="strict", newline="") as f: + reader = csv.reader(f) + try: + raw_header = next(reader) + except StopIteration as exc: + raise TimelineValidationError("CSV file is empty") from exc + + columns = [self._trim_header_cell(cell) for cell in raw_header] + if len(columns) > self.MAX_CSV_COLUMNS: + raise TimelineValidationError(f"CSV has too many columns (max {self.MAX_CSV_COLUMNS})") + time_column = expected_time_column or self._detect_time_column(columns) + if not time_column: + raise TimelineValidationError("No supported timestamp column found") + if time_column not in columns: + raise TimelineValidationError("Configured timestamp column is missing") + time_idx = columns.index(time_column) + + for idx, row in enumerate(reader, start=1): + if idx > self.MAX_CSV_ROWS: + raise TimelineValidationError(f"CSV has too many rows (max {self.MAX_CSV_ROWS})") + if len(row) > self.MAX_CSV_COLUMNS: + raise TimelineValidationError( + f"CSV row has too many columns (max {self.MAX_CSV_COLUMNS})" + ) + + values: Dict[str, str] = {} + for col_idx, col in enumerate(columns): + val = row[col_idx] if col_idx < len(row) else "" + if len(val) > self.MAX_CELL_CHARS: + raise TimelineValidationError( + f"CSV cell exceeds max length ({self.MAX_CELL_CHARS} characters)" + ) + values[col] = val + + ts_raw = values.get(time_column, "") + parsed_ts = self._parse_csv_timestamp(ts_raw, fallback_date) + if parsed_ts is None: + continue + + parseable_count += 1 + events.append( + { + "ts_epoch": parsed_ts.timestamp(), + "ts_iso_local": parsed_ts.isoformat(timespec="seconds"), + "ts_hms": parsed_ts.strftime("%H:%M:%S"), + "source": "csv", + "csv_values": values, + } + ) + except OSError as exc: + raise TimelineStorageError(str(exc)) from exc + except UnicodeDecodeError as exc: + raise TimelineValidationError(f"CSV must be UTF-8 encoded: {exc}") from exc + + if parseable_count == 0: + raise TimelineValidationError("CSV has no parseable timestamp values") + + events.sort(key=lambda evt: float(evt["ts_epoch"])) + with self._lock: + self._cache[upload_id] = {"columns": list(columns), "events": list(events)} + return columns, events + + def _metadata_fallback_date(self, metadata: Dict[str, Any]) -> date: + value = (metadata.get("inferred_date") or "").strip() + if value: + try: + return date.fromisoformat(value) + except ValueError: + pass + + uploaded = metadata.get("uploaded_at", "") + if uploaded: + try: + parsed = datetime.fromisoformat(uploaded) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=self._local_tz()) + return parsed.astimezone(self._local_tz()).date() + except ValueError: + pass + return self._now_local().date() + + def _parse_csv_timestamp(self, raw_value: str, fallback_date: date) -> Optional[datetime]: + value = (raw_value or "").strip() + if not value: + return None + + numeric = self._try_parse_numeric_timestamp(value) + if numeric is not None: + return numeric + + iso = self._try_parse_iso_timestamp(value) + if iso is not None: + return iso + + match = self._TIME_ONLY_RE.match(value) + if match: + hour = int(match.group(1)) + minute = int(match.group(2)) + second = int(match.group(3)) + local = datetime.combine( + fallback_date, + dt_time(hour=hour, minute=minute, second=second), + tzinfo=self._local_tz(), + ) + return local.astimezone(self._local_tz()) + return None + + def _try_parse_numeric_timestamp(self, value: str) -> Optional[datetime]: + try: + number = float(value) + except ValueError: + return None + + if abs(number) >= 1_000_000_000_000: + number /= 1000.0 + try: + return datetime.fromtimestamp(number, tz=timezone.utc).astimezone(self._local_tz()) + except (OverflowError, OSError, ValueError): + return None + + def _try_parse_iso_timestamp(self, value: str) -> Optional[datetime]: + cleaned = value.replace("Z", "+00:00") + try: + parsed = datetime.fromisoformat(cleaned) + except ValueError: + return None + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=self._local_tz()) + return parsed.astimezone(self._local_tz()) + + def _encode_export_csv(self, header: Sequence[str], rows: Sequence[Sequence[Any]]) -> bytes: + buffer = io.StringIO() + writer = csv.writer(buffer, lineterminator="\n") + writer.writerow([self._sanitize_export_cell(col) for col in header]) + for row in rows: + writer.writerow([self._sanitize_export_cell(cell) for cell in row]) + return buffer.getvalue().encode("utf-8") + + def _sanitize_export_cell(self, value: Any) -> str: + text = str(value if value is not None else "") + if text and text[0] in {"=", "+", "-", "@"}: + return "'" + text + return text + + def _reserve_capacity(self, new_size: int) -> None: + if new_size > self.MAX_UPLOAD_TOTAL: + raise TimelineValidationError("Upload would exceed total storage limit") + + self._enforce_upload_limits() + metas = self._load_all_metadata() + metas.sort(key=lambda item: float(item.get("uploaded_epoch", 0.0))) + + total = sum(int(meta.get("size", 0)) for meta in metas) + while metas and ( + len(metas) >= self.MAX_UPLOAD_FILES or total + new_size > self.MAX_UPLOAD_TOTAL + ): + oldest = metas.pop(0) + self._delete_upload_files(oldest.get("id", "")) + total = sum(int(meta.get("size", 0)) for meta in metas) + + if total + new_size > self.MAX_UPLOAD_TOTAL: + raise TimelineValidationError("Upload would exceed total storage limit") + + def _enforce_upload_limits(self) -> None: + metas = self._load_all_metadata() + metas.sort(key=lambda item: float(item.get("uploaded_epoch", 0.0))) + total = sum(int(meta.get("size", 0)) for meta in metas) + + while metas and (len(metas) > self.MAX_UPLOAD_FILES or total > self.MAX_UPLOAD_TOTAL): + oldest = metas.pop(0) + self._delete_upload_files(oldest.get("id", "")) + total = sum(int(meta.get("size", 0)) for meta in metas) + + def _load_all_metadata(self) -> List[Dict[str, Any]]: + metas: List[Dict[str, Any]] = [] + pattern = os.path.join(self.upload_dir, "*.json") + for path in glob.glob(pattern): + try: + with open(path, "r", encoding="utf-8") as f: + meta = json.load(f) + except (OSError, json.JSONDecodeError): + continue + + upload_id = str(meta.get("id", "")) + if not self._UPLOAD_ID_RE.match(upload_id): + continue + if not os.path.exists(self._csv_path(upload_id)): + continue + metas.append(meta) + return metas + + def _load_metadata_by_id(self, upload_id: str) -> Optional[Dict[str, Any]]: + upload_id = self._validated_upload_id(upload_id) + meta_path = self._meta_path(upload_id) + csv_path = self._csv_path(upload_id) + if not (os.path.exists(meta_path) and os.path.exists(csv_path)): + return None + try: + with open(meta_path, "r", encoding="utf-8") as f: + meta = json.load(f) + except (OSError, json.JSONDecodeError): + return None + if str(meta.get("id", "")) != upload_id: + return None + return meta + + def _delete_upload_files(self, upload_id: str) -> None: + if not upload_id or not self._UPLOAD_ID_RE.match(upload_id): + return + csv_path = self._csv_path(upload_id) + meta_path = self._meta_path(upload_id) + for path in (csv_path, meta_path): + if os.path.exists(path): + try: + os.remove(path) + except OSError: + pass + self._cache.pop(upload_id, None) + + def _public_metadata(self, meta: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + if meta is None: + return None + return { + "id": str(meta.get("id", "")), + "filename": str(meta.get("filename", "")), + "size": int(meta.get("size", 0)), + "uploaded_at": str(meta.get("uploaded_at", "")), + "time_column": str(meta.get("time_column", "")), + "row_limit": self.MAX_CSV_ROWS, + } + + def _write_bytes_atomic(self, path: str, raw: bytes) -> None: + temp_path = f"{path}.tmp-{secrets.token_hex(4)}" + with open(temp_path, "xb") as f: + f.write(raw) + os.replace(temp_path, path) + + def _write_json_atomic(self, path: str, data: Dict[str, Any]) -> None: + temp_path = f"{path}.tmp-{secrets.token_hex(4)}" + with open(temp_path, "x", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=True) + os.replace(temp_path, path) + + def _meta_path(self, upload_id: str) -> str: + return os.path.join(self.upload_dir, f"{upload_id}.json") + + def _csv_path(self, upload_id: str) -> str: + return os.path.join(self.upload_dir, f"{upload_id}.csv") + + def _validated_upload_id(self, upload_id: str) -> str: + upload_id = (upload_id or "").strip().lower() + if not self._UPLOAD_ID_RE.match(upload_id): + raise TimelineValidationError("Invalid upload identifier") + return upload_id + + def _trim_header_cell(self, cell: str) -> str: + value = (cell or "").strip() + if not value: + return "" + if len(value) > self.MAX_CELL_CHARS: + raise TimelineValidationError( + f"CSV header cell exceeds max length ({self.MAX_CELL_CHARS} characters)" + ) + return value + + def _extract_date_from_name(self, filename: str) -> Optional[date]: + match = self._DATE_IN_NAME_RE.search(filename or "") + if not match: + return None + try: + return date.fromisoformat(match.group(1)) + except ValueError: + return None + + def _clamp_hours(self, hours: int) -> int: + try: + value = int(hours) + except (TypeError, ValueError): + value = 6 + if value < 1: + value = 1 + if value > self.MAX_TIMELINE_HOURS: + value = self.MAX_TIMELINE_HOURS + return value + + def _now_local(self) -> datetime: + return datetime.now().astimezone() + + def _local_tz(self): + return self._now_local().tzinfo diff --git a/src/webapp.py b/src/webapp.py index 366d91b..af23a1a 100644 --- a/src/webapp.py +++ b/src/webapp.py @@ -1,16 +1,48 @@ +import hmac import json +import os import queue +import secrets import subprocess import threading import time -from typing import Any, Dict +from collections import defaultdict, deque +from typing import Any, Deque, Dict, Optional +from urllib.parse import urlparse -from flask import Flask, Response, jsonify, render_template, request, stream_with_context +from flask import Flask, Response, jsonify, render_template, request, session, stream_with_context from waitress import serve +from werkzeug.exceptions import RequestEntityTooLarge from app_state import AppState from network_manager import NetworkManager from serial_bridge import SerialBroadcaster +from timeline_service import ( + TimelineNotFoundError, + TimelineService, + TimelineStorageError, + TimelineValidationError, +) + + +class SlidingWindowRateLimiter: + def __init__(self, limit: int, window_seconds: int) -> None: + self.limit = int(limit) + self.window_seconds = int(window_seconds) + self._hits: Dict[str, Deque[float]] = defaultdict(deque) + self._lock = threading.Lock() + + def allow(self, key: str) -> bool: + now = time.monotonic() + cutoff = now - self.window_seconds + with self._lock: + bucket = self._hits[key] + while bucket and bucket[0] < cutoff: + bucket.popleft() + if len(bucket) >= self.limit: + return False + bucket.append(now) + return True class WebPortal: @@ -19,6 +51,9 @@ class WebPortal: state: AppState, network_manager: NetworkManager, broadcaster: SerialBroadcaster, + log_dir: str = "/home/pi", + log_prefix: str = "xxx", + upload_dir: str = "/home/pi/timeline_uploads", template_folder: str = "../templates", static_folder: str = "../static", ) -> None: @@ -28,10 +63,82 @@ class WebPortal: self._status_refresh_interval_s = 15.0 self._last_status_refresh_mono = 0.0 self._status_lock = threading.Lock() + self.timeline_service = TimelineService(log_dir=log_dir, log_prefix=log_prefix, upload_dir=upload_dir) + self._upload_limiter = SlidingWindowRateLimiter(limit=10, window_seconds=60) + self._download_limiter = SlidingWindowRateLimiter(limit=30, window_seconds=60) self.app = Flask(__name__, template_folder=template_folder, static_folder=static_folder) + self.app.secret_key = os.environ.get("SERIAL_WEB_SECRET") or secrets.token_hex(32) + self.app.config["SESSION_COOKIE_HTTPONLY"] = True + self.app.config["SESSION_COOKIE_SAMESITE"] = "Strict" + self.app.config["MAX_CONTENT_LENGTH"] = TimelineService.MAX_UPLOAD_SIZE + (1024 * 1024) self._register_routes() + self._register_error_handlers() + + def _register_error_handlers(self) -> None: + @self.app.errorhandler(RequestEntityTooLarge) + def too_large(_exc: RequestEntityTooLarge) -> Response: + if request.path.startswith("/api/timeline/uploads"): + return jsonify({"ok": False, "message": "Upload exceeds size limit"}), 413 + return jsonify({"ok": False, "message": "Request too large"}), 413 + + def _client_ip(self) -> str: + if request.access_route: + return request.access_route[0] + return request.remote_addr or "unknown" + + def _ensure_csrf_token(self) -> str: + token = session.get("csrf_token") + if not token: + token = secrets.token_urlsafe(32) + session["csrf_token"] = token + return str(token) + + def _is_valid_csrf(self) -> bool: + expected = session.get("csrf_token") + provided = ( + request.headers.get("X-CSRF-Token") + or request.form.get("csrf_token") + or request.args.get("csrf_token") + ) + if not expected or not provided: + return False + return hmac.compare_digest(str(expected), str(provided)) + + def _is_same_origin(self) -> bool: + expected_origin = request.host_url.rstrip("/") + for header in ("Origin", "Referer"): + value = request.headers.get(header, "").strip() + if not value: + continue + try: + parsed = urlparse(value) + except ValueError: + return False + if not parsed.scheme or not parsed.netloc: + return False + origin = f"{parsed.scheme}://{parsed.netloc}".rstrip("/") + return origin == expected_origin + return False + + def _guard_secure_endpoint( + self, + *, + require_csrf: bool = True, + limiter: Optional[SlidingWindowRateLimiter] = None, + ) -> Optional[Response]: + if not self._is_same_origin(): + return jsonify({"ok": False, "message": "Cross-origin requests are blocked"}), 403 + if require_csrf and not self._is_valid_csrf(): + return jsonify({"ok": False, "message": "Invalid CSRF token"}), 403 + if limiter and not limiter.allow(self._client_ip()): + return jsonify({"ok": False, "message": "Too many requests"}), 429 + return None def _register_routes(self) -> None: + @self.app.context_processor + def inject_template_vars() -> Dict[str, Any]: + return {"csrf_token": self._ensure_csrf_token()} + @self.app.route("/") def index() -> str: return render_template("index.html") @@ -40,6 +147,14 @@ class WebPortal: def serial_page() -> str: return render_template("serial.html") + @self.app.route("/timeline") + def timeline_page() -> str: + return render_template( + "timeline.html", + max_upload_mib=TimelineService.MAX_UPLOAD_SIZE // (1024 * 1024), + max_upload_files=TimelineService.MAX_UPLOAD_FILES, + ) + @self.app.route("/api/status", methods=["GET"]) def status() -> Response: try: @@ -110,8 +225,17 @@ class WebPortal: yield "retry: 2000\n\n" while True: try: - line = q.get(timeout=15) - data = json.dumps({"line": line}) + event = q.get(timeout=15) + if isinstance(event, dict): + payload = { + "line": event.get("line", ""), + "ts_iso": event.get("ts_iso", ""), + "ts_hms": event.get("ts_hms", ""), + "source": event.get("source", "serial"), + } + else: + payload = {"line": str(event), "ts_iso": "", "ts_hms": "", "source": "serial"} + data = json.dumps(payload) yield f"data: {data}\n\n" except queue.Empty: yield ": keepalive\n\n" @@ -120,5 +244,100 @@ class WebPortal: return Response(generate(), mimetype="text/event-stream") + @self.app.route("/api/timeline/uploads", methods=["GET"]) + def list_timeline_uploads() -> Response: + uploads = self.timeline_service.list_uploads() + return jsonify({"ok": True, "uploads": uploads}) + + @self.app.route("/api/timeline/uploads", methods=["POST"]) + def upload_timeline_csv() -> Response: + guard = self._guard_secure_endpoint(limiter=self._upload_limiter) + if guard is not None: + return guard + + csv_file = request.files.get("file") + if csv_file is None: + return jsonify({"ok": False, "message": "Missing file field"}), 400 + if not csv_file.filename: + return jsonify({"ok": False, "message": "Filename is required"}), 400 + + raw = csv_file.stream.read(TimelineService.MAX_UPLOAD_SIZE + 1) + if len(raw) > TimelineService.MAX_UPLOAD_SIZE: + return jsonify({"ok": False, "message": "Upload exceeds size limit"}), 413 + + try: + meta = self.timeline_service.upload_csv(csv_file.filename, raw) + except TimelineValidationError as exc: + return jsonify({"ok": False, "message": str(exc)}), 400 + except TimelineStorageError as exc: + return jsonify({"ok": False, "message": str(exc)}), 503 + return jsonify({"ok": True, "upload": meta}), 201 + + @self.app.route("/api/timeline/uploads/", methods=["DELETE"]) + def delete_timeline_csv(upload_id: str) -> Response: + guard = self._guard_secure_endpoint(limiter=self._upload_limiter) + if guard is not None: + return guard + try: + self.timeline_service.delete_upload(upload_id) + except TimelineValidationError as exc: + return jsonify({"ok": False, "message": str(exc)}), 400 + except TimelineNotFoundError: + return jsonify({"ok": False, "message": "Upload not found"}), 404 + return jsonify({"ok": True}) + + @self.app.route("/api/timeline", methods=["GET"]) + def timeline_data() -> Response: + hours_arg = request.args.get("hours", "6") + upload_id = request.args.get("upload_id", "") + try: + hours = int(hours_arg) + except ValueError: + hours = 6 + + try: + data = self.timeline_service.get_timeline(hours=hours, upload_id=upload_id) + except TimelineValidationError as exc: + return jsonify({"ok": False, "message": str(exc)}), 400 + except TimelineNotFoundError: + return jsonify({"ok": False, "message": "Upload not found"}), 404 + except TimelineStorageError as exc: + return jsonify({"ok": False, "message": str(exc)}), 503 + return jsonify({"ok": True, **data}) + + @self.app.route("/api/timeline/download", methods=["GET"]) + def download_timeline() -> Response: + guard = self._guard_secure_endpoint(limiter=self._download_limiter) + if guard is not None: + return guard + + kind = request.args.get("kind", "serial") + upload_id = request.args.get("upload_id", "") + hours_arg = request.args.get("hours", "6") + try: + hours = int(hours_arg) + except ValueError: + hours = 6 + + try: + filename, payload = self.timeline_service.build_download( + kind=kind, + hours=hours, + upload_id=upload_id, + ) + except TimelineValidationError as exc: + return jsonify({"ok": False, "message": str(exc)}), 400 + except TimelineNotFoundError: + return jsonify({"ok": False, "message": "Upload not found"}), 404 + except TimelineStorageError as exc: + return jsonify({"ok": False, "message": str(exc)}), 503 + + headers = { + "Content-Disposition": f'attachment; filename="{filename}"', + "Cache-Control": "no-store", + "X-Content-Type-Options": "nosniff", + } + return Response(payload, headers=headers, mimetype="text/csv") + def run(self, host: str = "0.0.0.0", port: int = 80) -> None: serve(self.app, host=host, port=port, threads=6) diff --git a/static/style.css b/static/style.css index cb00c9e..e59a93d 100644 --- a/static/style.css +++ b/static/style.css @@ -79,6 +79,12 @@ button:hover { min-height: 20px; } +.hint { + color: #4b5968; + font-size: 14px; + min-height: 20px; +} + .terminal { height: 70vh; overflow-y: auto; @@ -91,3 +97,95 @@ button:hover { font-family: "DejaVu Sans Mono", "Noto Sans Mono", monospace; white-space: pre-wrap; } + +.timeline-page { + max-width: 1280px; +} + +.timeline-upload-row { + display: grid; + grid-template-columns: 1fr auto; + gap: 10px; + align-items: center; +} + +.timeline-actions { + margin-top: 6px; +} + +.download-links { + display: flex; + gap: 16px; + flex-wrap: wrap; + margin-top: 8px; +} + +.timeline-grid { + display: grid; + grid-template-columns: 1fr 1fr; + gap: 16px; +} + +.timeline-panel { + min-height: 70vh; +} + +.timeline-terminal { + height: 60vh; +} + +.timeline-table-wrap { + border: 1px solid #d0d7de; + border-radius: 8px; + overflow: auto; + max-height: 60vh; + background: #ffffff; +} + +.timeline-table { + width: 100%; + border-collapse: collapse; + min-width: 640px; + font-size: 13px; +} + +.timeline-table th, +.timeline-table td { + border-bottom: 1px solid #e6ebf0; + border-right: 1px solid #e6ebf0; + padding: 6px 8px; + text-align: left; + white-space: nowrap; +} + +.timeline-table th:last-child, +.timeline-table td:last-child { + border-right: none; +} + +.timeline-table thead th { + background: #f7f9fb; + position: sticky; + top: 0; + z-index: 1; +} + +@media (max-width: 960px) { + .timeline-upload-row { + grid-template-columns: 1fr; + } + + .timeline-grid { + grid-template-columns: 1fr; + } + + .timeline-panel { + min-height: auto; + } + + .timeline-terminal, + .timeline-table-wrap { + max-height: 50vh; + height: 50vh; + } +} diff --git a/templates/index.html b/templates/index.html index 73eab09..374c969 100644 --- a/templates/index.html +++ b/templates/index.html @@ -34,7 +34,8 @@

Serial Monitor

- Zur Live-Serial-Seite +

Zur Live-Serial-Seite

+

Zur 6h Timeline + CSV Merge

diff --git a/templates/serial.html b/templates/serial.html index 35aec1d..46b3cb5 100644 --- a/templates/serial.html +++ b/templates/serial.html @@ -1,4 +1,4 @@ - + @@ -9,7 +9,7 @@

ESP32 Serial Live

-

Zurück zum WLAN-Portal

+

Zurueck zum WLAN-Portal | Zur Timeline


   
@@ -31,7 +31,8 @@ events.onmessage = (evt) => { try { const payload = JSON.parse(evt.data); - appendLine(payload.line || ''); + const ts = payload.ts_hms ? `[${payload.ts_hms}] ` : ''; + appendLine(`${ts}${payload.line || ''}`); } catch (e) { appendLine(evt.data || ''); } diff --git a/templates/timeline.html b/templates/timeline.html new file mode 100644 index 0000000..b60ddcf --- /dev/null +++ b/templates/timeline.html @@ -0,0 +1,264 @@ + + + + + + Timeline + + + +
+

Timeline der letzten 6 Stunden

+

Zurueck zum WLAN-Portal | Zur Live-Serial-Seite

+ +
+

CSV Upload

+
+ + +
+

Limit: {{ max_upload_mib }} MiB pro Datei, maximal {{ max_upload_files }} Uploads.

+
+ + + + +
+ + +
+ + +
+ +
+
+

Serial

+
+

+      
+ +
+

CSV

+
+
+
+
+
+ + + +