From 55bcc0c52f6be1a9b217f6fb0aa5e1e129097de5 Mon Sep 17 00:00:00 2001 From: ju6ge Date: Sun, 10 May 2026 14:32:00 +0200 Subject: [PATCH] move all mqtt publishing functions to mqtt module --- rust/src/main.rs | 135 ++------------------------------------ rust/src/mqtt.rs | 167 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 154 insertions(+), 148 deletions(-) diff --git a/rust/src/main.rs b/rust/src/main.rs index dd0af60..5a9a839 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -272,9 +272,9 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> { ); if let network::NetworkMode::WIFI { ref ip_address, .. } = network_mode { - publish_firmware_info(&mut board, version, ip_address, &timezone_time.to_rfc3339()).await; - publish_battery_state(&mut board).await; - let _ = publish_mppt_state(&mut board).await; + mqtt::publish_firmware_info(&mut board, version, ip_address, &timezone_time.to_rfc3339()).await; + mqtt::publish_battery_state(&mut board).await; + let _ = mqtt::publish_mppt_state(&mut board).await; } log( @@ -359,7 +359,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> { } info!("Water temp is {}", water_temp.as_ref().unwrap_or(&0.)); - publish_tank_state(&mut board, &tank_state, water_temp).await; + mqtt::publish_tank_state(&mut board, &tank_state, water_temp).await; let plantstate: [PlantState; PLANT_COUNT] = [ PlantState::read_hardware_state(0, &mut board).await, @@ -372,7 +372,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> { PlantState::read_hardware_state(7, &mut board).await, ]; - publish_plant_states(&mut board, &timezone_time.clone(), &plantstate).await; + mqtt::publish_plant_states(&mut board, &timezone_time.clone(), &plantstate).await; let pump_required = plantstate .iter() @@ -702,131 +702,6 @@ async fn update_charge_indicator( Ok(()) } -async fn publish_tank_state( - board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, - tank_state: &TankState, - water_temp: FatResult, -) { - let state = serde_json::to_string( - &tank_state.as_mqtt_info(&board.board_hal.get_config().tank, &water_temp), - ) - .unwrap(); - let _ = mqtt::publish("/water", &*state).await; -} - -async fn publish_plant_states( - board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, - timezone_time: &DateTime, - plantstate: &[PlantState; 8], -) { - for (plant_id, (plant_state, plant_conf)) in plantstate - .iter() - .zip(&board.board_hal.get_config().plants.clone()) - .enumerate() - { - let state = - serde_json::to_string(&plant_state.to_mqtt_info(plant_conf, timezone_time)).unwrap(); - let plant_topic = format!("/plant{}", plant_id + 1); - let _ = mqtt::publish(&plant_topic, &state).await; - } -} - -async fn publish_firmware_info( - board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, - version: VersionInfo, - ip_address: &str, - timezone_time: &str, -) { - mqtt::publish("/firmware/address", ip_address).await; - mqtt::publish("/firmware/state", &serde_json::to_string(&version).unwrap()) - .await; - mqtt::publish("/firmware/last_online", timezone_time) - .await; - mqtt::publish("/state", "online").await; -} -async fn pump_info( - plant_id: usize, - pump_active: bool, - pump_ineffective: bool, - median_current_ma: u16, - max_current_ma: u16, - min_current_ma: u16, - _error: bool, -) { - let pump_info = mqtt::PumpInfo { - enabled: pump_active, - pump_ineffective, - median_current_ma, - max_current_ma, - min_current_ma, - }; - let pump_topic = format!("/pump{}", plant_id + 1); - - match serde_json::to_string(&pump_info) { - Ok(state) => { - let _ = mqtt::publish(&pump_topic, &state).await; - } - Err(err) => { - warn!("Error publishing pump state {}", err); - } - }; -} - -async fn publish_mppt_state( - board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, -) -> FatResult<()> { - let current = board.board_hal.get_mptt_current().await?; - let voltage = board.board_hal.get_mptt_voltage().await?; - let solar_state = mqtt::Solar { - current_ma: current.as_milliamperes() as u32, - voltage_ma: voltage.as_millivolts() as u32, - }; - if let Ok(serialized_solar_state_bytes) = serde_json::to_string(&solar_state) { - let _ = mqtt::publish("/mppt", &serialized_solar_state_bytes).await; - } - Ok(()) -} - -async fn publish_battery_state( - board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, -) -> () { - let telemetry = match board - .board_hal - .get_battery_monitor() - .get_battery_state() - .await - { - Ok(BatteryState::Info(info)) => info, - Ok(BatteryState::Unknown) => BatteryInfo { - voltage_mv: None, - avg_current_ma: None, - soc_pct: None, - soh_pct: None, - temperature_c: None, - cycle_count: None, - remaining_mah: None, - design_mah: None, - error: Some(BatteryError::NoBatteryMonitor), - }, - Err(e) => BatteryInfo { - voltage_mv: None, - avg_current_ma: None, - soc_pct: None, - soh_pct: None, - temperature_c: None, - cycle_count: None, - remaining_mah: None, - design_mah: None, - error: Some(BatteryError::CommunicationError { - message: alloc::format!("{:?}", e), - }), - }, - }; - if let Ok(json) = serde_json::to_string(&telemetry) { - let _ = mqtt::publish("/battery", &json).await; - } -} - async fn wait_infinity( board: MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, wait_type: WaitType, diff --git a/rust/src/mqtt.rs b/rust/src/mqtt.rs index 2d5718f..052d12f 100644 --- a/rust/src/mqtt.rs +++ b/rust/src/mqtt.rs @@ -1,16 +1,23 @@ -use crate::bail; use crate::config::NetworkConfig; use crate::fat_error::{ContextExt, FatError, FatResult}; -use crate::hal::PlantHal; +use crate::hal::battery::{BatteryError, BatteryInfo, BatteryState}; +use crate::hal::{PlantHal, HAL}; use crate::log::{log, LogMessage}; +use crate::plant_state::PlantState; +use crate::tank::TankState; +use crate::{bail, VersionInfo}; use alloc::string::String; use alloc::{format, string::ToString}; +use chrono::DateTime; +use chrono_tz::Tz; use core::sync::atomic::Ordering; use embassy_executor::Spawner; use embassy_net::Stack; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::mutex::MutexGuard; use embassy_sync::once_lock::OnceLock; use embassy_time::{Duration, Timer, WithTimeout}; -use log::info; +use log::{info, warn}; use mcutie::{ Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, QoS, Topic, @@ -18,21 +25,6 @@ use mcutie::{ use portable_atomic::AtomicBool; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug, PartialEq, Default)] -pub struct PumpInfo { - pub enabled: bool, - pub pump_ineffective: bool, - pub median_current_ma: u16, - pub max_current_ma: u16, - pub min_current_ma: u16, -} - -#[derive(Serialize, Debug, PartialEq)] -pub struct Solar { - pub current_ma: u32, - pub voltage_ma: u32, -} - static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false); static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false); pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false); @@ -265,3 +257,142 @@ async fn mqtt_incoming_task( } } } + +pub async fn publish_tank_state( + board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, + tank_state: &TankState, + water_temp: FatResult, +) { + let state = serde_json::to_string( + &tank_state.as_mqtt_info(&board.board_hal.get_config().tank, &water_temp), + ) + .unwrap(); + let _ = publish("/water", &*state).await; +} + +pub async fn publish_plant_states( + board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, + timezone_time: &DateTime, + plantstate: &[PlantState; 8], +) { + for (plant_id, (plant_state, plant_conf)) in plantstate + .iter() + .zip(&board.board_hal.get_config().plants.clone()) + .enumerate() + { + let state = + serde_json::to_string(&plant_state.to_mqtt_info(plant_conf, timezone_time)).unwrap(); + let plant_topic = format!("/plant{}", plant_id + 1); + let _ = publish(&plant_topic, &state).await; + } +} + +pub async fn publish_firmware_info( + board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, + version: VersionInfo, + ip_address: &str, + timezone_time: &str, +) { + publish("/firmware/address", ip_address).await; + publish("/firmware/state", &serde_json::to_string(&version).unwrap()).await; + publish("/firmware/last_online", timezone_time).await; + publish("/state", "online").await; +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Default)] +struct PumpInfo { + pub enabled: bool, + pub pump_ineffective: bool, + pub median_current_ma: u16, + pub max_current_ma: u16, + pub min_current_ma: u16, +} + +pub async fn pump_info( + plant_id: usize, + pump_active: bool, + pump_ineffective: bool, + median_current_ma: u16, + max_current_ma: u16, + min_current_ma: u16, + _error: bool, +) { + let pump_info = PumpInfo { + enabled: pump_active, + pump_ineffective, + median_current_ma, + max_current_ma, + min_current_ma, + }; + let pump_topic = format!("/pump{}", plant_id + 1); + + match serde_json::to_string(&pump_info) { + Ok(state) => { + let _ = publish(&pump_topic, &state).await; + } + Err(err) => { + warn!("Error publishing pump state {}", err); + } + }; +} + +#[derive(Serialize, Debug, PartialEq)] +pub struct Solar { + pub current_ma: u32, + pub voltage_ma: u32, +} + +pub async fn publish_mppt_state( + board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, +) -> FatResult<()> { + let current = board.board_hal.get_mptt_current().await?; + let voltage = board.board_hal.get_mptt_voltage().await?; + let solar_state = Solar { + current_ma: current.as_milliamperes() as u32, + voltage_ma: voltage.as_millivolts() as u32, + }; + if let Ok(serialized_solar_state_bytes) = serde_json::to_string(&solar_state) { + let _ = publish("/mppt", &serialized_solar_state_bytes).await; + } + Ok(()) +} + +pub async fn publish_battery_state( + board: &mut MutexGuard<'_, CriticalSectionRawMutex, HAL<'static>>, +) -> () { + let telemetry = match board + .board_hal + .get_battery_monitor() + .get_battery_state() + .await + { + Ok(BatteryState::Info(info)) => info, + Ok(BatteryState::Unknown) => BatteryInfo { + voltage_mv: None, + avg_current_ma: None, + soc_pct: None, + soh_pct: None, + temperature_c: None, + cycle_count: None, + remaining_mah: None, + design_mah: None, + error: Some(BatteryError::NoBatteryMonitor), + }, + Err(e) => BatteryInfo { + voltage_mv: None, + avg_current_ma: None, + soc_pct: None, + soh_pct: None, + temperature_c: None, + cycle_count: None, + remaining_mah: None, + design_mah: None, + error: Some(BatteryError::CommunicationError { + message: alloc::format!("{:?}", e), + }), + }, + }; + if let Ok(json) = serde_json::to_string(&telemetry) { + let _ = publish("/battery", &json).await; + } +}