Files
DD3-Lora-Bridge-Raspi-Debugger/src/network_manager.py

298 lines
11 KiB
Python

import re
import shlex
import socket
import subprocess
import time
from typing import Dict, List, Optional, Tuple
from app_state import AppState
class NetworkManager:
def __init__(
self,
state: AppState,
interface: str = "wlan0",
hostapd_unit: str = "serial-hostapd.service",
dnsmasq_unit: str = "serial-dnsmasq.service",
ap_cidr: str = "192.168.4.1/24",
) -> None:
self.state = state
self.interface = interface
self.hostapd_unit = hostapd_unit
self.dnsmasq_unit = dnsmasq_unit
self.ap_cidr = ap_cidr
def _run(
self,
args: List[str],
timeout: int = 12,
check: bool = False,
) -> subprocess.CompletedProcess:
return subprocess.run(
args,
capture_output=True,
text=True,
timeout=timeout,
check=check,
)
def _run_quiet(self, args: List[str], timeout: int = 12) -> bool:
try:
proc = self._run(args, timeout=timeout, check=False)
return proc.returncode == 0
except Exception:
return False
def _wpa_status(self) -> Dict[str, str]:
try:
proc = self._run(["wpa_cli", "-i", self.interface, "status"], timeout=8)
if proc.returncode != 0:
return {}
status: Dict[str, str] = {}
for line in proc.stdout.splitlines():
if "=" not in line:
continue
key, value = line.strip().split("=", 1)
status[key] = value
return status
except Exception:
return {}
def is_wifi_connected(self) -> bool:
status = self._wpa_status()
if status.get("wpa_state") != "COMPLETED":
return False
if status.get("ip_address"):
return True
return bool(self.get_ipv4_address())
def get_ipv4_address(self) -> Optional[str]:
try:
proc = self._run(["ip", "-4", "addr", "show", "dev", self.interface], timeout=5)
if proc.returncode != 0:
return None
match = re.search(r"inet\s+(\d+\.\d+\.\d+\.\d+)/", proc.stdout)
return match.group(1) if match else None
except Exception:
return None
def has_default_route(self) -> bool:
try:
proc = self._run(["ip", "route", "show", "default"], timeout=5)
if proc.returncode != 0:
return False
return bool(proc.stdout.strip())
except Exception:
return False
def has_internet(self) -> bool:
if not self.is_wifi_connected():
return False
if not self.has_default_route():
return False
try:
with socket.create_connection(("1.1.1.1", 53), timeout=2):
pass
except OSError:
return False
try:
socket.gethostbyname("pool.ntp.org")
except OSError:
return False
return True
def _service_action(self, action: str, unit: str) -> bool:
return self._run_quiet(["systemctl", action, unit], timeout=20)
def _stop_wpa_services(self) -> None:
self._service_action("stop", f"wpa_supplicant@{self.interface}.service")
self._service_action("stop", "wpa_supplicant.service")
def _start_wpa_services(self) -> None:
if not self._service_action("start", f"wpa_supplicant@{self.interface}.service"):
self._service_action("start", "wpa_supplicant.service")
def start_ap(self) -> bool:
with self.state.lock:
if self.state.ap_mode:
return True
self._run_quiet(["rfkill", "unblock", "wlan"], timeout=6)
self._service_action("stop", "dhcpcd.service")
self._stop_wpa_services()
self._run_quiet(["ip", "link", "set", self.interface, "down"], timeout=6)
self._run_quiet(["ip", "addr", "flush", "dev", self.interface], timeout=6)
self._run_quiet(["ip", "addr", "add", self.ap_cidr, "dev", self.interface], timeout=6)
self._run_quiet(["ip", "link", "set", self.interface, "up"], timeout=6)
ok_hostapd = self._service_action("start", self.hostapd_unit)
ok_dnsmasq = self._service_action("start", self.dnsmasq_unit)
if ok_hostapd and ok_dnsmasq:
with self.state.lock:
self.state.ap_mode = True
self.state.update_status("AP mode active", "")
return True
self.state.update_status("AP start failed", "hostapd/dnsmasq could not be started")
return False
def stop_ap(self) -> bool:
self._service_action("stop", self.dnsmasq_unit)
self._service_action("stop", self.hostapd_unit)
self._run_quiet(["ip", "link", "set", self.interface, "down"], timeout=6)
self._run_quiet(["ip", "addr", "flush", "dev", self.interface], timeout=6)
self._run_quiet(["ip", "link", "set", self.interface, "up"], timeout=6)
self._start_wpa_services()
self._service_action("restart", "dhcpcd.service")
with self.state.lock:
self.state.ap_mode = False
self.state.update_status("Client mode active", "")
return True
def _parse_scan_results(self, output: str) -> List[Tuple[str, int]]:
results: Dict[str, int] = {}
for line in output.splitlines()[1:]:
parts = line.split("\t")
if len(parts) < 5:
continue
ssid = parts[4].strip()
if not ssid:
continue
try:
signal = int(parts[2])
except ValueError:
signal = -100
if ssid not in results or signal > results[ssid]:
results[ssid] = signal
sorted_items = sorted(results.items(), key=lambda x: x[1], reverse=True)
return sorted_items
def _scan_with_wpa_cli(self) -> List[str]:
cmd_scan = ["wpa_cli", "-i", self.interface, "scan"]
proc = self._run(cmd_scan, timeout=15)
if proc.returncode != 0 or "OK" not in proc.stdout:
return []
for _ in range(8):
time.sleep(1)
proc_results = self._run(["wpa_cli", "-i", self.interface, "scan_results"], timeout=10)
if proc_results.returncode == 0 and len(proc_results.stdout.splitlines()) > 1:
parsed = self._parse_scan_results(proc_results.stdout)
if parsed:
return [ssid for ssid, _signal in parsed]
return []
def _scan_with_iw(self) -> List[str]:
try:
proc = self._run(["iw", "dev", self.interface, "scan", "ap-force"], timeout=20)
except Exception:
return []
if proc.returncode != 0:
return []
ssids: List[str] = []
seen = set()
for line in proc.stdout.splitlines():
line = line.strip()
if line.startswith("SSID: "):
ssid = line.replace("SSID: ", "", 1).strip()
if ssid and ssid not in seen:
seen.add(ssid)
ssids.append(ssid)
return ssids
def scan_networks(self) -> List[str]:
ssids = self._scan_with_wpa_cli()
if not ssids:
ssids = self._scan_with_iw()
if ssids:
self.state.set_known_ssids(ssids)
self.state.update_status(f"Scan found {len(ssids)} network(s)", "")
return ssids
snapshot = self.state.snapshot()
cached = snapshot.get("known_ssids", [])
self.state.update_status("Scan failed, returning cached list", "No fresh scan results")
return list(cached)
def connect_to_wifi(self, ssid: str, password: str, timeout: int = 50) -> Tuple[bool, str]:
if not ssid:
return False, "SSID is required"
with self.state.lock:
self.state.connecting = True
self.state.status_message = f"Connecting to {ssid}"
self.state.last_error = ""
self.state.ap_grace_until = time.time() + 90
try:
self.stop_ap()
self._start_wpa_services()
self._service_action("restart", "dhcpcd.service")
add_proc = self._run(["wpa_cli", "-i", self.interface, "add_network"], timeout=10)
if add_proc.returncode != 0:
return False, "add_network failed"
network_id = add_proc.stdout.strip().splitlines()[-1].strip()
if not network_id.isdigit():
return False, f"invalid network id: {network_id}"
commands = [
["wpa_cli", "-i", self.interface, "set_network", network_id, "ssid", f'"{ssid}"'],
]
if password:
commands.extend(
[
["wpa_cli", "-i", self.interface, "set_network", network_id, "psk", f'"{password}"'],
["wpa_cli", "-i", self.interface, "set_network", network_id, "key_mgmt", "WPA-PSK"],
]
)
else:
commands.append(["wpa_cli", "-i", self.interface, "set_network", network_id, "key_mgmt", "NONE"])
commands.extend(
[
["wpa_cli", "-i", self.interface, "set_network", network_id, "scan_ssid", "1"],
["wpa_cli", "-i", self.interface, "enable_network", network_id],
["wpa_cli", "-i", self.interface, "select_network", network_id],
["wpa_cli", "-i", self.interface, "save_config"],
["wpa_cli", "-i", self.interface, "reconfigure"],
]
)
for cmd in commands:
proc = self._run(cmd, timeout=10)
if proc.returncode != 0 or "FAIL" in proc.stdout:
joined = " ".join(shlex.quote(c) for c in cmd)
return False, f"Command failed: {joined}"
deadline = time.time() + timeout
while time.time() < deadline:
if self.is_wifi_connected():
self.state.update_status(f"Connected to {ssid}", "")
return True, "Connected"
time.sleep(2)
return False, "Timeout waiting for Wi-Fi association"
except Exception as exc:
return False, f"Connect error: {exc}"
finally:
with self.state.lock:
self.state.connecting = False
def refresh_state(self) -> None:
wifi = self.is_wifi_connected()
internet = self.has_internet() if wifi else False
with self.state.lock:
self.state.wifi_connected = wifi
self.state.internet_available = internet