import re import shlex import socket import subprocess import time from typing import Dict, List, Optional, Tuple from app_state import AppState class NetworkManager: def __init__( self, state: AppState, interface: str = "wlan0", hostapd_unit: str = "serial-hostapd.service", dnsmasq_unit: str = "serial-dnsmasq.service", ap_cidr: str = "192.168.4.1/24", ) -> None: self.state = state self.interface = interface self.hostapd_unit = hostapd_unit self.dnsmasq_unit = dnsmasq_unit self.ap_cidr = ap_cidr def _run( self, args: List[str], timeout: int = 12, check: bool = False, ) -> subprocess.CompletedProcess: return subprocess.run( args, capture_output=True, text=True, timeout=timeout, check=check, ) def _run_quiet(self, args: List[str], timeout: int = 12) -> bool: try: proc = self._run(args, timeout=timeout, check=False) return proc.returncode == 0 except Exception: return False def _wpa_status(self) -> Dict[str, str]: try: proc = self._run(["wpa_cli", "-i", self.interface, "status"], timeout=8) if proc.returncode != 0: return {} status: Dict[str, str] = {} for line in proc.stdout.splitlines(): if "=" not in line: continue key, value = line.strip().split("=", 1) status[key] = value return status except Exception: return {} def is_wifi_connected(self) -> bool: status = self._wpa_status() if status.get("wpa_state") != "COMPLETED": return False if status.get("ip_address"): return True return bool(self.get_ipv4_address()) def get_ipv4_address(self) -> Optional[str]: try: proc = self._run(["ip", "-4", "addr", "show", "dev", self.interface], timeout=5) if proc.returncode != 0: return None match = re.search(r"inet\s+(\d+\.\d+\.\d+\.\d+)/", proc.stdout) return match.group(1) if match else None except Exception: return None def has_default_route(self) -> bool: try: proc = self._run(["ip", "route", "show", "default"], timeout=5) if proc.returncode != 0: return False return bool(proc.stdout.strip()) except Exception: return False def has_internet(self) -> bool: if not self.is_wifi_connected(): return False if not self.has_default_route(): return False try: with socket.create_connection(("1.1.1.1", 53), timeout=2): pass except OSError: return False try: socket.gethostbyname("pool.ntp.org") except OSError: return False return True def _service_action(self, action: str, unit: str) -> bool: return self._run_quiet(["systemctl", action, unit], timeout=20) def _stop_wpa_services(self) -> None: self._service_action("stop", f"wpa_supplicant@{self.interface}.service") self._service_action("stop", "wpa_supplicant.service") def _start_wpa_services(self) -> None: if not self._service_action("start", f"wpa_supplicant@{self.interface}.service"): self._service_action("start", "wpa_supplicant.service") def start_ap(self) -> bool: with self.state.lock: if self.state.ap_mode: return True self._run_quiet(["rfkill", "unblock", "wlan"], timeout=6) self._service_action("stop", "dhcpcd.service") self._stop_wpa_services() self._run_quiet(["ip", "link", "set", self.interface, "down"], timeout=6) self._run_quiet(["ip", "addr", "flush", "dev", self.interface], timeout=6) self._run_quiet(["ip", "addr", "add", self.ap_cidr, "dev", self.interface], timeout=6) self._run_quiet(["ip", "link", "set", self.interface, "up"], timeout=6) ok_hostapd = self._service_action("start", self.hostapd_unit) ok_dnsmasq = self._service_action("start", self.dnsmasq_unit) if ok_hostapd and ok_dnsmasq: with self.state.lock: self.state.ap_mode = True self.state.update_status("AP mode active", "") return True self.state.update_status("AP start failed", "hostapd/dnsmasq could not be started") return False def stop_ap(self) -> bool: self._service_action("stop", self.dnsmasq_unit) self._service_action("stop", self.hostapd_unit) self._run_quiet(["ip", "link", "set", self.interface, "down"], timeout=6) self._run_quiet(["ip", "addr", "flush", "dev", self.interface], timeout=6) self._run_quiet(["ip", "link", "set", self.interface, "up"], timeout=6) self._start_wpa_services() self._service_action("restart", "dhcpcd.service") with self.state.lock: self.state.ap_mode = False self.state.update_status("Client mode active", "") return True def _parse_scan_results(self, output: str) -> List[Tuple[str, int]]: results: Dict[str, int] = {} for line in output.splitlines()[1:]: parts = line.split("\t") if len(parts) < 5: continue ssid = parts[4].strip() if not ssid: continue try: signal = int(parts[2]) except ValueError: signal = -100 if ssid not in results or signal > results[ssid]: results[ssid] = signal sorted_items = sorted(results.items(), key=lambda x: x[1], reverse=True) return sorted_items def _scan_with_wpa_cli(self) -> List[str]: try: cmd_scan = ["wpa_cli", "-i", self.interface, "scan"] proc = self._run(cmd_scan, timeout=15) if proc.returncode != 0 or "OK" not in proc.stdout: return [] for _ in range(8): time.sleep(1) proc_results = self._run(["wpa_cli", "-i", self.interface, "scan_results"], timeout=10) if proc_results.returncode == 0 and len(proc_results.stdout.splitlines()) > 1: parsed = self._parse_scan_results(proc_results.stdout) if parsed: return [ssid for ssid, _signal in parsed] except Exception: return [] return [] def _scan_with_iw(self) -> List[str]: try: proc = self._run(["iw", "dev", self.interface, "scan", "ap-force"], timeout=20) except Exception: return [] if proc.returncode != 0: return [] ssids: List[str] = [] seen = set() for line in proc.stdout.splitlines(): line = line.strip() if line.startswith("SSID: "): ssid = line.replace("SSID: ", "", 1).strip() if ssid and ssid not in seen: seen.add(ssid) ssids.append(ssid) return ssids def scan_networks(self) -> List[str]: try: ssids = self._scan_with_wpa_cli() if not ssids: ssids = self._scan_with_iw() except Exception: ssids = [] if ssids: self.state.set_known_ssids(ssids) self.state.update_status(f"Scan found {len(ssids)} network(s)", "") return ssids snapshot = self.state.snapshot() cached = snapshot.get("known_ssids", []) self.state.update_status("Scan failed, returning cached list", "No fresh scan results") return list(cached) def connect_to_wifi(self, ssid: str, password: str, timeout: int = 50) -> Tuple[bool, str]: if not ssid: return False, "SSID is required" with self.state.lock: self.state.connecting = True self.state.status_message = f"Connecting to {ssid}" self.state.last_error = "" self.state.ap_grace_until = time.time() + 90 try: self.stop_ap() self._start_wpa_services() self._service_action("restart", "dhcpcd.service") add_proc = self._run(["wpa_cli", "-i", self.interface, "add_network"], timeout=10) if add_proc.returncode != 0: return False, "add_network failed" network_id = add_proc.stdout.strip().splitlines()[-1].strip() if not network_id.isdigit(): return False, f"invalid network id: {network_id}" commands = [ ["wpa_cli", "-i", self.interface, "set_network", network_id, "ssid", f'"{ssid}"'], ] if password: commands.extend( [ ["wpa_cli", "-i", self.interface, "set_network", network_id, "psk", f'"{password}"'], ["wpa_cli", "-i", self.interface, "set_network", network_id, "key_mgmt", "WPA-PSK"], ] ) else: commands.append(["wpa_cli", "-i", self.interface, "set_network", network_id, "key_mgmt", "NONE"]) commands.extend( [ ["wpa_cli", "-i", self.interface, "set_network", network_id, "scan_ssid", "1"], ["wpa_cli", "-i", self.interface, "enable_network", network_id], ["wpa_cli", "-i", self.interface, "select_network", network_id], ["wpa_cli", "-i", self.interface, "save_config"], ["wpa_cli", "-i", self.interface, "reconfigure"], ] ) for cmd in commands: proc = self._run(cmd, timeout=10) if proc.returncode != 0 or "FAIL" in proc.stdout: joined = " ".join(shlex.quote(c) for c in cmd) return False, f"Command failed: {joined}" deadline = time.time() + timeout while time.time() < deadline: if self.is_wifi_connected(): self.state.update_status(f"Connected to {ssid}", "") return True, "Connected" time.sleep(2) return False, "Timeout waiting for Wi-Fi association" except Exception as exc: return False, f"Connect error: {exc}" finally: with self.state.lock: self.state.connecting = False def refresh_state(self) -> None: wifi = self.is_wifi_connected() internet = self.has_internet() if wifi else False with self.state.lock: self.state.wifi_connected = wifi self.state.internet_available = internet