Integrate mcutie library for MQTT functionality

- Added `mcutie` as a dependency in `Cargo.toml` and updated `Cargo.lock`.
- Replaced commented-out MQTT logic with fully implemented functionality in `esp.rs`.
- Enhanced MQTT publish and subscription handling with configurable topics and error handling.
- Updated MQTT connection logic to improve reliability and logging.
This commit is contained in:
2026-04-26 19:56:16 +02:00
parent 2e4eb283b5
commit fc0e18da56
3 changed files with 186 additions and 128 deletions

View File

@@ -1426,6 +1426,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "httparse" name = "httparse"
version = "1.10.1" version = "1.10.1"
@@ -1625,6 +1631,24 @@ dependencies = [
"regex-automata", "regex-automata",
] ]
[[package]]
name = "mcutie"
version = "3.0.0"
dependencies = [
"embassy-futures",
"embassy-net 0.8.0",
"embassy-sync 0.8.0",
"embassy-time",
"embedded-io 0.7.1",
"embedded-io-async 0.7.0",
"heapless 0.7.17",
"hex",
"log",
"mqttrs",
"once_cell",
"pin-project",
]
[[package]] [[package]]
name = "measurements" name = "measurements"
version = "0.11.1" version = "0.11.1"
@@ -1640,6 +1664,15 @@ version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "mqttrs"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62fc2b40eee1386c55479d534ec95a668e0562d54c6d1dc83bb1962469fec8a7"
dependencies = [
"heapless 0.7.17",
]
[[package]] [[package]]
name = "nb" name = "nb"
version = "0.1.3" version = "0.1.3"
@@ -1734,6 +1767,10 @@ name = "once_cell"
version = "1.21.3" version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
dependencies = [
"critical-section",
"portable-atomic",
]
[[package]] [[package]]
name = "onewire" name = "onewire"
@@ -1810,6 +1847,26 @@ dependencies = [
"siphasher", "siphasher",
] ]
[[package]]
name = "pin-project"
version = "1.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.110",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.16" version = "0.2.16"
@@ -1862,6 +1919,7 @@ dependencies = [
"ina219", "ina219",
"lib-bms-protocol", "lib-bms-protocol",
"log", "log",
"mcutie",
"measurements", "measurements",
"nb 1.1.0", "nb 1.1.0",
"onewire", "onewire",

View File

@@ -109,6 +109,7 @@ unit-enum = "1.4.3"
async-trait = "0.1.89" async-trait = "0.1.89"
option-lock = { version = "0.3.1", default-features = false } option-lock = { version = "0.3.1", default-features = false }
measurements = "0.11.1" measurements = "0.11.1"
mcutie = { path = "src/mcutie_3_0_0", features = ["log"] }
#bq34z100 = { path = "../../bq34z100_rust" } #bq34z100 = { path = "../../bq34z100_rust" }

View File

@@ -39,10 +39,10 @@ use esp_radio::wifi::scan::{ScanConfig, ScanTypeConfig};
use esp_radio::wifi::sta::StationConfig; use esp_radio::wifi::sta::StationConfig;
use esp_radio::wifi::{AuthenticationMethod, Config, Interface, WifiController}; use esp_radio::wifi::{AuthenticationMethod, Config, Interface, WifiController};
use log::{error, info, warn}; use log::{error, info, warn};
// pub use mcutie::{ use mcutie::{
// Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable,
// QoS, Topic, QoS, Topic,
// }; };
use portable_atomic::AtomicBool; use portable_atomic::AtomicBool;
use sntpc::{get_time, NtpContext, NtpTimestampGenerator, NtpUdpSocket}; use sntpc::{get_time, NtpContext, NtpTimestampGenerator, NtpUdpSocket};
@@ -678,62 +678,62 @@ impl Esp<'_> {
let round_trip_topic = format!("{base_topic}/internal/roundtrip"); let round_trip_topic = format!("{base_topic}/internal/roundtrip");
let stay_alive_topic = format!("{base_topic}/stay_alive"); let stay_alive_topic = format!("{base_topic}/stay_alive");
// let mut builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 0> = let mut builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 0> =
// McutieBuilder::new(stack, "plant ctrl", mqtt_url); McutieBuilder::new(stack, "plant ctrl", mqtt_url);
// if let (Some(mqtt_user), Some(mqtt_password)) = ( if let (Some(mqtt_user), Some(mqtt_password)) = (
// network_config.mqtt_user.as_ref(), network_config.mqtt_user.as_ref(),
// network_config.mqtt_password.as_ref(), network_config.mqtt_password.as_ref(),
// ) { ) {
// builder = builder.with_authentication(mqtt_user, mqtt_password); builder = builder.with_authentication(mqtt_user, mqtt_password);
// info!("With authentification"); info!("With authentification");
// } }
//
// let lwt = Topic::General(last_will_topic); let lwt = Topic::General(last_will_topic);
// let lwt = mk_static!(Topic<String>, lwt); let lwt = mk_static!(Topic<String>, lwt);
// let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce);
// builder = builder.with_last_will(lwt); builder = builder.with_last_will(lwt);
// //TODO make configurable //TODO make configurable
// builder = builder.with_device_id("plantctrl"); builder = builder.with_device_id("plantctrl");
//
// let builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 2> = builder let builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 2> = builder
// .with_subscriptions([ .with_subscriptions([
// Topic::General(round_trip_topic.clone()), Topic::General(round_trip_topic.clone()),
// Topic::General(stay_alive_topic.clone()), Topic::General(stay_alive_topic.clone()),
// ]); ]);
//
// let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16;
// let (receiver, task) = builder.build(keep_alive); let (receiver, task) = builder.build(keep_alive);
//
// spawner.spawn(mqtt_incoming_task( spawner.spawn(mqtt_incoming_task(
// receiver, receiver,
// round_trip_topic.clone(), round_trip_topic.clone(),
// stay_alive_topic.clone(), stay_alive_topic.clone(),
// )?); )?);
// spawner.spawn(mqtt_runner(task)?); spawner.spawn(mqtt_runner(task)?);
//
// log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
//
// log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); log(LogMessage::MqttInfo, 0, 0, "", mqtt_url);
//
let mqtt_timeout = 15000; let mqtt_timeout = 15000;
// let res = async { let res = async {
// while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) {
// crate::hal::PlantHal::feed_watchdog(); crate::hal::PlantHal::feed_watchdog();
// Timer::after(Duration::from_millis(100)).await; Timer::after(Duration::from_millis(100)).await;
// } }
// Ok::<(), FatError>(()) Ok::<(), FatError>(())
// } }
// .with_timeout(Duration::from_millis(mqtt_timeout as u64)) .with_timeout(Duration::from_millis(mqtt_timeout as u64))
// .await; .await;
//
// if res.is_err() { if res.is_err() {
// bail!("Timeout waiting MQTT connect event") bail!("Timeout waiting MQTT connect event")
// } }
//
// let _ = Topic::General(round_trip_topic.clone()) let _ = Topic::General(round_trip_topic.clone())
// .with_display("online_text") .with_display("online_text")
// .publish() .publish()
// .await; .await;
let res = async { let res = async {
while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) {
@@ -767,36 +767,37 @@ impl Esp<'_> {
let full_topic = format!("{base_topic}{subtopic}"); let full_topic = format!("{base_topic}{subtopic}");
loop { loop {
// let result = Topic::General(full_topic.as_str()) let result = Topic::General(full_topic.as_str())
// .with_display(message) .with_display(message)
// .retain(true) .retain(true)
// .publish() .publish()
// .await; .await;
// match result { match result {
// Ok(()) => return Ok(()), Ok(()) => return Ok(()),
// Err(err) => { Err(err) => {
// let retry = match err { let retry = match err {
// Error::IOError => false, Error::IOError => false,
// Error::TimedOut => true, Error::TimedOut => true,
// Error::TooLarge => false, Error::TooLarge => false,
// Error::PacketError => false, Error::PacketError => false,
// Error::Invalid => false, Error::Invalid => false,
// }; Error::Rejected => false,
// if !retry { };
// bail!( if !retry {
// "Error during mqtt send on topic {} with message {:#?} error is {:?}", bail!(
// &full_topic, "Error during mqtt send on topic {} with message {:#?} error is {:?}",
// message, &full_topic,
// err message,
// ); err
// } );
// info!( }
// "Retransmit for {} with message {:#?} error is {:?} retrying {}", info!(
// &full_topic, message, err, retry "Retransmit for {} with message {:#?} error is {:?} retrying {}",
// ); &full_topic, message, err, retry
// Timer::after(Duration::from_millis(100)).await; );
// } Timer::after(Duration::from_millis(100)).await;
// } }
}
} }
} }
pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) { pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) {
@@ -821,54 +822,52 @@ impl Esp<'_> {
} }
#[embassy_executor::task] #[embassy_executor::task]
async fn mqtt_runner(//task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, async fn mqtt_runner(
task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>,
) { ) {
//task.run().await; task.run().await;
} }
#[embassy_executor::task] #[embassy_executor::task]
async fn mqtt_incoming_task( async fn mqtt_incoming_task(
//receiver: McutieReceiver, receiver: McutieReceiver,
round_trip_topic: String, round_trip_topic: String,
stay_alive_topic: String, stay_alive_topic: String,
) { ) {
loop { loop {
//let message = receiver.receive().await; let message = receiver.receive().await;
// match message { match message {
// MqttMessage::Connected => { MqttMessage::Connected => {
// info!("Mqtt connected"); info!("Mqtt connected");
// MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed);
// } }
// MqttMessage::Publish(topic, payload) => match topic { MqttMessage::Publish(topic, payload) => match topic {
// Topic::DeviceType(_type_topic) => {} Topic::DeviceType(_type_topic) => {}
// Topic::Device(_device_topic) => {} Topic::Device(_device_topic) => {}
// Topic::General(topic) => { Topic::General(topic) => {
// let subtopic = topic.as_str(); let subtopic = topic.as_str();
//
// if subtopic.eq(round_trip_topic.as_str()) { if subtopic.eq(round_trip_topic.as_str()) {
// MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed);
// } else if subtopic.eq(stay_alive_topic.as_str()) { } else if subtopic.eq(stay_alive_topic.as_str()) {
// let value = payload.eq_ignore_ascii_case("true".as_ref()) let value = payload.eq_ignore_ascii_case("true".as_ref())
// || payload.eq_ignore_ascii_case("1".as_ref()); || payload.eq_ignore_ascii_case("1".as_ref());
// let a = match value { let a = match value {
// true => 1, true => 1,
// false => 0, false => 0,
// }; };
// log(LogMessage::MqttStayAliveRec, a, 0, "", ""); log(LogMessage::MqttStayAliveRec, a, 0, "", "");
// MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); MQTT_STAY_ALIVE.store(value, Ordering::Relaxed);
// } else { } else {
// log(LogMessage::UnknownTopic, 0, 0, "", &topic); log(LogMessage::UnknownTopic, 0, 0, "", &topic);
// } }
// } }
// }, },
// MqttMessage::Disconnected => { MqttMessage::Disconnected => {
// MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
// info!("Mqtt disconnected"); info!("Mqtt disconnected");
// } }
// MqttMessage::HomeAssistantOnline => { }
// info!("Home assistant is online");
// }
// }
} }
} }