diff --git a/Software/MainBoard/rust/Cargo.lock b/Software/MainBoard/rust/Cargo.lock index eadc66f..adb3691 100644 --- a/Software/MainBoard/rust/Cargo.lock +++ b/Software/MainBoard/rust/Cargo.lock @@ -1426,6 +1426,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "httparse" version = "1.10.1" @@ -1625,6 +1631,24 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "mcutie" +version = "3.0.0" +dependencies = [ + "embassy-futures", + "embassy-net 0.8.0", + "embassy-sync 0.8.0", + "embassy-time", + "embedded-io 0.7.1", + "embedded-io-async 0.7.0", + "heapless 0.7.17", + "hex", + "log", + "mqttrs", + "once_cell", + "pin-project", +] + [[package]] name = "measurements" version = "0.11.1" @@ -1640,6 +1664,15 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "mqttrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62fc2b40eee1386c55479d534ec95a668e0562d54c6d1dc83bb1962469fec8a7" +dependencies = [ + "heapless 0.7.17", +] + [[package]] name = "nb" version = "0.1.3" @@ -1734,6 +1767,10 @@ name = "once_cell" version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +dependencies = [ + "critical-section", + "portable-atomic", +] [[package]] name = "onewire" @@ -1810,6 +1847,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1862,6 +1919,7 @@ dependencies = [ "ina219", "lib-bms-protocol", "log", + "mcutie", "measurements", "nb 1.1.0", "onewire", diff --git a/Software/MainBoard/rust/Cargo.toml b/Software/MainBoard/rust/Cargo.toml index 5a69d8e..ffa0418 100644 --- a/Software/MainBoard/rust/Cargo.toml +++ b/Software/MainBoard/rust/Cargo.toml @@ -109,6 +109,7 @@ unit-enum = "1.4.3" async-trait = "0.1.89" option-lock = { version = "0.3.1", default-features = false } measurements = "0.11.1" +mcutie = { path = "src/mcutie_3_0_0", features = ["log"] } #bq34z100 = { path = "../../bq34z100_rust" } diff --git a/Software/MainBoard/rust/src/hal/esp.rs b/Software/MainBoard/rust/src/hal/esp.rs index d2a339d..7a2bdbf 100644 --- a/Software/MainBoard/rust/src/hal/esp.rs +++ b/Software/MainBoard/rust/src/hal/esp.rs @@ -39,10 +39,10 @@ use esp_radio::wifi::scan::{ScanConfig, ScanTypeConfig}; use esp_radio::wifi::sta::StationConfig; use esp_radio::wifi::{AuthenticationMethod, Config, Interface, WifiController}; use log::{error, info, warn}; -// pub use mcutie::{ -// Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, -// QoS, Topic, -// }; +use mcutie::{ + Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable, + QoS, Topic, +}; use portable_atomic::AtomicBool; use sntpc::{get_time, NtpContext, NtpTimestampGenerator, NtpUdpSocket}; @@ -678,62 +678,62 @@ impl Esp<'_> { let round_trip_topic = format!("{base_topic}/internal/roundtrip"); let stay_alive_topic = format!("{base_topic}/stay_alive"); - // let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = - // McutieBuilder::new(stack, "plant ctrl", mqtt_url); - // if let (Some(mqtt_user), Some(mqtt_password)) = ( - // network_config.mqtt_user.as_ref(), - // network_config.mqtt_password.as_ref(), - // ) { - // builder = builder.with_authentication(mqtt_user, mqtt_password); - // info!("With authentification"); - // } - // - // let lwt = Topic::General(last_will_topic); - // let lwt = mk_static!(Topic, lwt); - // let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); - // builder = builder.with_last_will(lwt); - // //TODO make configurable - // builder = builder.with_device_id("plantctrl"); - // - // let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder - // .with_subscriptions([ - // Topic::General(round_trip_topic.clone()), - // Topic::General(stay_alive_topic.clone()), - // ]); - // - // let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; - // let (receiver, task) = builder.build(keep_alive); - // - // spawner.spawn(mqtt_incoming_task( - // receiver, - // round_trip_topic.clone(), - // stay_alive_topic.clone(), - // )?); - // spawner.spawn(mqtt_runner(task)?); - // - // log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); - // - // log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); - // + let mut builder: McutieBuilder<'_, String, PublishDisplay, 0> = + McutieBuilder::new(stack, "plant ctrl", mqtt_url); + if let (Some(mqtt_user), Some(mqtt_password)) = ( + network_config.mqtt_user.as_ref(), + network_config.mqtt_password.as_ref(), + ) { + builder = builder.with_authentication(mqtt_user, mqtt_password); + info!("With authentification"); + } + + let lwt = Topic::General(last_will_topic); + let lwt = mk_static!(Topic, lwt); + let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce); + builder = builder.with_last_will(lwt); + //TODO make configurable + builder = builder.with_device_id("plantctrl"); + + let builder: McutieBuilder<'_, String, PublishDisplay, 2> = builder + .with_subscriptions([ + Topic::General(round_trip_topic.clone()), + Topic::General(stay_alive_topic.clone()), + ]); + + let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16; + let (receiver, task) = builder.build(keep_alive); + + spawner.spawn(mqtt_incoming_task( + receiver, + round_trip_topic.clone(), + stay_alive_topic.clone(), + )?); + spawner.spawn(mqtt_runner(task)?); + + log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic); + + log(LogMessage::MqttInfo, 0, 0, "", mqtt_url); + let mqtt_timeout = 15000; - // let res = async { - // while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { - // crate::hal::PlantHal::feed_watchdog(); - // Timer::after(Duration::from_millis(100)).await; - // } - // Ok::<(), FatError>(()) - // } - // .with_timeout(Duration::from_millis(mqtt_timeout as u64)) - // .await; - // - // if res.is_err() { - // bail!("Timeout waiting MQTT connect event") - // } - // - // let _ = Topic::General(round_trip_topic.clone()) - // .with_display("online_text") - // .publish() - // .await; + let res = async { + while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) { + crate::hal::PlantHal::feed_watchdog(); + Timer::after(Duration::from_millis(100)).await; + } + Ok::<(), FatError>(()) + } + .with_timeout(Duration::from_millis(mqtt_timeout as u64)) + .await; + + if res.is_err() { + bail!("Timeout waiting MQTT connect event") + } + + let _ = Topic::General(round_trip_topic.clone()) + .with_display("online_text") + .publish() + .await; let res = async { while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) { @@ -767,36 +767,37 @@ impl Esp<'_> { let full_topic = format!("{base_topic}{subtopic}"); loop { - // let result = Topic::General(full_topic.as_str()) - // .with_display(message) - // .retain(true) - // .publish() - // .await; - // match result { - // Ok(()) => return Ok(()), - // Err(err) => { - // let retry = match err { - // Error::IOError => false, - // Error::TimedOut => true, - // Error::TooLarge => false, - // Error::PacketError => false, - // Error::Invalid => false, - // }; - // if !retry { - // bail!( - // "Error during mqtt send on topic {} with message {:#?} error is {:?}", - // &full_topic, - // message, - // err - // ); - // } - // info!( - // "Retransmit for {} with message {:#?} error is {:?} retrying {}", - // &full_topic, message, err, retry - // ); - // Timer::after(Duration::from_millis(100)).await; - // } - // } + let result = Topic::General(full_topic.as_str()) + .with_display(message) + .retain(true) + .publish() + .await; + match result { + Ok(()) => return Ok(()), + Err(err) => { + let retry = match err { + Error::IOError => false, + Error::TimedOut => true, + Error::TooLarge => false, + Error::PacketError => false, + Error::Invalid => false, + Error::Rejected => false, + }; + if !retry { + bail!( + "Error during mqtt send on topic {} with message {:#?} error is {:?}", + &full_topic, + message, + err + ); + } + info!( + "Retransmit for {} with message {:#?} error is {:?} retrying {}", + &full_topic, message, err, retry + ); + Timer::after(Duration::from_millis(100)).await; + } + } } } pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) { @@ -821,54 +822,52 @@ impl Esp<'_> { } #[embassy_executor::task] -async fn mqtt_runner(//task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, +async fn mqtt_runner( + task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>, ) { - //task.run().await; + task.run().await; } #[embassy_executor::task] async fn mqtt_incoming_task( - //receiver: McutieReceiver, + receiver: McutieReceiver, round_trip_topic: String, stay_alive_topic: String, ) { loop { - //let message = receiver.receive().await; - // match message { - // MqttMessage::Connected => { - // info!("Mqtt connected"); - // MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); - // } - // MqttMessage::Publish(topic, payload) => match topic { - // Topic::DeviceType(_type_topic) => {} - // Topic::Device(_device_topic) => {} - // Topic::General(topic) => { - // let subtopic = topic.as_str(); - // - // if subtopic.eq(round_trip_topic.as_str()) { - // MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); - // } else if subtopic.eq(stay_alive_topic.as_str()) { - // let value = payload.eq_ignore_ascii_case("true".as_ref()) - // || payload.eq_ignore_ascii_case("1".as_ref()); - // let a = match value { - // true => 1, - // false => 0, - // }; - // log(LogMessage::MqttStayAliveRec, a, 0, "", ""); - // MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); - // } else { - // log(LogMessage::UnknownTopic, 0, 0, "", &topic); - // } - // } - // }, - // MqttMessage::Disconnected => { - // MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); - // info!("Mqtt disconnected"); - // } - // MqttMessage::HomeAssistantOnline => { - // info!("Home assistant is online"); - // } - // } + let message = receiver.receive().await; + match message { + MqttMessage::Connected => { + info!("Mqtt connected"); + MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed); + } + MqttMessage::Publish(topic, payload) => match topic { + Topic::DeviceType(_type_topic) => {} + Topic::Device(_device_topic) => {} + Topic::General(topic) => { + let subtopic = topic.as_str(); + + if subtopic.eq(round_trip_topic.as_str()) { + MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed); + } else if subtopic.eq(stay_alive_topic.as_str()) { + let value = payload.eq_ignore_ascii_case("true".as_ref()) + || payload.eq_ignore_ascii_case("1".as_ref()); + let a = match value { + true => 1, + false => 0, + }; + log(LogMessage::MqttStayAliveRec, a, 0, "", ""); + MQTT_STAY_ALIVE.store(value, Ordering::Relaxed); + } else { + log(LogMessage::UnknownTopic, 0, 0, "", &topic); + } + } + }, + MqttMessage::Disconnected => { + MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed); + info!("Mqtt disconnected"); + } + } } }