diff --git a/rust/src/hal/esp.rs b/rust/src/hal/esp.rs index c595887..4a70d3e 100644 --- a/rust/src/hal/esp.rs +++ b/rust/src/hal/esp.rs @@ -22,7 +22,6 @@ use embassy_net::udp::{PacketMetadata, UdpSocket}; use embassy_net::{DhcpConfig, IpAddress, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::mutex::{Mutex, MutexGuard}; -use embassy_sync::once_lock::OnceLock; use embassy_time::{Duration, Timer, WithTimeout}; use embedded_storage::nor_flash::{check_erase, NorFlash, ReadNorFlash, RmwNorFlashStorage}; use esp_bootloader_esp_idf::ota::OtaImageState::Valid; @@ -44,10 +43,6 @@ use esp_storage::FlashStorage; use littlefs2::fs::Filesystem; use littlefs2_core::{FileType, PathBuf, SeekFrom}; use log::{info, warn, error}; -use mcutie::{ - Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, - QoS, Topic, -}; use portable_atomic::AtomicBool; use sntpc::{NtpContext, NtpTimestampGenerator, NtpUdpSocket, get_time}; @@ -68,11 +63,6 @@ static mut LAST_CORROSION_PROTECTION_CHECK_DAY: i8 = -1; const CONFIG_FILE: &str = "config.json"; const NTP_SERVER: &str = "pool.ntp.org"; -static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false); -static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false); -pub use crate::mqtt::MQTT_STAY_ALIVE; -static MQTT_BASE_TOPIC: OnceLock = OnceLock::new(); - #[derive(Serialize, Debug)] pub struct FileInfo { filename: String, @@ -740,228 +730,6 @@ impl Esp<'_> { } } - pub(crate) async fn mqtt( - &mut self, - network_config: &'static NetworkConfig, - stack: Stack<'static>, - spawner: Spawner, - ) -> FatResult<()> { - let base_topic = network_config - .base_topic - .as_ref() - .context("missing base topic")?; - if base_topic.is_empty() { - bail!("Mqtt base_topic was empty") - } - MQTT_BASE_TOPIC - .init(base_topic.to_string()) - .map_err(|_| FatError::String { - error: "Error setting basetopic".to_string(), - })?; - - let mqtt_url = network_config - .mqtt_url - .as_ref() - .context("missing mqtt url")?; - if mqtt_url.is_empty() { - bail!("Mqtt url was empty") - } - - let last_will_topic = format!("{base_topic}/state"); - let round_trip_topic = format!("{base_topic}/internal/roundtrip"); - let stay_alive_topic = format!("{base_topic}/stay_alive"); - - let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = - McutieBuilder::new(stack, "plant ctrl", mqtt_url); - if let (Some(mqtt_user), Some(mqtt_password)) = ( - network_config.mqtt_user.as_ref(), - network_config.mqtt_password.as_ref(), - ) { - builder = builder.with_authentication(mqtt_user, mqtt_password); - info!("With authentification"); - } - - let lwt = Topic::General(last_will_topic); - let lwt = mk_static!(Topic, lwt); - let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); - builder = builder.with_last_will(lwt); - //TODO make configurable - builder = builder.with_device_id("plantctrl"); - - let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder - .with_subscriptions([ - Topic::General(round_trip_topic.clone()), - Topic::General(stay_alive_topic.clone()), - ]); - - let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; - let (receiver, task) = builder.build(keep_alive); - - spawner.spawn(mqtt_incoming_task( - receiver, - round_trip_topic.clone(), - stay_alive_topic.clone(), - )?); - spawner.spawn(mqtt_runner(task)?); - - log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); - - log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); - - let mqtt_timeout = 15000; - let res = async { - while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { - crate::hal::PlantHal::feed_watchdog(); - Timer::after(Duration::from_millis(100)).await; - } - Ok::<(), FatError>(()) - } - .with_timeout(Duration::from_millis(mqtt_timeout as u64)) - .await; - - if res.is_err() { - bail!("Timeout waiting MQTT connect event") - } - - let _ = Topic::General(round_trip_topic.clone()) - .with_display("online_text") - .publish() - .await; - - let res = async { - while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { - crate::hal::PlantHal::feed_watchdog(); - Timer::after(Duration::from_millis(100)).await; - } - Ok::<(), FatError>(()) - } - .with_timeout(Duration::from_millis(mqtt_timeout as u64)) - .await; - - if res.is_err() { - //ensure we do not further try to publish - MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); - bail!("Timeout waiting MQTT roundtrip") - } - Ok(()) - } - - pub(crate) async fn mqtt_inner(&mut self, subtopic: &str, message: &str) -> FatResult<()> { - if !subtopic.starts_with("/") { - bail!("Subtopic without / at start {}", subtopic); - } - if subtopic.len() > 192 { - bail!("Subtopic exceeds 192 chars {}", subtopic); - } - let base_topic = MQTT_BASE_TOPIC - .try_get() - .context("missing base topic in static!")?; - - let full_topic = format!("{base_topic}{subtopic}"); - - loop { - let result = Topic::General(full_topic.as_str()) - .with_display(message) - .retain(true) - .publish() - .await; - match result { - Ok(()) => return Ok(()), - Err(err) => { - let retry = match err { - Error::IOError => false, - Error::TimedOut => true, - Error::TooLarge => false, - Error::PacketError => false, - Error::Invalid => false, - Error::Rejected => false, - }; - if !retry { - bail!( - "Error during mqtt send on topic {} with message {:#?} error is {:?}", - &full_topic, - message, - err - ); - } - info!( - "Retransmit for {} with message {:#?} error is {:?} retrying {}", - &full_topic, message, err, retry - ); - Timer::after(Duration::from_millis(100)).await; - } - } - } - } - pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) { - let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed); - if !online { - return; - } - let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed); - if !roundtrip_ok { - info!("MQTT roundtrip not received yet, dropping message"); - return; - } - match self.mqtt_inner(subtopic, message).await { - Ok(()) => {} - Err(err) => { - info!( - "Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}" - ); - } - }; - } -} - -#[embassy_executor::task] -async fn mqtt_runner( - task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, -) { - task.run().await; -} - -#[embassy_executor::task] -async fn mqtt_incoming_task( - receiver: McutieReceiver, - round_trip_topic: String, - stay_alive_topic: String, -) { - loop { - let message = receiver.receive().await; - match message { - MqttMessage::Connected => { - info!("Mqtt connected"); - MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); - } - MqttMessage::Publish(topic, payload) => match topic { - Topic::DeviceType(_type_topic) => {} - Topic::Device(_device_topic) => {} - Topic::General(topic) => { - let subtopic = topic.as_str(); - - if subtopic.eq(round_trip_topic.as_str()) { - MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); - } else if subtopic.eq(stay_alive_topic.as_str()) { - let value = payload.eq_ignore_ascii_case("true".as_ref()) - || payload.eq_ignore_ascii_case("1".as_ref()); - let a = match value { - true => 1, - false => 0, - }; - log(LogMessage::MqttStayAliveRec, a, 0, "", ""); - MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); - } else { - log(LogMessage::UnknownTopic, 0, 0, "", &topic); - } - } - }, - MqttMessage::Disconnected => { - MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); - info!("Mqtt disconnected"); - } - } - } } #[embassy_executor::task(pool_size = 2)]