use crate::bail; use crate::config::NetworkConfig; use crate::fat_error::{ContextExt, FatError, FatResult}; use crate::hal::PlantHal; use crate::log::{log, LogMessage}; use alloc::string::String; use alloc::{format, string::ToString}; use core::sync::atomic::Ordering; use embassy_executor::Spawner; use embassy_net::Stack; use embassy_time::{Duration, Timer, WithTimeout}; use log::info; use mcutie::{ Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, QoS, Topic, }; use portable_atomic::AtomicBool; use embassy_sync::once_lock::OnceLock; static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false); static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false); pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false); static MQTT_BASE_TOPIC: OnceLock = OnceLock::new(); pub fn is_stay_alive() -> bool { MQTT_STAY_ALIVE.load(Ordering::Relaxed) } pub async fn publish(subtopic: &str, message: &str) { let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed); if !online { return; } let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed); if !roundtrip_ok { info!("MQTT roundtrip not received yet, dropping message"); return; } match publish_inner(subtopic, message).await { Ok(()) => {} Err(err) => { info!( "Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}" ); } }; } async fn publish_inner(subtopic: &str, message: &str) -> FatResult<()> { if !subtopic.starts_with("/") { bail!("Subtopic without / at start {}", subtopic); } if subtopic.len() > 192 { bail!("Subtopic exceeds 192 chars {}", subtopic); } let base_topic = MQTT_BASE_TOPIC .try_get() .context("missing base topic in static!")?; let full_topic = format!("{base_topic}{subtopic}"); loop { let result = Topic::General(full_topic.as_str()) .with_display(message) .retain(true) .publish() .await; match result { Ok(()) => return Ok(()), Err(err) => { let retry = match err { Error::IOError => false, Error::TimedOut => true, Error::TooLarge => false, Error::PacketError => false, Error::Invalid => false, Error::Rejected => false, }; if !retry { bail!( "Error during mqtt send on topic {} with message {:#?} error is {:?}", &full_topic, message, err ); } info!( "Retransmit for {} with message {:#?} error is {:?} retrying {}", &full_topic, message, err, retry ); Timer::after(Duration::from_millis(100)).await; } } } } macro_rules! mk_static { ($t:ty,$val:expr) => {{ static STATIC_CELL: static_cell::StaticCell<$t> = static_cell::StaticCell::new(); #[deny(unused_attributes)] let x = STATIC_CELL.uninit().write(($val)); x }}; } pub async fn mqtt_init( network_config: &'static NetworkConfig, stack: Stack<'static>, spawner: Spawner, ) -> FatResult<()> { let base_topic = network_config .base_topic .as_ref() .context("missing base topic")?; if base_topic.is_empty() { bail!("Mqtt base_topic was empty") } MQTT_BASE_TOPIC .init(base_topic.to_string()) .map_err(|_| FatError::String { error: "Error setting basetopic".to_string(), })?; let mqtt_url = network_config .mqtt_url .as_ref() .context("missing mqtt url")?; if mqtt_url.is_empty() { bail!("Mqtt url was empty") } let last_will_topic = format!("{base_topic}/state"); let round_trip_topic = format!("{base_topic}/internal/roundtrip"); let stay_alive_topic = format!("{base_topic}/stay_alive"); let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = McutieBuilder::new(stack, "plant ctrl", mqtt_url); if let (Some(mqtt_user), Some(mqtt_password)) = ( network_config.mqtt_user.as_ref(), network_config.mqtt_password.as_ref(), ) { builder = builder.with_authentication(mqtt_user, mqtt_password); info!("With authentification"); } let lwt = Topic::General(last_will_topic); let lwt = mk_static!(Topic, lwt); let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); builder = builder.with_last_will(lwt); //TODO make configurable builder = builder.with_device_id("plantctrl"); let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder .with_subscriptions([ Topic::General(round_trip_topic.clone()), Topic::General(stay_alive_topic.clone()), ]); let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; let (receiver, task) = builder.build(keep_alive); spawner.spawn(mqtt_incoming_task( receiver, round_trip_topic.clone(), stay_alive_topic.clone(), )?); spawner.spawn(mqtt_runner(task)?); log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); let mqtt_timeout = 15000; let res = async { while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { PlantHal::feed_watchdog(); Timer::after(Duration::from_millis(100)).await; } Ok::<(), FatError>(()) } .with_timeout(Duration::from_millis(mqtt_timeout as u64)) .await; if res.is_err() { bail!("Timeout waiting MQTT connect event") } let _ = Topic::General(round_trip_topic.clone()) .with_display("online_text") .publish() .await; let res = async { while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { PlantHal::feed_watchdog(); Timer::after(Duration::from_millis(100)).await; } Ok::<(), FatError>(()) } .with_timeout(Duration::from_millis(mqtt_timeout as u64)) .await; if res.is_err() { MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); bail!("Timeout waiting MQTT roundtrip") } Ok(()) } #[embassy_executor::task] async fn mqtt_runner( task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, ) { task.run().await; } #[embassy_executor::task] async fn mqtt_incoming_task( receiver: McutieReceiver, round_trip_topic: String, stay_alive_topic: String, ) { loop { let message = receiver.receive().await; match message { MqttMessage::Connected => { info!("Mqtt connected"); MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); } MqttMessage::Publish(topic, payload) => match topic { Topic::DeviceType(_type_topic) => {} Topic::Device(_device_topic) => {} Topic::General(topic) => { let subtopic = topic.as_str(); if subtopic.eq(round_trip_topic.as_str()) { MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); } else if subtopic.eq(stay_alive_topic.as_str()) { let value = payload.eq_ignore_ascii_case("true".as_ref()) || payload.eq_ignore_ascii_case("1".as_ref()); let a = match value { true => 1, false => 0, }; log(LogMessage::MqttStayAliveRec, a, 0, "", ""); MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); } else { log(LogMessage::UnknownTopic, 0, 0, "", &topic); } } }, MqttMessage::Disconnected => { MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); info!("Mqtt disconnected"); } } } }