2c53b4fd2d
Add the MP90 front panel clock/weather renderer, systemd service, ignore rules, and deployment documentation.
263 lines
8.1 KiB
Python
263 lines
8.1 KiB
Python
#!/home/linuxbrew/.linuxbrew/bin/python3
|
|
"""Render a clock and Mannheim weather forecast on the MP90 front TFT."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import logging
|
|
import signal
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Iterable
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import hid
|
|
import requests
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
|
|
WIDTH = 320
|
|
HEIGHT = 170
|
|
PORTRAIT_WIDTH = HEIGHT
|
|
PORTRAIT_HEIGHT = WIDTH
|
|
FRAMEBUFFER_BYTES = WIDTH * HEIGHT * 2
|
|
DATA_BYTES = 4096
|
|
REPORT_BYTES = 1
|
|
HEADER_BYTES = 8
|
|
PACKET_BYTES = REPORT_BYTES + HEADER_BYTES + DATA_BYTES
|
|
FINAL_CHUNK = 26
|
|
TZ = ZoneInfo("Europe/Berlin")
|
|
HID_PATH = b"1-8:1.1"
|
|
FONT_REGULAR = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
|
|
FONT_BOLD = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
|
|
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
|
|
MANNHEIM = (49.4891, 8.46694)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Forecast:
|
|
target: dt.datetime
|
|
temperature: float
|
|
weather_code: int
|
|
precipitation_probability: int
|
|
|
|
|
|
WEATHER_TEXT = {
|
|
0: "Clear",
|
|
1: "Mostly clear",
|
|
2: "Partly cloudy",
|
|
3: "Overcast",
|
|
45: "Fog",
|
|
48: "Fog",
|
|
51: "Light drizzle",
|
|
53: "Drizzle",
|
|
55: "Heavy drizzle",
|
|
61: "Light rain",
|
|
63: "Rain",
|
|
65: "Heavy rain",
|
|
71: "Light snow",
|
|
73: "Snow",
|
|
75: "Heavy snow",
|
|
80: "Showers",
|
|
81: "Showers",
|
|
82: "Heavy showers",
|
|
95: "Thunderstorm",
|
|
96: "Thunderstorm",
|
|
99: "Thunderstorm",
|
|
}
|
|
|
|
|
|
def packet(header: bytes, data: bytes = b"") -> bytes:
|
|
if len(header) != HEADER_BYTES:
|
|
raise ValueError("panel headers must be exactly 8 bytes")
|
|
if len(data) > DATA_BYTES:
|
|
raise ValueError("panel payload exceeds 4096 bytes")
|
|
return bytes(REPORT_BYTES) + header + data + bytes(DATA_BYTES - len(data))
|
|
|
|
|
|
def checked_write(device: hid.device, payload: bytes) -> None:
|
|
written = device.write(payload)
|
|
if written != len(payload):
|
|
raise OSError(f"short HID write: expected {len(payload)}, got {written}")
|
|
|
|
|
|
def set_portrait(device: hid.device) -> None:
|
|
checked_write(device, packet(bytes((0x55, 0xA1, 0xF1, 0x02, 0, 0, 0, 0))))
|
|
|
|
|
|
def heartbeat(device: hid.device, now: dt.datetime) -> None:
|
|
header = bytes((0x55, 0xA1, 0xF2, now.hour, now.minute, now.second, 0, 0))
|
|
checked_write(device, packet(header))
|
|
|
|
|
|
def rgb565_bytes(image: Image.Image) -> bytes:
|
|
result = bytearray()
|
|
for red, green, blue in image.convert("RGB").get_flattened_data():
|
|
value = ((red >> 3) << 11) | ((green >> 2) << 5) | (blue >> 3)
|
|
result.extend((value >> 8, value & 0xFF))
|
|
return bytes(result)
|
|
|
|
|
|
def redraw(device: hid.device, image: Image.Image) -> None:
|
|
framebuffer = rgb565_bytes(image)
|
|
if len(framebuffer) != FRAMEBUFFER_BYTES:
|
|
raise ValueError("unexpected framebuffer size")
|
|
|
|
for index in range(FINAL_CHUNK + 1):
|
|
offset = index * DATA_BYTES
|
|
chunk = framebuffer[offset : offset + DATA_BYTES]
|
|
command = 0xF0 if index == 0 else 0xF2 if index == FINAL_CHUNK else 0xF1
|
|
header = bytes(
|
|
(
|
|
0x55,
|
|
0xA3,
|
|
command,
|
|
index + 1,
|
|
(offset >> 8) & 0xFF,
|
|
offset & 0xFF,
|
|
(len(chunk) >> 8) & 0xFF,
|
|
len(chunk) & 0xFF,
|
|
)
|
|
)
|
|
checked_write(device, packet(header, chunk))
|
|
|
|
|
|
def fetch_forecasts(now: dt.datetime) -> list[Forecast]:
|
|
tomorrow = (now + dt.timedelta(days=1)).date()
|
|
targets = [
|
|
dt.datetime.combine(tomorrow, dt.time(hour=9), tzinfo=TZ),
|
|
dt.datetime.combine(tomorrow, dt.time(hour=17), tzinfo=TZ),
|
|
]
|
|
response = requests.get(
|
|
OPEN_METEO_URL,
|
|
params={
|
|
"latitude": MANNHEIM[0],
|
|
"longitude": MANNHEIM[1],
|
|
"timezone": "Europe/Berlin",
|
|
"forecast_days": 3,
|
|
"hourly": "temperature_2m,weather_code,precipitation_probability",
|
|
},
|
|
timeout=20,
|
|
)
|
|
response.raise_for_status()
|
|
hourly = response.json()["hourly"]
|
|
indices = {value: index for index, value in enumerate(hourly["time"])}
|
|
|
|
forecasts = []
|
|
for target in targets:
|
|
index = indices[target.strftime("%Y-%m-%dT%H:00")]
|
|
forecasts.append(
|
|
Forecast(
|
|
target=target,
|
|
temperature=float(hourly["temperature_2m"][index]),
|
|
weather_code=int(hourly["weather_code"][index]),
|
|
precipitation_probability=int(hourly["precipitation_probability"][index]),
|
|
)
|
|
)
|
|
return forecasts
|
|
|
|
|
|
def condition(code: int) -> str:
|
|
return WEATHER_TEXT.get(code, "Mixed")
|
|
|
|
|
|
def font(path: str, size: int) -> ImageFont.FreeTypeFont:
|
|
return ImageFont.truetype(path, size=size)
|
|
|
|
|
|
def draw_card(draw: ImageDraw.ImageDraw, y: int, forecast: Forecast) -> None:
|
|
draw.rounded_rectangle((12, y, 158, y + 82), radius=12, fill="#183149", outline="#2e5875")
|
|
draw.text((24, y + 9), forecast.target.strftime("%H:%M"), font=font(FONT_BOLD, 19), fill="#ffce67")
|
|
draw.text((24, y + 34), f"{forecast.temperature:.1f} C", font=font(FONT_BOLD, 22), fill="#f5fbff")
|
|
detail = f"{condition(forecast.weather_code)} {forecast.precipitation_probability}% rain"
|
|
draw.text((24, y + 62), detail, font=font(FONT_REGULAR, 10), fill="#b7d5e8")
|
|
|
|
|
|
def render(now: dt.datetime, forecasts: Iterable[Forecast]) -> Image.Image:
|
|
image = Image.new("RGB", (PORTRAIT_WIDTH, PORTRAIT_HEIGHT), "#071a29")
|
|
draw = ImageDraw.Draw(image)
|
|
draw.rectangle((0, 0, PORTRAIT_WIDTH, 8), fill="#4cb8c4")
|
|
draw.text((15, 18), now.strftime("%H:%M"), font=font(FONT_BOLD, 45), fill="#f5fbff")
|
|
draw.text((17, 73), "MANNHEIM", font=font(FONT_BOLD, 18), fill="#ffce67")
|
|
draw.text((18, 98), now.strftime("%a %d %b"), font=font(FONT_REGULAR, 15), fill="#b7d5e8")
|
|
draw.text((17, 128), "TOMORROW", font=font(FONT_BOLD, 11), fill="#76d2d9")
|
|
for y, forecast in zip((148, 239), forecasts):
|
|
draw_card(draw, y, forecast)
|
|
return image.rotate(90, expand=True)
|
|
|
|
|
|
class Panel:
|
|
def __init__(self) -> None:
|
|
self.device: hid.device | None = None
|
|
|
|
def close(self) -> None:
|
|
if self.device is not None:
|
|
self.device.close()
|
|
self.device = None
|
|
|
|
def connect(self) -> hid.device:
|
|
self.close()
|
|
device = hid.device()
|
|
device.open_path(HID_PATH)
|
|
self.device = device
|
|
set_portrait(device)
|
|
time.sleep(1.7)
|
|
logging.info("connected to MP90 front panel")
|
|
return device
|
|
|
|
|
|
def run(once: bool) -> None:
|
|
panel = Panel()
|
|
forecasts: list[Forecast] = []
|
|
weather_due = 0.0
|
|
redraw_due = 0.0
|
|
stop = False
|
|
|
|
def request_stop(_signum: int, _frame: object) -> None:
|
|
nonlocal stop
|
|
stop = True
|
|
|
|
signal.signal(signal.SIGTERM, request_stop)
|
|
signal.signal(signal.SIGINT, request_stop)
|
|
|
|
try:
|
|
while not stop:
|
|
now = dt.datetime.now(TZ)
|
|
monotonic = time.monotonic()
|
|
try:
|
|
if monotonic >= weather_due or not forecasts:
|
|
forecasts = fetch_forecasts(now)
|
|
weather_due = monotonic + 15 * 60
|
|
logging.info("updated Mannheim weather forecast")
|
|
|
|
device = panel.device or panel.connect()
|
|
if monotonic >= redraw_due:
|
|
redraw(device, render(now, forecasts))
|
|
redraw_due = monotonic + 60
|
|
logging.info("redrew clock at %s", now.strftime("%H:%M:%S"))
|
|
if once:
|
|
return
|
|
time.sleep(1.7)
|
|
|
|
heartbeat(device, dt.datetime.now(TZ))
|
|
time.sleep(1.7)
|
|
except Exception:
|
|
logging.exception("panel update failed; reconnecting")
|
|
panel.close()
|
|
time.sleep(3)
|
|
finally:
|
|
panel.close()
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--once", action="store_true", help="render one frame and exit")
|
|
args = parser.parse_args()
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
run(args.once)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|