refactor: create mqtt module with core MQTT statics and tasks

This commit is contained in:
2026-05-05 20:39:03 +02:00
parent e15e78cc26
commit 3feaacd460
3 changed files with 279 additions and 295 deletions

View File

@@ -18,7 +18,7 @@ use hal::PROGRESS_ACTIVE;
use crate::config::{NetworkConfig, PlantConfig};
use crate::fat_error::FatResult;
use crate::hal::esp::MQTT_STAY_ALIVE;
use crate::log::log;
use crate::tank::{determine_tank_state, TankError, TankState, WATER_FROZEN_THRESH};
use crate::webserver::http_server;
@@ -67,6 +67,7 @@ mod config;
mod fat_error;
mod hal;
mod log;
mod mqtt;
mod plant_state;
mod tank;
mod webserver;
@@ -536,11 +537,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
match &serde_json::to_string(&light_state) {
Ok(state) => {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/light", state)
.await;
let _ = mqtt::publish("/light", state).await;
}
Err(err) => {
info!("Error publishing lightstate {}", err);
@@ -550,29 +547,16 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
let deep_sleep_duration_minutes: u32 =
// if battery soc is unknown assume battery has enough change
if state_of_charge < 10.0 && !matches!(battery_state, BatteryState::Unknown) {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/deepsleep", "low Volt 12h").await;
let _ = mqtt::publish("/deepsleep", "low Volt 12h").await;
12 * 60
} else if is_day {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/deepsleep", "normal 20m").await;
let _ = mqtt::publish("/deepsleep", "normal 20m").await;
20
} else {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/deepsleep", "night 1h").await;
let _ = mqtt::publish("/deepsleep", "night 1h").await;
60
};
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/state", "sleep")
.await;
let _ = mqtt::publish("/state", "sleep").await;
info!("Go to sleep for {} minutes", deep_sleep_duration_minutes);
//determine next event
@@ -582,7 +566,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
//TODO
//mark_app_valid();
let stay_alive = MQTT_STAY_ALIVE.load(Ordering::Relaxed);
let stay_alive = mqtt::is_stay_alive();
info!("Check stay alive, current state is {}", stay_alive);
if stay_alive {
@@ -737,7 +721,7 @@ async fn publish_tank_state(
&tank_state.as_mqtt_info(&board.board_hal.get_config().tank, &water_temp),
)
.unwrap();
let _ = board.board_hal.get_esp().mqtt_publish("/water", &*state);
let _ = mqtt::publish("/water", &*state).await;
}
async fn publish_plant_states(
@@ -753,11 +737,7 @@ async fn publish_plant_states(
let state =
serde_json::to_string(&plant_state.to_mqtt_info(plant_conf, timezone_time)).unwrap();
let plant_topic = format!("/plant{}", plant_id + 1);
let _ = board
.board_hal
.get_esp()
.mqtt_publish(&plant_topic, &state)
.await;
let _ = mqtt::publish(&plant_topic, &state).await;
}
}
@@ -767,13 +747,12 @@ async fn publish_firmware_info(
ip_address: &str,
timezone_time: &str,
) {
let esp = board.board_hal.get_esp();
esp.mqtt_publish("/firmware/address", ip_address).await;
esp.mqtt_publish("/firmware/state", format!("{:?}", &version).as_str())
mqtt::publish("/firmware/address", ip_address).await;
mqtt::publish("/firmware/state", format!("{:?}", &version).as_str())
.await;
esp.mqtt_publish("/firmware/last_online", timezone_time)
mqtt::publish("/firmware/last_online", timezone_time)
.await;
esp.mqtt_publish("/state", "online").await;
mqtt::publish("/state", "online").await;
}
macro_rules! mk_static {
($t:ty,$val:expr) => {{
@@ -813,12 +792,7 @@ async fn try_connect_wifi_sntp_mqtt(
let mqtt_connected = if board.board_hal.get_config().network.mqtt_url.is_some() {
let nw_config = board.board_hal.get_config().network.clone();
let nw_config = mk_static!(NetworkConfig, nw_config);
match board
.board_hal
.get_esp()
.mqtt(nw_config, stack, spawner)
.await
{
match mqtt::mqtt_init(nw_config, stack, spawner).await {
Ok(_) => {
info!("Mqtt connection ready");
true
@@ -873,15 +847,7 @@ async fn pump_info(
match serde_json::to_string(&pump_info) {
Ok(state) => {
BOARD_ACCESS
.get()
.await
.lock()
.await
.board_hal
.get_esp()
.mqtt_publish(&pump_topic, &state)
.await;
let _ = mqtt::publish(&pump_topic, &state).await;
}
Err(err) => {
warn!("Error publishing pump state {}", err);
@@ -899,10 +865,7 @@ async fn publish_mppt_state(
voltage_ma: voltage.as_millivolts() as u32,
};
if let Ok(serialized_solar_state_bytes) = serde_json::to_string(&solar_state) {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/mppt", &serialized_solar_state_bytes);
let _ = mqtt::publish("/mppt", &serialized_solar_state_bytes).await;
}
Ok(())
}
@@ -923,11 +886,7 @@ async fn publish_battery_state(
Err(_) => "error".to_owned(),
};
{
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/battery", &*value)
.await;
let _ = mqtt::publish("/battery", &*value).await;
}
}
@@ -1031,9 +990,8 @@ async fn wait_infinity(
let cur = board.board_hal.get_time().await;
let timezone_time = cur.with_timezone(&timezone);
let esp = board.board_hal.get_esp();
esp.mqtt_publish("/state", "config").await;
esp.mqtt_publish("/firmware/last_online", &timezone_time.to_rfc3339())
mqtt::publish("/state", "config").await;
mqtt::publish("/firmware/last_online", &timezone_time.to_rfc3339())
.await;
last_mqtt_update = Some(now);
}
@@ -1087,7 +1045,7 @@ async fn wait_infinity(
hal::PlantHal::feed_watchdog();
if wait_type == WaitType::MqttConfig && !MQTT_STAY_ALIVE.load(Ordering::Relaxed) {
if wait_type == WaitType::MqttConfig && !mqtt::is_stay_alive() {
reboot_now.store(true, Ordering::Relaxed);
}
if reboot_now.load(Ordering::Relaxed) {