diff --git a/rust/src/main.rs b/rust/src/main.rs index daf76d7..31e55f4 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -67,6 +67,7 @@ mod config; mod fat_error; mod hal; mod log; +mod mqtt; mod plant_state; mod tank; mod webserver; diff --git a/rust/src/mqtt.rs b/rust/src/mqtt.rs new file mode 100644 index 0000000..9a01897 --- /dev/null +++ b/rust/src/mqtt.rs @@ -0,0 +1,257 @@ +use crate::config::NetworkConfig; +use crate::fat_error::{ContextExt, FatError, FatResult}; +use crate::hal::PlantHal; +use crate::log::{log, LogMessage}; +use alloc::string::String; +use alloc::{format, string::ToString}; +use core::sync::atomic::Ordering; +use embassy_executor::Spawner; +use embassy_net::Stack; +use embassy_time::{Duration, Timer, WithTimeout}; +use log::info; +use mcutie::{ + Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, + QoS, Topic, +}; +use portable_atomic::AtomicBool; +use embassy_sync::once_lock::OnceLock; + +static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false); +static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false); +pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false); +static MQTT_BASE_TOPIC: OnceLock = OnceLock::new(); + +pub fn is_stay_alive() -> bool { + MQTT_STAY_ALIVE.load(Ordering::Relaxed) +} + +pub async fn publish(subtopic: &str, message: &str) { + let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed); + if !online { + return; + } + let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed); + if !roundtrip_ok { + info!("MQTT roundtrip not received yet, dropping message"); + return; + } + match publish_inner(subtopic, message).await { + Ok(()) => {} + Err(err) => { + info!( + "Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}" + ); + } + }; +} + +async fn publish_inner(subtopic: &str, message: &str) -> FatResult<()> { + if !subtopic.starts_with("/") { + bail!("Subtopic without / at start {}", subtopic); + } + if subtopic.len() > 192 { + bail!("Subtopic exceeds 192 chars {}", subtopic); + } + let base_topic = MQTT_BASE_TOPIC + .try_get() + .context("missing base topic in static!")?; + + let full_topic = format!("{base_topic}{subtopic}"); + + loop { + let result = Topic::General(full_topic.as_str()) + .with_display(message) + .retain(true) + .publish() + .await; + match result { + Ok(()) => return Ok(()), + Err(err) => { + let retry = match err { + Error::IOError => false, + Error::TimedOut => true, + Error::TooLarge => false, + Error::PacketError => false, + Error::Invalid => false, + Error::Rejected => false, + }; + if !retry { + bail!( + "Error during mqtt send on topic {} with message {:#?} error is {:?}", + &full_topic, + message, + err + ); + } + info!( + "Retransmit for {} with message {:#?} error is {:?} retrying {}", + &full_topic, message, err, retry + ); + Timer::after(Duration::from_millis(100)).await; + } + } + } +} + +pub async fn mqtt_init( + network_config: &'static NetworkConfig, + stack: Stack<'static>, + spawner: Spawner, +) -> FatResult<()> { + let base_topic = network_config + .base_topic + .as_ref() + .context("missing base topic")?; + if base_topic.is_empty() { + bail!("Mqtt base_topic was empty") + } + MQTT_BASE_TOPIC + .init(base_topic.to_string()) + .map_err(|_| FatError::String { + error: "Error setting basetopic".to_string(), + })?; + + let mqtt_url = network_config + .mqtt_url + .as_ref() + .context("missing mqtt url")?; + if mqtt_url.is_empty() { + bail!("Mqtt url was empty") + } + + let last_will_topic = format!("{base_topic}/state"); + let round_trip_topic = format!("{base_topic}/internal/roundtrip"); + let stay_alive_topic = format!("{base_topic}/stay_alive"); + + let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = + McutieBuilder::new(stack, "plant ctrl", mqtt_url); + if let (Some(mqtt_user), Some(mqtt_password)) = ( + network_config.mqtt_user.as_ref(), + network_config.mqtt_password.as_ref(), + ) { + builder = builder.with_authentication(mqtt_user, mqtt_password); + info!("With authentification"); + } + + let lwt = Topic::General(last_will_topic); + let lwt = mk_static!(Topic, lwt); + let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); + builder = builder.with_last_will(lwt); + //TODO make configurable + builder = builder.with_device_id("plantctrl"); + + let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder + .with_subscriptions([ + Topic::General(round_trip_topic.clone()), + Topic::General(stay_alive_topic.clone()), + ]); + + let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; + let (receiver, task) = builder.build(keep_alive); + + spawner.spawn(mqtt_incoming_task( + receiver, + round_trip_topic.clone(), + stay_alive_topic.clone(), + )?); + spawner.spawn(mqtt_runner(task)?); + + log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); + + log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); + + let mqtt_timeout = 15000; + let res = async { + while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { + PlantHal::feed_watchdog(); + Timer::after(Duration::from_millis(100)).await; + } + Ok::<(), FatError>(()) + } + .with_timeout(Duration::from_millis(mqtt_timeout as u64)) + .await; + + if res.is_err() { + bail!("Timeout waiting MQTT connect event") + } + + let _ = Topic::General(round_trip_topic.clone()) + .with_display("online_text") + .publish() + .await; + + let res = async { + while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { + PlantHal::feed_watchdog(); + Timer::after(Duration::from_millis(100)).await; + } + Ok::<(), FatError>(()) + } + .with_timeout(Duration::from_millis(mqtt_timeout as u64)) + .await; + + if res.is_err() { + MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); + bail!("Timeout waiting MQTT roundtrip") + } + Ok(()) +} + +macro_rules! mk_static { + ($t:ty,$val:expr) => {{ + static STATIC_CELL: static_cell::StaticCell<$t> = static_cell::StaticCell::new(); + #[deny(unused_attributes)] + let x = STATIC_CELL.uninit().write(($val)); + x + }}; +} + +#[embassy_executor::task] +async fn mqtt_runner( + task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, +) { + task.run().await; +} + +#[embassy_executor::task] +async fn mqtt_incoming_task( + receiver: McutieReceiver, + round_trip_topic: String, + stay_alive_topic: String, +) { + loop { + let message = receiver.receive().await; + match message { + MqttMessage::Connected => { + info!("Mqtt connected"); + MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); + } + MqttMessage::Publish(topic, payload) => match topic { + Topic::DeviceType(_type_topic) => {} + Topic::Device(_device_topic) => {} + Topic::General(topic) => { + let subtopic = topic.as_str(); + + if subtopic.eq(round_trip_topic.as_str()) { + MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); + } else if subtopic.eq(stay_alive_topic.as_str()) { + let value = payload.eq_ignore_ascii_case("true".as_ref()) + || payload.eq_ignore_ascii_case("1".as_ref()); + let a = match value { + true => 1, + false => 0, + }; + log(LogMessage::MqttStayAliveRec, a, 0, "", ""); + MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); + } else { + log(LogMessage::UnknownTopic, 0, 0, "", &topic); + } + } + }, + MqttMessage::Disconnected => { + MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); + info!("Mqtt disconnected"); + } + } + } +}