Files
DD3-Lora-Bridge-Raspi-Debugger/src/rtc_sync.py

83 lines
3.4 KiB
Python

import os
import subprocess
from datetime import datetime, timezone
from typing import Optional, Tuple
import ntplib
from app_state import AppState
class RTCAndNTPManager:
def __init__(self, state: AppState, rtc_device: str = "/dev/rtc0", ntp_server: str = "pool.ntp.org") -> None:
self.state = state
self.rtc_device = rtc_device
self.ntp_server = ntp_server
self._resolved_rtc_device: Optional[str] = None
def _run(self, args, timeout: int = 12) -> subprocess.CompletedProcess:
return subprocess.run(args, capture_output=True, text=True, timeout=timeout, check=False)
def _resolve_rtc_device(self) -> Optional[str]:
if self._resolved_rtc_device and os.path.exists(self._resolved_rtc_device):
return self._resolved_rtc_device
candidates = [self.rtc_device, "/dev/rtc", "/dev/rtc0", "/dev/rtc1"]
for candidate in candidates:
if os.path.exists(candidate):
self._resolved_rtc_device = candidate
return candidate
return None
def rtc_available(self) -> bool:
return self._resolve_rtc_device() is not None
def sync_from_rtc(self) -> Tuple[bool, str]:
rtc_device = self._resolve_rtc_device()
if rtc_device is None:
return False, f"RTC not found (checked: {self.rtc_device}, /dev/rtc, /dev/rtc0, /dev/rtc1)"
proc = self._run(["hwclock", "-s", "--utc", "--rtc", rtc_device], timeout=10)
if proc.returncode == 0:
self.state.update_status(f"System time loaded from {rtc_device}", "")
return True, f"{rtc_device} -> system time ok"
return False, (proc.stderr or proc.stdout or "hwclock -s failed").strip()
def sync_ntp_to_system(self, timeout: int = 6) -> Tuple[bool, str]:
try:
client = ntplib.NTPClient()
response = client.request(self.ntp_server, version=3, timeout=timeout)
ts = float(response.tx_time)
dt_utc = datetime.fromtimestamp(ts, tz=timezone.utc)
iso_utc = dt_utc.strftime("%Y-%m-%d %H:%M:%S")
proc = self._run(["date", "-u", "-s", iso_utc], timeout=10)
if proc.returncode != 0:
return False, (proc.stderr or proc.stdout or "date -s failed").strip()
self.state.update_status("System time synced via NTP", "")
return True, f"NTP sync ok ({iso_utc} UTC)"
except Exception as exc:
return False, f"NTP sync failed: {exc}"
def write_system_time_to_rtc(self) -> Tuple[bool, str]:
rtc_device = self._resolve_rtc_device()
if rtc_device is None:
return False, f"RTC not found (checked: {self.rtc_device}, /dev/rtc, /dev/rtc0, /dev/rtc1)"
proc = self._run(["hwclock", "-w", "--utc", "--rtc", rtc_device], timeout=10)
if proc.returncode == 0:
self.state.update_status(f"{rtc_device} updated from system time", "")
return True, f"system -> {rtc_device} ok"
return False, (proc.stderr or proc.stdout or "hwclock -w failed").strip()
def sync_ntp_and_rtc(self) -> Tuple[bool, str]:
ok_ntp, msg_ntp = self.sync_ntp_to_system()
if not ok_ntp:
return False, msg_ntp
ok_rtc, msg_rtc = self.write_system_time_to_rtc()
if not ok_rtc:
return False, f"NTP ok, RTC write failed: {msg_rtc}"
return True, "NTP + RTC sync successful"