12 Commits

3 changed files with 337 additions and 358 deletions

View File

@@ -22,7 +22,6 @@ use embassy_net::udp::{PacketMetadata, UdpSocket};
use embassy_net::{DhcpConfig, IpAddress, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::mutex::{Mutex, MutexGuard};
use embassy_sync::once_lock::OnceLock;
use embassy_time::{Duration, Timer, WithTimeout};
use embedded_storage::nor_flash::{check_erase, NorFlash, ReadNorFlash, RmwNorFlashStorage};
use esp_bootloader_esp_idf::ota::OtaImageState::Valid;
@@ -44,10 +43,6 @@ use esp_storage::FlashStorage;
use littlefs2::fs::Filesystem;
use littlefs2_core::{FileType, PathBuf, SeekFrom};
use log::{info, warn, error};
use mcutie::{
Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable,
QoS, Topic,
};
use portable_atomic::AtomicBool;
use sntpc::{NtpContext, NtpTimestampGenerator, NtpUdpSocket, get_time};
@@ -68,11 +63,6 @@ static mut LAST_CORROSION_PROTECTION_CHECK_DAY: i8 = -1;
const CONFIG_FILE: &str = "config.json";
const NTP_SERVER: &str = "pool.ntp.org";
static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false);
static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false);
pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false);
static MQTT_BASE_TOPIC: OnceLock<String> = OnceLock::new();
#[derive(Serialize, Debug)]
pub struct FileInfo {
filename: String,
@@ -740,228 +730,6 @@ impl Esp<'_> {
}
}
pub(crate) async fn mqtt(
&mut self,
network_config: &'static NetworkConfig,
stack: Stack<'static>,
spawner: Spawner,
) -> FatResult<()> {
let base_topic = network_config
.base_topic
.as_ref()
.context("missing base topic")?;
if base_topic.is_empty() {
bail!("Mqtt base_topic was empty")
}
MQTT_BASE_TOPIC
.init(base_topic.to_string())
.map_err(|_| FatError::String {
error: "Error setting basetopic".to_string(),
})?;
let mqtt_url = network_config
.mqtt_url
.as_ref()
.context("missing mqtt url")?;
if mqtt_url.is_empty() {
bail!("Mqtt url was empty")
}
let last_will_topic = format!("{base_topic}/state");
let round_trip_topic = format!("{base_topic}/internal/roundtrip");
let stay_alive_topic = format!("{base_topic}/stay_alive");
let mut builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 0> =
McutieBuilder::new(stack, "plant ctrl", mqtt_url);
if let (Some(mqtt_user), Some(mqtt_password)) = (
network_config.mqtt_user.as_ref(),
network_config.mqtt_password.as_ref(),
) {
builder = builder.with_authentication(mqtt_user, mqtt_password);
info!("With authentification");
}
let lwt = Topic::General(last_will_topic);
let lwt = mk_static!(Topic<String>, lwt);
let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce);
builder = builder.with_last_will(lwt);
//TODO make configurable
builder = builder.with_device_id("plantctrl");
let builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 2> = builder
.with_subscriptions([
Topic::General(round_trip_topic.clone()),
Topic::General(stay_alive_topic.clone()),
]);
let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16;
let (receiver, task) = builder.build(keep_alive);
spawner.spawn(mqtt_incoming_task(
receiver,
round_trip_topic.clone(),
stay_alive_topic.clone(),
)?);
spawner.spawn(mqtt_runner(task)?);
log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
log(LogMessage::MqttInfo, 0, 0, "", mqtt_url);
let mqtt_timeout = 15000;
let res = async {
while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) {
crate::hal::PlantHal::feed_watchdog();
Timer::after(Duration::from_millis(100)).await;
}
Ok::<(), FatError>(())
}
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
.await;
if res.is_err() {
bail!("Timeout waiting MQTT connect event")
}
let _ = Topic::General(round_trip_topic.clone())
.with_display("online_text")
.publish()
.await;
let res = async {
while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) {
crate::hal::PlantHal::feed_watchdog();
Timer::after(Duration::from_millis(100)).await;
}
Ok::<(), FatError>(())
}
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
.await;
if res.is_err() {
//ensure we do not further try to publish
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
bail!("Timeout waiting MQTT roundtrip")
}
Ok(())
}
pub(crate) async fn mqtt_inner(&mut self, subtopic: &str, message: &str) -> FatResult<()> {
if !subtopic.starts_with("/") {
bail!("Subtopic without / at start {}", subtopic);
}
if subtopic.len() > 192 {
bail!("Subtopic exceeds 192 chars {}", subtopic);
}
let base_topic = MQTT_BASE_TOPIC
.try_get()
.context("missing base topic in static!")?;
let full_topic = format!("{base_topic}{subtopic}");
loop {
let result = Topic::General(full_topic.as_str())
.with_display(message)
.retain(true)
.publish()
.await;
match result {
Ok(()) => return Ok(()),
Err(err) => {
let retry = match err {
Error::IOError => false,
Error::TimedOut => true,
Error::TooLarge => false,
Error::PacketError => false,
Error::Invalid => false,
Error::Rejected => false,
};
if !retry {
bail!(
"Error during mqtt send on topic {} with message {:#?} error is {:?}",
&full_topic,
message,
err
);
}
info!(
"Retransmit for {} with message {:#?} error is {:?} retrying {}",
&full_topic, message, err, retry
);
Timer::after(Duration::from_millis(100)).await;
}
}
}
}
pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) {
let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed);
if !online {
return;
}
let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed);
if !roundtrip_ok {
info!("MQTT roundtrip not received yet, dropping message");
return;
}
match self.mqtt_inner(subtopic, message).await {
Ok(()) => {}
Err(err) => {
info!(
"Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}"
);
}
};
}
}
#[embassy_executor::task]
async fn mqtt_runner(
task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>,
) {
task.run().await;
}
#[embassy_executor::task]
async fn mqtt_incoming_task(
receiver: McutieReceiver,
round_trip_topic: String,
stay_alive_topic: String,
) {
loop {
let message = receiver.receive().await;
match message {
MqttMessage::Connected => {
info!("Mqtt connected");
MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed);
}
MqttMessage::Publish(topic, payload) => match topic {
Topic::DeviceType(_type_topic) => {}
Topic::Device(_device_topic) => {}
Topic::General(topic) => {
let subtopic = topic.as_str();
if subtopic.eq(round_trip_topic.as_str()) {
MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed);
} else if subtopic.eq(stay_alive_topic.as_str()) {
let value = payload.eq_ignore_ascii_case("true".as_ref())
|| payload.eq_ignore_ascii_case("1".as_ref());
let a = match value {
true => 1,
false => 0,
};
log(LogMessage::MqttStayAliveRec, a, 0, "", "");
MQTT_STAY_ALIVE.store(value, Ordering::Relaxed);
} else {
log(LogMessage::UnknownTopic, 0, 0, "", &topic);
}
}
},
MqttMessage::Disconnected => {
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
info!("Mqtt disconnected");
}
}
}
}
#[embassy_executor::task(pool_size = 2)]

View File

@@ -18,7 +18,7 @@ use hal::PROGRESS_ACTIVE;
use crate::config::{NetworkConfig, PlantConfig};
use crate::fat_error::FatResult;
use crate::hal::esp::MQTT_STAY_ALIVE;
use crate::log::log;
use crate::tank::{determine_tank_state, TankError, TankState, WATER_FROZEN_THRESH};
use crate::webserver::http_server;
@@ -67,6 +67,7 @@ mod config;
mod fat_error;
mod hal;
mod log;
mod mqtt;
mod plant_state;
mod tank;
mod webserver;
@@ -83,12 +84,6 @@ enum WaitType {
MqttConfig,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct Solar {
current_ma: u32,
voltage_ma: u32,
}
impl WaitType {
fn blink_pattern(&self) -> u64 {
match self {
@@ -99,31 +94,6 @@ impl WaitType {
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Default)]
/// Light State tracking data for mqtt
struct LightState {
/// is enabled in config
enabled: bool,
/// led is on
active: bool,
/// led should not be on at this time of day
out_of_work_hour: bool,
/// the battery is low so do not use led
battery_low: bool,
/// the sun is up
is_day: bool,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Default)]
///mqtt struct to track pump activities
struct PumpInfo {
enabled: bool,
pump_ineffective: bool,
median_current_ma: u16,
max_current_ma: u16,
min_current_ma: u16,
}
#[derive(Serialize)]
pub struct PumpResult {
median_current_ma: u16,
@@ -135,22 +105,6 @@ pub struct PumpResult {
pump_time_s: u16,
}
#[derive(Serialize, Debug, PartialEq)]
enum SntpMode {
OFFLINE,
SYNC { current: DateTime<Utc> },
}
#[derive(Serialize, Debug, PartialEq)]
enum NetworkMode {
WIFI {
sntp: SntpMode,
mqtt: bool,
ip_address: String,
},
OFFLINE,
}
async fn safe_main(spawner: Spawner) -> FatResult<()> {
info!("Startup Rust");
@@ -239,10 +193,10 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
info!("No wifi configured");
//the current sensors require this amount to stabilize, in the case of Wi-Fi this is already handled due to connect timings;
Timer::after_millis(100).await;
NetworkMode::OFFLINE
mqtt::NetworkMode::OFFLINE
};
if matches!(network_mode, NetworkMode::OFFLINE) && to_config {
if matches!(network_mode, mqtt::NetworkMode::OFFLINE) && to_config {
info!("Could not connect to station and config mode forced, switching to ap mode!");
let res = {
@@ -276,7 +230,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
timezone_time
);
if let NetworkMode::WIFI { ref ip_address, .. } = network_mode {
if let mqtt::NetworkMode::WIFI { ref ip_address, .. } = network_mode {
publish_firmware_info(&mut board, version, ip_address, &timezone_time.to_rfc3339()).await;
publish_battery_state(&mut board).await;
let _ = publish_mppt_state(&mut board).await;
@@ -284,15 +238,15 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
log(
LogMessage::StartupInfo,
matches!(network_mode, NetworkMode::WIFI { .. }) as u32,
matches!(network_mode, mqtt::NetworkMode::WIFI { .. }) as u32,
matches!(
network_mode,
NetworkMode::WIFI {
sntp: SntpMode::SYNC { .. },
mqtt::NetworkMode::WIFI {
sntp: mqtt::SntpMode::SYNC { .. },
..
}
) as u32,
matches!(network_mode, NetworkMode::WIFI { mqtt: true, .. })
matches!(network_mode, mqtt::NetworkMode::WIFI { mqtt: true, .. })
.to_string()
.as_str(),
"",
@@ -466,7 +420,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
.unwrap_or(BatteryState::Unknown);
info!("Battery state is {:?}", battery_state);
let mut light_state = LightState {
let mut light_state = mqtt::LightState {
enabled: board.board_hal.get_config().night_lamp.enabled,
..Default::default()
};
@@ -536,11 +490,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
match &serde_json::to_string(&light_state) {
Ok(state) => {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/light", state)
.await;
let _ = mqtt::publish("/light", state).await;
}
Err(err) => {
info!("Error publishing lightstate {}", err);
@@ -550,29 +500,16 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
let deep_sleep_duration_minutes: u32 =
// if battery soc is unknown assume battery has enough change
if state_of_charge < 10.0 && !matches!(battery_state, BatteryState::Unknown) {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/deepsleep", "low Volt 12h").await;
let _ = mqtt::publish("/deepsleep", "low Volt 12h").await;
12 * 60
} else if is_day {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/deepsleep", "normal 20m").await;
let _ = mqtt::publish("/deepsleep", "normal 20m").await;
20
} else {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/deepsleep", "night 1h").await;
let _ = mqtt::publish("/deepsleep", "night 1h").await;
60
};
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/state", "sleep")
.await;
let _ = mqtt::publish("/state", "sleep").await;
info!("Go to sleep for {} minutes", deep_sleep_duration_minutes);
//determine next event
@@ -582,7 +519,7 @@ async fn safe_main(spawner: Spawner) -> FatResult<()> {
//TODO
//mark_app_valid();
let stay_alive = MQTT_STAY_ALIVE.load(Ordering::Relaxed);
let stay_alive = mqtt::is_stay_alive();
info!("Check stay alive, current state is {}", stay_alive);
if stay_alive {
@@ -737,7 +674,7 @@ async fn publish_tank_state(
&tank_state.as_mqtt_info(&board.board_hal.get_config().tank, &water_temp),
)
.unwrap();
let _ = board.board_hal.get_esp().mqtt_publish("/water", &*state);
let _ = mqtt::publish("/water", &*state).await;
}
async fn publish_plant_states(
@@ -753,11 +690,7 @@ async fn publish_plant_states(
let state =
serde_json::to_string(&plant_state.to_mqtt_info(plant_conf, timezone_time)).unwrap();
let plant_topic = format!("/plant{}", plant_id + 1);
let _ = board
.board_hal
.get_esp()
.mqtt_publish(&plant_topic, &state)
.await;
let _ = mqtt::publish(&plant_topic, &state).await;
}
}
@@ -767,13 +700,12 @@ async fn publish_firmware_info(
ip_address: &str,
timezone_time: &str,
) {
let esp = board.board_hal.get_esp();
esp.mqtt_publish("/firmware/address", ip_address).await;
esp.mqtt_publish("/firmware/state", format!("{:?}", &version).as_str())
mqtt::publish("/firmware/address", ip_address).await;
mqtt::publish("/firmware/state", format!("{:?}", &version).as_str())
.await;
esp.mqtt_publish("/firmware/last_online", timezone_time)
mqtt::publish("/firmware/last_online", timezone_time)
.await;
esp.mqtt_publish("/state", "online").await;
mqtt::publish("/state", "online").await;
}
macro_rules! mk_static {
($t:ty,$val:expr) => {{
@@ -787,13 +719,13 @@ async fn try_connect_wifi_sntp_mqtt(
board: &mut MutexGuard<'static, CriticalSectionRawMutex, HAL<'static>>,
stack_store: &mut OptionLock<Stack<'static>>,
spawner: Spawner,
) -> NetworkMode {
) -> mqtt::NetworkMode {
let nw_conf = &board.board_hal.get_config().network.clone();
match board.board_hal.get_esp().wifi(nw_conf, spawner).await {
Ok(stack) => {
stack_store.replace(stack);
let sntp_mode: SntpMode = match board.board_hal.get_esp().sntp(1000 * 10, stack).await {
let sntp_mode: mqtt::SntpMode = match board.board_hal.get_esp().sntp(1000 * 10, stack).await {
Ok(new_time) => {
info!("Using time from sntp {}", new_time.to_rfc3339());
let _ = board
@@ -801,24 +733,19 @@ async fn try_connect_wifi_sntp_mqtt(
.get_rtc_module()
.set_rtc_time(&new_time)
.await;
SntpMode::SYNC { current: new_time }
mqtt::SntpMode::SYNC { current: new_time }
}
Err(err) => {
warn!("sntp error: {err}");
board.board_hal.general_fault(true).await;
SntpMode::OFFLINE
mqtt::SntpMode::OFFLINE
}
};
let mqtt_connected = if board.board_hal.get_config().network.mqtt_url.is_some() {
let nw_config = board.board_hal.get_config().network.clone();
let nw_config = mk_static!(NetworkConfig, nw_config);
match board
.board_hal
.get_esp()
.mqtt(nw_config, stack, spawner)
.await
{
match mqtt::mqtt_init(nw_config, stack, spawner).await {
Ok(_) => {
info!("Mqtt connection ready");
true
@@ -839,7 +766,7 @@ async fn try_connect_wifi_sntp_mqtt(
None => String::from("No IP"),
},
};
NetworkMode::WIFI {
mqtt::NetworkMode::WIFI {
sntp: sntp_mode,
mqtt: mqtt_connected,
ip_address: ip,
@@ -848,7 +775,7 @@ async fn try_connect_wifi_sntp_mqtt(
Err(err) => {
info!("Offline mode due to {err}");
board.board_hal.general_fault(true).await;
NetworkMode::OFFLINE
mqtt::NetworkMode::OFFLINE
}
}
}
@@ -862,7 +789,7 @@ async fn pump_info(
min_current_ma: u16,
_error: bool,
) {
let pump_info = PumpInfo {
let pump_info = mqtt::PumpInfo {
enabled: pump_active,
pump_ineffective,
median_current_ma,
@@ -873,15 +800,7 @@ async fn pump_info(
match serde_json::to_string(&pump_info) {
Ok(state) => {
BOARD_ACCESS
.get()
.await
.lock()
.await
.board_hal
.get_esp()
.mqtt_publish(&pump_topic, &state)
.await;
let _ = mqtt::publish(&pump_topic, &state).await;
}
Err(err) => {
warn!("Error publishing pump state {}", err);
@@ -894,15 +813,12 @@ async fn publish_mppt_state(
) -> FatResult<()> {
let current = board.board_hal.get_mptt_current().await?;
let voltage = board.board_hal.get_mptt_voltage().await?;
let solar_state = Solar {
let solar_state = mqtt::Solar {
current_ma: current.as_milliamperes() as u32,
voltage_ma: voltage.as_millivolts() as u32,
};
if let Ok(serialized_solar_state_bytes) = serde_json::to_string(&solar_state) {
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/mppt", &serialized_solar_state_bytes);
let _ = mqtt::publish("/mppt", &serialized_solar_state_bytes).await;
}
Ok(())
}
@@ -923,11 +839,7 @@ async fn publish_battery_state(
Err(_) => "error".to_owned(),
};
{
let _ = board
.board_hal
.get_esp()
.mqtt_publish("/battery", &*value)
.await;
let _ = mqtt::publish("/battery", &*value).await;
}
}
@@ -1031,9 +943,8 @@ async fn wait_infinity(
let cur = board.board_hal.get_time().await;
let timezone_time = cur.with_timezone(&timezone);
let esp = board.board_hal.get_esp();
esp.mqtt_publish("/state", "config").await;
esp.mqtt_publish("/firmware/last_online", &timezone_time.to_rfc3339())
mqtt::publish("/state", "config").await;
mqtt::publish("/firmware/last_online", &timezone_time.to_rfc3339())
.await;
last_mqtt_update = Some(now);
}
@@ -1087,7 +998,7 @@ async fn wait_infinity(
hal::PlantHal::feed_watchdog();
if wait_type == WaitType::MqttConfig && !MQTT_STAY_ALIVE.load(Ordering::Relaxed) {
if wait_type == WaitType::MqttConfig && !mqtt::is_stay_alive() {
reboot_now.store(true, Ordering::Relaxed);
}
if reboot_now.load(Ordering::Relaxed) {

300
rust/src/mqtt.rs Normal file
View File

@@ -0,0 +1,300 @@
use crate::bail;
use crate::config::NetworkConfig;
use crate::fat_error::{ContextExt, FatError, FatResult};
use crate::hal::PlantHal;
use crate::log::{log, LogMessage};
use alloc::string::String;
use alloc::{format, string::ToString};
use chrono::{DateTime, Utc};
use core::sync::atomic::Ordering;
use embassy_executor::Spawner;
use embassy_net::Stack;
use embassy_time::{Duration, Timer, WithTimeout};
use log::info;
use mcutie::{
Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable,
QoS, Topic,
};
use portable_atomic::AtomicBool;
use embassy_sync::once_lock::OnceLock;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq, Default)]
pub struct LightState {
pub enabled: bool,
pub active: bool,
pub out_of_work_hour: bool,
pub battery_low: bool,
pub is_day: bool,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Default)]
pub struct PumpInfo {
pub enabled: bool,
pub pump_ineffective: bool,
pub median_current_ma: u16,
pub max_current_ma: u16,
pub min_current_ma: u16,
}
#[derive(Serialize, Debug, PartialEq)]
pub struct Solar {
pub current_ma: u32,
pub voltage_ma: u32,
}
#[derive(Serialize, Debug, PartialEq)]
pub enum SntpMode {
OFFLINE,
SYNC { current: DateTime<Utc> },
}
#[derive(Serialize, Debug, PartialEq)]
pub enum NetworkMode {
WIFI {
sntp: SntpMode,
mqtt: bool,
ip_address: String,
},
OFFLINE,
}
static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false);
static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false);
pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false);
static MQTT_BASE_TOPIC: OnceLock<String> = OnceLock::new();
pub fn is_stay_alive() -> bool {
MQTT_STAY_ALIVE.load(Ordering::Relaxed)
}
pub async fn publish(subtopic: &str, message: &str) {
let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed);
if !online {
return;
}
let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed);
if !roundtrip_ok {
info!("MQTT roundtrip not received yet, dropping message");
return;
}
match publish_inner(subtopic, message).await {
Ok(()) => {}
Err(err) => {
info!(
"Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}"
);
}
};
}
async fn publish_inner(subtopic: &str, message: &str) -> FatResult<()> {
if !subtopic.starts_with("/") {
bail!("Subtopic without / at start {}", subtopic);
}
if subtopic.len() > 192 {
bail!("Subtopic exceeds 192 chars {}", subtopic);
}
let base_topic = MQTT_BASE_TOPIC
.try_get()
.context("missing base topic in static!")?;
let full_topic = format!("{base_topic}{subtopic}");
loop {
let result = Topic::General(full_topic.as_str())
.with_display(message)
.retain(true)
.publish()
.await;
match result {
Ok(()) => return Ok(()),
Err(err) => {
let retry = match err {
Error::IOError => false,
Error::TimedOut => true,
Error::TooLarge => false,
Error::PacketError => false,
Error::Invalid => false,
Error::Rejected => false,
};
if !retry {
bail!(
"Error during mqtt send on topic {} with message {:#?} error is {:?}",
&full_topic,
message,
err
);
}
info!(
"Retransmit for {} with message {:#?} error is {:?} retrying {}",
&full_topic, message, err, retry
);
Timer::after(Duration::from_millis(100)).await;
}
}
}
}
macro_rules! mk_static {
($t:ty,$val:expr) => {{
static STATIC_CELL: static_cell::StaticCell<$t> = static_cell::StaticCell::new();
#[deny(unused_attributes)]
let x = STATIC_CELL.uninit().write(($val));
x
}};
}
pub async fn mqtt_init(
network_config: &'static NetworkConfig,
stack: Stack<'static>,
spawner: Spawner,
) -> FatResult<()> {
let base_topic = network_config
.base_topic
.as_ref()
.context("missing base topic")?;
if base_topic.is_empty() {
bail!("Mqtt base_topic was empty")
}
MQTT_BASE_TOPIC
.init(base_topic.to_string())
.map_err(|_| FatError::String {
error: "Error setting basetopic".to_string(),
})?;
let mqtt_url = network_config
.mqtt_url
.as_ref()
.context("missing mqtt url")?;
if mqtt_url.is_empty() {
bail!("Mqtt url was empty")
}
let last_will_topic = format!("{base_topic}/state");
let round_trip_topic = format!("{base_topic}/internal/roundtrip");
let stay_alive_topic = format!("{base_topic}/stay_alive");
let mut builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 0> =
McutieBuilder::new(stack, "plant ctrl", mqtt_url);
if let (Some(mqtt_user), Some(mqtt_password)) = (
network_config.mqtt_user.as_ref(),
network_config.mqtt_password.as_ref(),
) {
builder = builder.with_authentication(mqtt_user, mqtt_password);
info!("With authentification");
}
let lwt = Topic::General(last_will_topic);
let lwt = mk_static!(Topic<String>, lwt);
let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce);
builder = builder.with_last_will(lwt);
//TODO make configurable
builder = builder.with_device_id("plantctrl");
let builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 2> = builder
.with_subscriptions([
Topic::General(round_trip_topic.clone()),
Topic::General(stay_alive_topic.clone()),
]);
let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16;
let (receiver, task) = builder.build(keep_alive);
spawner.spawn(mqtt_incoming_task(
receiver,
round_trip_topic.clone(),
stay_alive_topic.clone(),
)?);
spawner.spawn(mqtt_runner(task)?);
log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
log(LogMessage::MqttInfo, 0, 0, "", mqtt_url);
let mqtt_timeout = 15000;
let res = async {
while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) {
PlantHal::feed_watchdog();
Timer::after(Duration::from_millis(100)).await;
}
Ok::<(), FatError>(())
}
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
.await;
if res.is_err() {
bail!("Timeout waiting MQTT connect event")
}
let _ = Topic::General(round_trip_topic.clone())
.with_display("online_text")
.publish()
.await;
let res = async {
while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) {
PlantHal::feed_watchdog();
Timer::after(Duration::from_millis(100)).await;
}
Ok::<(), FatError>(())
}
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
.await;
if res.is_err() {
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
bail!("Timeout waiting MQTT roundtrip")
}
Ok(())
}
#[embassy_executor::task]
async fn mqtt_runner(
task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>,
) {
task.run().await;
}
#[embassy_executor::task]
async fn mqtt_incoming_task(
receiver: McutieReceiver,
round_trip_topic: String,
stay_alive_topic: String,
) {
loop {
let message = receiver.receive().await;
match message {
MqttMessage::Connected => {
info!("Mqtt connected");
MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed);
}
MqttMessage::Publish(topic, payload) => match topic {
Topic::DeviceType(_type_topic) => {}
Topic::Device(_device_topic) => {}
Topic::General(topic) => {
let subtopic = topic.as_str();
if subtopic.eq(round_trip_topic.as_str()) {
MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed);
} else if subtopic.eq(stay_alive_topic.as_str()) {
let value = payload.eq_ignore_ascii_case("true".as_ref())
|| payload.eq_ignore_ascii_case("1".as_ref());
let a = match value {
true => 1,
false => 0,
};
log(LogMessage::MqttStayAliveRec, a, 0, "", "");
MQTT_STAY_ALIVE.store(value, Ordering::Relaxed);
} else {
log(LogMessage::UnknownTopic, 0, 0, "", &topic);
}
}
},
MqttMessage::Disconnected => {
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
info!("Mqtt disconnected");
}
}
}
}