From a02b84d732163f2bbe3f74aebdac4a0e155427f5 Mon Sep 17 00:00:00 2001 From: ju6ge Date: Sun, 10 May 2026 02:34:16 +0200 Subject: [PATCH] refactor: create mqtt module with core MQTT statics and tasks --- Software/MainBoard/rust/src/hal/esp.rs | 231 ---------------------- Software/MainBoard/rust/src/main.rs | 95 +++------ Software/MainBoard/rust/src/mqtt.rs | 258 +++++++++++++++++++++++++ 3 files changed, 284 insertions(+), 300 deletions(-) create mode 100644 Software/MainBoard/rust/src/mqtt.rs diff --git a/Software/MainBoard/rust/src/hal/esp.rs b/Software/MainBoard/rust/src/hal/esp.rs index 56e0e2d..1f35b16 100644 --- a/Software/MainBoard/rust/src/hal/esp.rs +++ b/Software/MainBoard/rust/src/hal/esp.rs @@ -39,10 +39,6 @@ use esp_radio::wifi::scan::{ScanConfig, ScanTypeConfig}; use esp_radio::wifi::sta::StationConfig; use esp_radio::wifi::{AuthenticationMethod, Config, Interface, WifiController}; use log::{error, info, warn}; -use mcutie::{ - Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, - QoS, Topic, -}; use portable_atomic::AtomicBool; use sntpc::{get_time, NtpContext, NtpTimestampGenerator, NtpUdpSocket}; @@ -61,11 +57,6 @@ static mut LAST_CORROSION_PROTECTION_CHECK_DAY: i8 = -1; const NTP_SERVER: &str = "pool.ntp.org"; -static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false); -static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false); -pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false); -static MQTT_BASE_TOPIC: OnceLock = OnceLock::new(); - #[derive(Copy, Clone, Default)] struct Timestamp { stamp: DateTime, @@ -665,228 +656,6 @@ impl Esp<'_> { } } - pub(crate) async fn mqtt( - &mut self, - network_config: &'static NetworkConfig, - stack: Stack<'static>, - spawner: Spawner, - ) -> FatResult<()> { - let base_topic = network_config - .base_topic - .as_ref() - .context("missing base topic")?; - if base_topic.is_empty() { - bail!("Mqtt base_topic was empty") - } - MQTT_BASE_TOPIC - .init(base_topic.to_string()) - .map_err(|_| FatError::String { - error: "Error setting basetopic".to_string(), - })?; - - let mqtt_url = network_config - .mqtt_url - .as_ref() - .context("missing mqtt url")?; - if mqtt_url.is_empty() { - bail!("Mqtt url was empty") - } - - let last_will_topic = format!("{base_topic}/state"); - let round_trip_topic = format!("{base_topic}/internal/roundtrip"); - let stay_alive_topic = format!("{base_topic}/stay_alive"); - - let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = - McutieBuilder::new(stack, "plant ctrl", mqtt_url); - if let (Some(mqtt_user), Some(mqtt_password)) = ( - network_config.mqtt_user.as_ref(), - network_config.mqtt_password.as_ref(), - ) { - builder = builder.with_authentication(mqtt_user, mqtt_password); - info!("With authentification"); - } - - let lwt = Topic::General(last_will_topic); - let lwt = mk_static!(Topic, lwt); - let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); - builder = builder.with_last_will(lwt); - //TODO make configurable - builder = builder.with_device_id("plantctrl"); - - let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder - .with_subscriptions([ - Topic::General(round_trip_topic.clone()), - Topic::General(stay_alive_topic.clone()), - ]); - - let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; - let (receiver, task) = builder.build(keep_alive); - - spawner.spawn(mqtt_incoming_task( - receiver, - round_trip_topic.clone(), - stay_alive_topic.clone(), - )?); - spawner.spawn(mqtt_runner(task)?); - - log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); - - log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); - - let mqtt_timeout = 15000; - let res = async { - while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { - crate::hal::PlantHal::feed_watchdog(); - Timer::after(Duration::from_millis(100)).await; - } - Ok::<(), FatError>(()) - } - .with_timeout(Duration::from_millis(mqtt_timeout as u64)) - .await; - - if res.is_err() { - bail!("Timeout waiting MQTT connect event") - } - - let _ = Topic::General(round_trip_topic.clone()) - .with_display("online_text") - .publish() - .await; - - let res = async { - while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { - crate::hal::PlantHal::feed_watchdog(); - Timer::after(Duration::from_millis(100)).await; - } - Ok::<(), FatError>(()) - } - .with_timeout(Duration::from_millis(mqtt_timeout as u64)) - .await; - - if res.is_err() { - //ensure we do not further try to publish - MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); - bail!("Timeout waiting MQTT roundtrip") - } - Ok(()) - } - - pub(crate) async fn mqtt_inner(&mut self, subtopic: &str, message: &str) -> FatResult<()> { - if !subtopic.starts_with("/") { - bail!("Subtopic without / at start {}", subtopic); - } - if subtopic.len() > 192 { - bail!("Subtopic exceeds 192 chars {}", subtopic); - } - let base_topic = MQTT_BASE_TOPIC - .try_get() - .context("missing base topic in static!")?; - - let full_topic = format!("{base_topic}{subtopic}"); - - loop { - let result = Topic::General(full_topic.as_str()) - .with_display(message) - .retain(true) - .publish() - .await; - match result { - Ok(()) => return Ok(()), - Err(err) => { - let retry = match err { - Error::IOError => false, - Error::TimedOut => true, - Error::TooLarge => false, - Error::PacketError => false, - Error::Invalid => false, - Error::Rejected => false, - }; - if !retry { - bail!( - "Error during mqtt send on topic {} with message {:#?} error is {:?}", - &full_topic, - message, - err - ); - } - info!( - "Retransmit for {} with message {:#?} error is {:?} retrying {}", - &full_topic, message, err, retry - ); - Timer::after(Duration::from_millis(100)).await; - } - } - } - } - pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) { - let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed); - if !online { - return; - } - let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed); - if !roundtrip_ok { - info!("MQTT roundtrip not received yet, dropping message"); - return; - } - match self.mqtt_inner(subtopic, message).await { - Ok(()) => {} - Err(err) => { - info!( - "Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}" - ); - } - }; - } -} - -#[embassy_executor::task] -async fn mqtt_runner( - task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, -) { - task.run().await; -} - -#[embassy_executor::task] -async fn mqtt_incoming_task( - receiver: McutieReceiver, - round_trip_topic: String, - stay_alive_topic: String, -) { - loop { - let message = receiver.receive().await; - match message { - MqttMessage::Connected => { - info!("Mqtt connected"); - MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); - } - MqttMessage::Publish(topic, payload) => match topic { - Topic::DeviceType(_type_topic) => {} - Topic::Device(_device_topic) => {} - Topic::General(topic) => { - let subtopic = topic.as_str(); - - if subtopic.eq(round_trip_topic.as_str()) { - MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); - } else if subtopic.eq(stay_alive_topic.as_str()) { - let value = payload.eq_ignore_ascii_case("true".as_ref()) - || payload.eq_ignore_ascii_case("1".as_ref()); - let a = match value { - true => 1, - false => 0, - }; - log(LogMessage::MqttStayAliveRec, a, 0, "", ""); - MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); - } else { - log(LogMessage::UnknownTopic, 0, 0, "", &topic); - } - } - }, - MqttMessage::Disconnected => { - MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); - info!("Mqtt disconnected"); - } - } - } } #[embassy_executor::task(pool_size = 2)] diff --git a/Software/MainBoard/rust/src/main.rs b/Software/MainBoard/rust/src/main.rs index 9a35956..cbb7aec 100644 --- a/Software/MainBoard/rust/src/main.rs +++ b/Software/MainBoard/rust/src/main.rs @@ -14,10 +14,10 @@ esp_bootloader_esp_idf::esp_app_desc!(); use esp_backtrace as _; +use crate::hal::PROGRESS_ACTIVE; use crate::config::{NetworkConfig, PlantConfig, PlantControllerConfig}; use crate::fat_error::FatResult; -use crate::hal::esp::MQTT_STAY_ALIVE; -use crate::hal::PROGRESS_ACTIVE; + use crate::log::log; use crate::tank::{determine_tank_state, TankError, TankState, WATER_FROZEN_THRESH}; use crate::webserver::http_server; @@ -67,6 +67,7 @@ mod config; mod fat_error; mod hal; mod log; +mod mqtt; mod plant_state; mod tank; mod webserver; @@ -329,7 +330,9 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> { log(LogMessage::NormalRun, 0, 0, "", ""); } - let dry_run = MQTT_STAY_ALIVE.load(Ordering::Relaxed); + // if stay alive is true then the hardware will determine state and pretend to do all actions with logging + // this is to help debug what the hardware would do with the current settings applied + let dry_run = mqtt::is_stay_alive(); let tank_state = determine_tank_state(&mut board).await; @@ -643,11 +646,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> { match &serde_json::to_string(&light_state) { Ok(state) => { - let _ = board - .board_hal - .get_esp() - .mqtt_publish("/light", state) - .await; + let _ = mqtt::publish("/light", state).await; } Err(err) => { info!("Error publishing lightstate {err}"); @@ -657,39 +656,24 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> { let deep_sleep_duration_minutes: u32 = // if battery soc is unknown assume battery has enough change if matches!(battery_state, BatteryState::Info(data) if data.state_of_charge < 10) { - let _ = board - .board_hal - .get_esp() - .mqtt_publish("/deepsleep", "low Volt 12h").await; + let _ = mqtt::publish("/deepsleep", "low Volt 12h").await; 12 * 60 } else if is_day { - let _ = board - .board_hal - .get_esp() - .mqtt_publish("/deepsleep", "normal 20m").await; + let _ = mqtt::publish("/deepsleep", "normal 20m").await; 20 } else { - let _ = board - .board_hal - .get_esp() - .mqtt_publish("/deepsleep", "night 1h").await; + let _ = mqtt::publish("/deepsleep", "night 1h").await; 60 }; - - let _ = board - .board_hal - .get_esp() - .mqtt_publish("/state", "sleep") - .await; - info!("Go to sleep for {deep_sleep_duration_minutes} minutes"); + let _ = mqtt::publish("/state", "sleep").await; //determine next event //is light out of work trigger soon? //is battery low ?? //is deep sleep - let stay_alive = MQTT_STAY_ALIVE.load(Ordering::Relaxed); - info!("Check stay alive, current state is {stay_alive}"); + let stay_alive = mqtt::is_stay_alive(); + info!("Check stay alive, current state is {}", stay_alive); if stay_alive { let reboot_now = Arc::new(AtomicBool::new(false)); @@ -930,11 +914,7 @@ async fn publish_tank_state( let state = serde_json::to_string( &tank_state.as_mqtt_info(&board.board_hal.get_config().tank, &water_temp), )?; - board - .board_hal - .get_esp() - .mqtt_publish("/water", &state) - .await; + let _ = mqtt::publish("/water", &*state).await; Ok(()) } @@ -950,11 +930,7 @@ async fn publish_plant_states( { let state = serde_json::to_string(&plant_state.to_mqtt_info(plant_conf, timezone_time))?; let plant_topic = format!("/plant{}", plant_id + 1); - let _ = board - .board_hal - .get_esp() - .mqtt_publish(&plant_topic, &state) - .await; + let _ = mqtt::publish(&plant_topic, &state).await; } Ok(()) } @@ -965,13 +941,12 @@ async fn publish_firmware_info( ip_address: &str, timezone_time: &str, ) { - let esp = board.board_hal.get_esp(); - esp.mqtt_publish("/firmware/address", ip_address).await; - esp.mqtt_publish("/firmware/state", format!("{:?}", &version).as_str()) + mqtt::publish("/firmware/address", ip_address).await; + mqtt::publish("/firmware/state", format!("{:?}", &version).as_str()) .await; - esp.mqtt_publish("/firmware/last_online", timezone_time) + mqtt::publish("/firmware/last_online", timezone_time) .await; - esp.mqtt_publish("/state", "online").await; + mqtt::publish("/state", "online").await; } macro_rules! mk_static { ($t:ty,$val:expr) => {{ @@ -1011,12 +986,7 @@ async fn try_connect_wifi_sntp_mqtt( let mqtt_connected = if board.board_hal.get_config().network.mqtt_url.is_some() { let nw_config = board.board_hal.get_config().network.clone(); let nw_config = mk_static!(NetworkConfig, nw_config); - match board - .board_hal - .get_esp() - .mqtt(nw_config, stack, spawner) - .await - { + match mqtt::mqtt_init(nw_config, stack, spawner).await { Ok(_) => { info!("Mqtt connection ready"); true @@ -1077,11 +1047,7 @@ async fn pump_info( match serde_json::to_string(&pump_info) { Ok(state) => { - board - .board_hal - .get_esp() - .mqtt_publish(&pump_topic, &state) - .await; + let _ = mqtt::publish(&pump_topic, &state).await; } Err(err) => { warn!("Error publishing pump state {err}"); @@ -1099,11 +1065,7 @@ async fn publish_mppt_state( voltage_ma: voltage.as_millivolts() as u32, }; if let Ok(serialized_solar_state_bytes) = serde_json::to_string(&solar_state) { - board - .board_hal - .get_esp() - .mqtt_publish("/mppt", &serialized_solar_state_bytes) - .await; + let _ = mqtt::publish("/mppt", &serialized_solar_state_bytes).await; } Ok(()) } @@ -1120,11 +1082,7 @@ async fn publish_battery_state( Err(_) => "error".to_owned(), }; { - let _ = board - .board_hal - .get_esp() - .mqtt_publish("/battery", &value) - .await; + let _ = mqtt::publish("/battery", &*value).await; } Ok(()) } @@ -1229,9 +1187,8 @@ async fn wait_infinity( let cur = board.board_hal.get_time().await; let timezone_time = cur.with_timezone(&timezone); - let esp = board.board_hal.get_esp(); - esp.mqtt_publish("/state", "config").await; - esp.mqtt_publish("/firmware/last_online", &timezone_time.to_rfc3339()) + mqtt::publish("/state", "config").await; + mqtt::publish("/firmware/last_online", &timezone_time.to_rfc3339()) .await; last_mqtt_update = Some(now); } @@ -1285,7 +1242,7 @@ async fn wait_infinity( hal::PlantHal::feed_watchdog(); - if wait_type == WaitType::MqttConfig && !MQTT_STAY_ALIVE.load(Ordering::Relaxed) { + if wait_type == WaitType::MqttConfig && !mqtt::is_stay_alive() { reboot_now.store(true, Ordering::Relaxed); } if reboot_now.load(Ordering::Relaxed) { diff --git a/Software/MainBoard/rust/src/mqtt.rs b/Software/MainBoard/rust/src/mqtt.rs new file mode 100644 index 0000000..91955c1 --- /dev/null +++ b/Software/MainBoard/rust/src/mqtt.rs @@ -0,0 +1,258 @@ +use crate::bail; +use crate::config::NetworkConfig; +use crate::fat_error::{ContextExt, FatError, FatResult}; +use crate::hal::PlantHal; +use crate::log::{log, LogMessage}; +use alloc::string::String; +use alloc::{format, string::ToString}; +use core::sync::atomic::Ordering; +use embassy_executor::Spawner; +use embassy_net::Stack; +use embassy_time::{Duration, Timer, WithTimeout}; +use log::info; +use mcutie::{ + Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, + QoS, Topic, +}; +use portable_atomic::AtomicBool; +use embassy_sync::once_lock::OnceLock; + +static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false); +static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false); +pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false); +static MQTT_BASE_TOPIC: OnceLock = OnceLock::new(); + +pub fn is_stay_alive() -> bool { + MQTT_STAY_ALIVE.load(Ordering::Relaxed) +} + +pub async fn publish(subtopic: &str, message: &str) { + let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed); + if !online { + return; + } + let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed); + if !roundtrip_ok { + info!("MQTT roundtrip not received yet, dropping message"); + return; + } + match publish_inner(subtopic, message).await { + Ok(()) => {} + Err(err) => { + info!( + "Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}" + ); + } + }; +} + +async fn publish_inner(subtopic: &str, message: &str) -> FatResult<()> { + if !subtopic.starts_with("/") { + bail!("Subtopic without / at start {}", subtopic); + } + if subtopic.len() > 192 { + bail!("Subtopic exceeds 192 chars {}", subtopic); + } + let base_topic = MQTT_BASE_TOPIC + .try_get() + .context("missing base topic in static!")?; + + let full_topic = format!("{base_topic}{subtopic}"); + + loop { + let result = Topic::General(full_topic.as_str()) + .with_display(message) + .retain(true) + .publish() + .await; + match result { + Ok(()) => return Ok(()), + Err(err) => { + let retry = match err { + Error::IOError => false, + Error::TimedOut => true, + Error::TooLarge => false, + Error::PacketError => false, + Error::Invalid => false, + Error::Rejected => false, + }; + if !retry { + bail!( + "Error during mqtt send on topic {} with message {:#?} error is {:?}", + &full_topic, + message, + err + ); + } + info!( + "Retransmit for {} with message {:#?} error is {:?} retrying {}", + &full_topic, message, err, retry + ); + Timer::after(Duration::from_millis(100)).await; + } + } + } +} + +macro_rules! mk_static { + ($t:ty,$val:expr) => {{ + static STATIC_CELL: static_cell::StaticCell<$t> = static_cell::StaticCell::new(); + #[deny(unused_attributes)] + let x = STATIC_CELL.uninit().write(($val)); + x + }}; +} + +pub async fn mqtt_init( + network_config: &'static NetworkConfig, + stack: Stack<'static>, + spawner: Spawner, +) -> FatResult<()> { + let base_topic = network_config + .base_topic + .as_ref() + .context("missing base topic")?; + if base_topic.is_empty() { + bail!("Mqtt base_topic was empty") + } + MQTT_BASE_TOPIC + .init(base_topic.to_string()) + .map_err(|_| FatError::String { + error: "Error setting basetopic".to_string(), + })?; + + let mqtt_url = network_config + .mqtt_url + .as_ref() + .context("missing mqtt url")?; + if mqtt_url.is_empty() { + bail!("Mqtt url was empty") + } + + let last_will_topic = format!("{base_topic}/state"); + let round_trip_topic = format!("{base_topic}/internal/roundtrip"); + let stay_alive_topic = format!("{base_topic}/stay_alive"); + + let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = + McutieBuilder::new(stack, "plant ctrl", mqtt_url); + if let (Some(mqtt_user), Some(mqtt_password)) = ( + network_config.mqtt_user.as_ref(), + network_config.mqtt_password.as_ref(), + ) { + builder = builder.with_authentication(mqtt_user, mqtt_password); + info!("With authentification"); + } + + let lwt = Topic::General(last_will_topic); + let lwt = mk_static!(Topic, lwt); + let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); + builder = builder.with_last_will(lwt); + //TODO make configurable + builder = builder.with_device_id("plantctrl"); + + let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder + .with_subscriptions([ + Topic::General(round_trip_topic.clone()), + Topic::General(stay_alive_topic.clone()), + ]); + + let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; + let (receiver, task) = builder.build(keep_alive); + + spawner.spawn(mqtt_incoming_task( + receiver, + round_trip_topic.clone(), + stay_alive_topic.clone(), + )?); + spawner.spawn(mqtt_runner(task)?); + + log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); + + log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); + + let mqtt_timeout = 15000; + let res = async { + while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { + PlantHal::feed_watchdog(); + Timer::after(Duration::from_millis(100)).await; + } + Ok::<(), FatError>(()) + } + .with_timeout(Duration::from_millis(mqtt_timeout as u64)) + .await; + + if res.is_err() { + bail!("Timeout waiting MQTT connect event") + } + + let _ = Topic::General(round_trip_topic.clone()) + .with_display("online_text") + .publish() + .await; + + let res = async { + while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { + PlantHal::feed_watchdog(); + Timer::after(Duration::from_millis(100)).await; + } + Ok::<(), FatError>(()) + } + .with_timeout(Duration::from_millis(mqtt_timeout as u64)) + .await; + + if res.is_err() { + MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); + bail!("Timeout waiting MQTT roundtrip") + } + Ok(()) +} + +#[embassy_executor::task] +async fn mqtt_runner( + task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, +) { + task.run().await; +} + +#[embassy_executor::task] +async fn mqtt_incoming_task( + receiver: McutieReceiver, + round_trip_topic: String, + stay_alive_topic: String, +) { + loop { + let message = receiver.receive().await; + match message { + MqttMessage::Connected => { + info!("Mqtt connected"); + MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); + } + MqttMessage::Publish(topic, payload) => match topic { + Topic::DeviceType(_type_topic) => {} + Topic::Device(_device_topic) => {} + Topic::General(topic) => { + let subtopic = topic.as_str(); + + if subtopic.eq(round_trip_topic.as_str()) { + MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); + } else if subtopic.eq(stay_alive_topic.as_str()) { + let value = payload.eq_ignore_ascii_case("true".as_ref()) + || payload.eq_ignore_ascii_case("1".as_ref()); + let a = match value { + true => 1, + false => 0, + }; + log(LogMessage::MqttStayAliveRec, a, 0, "", ""); + MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); + } else { + log(LogMessage::UnknownTopic, 0, 0, "", &topic); + } + } + }, + MqttMessage::Disconnected => { + MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); + info!("Mqtt disconnected"); + } + } + } +}