use crate::bail; use crate::config::{NetworkConfig, PlantControllerConfig}; use crate::hal::{get_next_slot, PLANT_COUNT, TIME_ACCESS}; use crate::log::{LogMessage, LOG_ACCESS}; use chrono::{DateTime, Utc}; use serde::Serialize; use crate::fat_error::{ContextExt, FatError, FatResult}; use crate::hal::little_fs2storage_adapter::LittleFs2Filesystem; use alloc::string::ToString; use alloc::sync::Arc; use alloc::{format, string::String, vec, vec::Vec}; use core::net::{IpAddr, Ipv4Addr, SocketAddr}; use core::str::FromStr; use core::sync::atomic::Ordering; use embassy_executor::Spawner; use embassy_net::udp::UdpSocket; use embassy_net::{DhcpConfig, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::mutex::{Mutex, MutexGuard}; use embassy_sync::once_lock::OnceLock; use embassy_time::{Duration, Timer}; use embedded_storage::nor_flash::{check_erase, NorFlash, ReadNorFlash}; use esp_bootloader_esp_idf::ota::OtaImageState::Valid; use esp_bootloader_esp_idf::ota::{Ota, OtaImageState}; use esp_bootloader_esp_idf::partitions::FlashRegion; use esp_hal::gpio::{Input, RtcPinWithResistors}; use esp_hal::rng::Rng; use esp_hal::rtc_cntl::{ sleep::{TimerWakeupSource, WakeupLevel}, Rtc, }; use esp_hal::system::software_reset; use esp_println::println; use esp_storage::FlashStorage; use esp_wifi::wifi::{ AccessPointConfiguration, AccessPointInfo, AuthMethod, ClientConfiguration, Configuration, ScanConfig, ScanTypeConfig, WifiController, WifiDevice, WifiState, }; use littlefs2::fs::Filesystem; use littlefs2_core::{FileType, PathBuf, SeekFrom}; use log::{info, warn}; use mcutie::{ Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, QoS, Topic, }; use portable_atomic::AtomicBool; use smoltcp::socket::udp::PacketMetadata; use smoltcp::wire::DnsQueryType; use sntpc::{get_time, NtpContext, NtpTimestampGenerator}; #[esp_hal::ram(rtc_fast, persistent)] static mut LAST_WATERING_TIMESTAMP: [i64; PLANT_COUNT] = [0; PLANT_COUNT]; #[esp_hal::ram(rtc_fast, persistent)] static mut CONSECUTIVE_WATERING_PLANT: [u32; PLANT_COUNT] = [0; PLANT_COUNT]; #[esp_hal::ram(rtc_fast, persistent)] static mut LOW_VOLTAGE_DETECTED: i8 = 0; #[esp_hal::ram(rtc_fast, persistent)] static mut RESTART_TO_CONF: i8 = 0; const CONFIG_FILE: &str = "config.json"; const NTP_SERVER: &str = "pool.ntp.org"; static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false); static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false); pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false); static MQTT_BASE_TOPIC: OnceLock = OnceLock::new(); #[derive(Serialize, Debug)] pub struct FileInfo { filename: String, size: usize, } #[derive(Serialize, Debug)] pub struct FileList { total: usize, used: usize, files: Vec, } #[derive(Copy, Clone, Default)] struct Timestamp { stamp: DateTime, } // Minimal esp-idf equivalent for gpio_hold on esp32c6 via ROM functions extern "C" { fn gpio_pad_hold(gpio_num: u32); fn gpio_pad_unhold(gpio_num: u32); } #[inline(always)] pub fn hold_enable(gpio_num: u8) { unsafe { gpio_pad_hold(gpio_num as u32) } } #[inline(always)] pub fn hold_disable(gpio_num: u8) { unsafe { gpio_pad_unhold(gpio_num as u32) } } impl NtpTimestampGenerator for Timestamp { fn init(&mut self) { self.stamp = DateTime::default(); } fn timestamp_sec(&self) -> u64 { self.stamp.timestamp() as u64 } fn timestamp_subsec_micros(&self) -> u32 { self.stamp.timestamp_subsec_micros() } } pub struct Esp<'a> { pub fs: Arc>>, pub rng: Rng, //first starter (ap or sta will take these) pub interface_sta: Option>, pub interface_ap: Option>, pub controller: Arc>>, pub boot_button: Input<'a>, // RTC-capable GPIO used as external wake source (store the raw peripheral) pub wake_gpio1: esp_hal::peripherals::GPIO1<'static>, pub ota: Ota<'static, FlashStorage>, pub ota_next: &'static mut FlashRegion<'static, FlashStorage>, } // SAFETY: On this target we never move Esp across OS threads; the firmware runs single-core // cooperative tasks with Embassy. All interior mutability of non-Send peripherals is gated // behind &mut self or embassy_sync Mutex with CriticalSectionRawMutex, which does not rely on // thread scheduling. Therefore it is sound to mark Esp as Send to satisfy trait object bounds // (e.g., Box). If you add fields that are accessed from multiple // CPU cores/threads, reconsider this. unsafe impl Send for Esp<'_> {} macro_rules! mk_static { ($t:ty,$val:expr) => {{ static STATIC_CELL: static_cell::StaticCell<$t> = static_cell::StaticCell::new(); #[deny(unused_attributes)] let x = STATIC_CELL.uninit().write(($val)); x }}; } impl Esp<'_> { pub(crate) async fn delete_file(&self, filename: String) -> FatResult<()> { let file = PathBuf::try_from(filename.as_str())?; let access = self.fs.lock().await; access.remove(&*file)?; Ok(()) } pub(crate) async fn write_file( &mut self, filename: String, offset: u32, buf: &[u8], ) -> Result<(), FatError> { let file = PathBuf::try_from(filename.as_str())?; let access = self.fs.lock().await; access.open_file_with_options_and_then( |options| options.read(true).write(true).create(true), &*file, |file| { file.seek(SeekFrom::Start(offset))?; file.write(buf)?; Ok(()) }, )?; Ok(()) } pub async fn get_size(&mut self, filename: String) -> FatResult { let file = PathBuf::try_from(filename.as_str())?; let access = self.fs.lock().await; let data = access.metadata(&*file)?; Ok(data.len()) } pub(crate) async fn get_file( &mut self, filename: String, chunk: u32, ) -> FatResult<([u8; 512], usize)> { use littlefs2::io::Error as lfs2Error; let file = PathBuf::try_from(filename.as_str())?; let access = self.fs.lock().await; let mut buf = [0_u8; 512]; let mut read = 0; let offset = chunk * buf.len() as u32; access.open_file_with_options_and_then( |options| options.read(true), &*file, |file| { let length = file.len()? as u32; if length == 0 { Err(lfs2Error::IO) } else if length > offset { file.seek(SeekFrom::Start(offset))?; read = file.read(&mut buf)?; Ok(()) } else { //exactly at end, do nothing Ok(()) } }, )?; Ok((buf, read)) } pub(crate) fn get_current_ota_slot(&mut self) -> String { match get_next_slot(&mut self.ota) { Ok(slot) => { format!("{:?}", slot.next()) } Err(err) => { format!("{:?}", err) } } } pub(crate) fn get_ota_state(&mut self) -> String { match self.ota.current_ota_state() { Ok(state) => { format!("{:?}", state) } Err(err) => { format!("{:?}", err) } } } pub(crate) async fn write_ota(&mut self, offset: u32, buf: &[u8]) -> Result<(), FatError> { if self.ota.current_ota_state() == Ok(OtaImageState::Invalid) { bail!("Invalid OTA state, refusing ota write") } if self.ota.current_ota_state() == Ok(OtaImageState::Undefined) { bail!("Invalid OTA state, refusing ota write") } let _ = check_erase(self.ota_next, offset, offset + 4096); self.ota_next.erase(offset, offset + 4096)?; let mut temp = vec![0; buf.len()]; let read_back = temp.as_mut_slice(); //change to nor flash, align writes! self.ota_next.write(offset, buf)?; self.ota_next.read(offset, read_back)?; if buf != read_back { info!("Expected {:?} but got {:?}", buf, read_back); bail!( "Flash error, read back does not match write buffer at offset {:x}", offset ) } Ok(()) } pub(crate) async fn finalize_ota(&mut self) -> Result<(), FatError> { if self.ota.current_ota_state() == Ok(OtaImageState::Invalid) { bail!("Invalid OTA state, refusing ota write") } if self.ota.current_ota_state() == Ok(OtaImageState::Undefined) { bail!("Invalid OTA state, refusing ota write") } let current_state = self.ota.current_ota_state()?; info!("current state {:?}", current_state); let next_slot = get_next_slot(&mut self.ota)?; info!("current slot {:?}", next_slot.next()); if current_state == OtaImageState::PendingVerify { info!("verifying ota image from pending"); self.ota.set_current_ota_state(Valid)?; } self.ota.set_current_slot(next_slot)?; info!("switched slot"); self.ota.set_current_ota_state(OtaImageState::New)?; info!("switched state for new partition"); let state_new = self.ota.current_ota_state()?; info!("state on new partition now {:?}", state_new); //determine nextslot crc self.set_restart_to_conf(true); Ok(()) } pub(crate) fn mode_override_pressed(&mut self) -> bool { self.boot_button.is_low() } pub(crate) async fn sntp( &mut self, _max_wait_ms: u32, stack: Stack<'_>, ) -> FatResult> { println!("start sntp"); let mut rx_meta = [PacketMetadata::EMPTY; 16]; let mut rx_buffer = [0; 4096]; let mut tx_meta = [PacketMetadata::EMPTY; 16]; let mut tx_buffer = [0; 4096]; let mut socket = UdpSocket::new( stack, &mut rx_meta, &mut rx_buffer, &mut tx_meta, &mut tx_buffer, ); socket.bind(123).unwrap(); let context = NtpContext::new(Timestamp::default()); let ntp_addrs = stack .dns_query(NTP_SERVER, DnsQueryType::A) .await .expect("Failed to resolve DNS"); if ntp_addrs.is_empty() { bail!("Failed to resolve DNS"); } info!("NTP server: {:?}", ntp_addrs); let mut counter = 0; loop { let addr: IpAddr = ntp_addrs[0].into(); let result = get_time(SocketAddr::from((addr, 123)), &socket, context).await; match result { Ok(time) => { info!("Time: {:?}", time); return DateTime::from_timestamp(time.seconds as i64, 0) .context("Could not convert Sntp result"); } Err(e) => { warn!("Error: {:?}", e); counter += 1; if counter > 10 { bail!("Failed to get time from NTP server"); } Timer::after(Duration::from_millis(100)).await; } } } } pub(crate) async fn wifi_scan(&mut self) -> FatResult> { info!("start wifi scan"); let mut lock = self.controller.try_lock()?; info!("start wifi scan lock"); let scan_config = ScanConfig { ssid: None, bssid: None, channel: None, show_hidden: false, scan_type: ScanTypeConfig::Active { min: Default::default(), max: Default::default(), }, }; let rv = lock.scan_with_config_async(scan_config).await?; info!("end wifi scan lock"); Ok(rv) } pub(crate) fn last_pump_time(&self, plant: usize) -> Option> { let ts = unsafe { LAST_WATERING_TIMESTAMP }[plant]; DateTime::from_timestamp_millis(ts) } pub(crate) fn store_last_pump_time(&mut self, plant: usize, time: DateTime) { unsafe { LAST_WATERING_TIMESTAMP[plant] = time.timestamp_millis(); } } pub(crate) fn set_low_voltage_in_cycle(&mut self) { unsafe { LOW_VOLTAGE_DETECTED = 1; } } pub(crate) fn clear_low_voltage_in_cycle(&mut self) { unsafe { LOW_VOLTAGE_DETECTED = 0; } } pub(crate) fn low_voltage_in_cycle(&mut self) -> bool { unsafe { LOW_VOLTAGE_DETECTED == 1 } } pub(crate) fn store_consecutive_pump_count(&mut self, plant: usize, count: u32) { unsafe { CONSECUTIVE_WATERING_PLANT[plant] = count; } } pub(crate) fn consecutive_pump_count(&mut self, plant: usize) -> u32 { unsafe { CONSECUTIVE_WATERING_PLANT[plant] } } pub(crate) fn get_restart_to_conf(&mut self) -> bool { unsafe { RESTART_TO_CONF == 1 } } pub(crate) fn set_restart_to_conf(&mut self, to_conf: bool) { unsafe { if to_conf { RESTART_TO_CONF = 1; } else { RESTART_TO_CONF = 0; } } } pub(crate) async fn wifi_ap(&mut self) -> FatResult> { let ssid = match self.load_config().await { Ok(config) => config.network.ap_ssid.as_str().to_string(), Err(_) => "PlantCtrl Emergency Mode".to_string(), }; let spawner = Spawner::for_current_executor().await; let device = self.interface_ap.take().unwrap(); let gw_ip_addr_str = "192.168.71.1"; let gw_ip_addr = Ipv4Addr::from_str(gw_ip_addr_str).expect("failed to parse gateway ip"); let config = embassy_net::Config::ipv4_static(StaticConfigV4 { address: Ipv4Cidr::new(gw_ip_addr, 24), gateway: Some(gw_ip_addr), dns_servers: Default::default(), }); let seed = (self.rng.random() as u64) << 32 | self.rng.random() as u64; println!("init secondary stack"); // Init network stack let (stack, runner) = embassy_net::new( device, config, mk_static!(StackResources<4>, StackResources::<4>::new()), seed, ); let stack = mk_static!(Stack, stack); let client_config = Configuration::AccessPoint(AccessPointConfiguration { ssid: ssid.clone(), ..Default::default() }); self.controller .lock() .await .set_configuration(&client_config)?; println!("start new"); self.controller.lock().await.start()?; println!("start net task"); spawner.spawn(net_task(runner)).ok(); println!("run dhcp"); spawner.spawn(run_dhcp(stack.clone(), gw_ip_addr_str)).ok(); loop { if stack.is_link_up() { break; } Timer::after(Duration::from_millis(500)).await; } while !stack.is_config_up() { Timer::after(Duration::from_millis(100)).await } println!("Connect to the AP `${ssid}` and point your browser to http://{gw_ip_addr_str}/"); stack .config_v4() .inspect(|c| println!("ipv4 config: {c:?}")); Ok(stack.clone()) } pub(crate) async fn wifi( &mut self, network_config: &NetworkConfig, ) -> FatResult> { esp_wifi::wifi_set_log_verbose(); let ssid = network_config.ssid.clone(); match &ssid { Some(ssid) => { if ssid.is_empty() { bail!("Wifi ssid was empty") } } None => { bail!("Wifi ssid was empty") } } let ssid = ssid.unwrap().to_string(); info!("attempting to connect wifi {ssid}"); let password = match network_config.password { Some(ref password) => password.to_string(), None => "".to_string(), }; let max_wait = network_config.max_wait; let spawner = Spawner::for_current_executor().await; let device = self.interface_sta.take().unwrap(); let config = embassy_net::Config::dhcpv4(DhcpConfig::default()); let seed = (self.rng.random() as u64) << 32 | self.rng.random() as u64; // Init network stack let (stack, runner) = embassy_net::new( device, config, mk_static!(StackResources<8>, StackResources::<8>::new()), seed, ); let stack = mk_static!(Stack, stack); let client_config = Configuration::Client(ClientConfiguration { ssid, bssid: None, auth_method: AuthMethod::WPA2Personal, //FIXME read from config, fill via scan password, channel: None, }); self.controller .lock() .await .set_configuration(&client_config)?; spawner.spawn(net_task(runner)).ok(); self.controller.lock().await.start_async().await?; let timeout = { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } + max_wait as u64 * 1000; loop { let state = esp_wifi::wifi::sta_state(); match state { WifiState::StaStarted => { self.controller.lock().await.connect()?; break; } _ => {} } if { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } > timeout { bail!("Timeout waiting for wifi sta ready") } Timer::after(Duration::from_millis(500)).await; } let timeout = { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } + max_wait as u64 * 1000; loop { let state = esp_wifi::wifi::sta_state(); match state { WifiState::StaConnected => { break; } _ => {} } if { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } > timeout { bail!("Timeout waiting for wifi sta connected") } Timer::after(Duration::from_millis(500)).await; } let timeout = { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } + max_wait as u64 * 1000; while !stack.is_link_up() { if { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } > timeout { bail!("Timeout waiting for wifi link up") } Timer::after(Duration::from_millis(500)).await; } let timeout = { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } + max_wait as u64 * 1000; while !stack.is_config_up() { if { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } > timeout { bail!("Timeout waiting for wifi config up") } Timer::after(Duration::from_millis(100)).await } info!("Connected WIFI, dhcp: {:?}", stack.config_v4()); Ok(stack.clone()) } pub fn deep_sleep( &mut self, duration_in_ms: u64, mut rtc: MutexGuard, ) -> ! { // Configure and enter deep sleep using esp-hal. Also keep prior behavior where // duration_in_ms == 0 triggers an immediate reset. // Mark the current OTA image as valid if we reached here while in pending verify. if let Ok(cur) = self.ota.current_ota_state() { if cur == OtaImageState::PendingVerify { self.ota .set_current_ota_state(OtaImageState::Valid) .expect("Could not set image to valid"); } } if duration_in_ms == 0 { software_reset(); } else { ///let timer = TimerWakeupSource::new(core::time::Duration::from_millis(duration_in_ms)); let timer = TimerWakeupSource::new(core::time::Duration::from_millis(5000)); let mut wake_pins: [(&mut dyn RtcPinWithResistors, WakeupLevel); 1] = [(&mut self.wake_gpio1, WakeupLevel::Low)]; let ext1 = esp_hal::rtc_cntl::sleep::Ext1WakeupSource::new(&mut wake_pins); rtc.sleep_deep(&[&timer, &ext1]); } } pub(crate) async fn load_config(&mut self) -> FatResult { let cfg = PathBuf::try_from(CONFIG_FILE)?; let config_exist = self.fs.lock().await.exists(&cfg); if !config_exist { bail!("No config file stored") } let data = self.fs.lock().await.read::<4096>(&cfg)?; let config: PlantControllerConfig = serde_json::from_slice(&data)?; return Ok(config); } pub(crate) async fn save_config(&mut self, config: Vec) -> FatResult<()> { let filesystem = self.fs.lock().await; let cfg = PathBuf::try_from(CONFIG_FILE)?; filesystem.write(&cfg, &*config)?; Ok(()) } pub(crate) async fn list_files(&self) -> FatResult { let path = PathBuf::new(); let fs = self.fs.lock().await; let free_size = fs.available_space()?; let total_size = fs.total_space(); let mut result = FileList { total: total_size, used: total_size - free_size, files: Vec::new(), }; fs.read_dir_and_then(&path, |dir| { for entry in dir { let e = entry?; if e.file_type() == FileType::File { result.files.push(FileInfo { filename: e.path().to_string(), size: e.metadata().len(), }); } } Ok(()) })?; Ok(result) } pub(crate) async fn init_rtc_deepsleep_memory( &self, init_rtc_store: bool, to_config_mode: bool, ) { if init_rtc_store { unsafe { LAST_WATERING_TIMESTAMP = [0; PLANT_COUNT]; CONSECUTIVE_WATERING_PLANT = [0; PLANT_COUNT]; LOW_VOLTAGE_DETECTED = 0; if to_config_mode { RESTART_TO_CONF = 1 } else { RESTART_TO_CONF = 0; } }; } else { unsafe { if to_config_mode { RESTART_TO_CONF = 1; } LOG_ACCESS .lock() .await .log( LogMessage::RestartToConfig, RESTART_TO_CONF as u32, 0, "", "", ) .await; LOG_ACCESS .lock() .await .log( LogMessage::LowVoltage, LOW_VOLTAGE_DETECTED as u32, 0, "", "", ) .await; for i in 0..PLANT_COUNT { log::info!( "LAST_WATERING_TIMESTAMP[{}] = UTC {}", i, LAST_WATERING_TIMESTAMP[i] ); } for i in 0..PLANT_COUNT { log::info!( "CONSECUTIVE_WATERING_PLANT[{}] = {}", i, CONSECUTIVE_WATERING_PLANT[i] ); } } } } pub(crate) async fn mqtt( &mut self, network_config: &'static NetworkConfig, stack: Stack<'static>, ) -> FatResult<()> { let base_topic = network_config .base_topic .as_ref() .context("missing base topic")?; if base_topic.is_empty() { bail!("Mqtt base_topic was empty") } MQTT_BASE_TOPIC .init(base_topic.to_string()) .map_err(|_| FatError::String { error: "Error setting basetopic".to_string(), })?; let mqtt_url = network_config .mqtt_url .as_ref() .context("missing mqtt url")?; if mqtt_url.is_empty() { bail!("Mqtt url was empty") } let last_will_topic = format!("{}/state", base_topic); let round_trip_topic = format!("{}/internal/roundtrip", base_topic); let stay_alive_topic = format!("{}/stay_alive", base_topic); let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = McutieBuilder::new(stack, "plant ctrl", mqtt_url); if network_config.mqtt_user.is_some() && network_config.mqtt_password.is_some() { builder = builder.with_authentication( network_config.mqtt_user.as_ref().unwrap().as_str(), network_config.mqtt_password.as_ref().unwrap().as_str(), ); info!("With authentification"); } let lwt = Topic::General(last_will_topic); let lwt = mk_static!(Topic, lwt); let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); builder = builder.with_last_will(lwt); //TODO make configurable builder = builder.with_device_id("plantctrl"); let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder .with_subscriptions([ Topic::General(round_trip_topic.clone()), Topic::General(stay_alive_topic.clone()), ]); let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; let (receiver, task) = builder.build(keep_alive); let spawner = Spawner::for_current_executor().await; spawner.spawn(mqtt_incoming_task( receiver, round_trip_topic.clone(), stay_alive_topic.clone(), ))?; spawner.spawn(mqtt_runner(task))?; LOG_ACCESS .lock() .await .log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic) .await; LOG_ACCESS .lock() .await .log(LogMessage::MqttInfo, 0, 0, "", mqtt_url) .await; let mqtt_timeout = 15000; let timeout = { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } + mqtt_timeout as u64 * 1000; while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { let cur = TIME_ACCESS.get().await.lock().await.current_time_us(); if cur > timeout { bail!("Timeout waiting MQTT connect event") } Timer::after(Duration::from_millis(100)).await; } Topic::General(round_trip_topic.clone()) .with_display("online_text") .publish() .await .unwrap(); let timeout = { let guard = TIME_ACCESS.get().await.lock().await; guard.current_time_us() } + mqtt_timeout as u64 * 1000; while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { let cur = TIME_ACCESS.get().await.lock().await.current_time_us(); if cur > timeout { //ensure we do not further try to publish MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); bail!("Timeout waiting MQTT roundtrip") } Timer::after(Duration::from_millis(100)).await; } Ok(()) } pub(crate) async fn mqtt_inner(&mut self, subtopic: &str, message: &str) -> FatResult<()> { if !subtopic.starts_with("/") { bail!("Subtopic without / at start {}", subtopic); } if subtopic.len() > 192 { bail!("Subtopic exceeds 192 chars {}", subtopic); } let base_topic = MQTT_BASE_TOPIC .try_get() .context("missing base topic in static!")?; let full_topic = format!("{base_topic}{subtopic}"); loop { let result = Topic::General(full_topic.as_str()) .with_display(message) .retain(true) .publish() .await; match result { Ok(()) => return Ok(()), Err(err) => { let retry = match err { Error::IOError => false, Error::TimedOut => true, Error::TooLarge => false, Error::PacketError => false, Error::Invalid => false, }; if !retry { bail!( "Error during mqtt send on topic {} with message {:#?} error is {:?}", &full_topic, message, err ); } info!( "Retransmit for {} with message {:#?} error is {:?} retrying {}", &full_topic, message, err, retry ); Timer::after(Duration::from_millis(100)).await; } } } } pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) { let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed); if !online { return; } let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed); if !roundtrip_ok { info!("MQTT roundtrip not received yet, dropping message"); return; } match self.mqtt_inner(subtopic, message).await { Ok(()) => {} Err(err) => { info!( "Error during mqtt send on topic {} with message {:#?} error is {:?}", subtopic, message, err ); } }; } } #[embassy_executor::task] async fn mqtt_runner( task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, ) { task.run().await; } #[embassy_executor::task] async fn mqtt_incoming_task( receiver: McutieReceiver, round_trip_topic: String, stay_alive_topic: String, ) { loop { let message = receiver.receive().await; match message { MqttMessage::Connected => { info!("Mqtt connected"); MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); } MqttMessage::Publish(topic, payload) => match topic { Topic::DeviceType(_type_topic) => {} Topic::Device(_device_topic) => {} Topic::General(topic) => { let subtopic = topic.as_str(); if subtopic.eq(round_trip_topic.as_str()) { MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); } else if subtopic.eq(stay_alive_topic.as_str()) { let value = payload.eq_ignore_ascii_case("true".as_ref()) || payload.eq_ignore_ascii_case("1".as_ref()); let a = match value { true => 1, false => 0, }; LOG_ACCESS .lock() .await .log(LogMessage::MqttStayAliveRec, a, 0, "", "") .await; MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); } else { LOG_ACCESS .lock() .await .log(LogMessage::UnknownTopic, 0, 0, "", &*topic) .await; } } }, MqttMessage::Disconnected => { MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); info!("Mqtt disconnected"); } MqttMessage::HomeAssistantOnline => { info!("Home assistant is online"); } } } } #[embassy_executor::task(pool_size = 2)] async fn net_task(mut runner: Runner<'static, WifiDevice<'static>>) { runner.run().await; } #[embassy_executor::task] async fn run_dhcp(stack: Stack<'static>, gw_ip_addr: &'static str) { use core::net::{Ipv4Addr, SocketAddrV4}; use edge_dhcp::{ io::{self, DEFAULT_SERVER_PORT}, server::{Server, ServerOptions}, }; use edge_nal::UdpBind; use edge_nal_embassy::{Udp, UdpBuffers}; let ip = Ipv4Addr::from_str(gw_ip_addr).expect("dhcp task failed to parse gw ip"); let mut buf = [0u8; 1500]; let mut gw_buf = [Ipv4Addr::UNSPECIFIED]; let buffers = UdpBuffers::<3, 1024, 1024, 10>::new(); let unbound_socket = Udp::new(stack, &buffers); let mut bound_socket = unbound_socket .bind(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::UNSPECIFIED, DEFAULT_SERVER_PORT, ))) .await .unwrap(); loop { _ = io::server::run( &mut Server::<_, 64>::new_with_et(ip), &ServerOptions::new(ip, Some(&mut gw_buf)), &mut bound_socket, &mut buf, ) .await .inspect_err(|e| log::warn!("DHCP server error: {e:?}")); Timer::after(Duration::from_millis(500)).await; } }