#!/home/linuxbrew/.linuxbrew/bin/python3 """Render a clock and Mannheim weather forecast on the MP90 front TFT.""" from __future__ import annotations import argparse import datetime as dt import logging import signal import time from dataclasses import dataclass from typing import Iterable from zoneinfo import ZoneInfo import hid import requests from PIL import Image, ImageDraw, ImageFont WIDTH = 320 HEIGHT = 170 PORTRAIT_WIDTH = HEIGHT PORTRAIT_HEIGHT = WIDTH FRAMEBUFFER_BYTES = WIDTH * HEIGHT * 2 DATA_BYTES = 4096 REPORT_BYTES = 1 HEADER_BYTES = 8 PACKET_BYTES = REPORT_BYTES + HEADER_BYTES + DATA_BYTES FINAL_CHUNK = 26 TZ = ZoneInfo("Europe/Berlin") HID_PATH = b"1-8:1.1" FONT_REGULAR = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf" FONT_BOLD = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast" MANNHEIM = (49.4891, 8.46694) @dataclass(frozen=True) class Forecast: target: dt.datetime temperature: float weather_code: int precipitation_probability: int WEATHER_TEXT = { 0: "Clear", 1: "Mostly clear", 2: "Partly cloudy", 3: "Overcast", 45: "Fog", 48: "Fog", 51: "Light drizzle", 53: "Drizzle", 55: "Heavy drizzle", 61: "Light rain", 63: "Rain", 65: "Heavy rain", 71: "Light snow", 73: "Snow", 75: "Heavy snow", 80: "Showers", 81: "Showers", 82: "Heavy showers", 95: "Thunderstorm", 96: "Thunderstorm", 99: "Thunderstorm", } def packet(header: bytes, data: bytes = b"") -> bytes: if len(header) != HEADER_BYTES: raise ValueError("panel headers must be exactly 8 bytes") if len(data) > DATA_BYTES: raise ValueError("panel payload exceeds 4096 bytes") return bytes(REPORT_BYTES) + header + data + bytes(DATA_BYTES - len(data)) def checked_write(device: hid.device, payload: bytes) -> None: written = device.write(payload) if written != len(payload): raise OSError(f"short HID write: expected {len(payload)}, got {written}") def set_portrait(device: hid.device) -> None: checked_write(device, packet(bytes((0x55, 0xA1, 0xF1, 0x02, 0, 0, 0, 0)))) def heartbeat(device: hid.device, now: dt.datetime) -> None: header = bytes((0x55, 0xA1, 0xF2, now.hour, now.minute, now.second, 0, 0)) checked_write(device, packet(header)) def rgb565_bytes(image: Image.Image) -> bytes: result = bytearray() for red, green, blue in image.convert("RGB").get_flattened_data(): value = ((red >> 3) << 11) | ((green >> 2) << 5) | (blue >> 3) result.extend((value >> 8, value & 0xFF)) return bytes(result) def redraw(device: hid.device, image: Image.Image) -> None: framebuffer = rgb565_bytes(image) if len(framebuffer) != FRAMEBUFFER_BYTES: raise ValueError("unexpected framebuffer size") for index in range(FINAL_CHUNK + 1): offset = index * DATA_BYTES chunk = framebuffer[offset : offset + DATA_BYTES] command = 0xF0 if index == 0 else 0xF2 if index == FINAL_CHUNK else 0xF1 header = bytes( ( 0x55, 0xA3, command, index + 1, (offset >> 8) & 0xFF, offset & 0xFF, (len(chunk) >> 8) & 0xFF, len(chunk) & 0xFF, ) ) checked_write(device, packet(header, chunk)) def fetch_forecasts(now: dt.datetime) -> list[Forecast]: tomorrow = (now + dt.timedelta(days=1)).date() targets = [ dt.datetime.combine(tomorrow, dt.time(hour=9), tzinfo=TZ), dt.datetime.combine(tomorrow, dt.time(hour=17), tzinfo=TZ), ] response = requests.get( OPEN_METEO_URL, params={ "latitude": MANNHEIM[0], "longitude": MANNHEIM[1], "timezone": "Europe/Berlin", "forecast_days": 3, "hourly": "temperature_2m,weather_code,precipitation_probability", }, timeout=20, ) response.raise_for_status() hourly = response.json()["hourly"] indices = {value: index for index, value in enumerate(hourly["time"])} forecasts = [] for target in targets: index = indices[target.strftime("%Y-%m-%dT%H:00")] forecasts.append( Forecast( target=target, temperature=float(hourly["temperature_2m"][index]), weather_code=int(hourly["weather_code"][index]), precipitation_probability=int(hourly["precipitation_probability"][index]), ) ) return forecasts def condition(code: int) -> str: return WEATHER_TEXT.get(code, "Mixed") def font(path: str, size: int) -> ImageFont.FreeTypeFont: return ImageFont.truetype(path, size=size) def draw_card(draw: ImageDraw.ImageDraw, y: int, forecast: Forecast) -> None: draw.rounded_rectangle((12, y, 158, y + 82), radius=12, fill="#183149", outline="#2e5875") draw.text((24, y + 9), forecast.target.strftime("%H:%M"), font=font(FONT_BOLD, 19), fill="#ffce67") draw.text((24, y + 34), f"{forecast.temperature:.1f} C", font=font(FONT_BOLD, 22), fill="#f5fbff") detail = f"{condition(forecast.weather_code)} {forecast.precipitation_probability}% rain" draw.text((24, y + 62), detail, font=font(FONT_REGULAR, 10), fill="#b7d5e8") def render(now: dt.datetime, forecasts: Iterable[Forecast]) -> Image.Image: image = Image.new("RGB", (PORTRAIT_WIDTH, PORTRAIT_HEIGHT), "#071a29") draw = ImageDraw.Draw(image) draw.rectangle((0, 0, PORTRAIT_WIDTH, 8), fill="#4cb8c4") draw.text((15, 18), now.strftime("%H:%M"), font=font(FONT_BOLD, 45), fill="#f5fbff") draw.text((17, 73), "MANNHEIM", font=font(FONT_BOLD, 18), fill="#ffce67") draw.text((18, 98), now.strftime("%a %d %b"), font=font(FONT_REGULAR, 15), fill="#b7d5e8") draw.text((17, 128), "TOMORROW", font=font(FONT_BOLD, 11), fill="#76d2d9") for y, forecast in zip((148, 239), forecasts): draw_card(draw, y, forecast) return image.rotate(90, expand=True) class Panel: def __init__(self) -> None: self.device: hid.device | None = None def close(self) -> None: if self.device is not None: self.device.close() self.device = None def connect(self) -> hid.device: self.close() device = hid.device() device.open_path(HID_PATH) self.device = device set_portrait(device) time.sleep(1.7) logging.info("connected to MP90 front panel") return device def run(once: bool) -> None: panel = Panel() forecasts: list[Forecast] = [] weather_due = 0.0 redraw_due = 0.0 stop = False def request_stop(_signum: int, _frame: object) -> None: nonlocal stop stop = True signal.signal(signal.SIGTERM, request_stop) signal.signal(signal.SIGINT, request_stop) try: while not stop: now = dt.datetime.now(TZ) monotonic = time.monotonic() try: if monotonic >= weather_due or not forecasts: forecasts = fetch_forecasts(now) weather_due = monotonic + 15 * 60 logging.info("updated Mannheim weather forecast") device = panel.device or panel.connect() if monotonic >= redraw_due: redraw(device, render(now, forecasts)) redraw_due = monotonic + 60 logging.info("redrew clock at %s", now.strftime("%H:%M:%S")) if once: return time.sleep(1.7) heartbeat(device, dt.datetime.now(TZ)) time.sleep(1.7) except Exception: logging.exception("panel update failed; reconnecting") panel.close() time.sleep(3) finally: panel.close() def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--once", action="store_true", help="render one frame and exit") args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") run(args.once) if __name__ == "__main__": main()