initial mp90 panel renderer
Add the MP90 front panel clock/weather renderer, systemd service, ignore rules, and deployment documentation.
This commit is contained in:
+262
@@ -0,0 +1,262 @@
|
||||
#!/home/linuxbrew/.linuxbrew/bin/python3
|
||||
"""Render a clock and Mannheim weather forecast on the MP90 front TFT."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import datetime as dt
|
||||
import logging
|
||||
import signal
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import hid
|
||||
import requests
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
|
||||
WIDTH = 320
|
||||
HEIGHT = 170
|
||||
PORTRAIT_WIDTH = HEIGHT
|
||||
PORTRAIT_HEIGHT = WIDTH
|
||||
FRAMEBUFFER_BYTES = WIDTH * HEIGHT * 2
|
||||
DATA_BYTES = 4096
|
||||
REPORT_BYTES = 1
|
||||
HEADER_BYTES = 8
|
||||
PACKET_BYTES = REPORT_BYTES + HEADER_BYTES + DATA_BYTES
|
||||
FINAL_CHUNK = 26
|
||||
TZ = ZoneInfo("Europe/Berlin")
|
||||
HID_PATH = b"1-8:1.1"
|
||||
FONT_REGULAR = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
|
||||
FONT_BOLD = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
|
||||
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
|
||||
MANNHEIM = (49.4891, 8.46694)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Forecast:
|
||||
target: dt.datetime
|
||||
temperature: float
|
||||
weather_code: int
|
||||
precipitation_probability: int
|
||||
|
||||
|
||||
WEATHER_TEXT = {
|
||||
0: "Clear",
|
||||
1: "Mostly clear",
|
||||
2: "Partly cloudy",
|
||||
3: "Overcast",
|
||||
45: "Fog",
|
||||
48: "Fog",
|
||||
51: "Light drizzle",
|
||||
53: "Drizzle",
|
||||
55: "Heavy drizzle",
|
||||
61: "Light rain",
|
||||
63: "Rain",
|
||||
65: "Heavy rain",
|
||||
71: "Light snow",
|
||||
73: "Snow",
|
||||
75: "Heavy snow",
|
||||
80: "Showers",
|
||||
81: "Showers",
|
||||
82: "Heavy showers",
|
||||
95: "Thunderstorm",
|
||||
96: "Thunderstorm",
|
||||
99: "Thunderstorm",
|
||||
}
|
||||
|
||||
|
||||
def packet(header: bytes, data: bytes = b"") -> bytes:
|
||||
if len(header) != HEADER_BYTES:
|
||||
raise ValueError("panel headers must be exactly 8 bytes")
|
||||
if len(data) > DATA_BYTES:
|
||||
raise ValueError("panel payload exceeds 4096 bytes")
|
||||
return bytes(REPORT_BYTES) + header + data + bytes(DATA_BYTES - len(data))
|
||||
|
||||
|
||||
def checked_write(device: hid.device, payload: bytes) -> None:
|
||||
written = device.write(payload)
|
||||
if written != len(payload):
|
||||
raise OSError(f"short HID write: expected {len(payload)}, got {written}")
|
||||
|
||||
|
||||
def set_portrait(device: hid.device) -> None:
|
||||
checked_write(device, packet(bytes((0x55, 0xA1, 0xF1, 0x02, 0, 0, 0, 0))))
|
||||
|
||||
|
||||
def heartbeat(device: hid.device, now: dt.datetime) -> None:
|
||||
header = bytes((0x55, 0xA1, 0xF2, now.hour, now.minute, now.second, 0, 0))
|
||||
checked_write(device, packet(header))
|
||||
|
||||
|
||||
def rgb565_bytes(image: Image.Image) -> bytes:
|
||||
result = bytearray()
|
||||
for red, green, blue in image.convert("RGB").get_flattened_data():
|
||||
value = ((red >> 3) << 11) | ((green >> 2) << 5) | (blue >> 3)
|
||||
result.extend((value >> 8, value & 0xFF))
|
||||
return bytes(result)
|
||||
|
||||
|
||||
def redraw(device: hid.device, image: Image.Image) -> None:
|
||||
framebuffer = rgb565_bytes(image)
|
||||
if len(framebuffer) != FRAMEBUFFER_BYTES:
|
||||
raise ValueError("unexpected framebuffer size")
|
||||
|
||||
for index in range(FINAL_CHUNK + 1):
|
||||
offset = index * DATA_BYTES
|
||||
chunk = framebuffer[offset : offset + DATA_BYTES]
|
||||
command = 0xF0 if index == 0 else 0xF2 if index == FINAL_CHUNK else 0xF1
|
||||
header = bytes(
|
||||
(
|
||||
0x55,
|
||||
0xA3,
|
||||
command,
|
||||
index + 1,
|
||||
(offset >> 8) & 0xFF,
|
||||
offset & 0xFF,
|
||||
(len(chunk) >> 8) & 0xFF,
|
||||
len(chunk) & 0xFF,
|
||||
)
|
||||
)
|
||||
checked_write(device, packet(header, chunk))
|
||||
|
||||
|
||||
def fetch_forecasts(now: dt.datetime) -> list[Forecast]:
|
||||
tomorrow = (now + dt.timedelta(days=1)).date()
|
||||
targets = [
|
||||
dt.datetime.combine(tomorrow, dt.time(hour=9), tzinfo=TZ),
|
||||
dt.datetime.combine(tomorrow, dt.time(hour=17), tzinfo=TZ),
|
||||
]
|
||||
response = requests.get(
|
||||
OPEN_METEO_URL,
|
||||
params={
|
||||
"latitude": MANNHEIM[0],
|
||||
"longitude": MANNHEIM[1],
|
||||
"timezone": "Europe/Berlin",
|
||||
"forecast_days": 3,
|
||||
"hourly": "temperature_2m,weather_code,precipitation_probability",
|
||||
},
|
||||
timeout=20,
|
||||
)
|
||||
response.raise_for_status()
|
||||
hourly = response.json()["hourly"]
|
||||
indices = {value: index for index, value in enumerate(hourly["time"])}
|
||||
|
||||
forecasts = []
|
||||
for target in targets:
|
||||
index = indices[target.strftime("%Y-%m-%dT%H:00")]
|
||||
forecasts.append(
|
||||
Forecast(
|
||||
target=target,
|
||||
temperature=float(hourly["temperature_2m"][index]),
|
||||
weather_code=int(hourly["weather_code"][index]),
|
||||
precipitation_probability=int(hourly["precipitation_probability"][index]),
|
||||
)
|
||||
)
|
||||
return forecasts
|
||||
|
||||
|
||||
def condition(code: int) -> str:
|
||||
return WEATHER_TEXT.get(code, "Mixed")
|
||||
|
||||
|
||||
def font(path: str, size: int) -> ImageFont.FreeTypeFont:
|
||||
return ImageFont.truetype(path, size=size)
|
||||
|
||||
|
||||
def draw_card(draw: ImageDraw.ImageDraw, y: int, forecast: Forecast) -> None:
|
||||
draw.rounded_rectangle((12, y, 158, y + 82), radius=12, fill="#183149", outline="#2e5875")
|
||||
draw.text((24, y + 9), forecast.target.strftime("%H:%M"), font=font(FONT_BOLD, 19), fill="#ffce67")
|
||||
draw.text((24, y + 34), f"{forecast.temperature:.1f} C", font=font(FONT_BOLD, 22), fill="#f5fbff")
|
||||
detail = f"{condition(forecast.weather_code)} {forecast.precipitation_probability}% rain"
|
||||
draw.text((24, y + 62), detail, font=font(FONT_REGULAR, 10), fill="#b7d5e8")
|
||||
|
||||
|
||||
def render(now: dt.datetime, forecasts: Iterable[Forecast]) -> Image.Image:
|
||||
image = Image.new("RGB", (PORTRAIT_WIDTH, PORTRAIT_HEIGHT), "#071a29")
|
||||
draw = ImageDraw.Draw(image)
|
||||
draw.rectangle((0, 0, PORTRAIT_WIDTH, 8), fill="#4cb8c4")
|
||||
draw.text((15, 18), now.strftime("%H:%M"), font=font(FONT_BOLD, 45), fill="#f5fbff")
|
||||
draw.text((17, 73), "MANNHEIM", font=font(FONT_BOLD, 18), fill="#ffce67")
|
||||
draw.text((18, 98), now.strftime("%a %d %b"), font=font(FONT_REGULAR, 15), fill="#b7d5e8")
|
||||
draw.text((17, 128), "TOMORROW", font=font(FONT_BOLD, 11), fill="#76d2d9")
|
||||
for y, forecast in zip((148, 239), forecasts):
|
||||
draw_card(draw, y, forecast)
|
||||
return image.rotate(90, expand=True)
|
||||
|
||||
|
||||
class Panel:
|
||||
def __init__(self) -> None:
|
||||
self.device: hid.device | None = None
|
||||
|
||||
def close(self) -> None:
|
||||
if self.device is not None:
|
||||
self.device.close()
|
||||
self.device = None
|
||||
|
||||
def connect(self) -> hid.device:
|
||||
self.close()
|
||||
device = hid.device()
|
||||
device.open_path(HID_PATH)
|
||||
self.device = device
|
||||
set_portrait(device)
|
||||
time.sleep(1.7)
|
||||
logging.info("connected to MP90 front panel")
|
||||
return device
|
||||
|
||||
|
||||
def run(once: bool) -> None:
|
||||
panel = Panel()
|
||||
forecasts: list[Forecast] = []
|
||||
weather_due = 0.0
|
||||
redraw_due = 0.0
|
||||
stop = False
|
||||
|
||||
def request_stop(_signum: int, _frame: object) -> None:
|
||||
nonlocal stop
|
||||
stop = True
|
||||
|
||||
signal.signal(signal.SIGTERM, request_stop)
|
||||
signal.signal(signal.SIGINT, request_stop)
|
||||
|
||||
try:
|
||||
while not stop:
|
||||
now = dt.datetime.now(TZ)
|
||||
monotonic = time.monotonic()
|
||||
try:
|
||||
if monotonic >= weather_due or not forecasts:
|
||||
forecasts = fetch_forecasts(now)
|
||||
weather_due = monotonic + 15 * 60
|
||||
logging.info("updated Mannheim weather forecast")
|
||||
|
||||
device = panel.device or panel.connect()
|
||||
if monotonic >= redraw_due:
|
||||
redraw(device, render(now, forecasts))
|
||||
redraw_due = monotonic + 60
|
||||
logging.info("redrew clock at %s", now.strftime("%H:%M:%S"))
|
||||
if once:
|
||||
return
|
||||
time.sleep(1.7)
|
||||
|
||||
heartbeat(device, dt.datetime.now(TZ))
|
||||
time.sleep(1.7)
|
||||
except Exception:
|
||||
logging.exception("panel update failed; reconnecting")
|
||||
panel.close()
|
||||
time.sleep(3)
|
||||
finally:
|
||||
panel.close()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--once", action="store_true", help="render one frame and exit")
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
run(args.once)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user