Files
acidburns 2c53b4fd2d initial mp90 panel renderer
Add the MP90 front panel clock/weather renderer, systemd service, ignore rules, and deployment documentation.
2026-06-30 12:41:41 +02:00

263 lines
8.1 KiB
Python

#!/home/linuxbrew/.linuxbrew/bin/python3
"""Render a clock and Mannheim weather forecast on the MP90 front TFT."""
from __future__ import annotations
import argparse
import datetime as dt
import logging
import signal
import time
from dataclasses import dataclass
from typing import Iterable
from zoneinfo import ZoneInfo
import hid
import requests
from PIL import Image, ImageDraw, ImageFont
WIDTH = 320
HEIGHT = 170
PORTRAIT_WIDTH = HEIGHT
PORTRAIT_HEIGHT = WIDTH
FRAMEBUFFER_BYTES = WIDTH * HEIGHT * 2
DATA_BYTES = 4096
REPORT_BYTES = 1
HEADER_BYTES = 8
PACKET_BYTES = REPORT_BYTES + HEADER_BYTES + DATA_BYTES
FINAL_CHUNK = 26
TZ = ZoneInfo("Europe/Berlin")
HID_PATH = b"1-8:1.1"
FONT_REGULAR = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
FONT_BOLD = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
MANNHEIM = (49.4891, 8.46694)
@dataclass(frozen=True)
class Forecast:
target: dt.datetime
temperature: float
weather_code: int
precipitation_probability: int
WEATHER_TEXT = {
0: "Clear",
1: "Mostly clear",
2: "Partly cloudy",
3: "Overcast",
45: "Fog",
48: "Fog",
51: "Light drizzle",
53: "Drizzle",
55: "Heavy drizzle",
61: "Light rain",
63: "Rain",
65: "Heavy rain",
71: "Light snow",
73: "Snow",
75: "Heavy snow",
80: "Showers",
81: "Showers",
82: "Heavy showers",
95: "Thunderstorm",
96: "Thunderstorm",
99: "Thunderstorm",
}
def packet(header: bytes, data: bytes = b"") -> bytes:
if len(header) != HEADER_BYTES:
raise ValueError("panel headers must be exactly 8 bytes")
if len(data) > DATA_BYTES:
raise ValueError("panel payload exceeds 4096 bytes")
return bytes(REPORT_BYTES) + header + data + bytes(DATA_BYTES - len(data))
def checked_write(device: hid.device, payload: bytes) -> None:
written = device.write(payload)
if written != len(payload):
raise OSError(f"short HID write: expected {len(payload)}, got {written}")
def set_portrait(device: hid.device) -> None:
checked_write(device, packet(bytes((0x55, 0xA1, 0xF1, 0x02, 0, 0, 0, 0))))
def heartbeat(device: hid.device, now: dt.datetime) -> None:
header = bytes((0x55, 0xA1, 0xF2, now.hour, now.minute, now.second, 0, 0))
checked_write(device, packet(header))
def rgb565_bytes(image: Image.Image) -> bytes:
result = bytearray()
for red, green, blue in image.convert("RGB").get_flattened_data():
value = ((red >> 3) << 11) | ((green >> 2) << 5) | (blue >> 3)
result.extend((value >> 8, value & 0xFF))
return bytes(result)
def redraw(device: hid.device, image: Image.Image) -> None:
framebuffer = rgb565_bytes(image)
if len(framebuffer) != FRAMEBUFFER_BYTES:
raise ValueError("unexpected framebuffer size")
for index in range(FINAL_CHUNK + 1):
offset = index * DATA_BYTES
chunk = framebuffer[offset : offset + DATA_BYTES]
command = 0xF0 if index == 0 else 0xF2 if index == FINAL_CHUNK else 0xF1
header = bytes(
(
0x55,
0xA3,
command,
index + 1,
(offset >> 8) & 0xFF,
offset & 0xFF,
(len(chunk) >> 8) & 0xFF,
len(chunk) & 0xFF,
)
)
checked_write(device, packet(header, chunk))
def fetch_forecasts(now: dt.datetime) -> list[Forecast]:
tomorrow = (now + dt.timedelta(days=1)).date()
targets = [
dt.datetime.combine(tomorrow, dt.time(hour=9), tzinfo=TZ),
dt.datetime.combine(tomorrow, dt.time(hour=17), tzinfo=TZ),
]
response = requests.get(
OPEN_METEO_URL,
params={
"latitude": MANNHEIM[0],
"longitude": MANNHEIM[1],
"timezone": "Europe/Berlin",
"forecast_days": 3,
"hourly": "temperature_2m,weather_code,precipitation_probability",
},
timeout=20,
)
response.raise_for_status()
hourly = response.json()["hourly"]
indices = {value: index for index, value in enumerate(hourly["time"])}
forecasts = []
for target in targets:
index = indices[target.strftime("%Y-%m-%dT%H:00")]
forecasts.append(
Forecast(
target=target,
temperature=float(hourly["temperature_2m"][index]),
weather_code=int(hourly["weather_code"][index]),
precipitation_probability=int(hourly["precipitation_probability"][index]),
)
)
return forecasts
def condition(code: int) -> str:
return WEATHER_TEXT.get(code, "Mixed")
def font(path: str, size: int) -> ImageFont.FreeTypeFont:
return ImageFont.truetype(path, size=size)
def draw_card(draw: ImageDraw.ImageDraw, y: int, forecast: Forecast) -> None:
draw.rounded_rectangle((12, y, 158, y + 82), radius=12, fill="#183149", outline="#2e5875")
draw.text((24, y + 9), forecast.target.strftime("%H:%M"), font=font(FONT_BOLD, 19), fill="#ffce67")
draw.text((24, y + 34), f"{forecast.temperature:.1f} C", font=font(FONT_BOLD, 22), fill="#f5fbff")
detail = f"{condition(forecast.weather_code)} {forecast.precipitation_probability}% rain"
draw.text((24, y + 62), detail, font=font(FONT_REGULAR, 10), fill="#b7d5e8")
def render(now: dt.datetime, forecasts: Iterable[Forecast]) -> Image.Image:
image = Image.new("RGB", (PORTRAIT_WIDTH, PORTRAIT_HEIGHT), "#071a29")
draw = ImageDraw.Draw(image)
draw.rectangle((0, 0, PORTRAIT_WIDTH, 8), fill="#4cb8c4")
draw.text((15, 18), now.strftime("%H:%M"), font=font(FONT_BOLD, 45), fill="#f5fbff")
draw.text((17, 73), "MANNHEIM", font=font(FONT_BOLD, 18), fill="#ffce67")
draw.text((18, 98), now.strftime("%a %d %b"), font=font(FONT_REGULAR, 15), fill="#b7d5e8")
draw.text((17, 128), "TOMORROW", font=font(FONT_BOLD, 11), fill="#76d2d9")
for y, forecast in zip((148, 239), forecasts):
draw_card(draw, y, forecast)
return image.rotate(90, expand=True)
class Panel:
def __init__(self) -> None:
self.device: hid.device | None = None
def close(self) -> None:
if self.device is not None:
self.device.close()
self.device = None
def connect(self) -> hid.device:
self.close()
device = hid.device()
device.open_path(HID_PATH)
self.device = device
set_portrait(device)
time.sleep(1.7)
logging.info("connected to MP90 front panel")
return device
def run(once: bool) -> None:
panel = Panel()
forecasts: list[Forecast] = []
weather_due = 0.0
redraw_due = 0.0
stop = False
def request_stop(_signum: int, _frame: object) -> None:
nonlocal stop
stop = True
signal.signal(signal.SIGTERM, request_stop)
signal.signal(signal.SIGINT, request_stop)
try:
while not stop:
now = dt.datetime.now(TZ)
monotonic = time.monotonic()
try:
if monotonic >= weather_due or not forecasts:
forecasts = fetch_forecasts(now)
weather_due = monotonic + 15 * 60
logging.info("updated Mannheim weather forecast")
device = panel.device or panel.connect()
if monotonic >= redraw_due:
redraw(device, render(now, forecasts))
redraw_due = monotonic + 60
logging.info("redrew clock at %s", now.strftime("%H:%M:%S"))
if once:
return
time.sleep(1.7)
heartbeat(device, dt.datetime.now(TZ))
time.sleep(1.7)
except Exception:
logging.exception("panel update failed; reconnecting")
panel.close()
time.sleep(3)
finally:
panel.close()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--once", action="store_true", help="render one frame and exit")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
run(args.once)
if __name__ == "__main__":
main()