mqtt via mcutie

This commit is contained in:
2025-09-29 01:00:11 +02:00
parent 3d18b0dbf6
commit cfe23c8a09
15 changed files with 482 additions and 482 deletions

View File

@@ -7,26 +7,23 @@ use serde::Serialize;
use crate::fat_error::{ContextExt, FatError, FatResult};
use crate::hal::little_fs2storage_adapter::LittleFs2Filesystem;
use alloc::borrow::ToOwned;
use alloc::string::ToString;
use alloc::sync::Arc;
use alloc::{format, string::String, vec::Vec};
use core::marker::PhantomData;
use core::net::{IpAddr, Ipv4Addr, SocketAddr};
use core::str::FromStr;
use core::sync::atomic::Ordering;
use core::sync::atomic::Ordering::Relaxed;
use edge_dhcp::io::server::run;
use embassy_executor::Spawner;
use embassy_net::udp::UdpSocket;
use embassy_net::{DhcpConfig, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::mutex::{Mutex, MutexGuard};
use embassy_sync::once_lock::OnceLock;
use embassy_time::{Duration, Timer};
use embedded_storage::nor_flash::ReadNorFlash;
use esp_bootloader_esp_idf::ota::{Ota, OtaImageState};
use esp_bootloader_esp_idf::partitions::FlashRegion;
use esp_hal::gpio::{Input, InputConfig, Pull, RtcPinWithResistors};
use esp_hal::gpio::{Input, RtcPinWithResistors};
use esp_hal::rng::Rng;
use esp_hal::rtc_cntl::{
sleep::{TimerWakeupSource, WakeupLevel},
@@ -37,11 +34,15 @@ use esp_println::println;
use esp_storage::FlashStorage;
use esp_wifi::wifi::{
AccessPointConfiguration, AccessPointInfo, AuthMethod, ClientConfiguration, Configuration,
Interfaces, ScanConfig, ScanTypeConfig, WifiController, WifiDevice, WifiState,
ScanConfig, ScanTypeConfig, WifiController, WifiDevice, WifiState,
};
use littlefs2::fs::Filesystem;
use littlefs2_core::{FileType, PathBuf, SeekFrom};
use log::{info, warn};
use mcutie::{
Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable,
QoS, Topic,
};
use portable_atomic::AtomicBool;
use smoltcp::socket::udp::PacketMetadata;
use smoltcp::wire::DnsQueryType;
@@ -59,6 +60,11 @@ static mut RESTART_TO_CONF: i8 = 0;
const CONFIG_FILE: &str = "config.json";
const NTP_SERVER: &str = "pool.ntp.org";
static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false);
static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false);
pub static MQTT_STAY_ALIVE: AtomicBool = AtomicBool::new(false);
static MQTT_BASE_TOPIC: OnceLock<String> = OnceLock::new();
#[derive(Serialize, Debug)]
pub struct FileInfo {
filename: String,
@@ -72,18 +78,6 @@ pub struct FileList {
files: Vec<FileInfo>,
}
pub struct FileSystemSizeInfo {
pub total_size: usize,
pub used_size: usize,
pub free_size: usize,
}
pub struct MqttClient<'a> {
dummy: PhantomData<&'a ()>,
//mqtt_client: EspMqttClient<'a>,
base_topic: String,
}
#[derive(Copy, Clone, Default)]
struct Timestamp {
stamp: DateTime<Utc>,
@@ -127,9 +121,6 @@ pub struct Esp<'a> {
pub interface_ap: Option<WifiDevice<'static>>,
pub controller: Arc<Mutex<CriticalSectionRawMutex, WifiController<'static>>>,
//only filled, if a useable mqtt client with working roundtrip could be established
pub(crate) mqtt_client: Option<MqttClient<'a>>,
pub boot_button: Input<'a>,
// RTC-capable GPIO used as external wake source (store the raw peripheral)
@@ -147,12 +138,6 @@ pub struct Esp<'a> {
// CPU cores/threads, reconsider this.
unsafe impl Send for Esp<'_> {}
pub struct IpInfo {
pub(crate) ip: IpAddr,
netmask: IpAddr,
gateway: IpAddr,
}
macro_rules! mk_static {
($t:ty,$val:expr) => {{
static STATIC_CELL: static_cell::StaticCell<$t> = static_cell::StaticCell::new();
@@ -164,7 +149,7 @@ macro_rules! mk_static {
impl Esp<'_> {
pub(crate) async fn delete_file(&self, filename: String) -> FatResult<()> {
let file = PathBuf::try_from(filename.as_str()).unwrap();
let file = PathBuf::try_from(filename.as_str())?;
let access = self.fs.lock().await;
access.remove(&*file)?;
Ok(())
@@ -383,7 +368,7 @@ impl Esp<'_> {
}
}
pub(crate) async fn wifi_ap(&mut self, fallback: bool) -> FatResult<Stack<'static>> {
pub(crate) async fn wifi_ap(&mut self) -> FatResult<Stack<'static>> {
let ssid = match self.load_config().await {
Ok(config) => config.network.ap_ssid.as_str().to_string(),
Err(_) => "PlantCtrl Emergency Mode".to_string(),
@@ -482,7 +467,7 @@ impl Esp<'_> {
let (stack, runner) = embassy_net::new(
device,
config,
mk_static!(StackResources<4>, StackResources::<4>::new()),
mk_static!(StackResources<8>, StackResources::<8>::new()),
seed,
);
let stack = mk_static!(Stack, stack);
@@ -572,6 +557,8 @@ impl Esp<'_> {
}
Timer::after(Duration::from_millis(100)).await
}
info!("Connected WIFI, dhcp: {:?}", stack.config_v4());
Ok(stack.clone())
}
@@ -602,9 +589,6 @@ impl Esp<'_> {
let ext1 = esp_hal::rtc_cntl::sleep::Ext1WakeupSource::new(&mut wake_pins);
rtc.sleep_deep(&[&timer, &ext1]);
}
// We should never reach here because sleep_deep never returns, but just in case, reset.
software_reset();
}
pub(crate) async fn load_config(&mut self) -> FatResult<PlantControllerConfig> {
@@ -708,7 +692,11 @@ impl Esp<'_> {
}
}
pub(crate) async fn mqtt(&mut self, network_config: &NetworkConfig) -> FatResult<()> {
pub(crate) async fn mqtt(
&mut self,
network_config: &'static NetworkConfig,
stack: Stack<'static>,
) -> FatResult<()> {
let base_topic = network_config
.base_topic
.as_ref()
@@ -716,7 +704,12 @@ impl Esp<'_> {
if base_topic.is_empty() {
bail!("Mqtt base_topic was empty")
}
let _base_topic_copy = base_topic.clone();
MQTT_BASE_TOPIC
.init(base_topic.to_string())
.map_err(|_| FatError::String {
error: "Error setting basetopic".to_string(),
})?;
let mqtt_url = network_config
.mqtt_url
.as_ref()
@@ -725,202 +718,223 @@ impl Esp<'_> {
bail!("Mqtt url was empty")
}
bail!("todo");
//
// let last_will_topic = format!("{}/state", base_topic);
// let mqtt_client_config = MqttClientConfiguration {
// lwt: Some(LwtConfiguration {
// topic: &last_will_topic,
// payload: "lost".as_bytes(),
// qos: AtLeastOnce,
// retain: true,
// }),
// client_id: Some("plantctrl"),
// keep_alive_interval: Some(Duration::from_secs(60 * 60 * 2)),
// username: network_config.mqtt_user.as_ref().map(|v| &**v),
// password: network_config.mqtt_password.as_ref().map(|v| &**v),
// //room for improvement
// ..Default::default()
// };
//
// let mqtt_connected_event_received = Arc::new(AtomicBool::new(false));
// let mqtt_connected_event_ok = Arc::new(AtomicBool::new(false));
//
// let round_trip_ok = Arc::new(AtomicBool::new(false));
// let round_trip_topic = format!("{}/internal/roundtrip", base_topic);
// let stay_alive_topic = format!("{}/stay_alive", base_topic);
// log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
//
// let mqtt_connected_event_received_copy = mqtt_connected_event_received.clone();
// let mqtt_connected_event_ok_copy = mqtt_connected_event_ok.clone();
// let stay_alive_topic_copy = stay_alive_topic.clone();
// let round_trip_topic_copy = round_trip_topic.clone();
// let round_trip_ok_copy = round_trip_ok.clone();
// let client_id = mqtt_client_config.client_id.unwrap_or("not set");
// log(LogMessage::MqttInfo, 0, 0, client_id, mqtt_url);
// let mut client = EspMqttClient::new_cb(mqtt_url, &mqtt_client_config, move |event| {
// let payload = event.payload();
// match payload {
// embedded_svc::mqtt::client::EventPayload::Received {
// id: _,
// topic,
// data,
// details: _,
// } => {
// let data = String::from_utf8_lossy(data);
// if let Some(topic) = topic {
// //todo use enums
// if topic.eq(round_trip_topic_copy.as_str()) {
// round_trip_ok_copy.store(true, std::sync::atomic::Ordering::Relaxed);
// } else if topic.eq(stay_alive_topic_copy.as_str()) {
// let value =
// data.eq_ignore_ascii_case("true") || data.eq_ignore_ascii_case("1");
// log(LogMessage::MqttStayAliveRec, 0, 0, &data, "");
// STAY_ALIVE.store(value, std::sync::atomic::Ordering::Relaxed);
// } else {
// log(LogMessage::UnknownTopic, 0, 0, "", topic);
// }
// }
// }
// esp_idf_svc::mqtt::client::EventPayload::Connected(_) => {
// mqtt_connected_event_received_copy
// .store(true, std::sync::atomic::Ordering::Relaxed);
// mqtt_connected_event_ok_copy.store(true, std::sync::atomic::Ordering::Relaxed);
// log::info!("Mqtt connected");
// }
// esp_idf_svc::mqtt::client::EventPayload::Disconnected => {
// mqtt_connected_event_received_copy
// .store(true, std::sync::atomic::Ordering::Relaxed);
// mqtt_connected_event_ok_copy.store(false, std::sync::atomic::Ordering::Relaxed);
// log::info!("Mqtt disconnected");
// }
// esp_idf_svc::mqtt::client::EventPayload::Error(esp_error) => {
// log::info!("EspMqttError reported {:?}", esp_error);
// mqtt_connected_event_received_copy
// .store(true, std::sync::atomic::Ordering::Relaxed);
// mqtt_connected_event_ok_copy.store(false, std::sync::atomic::Ordering::Relaxed);
// log::info!("Mqtt error");
// }
// esp_idf_svc::mqtt::client::EventPayload::BeforeConnect => {
// log::info!("Mqtt before connect")
// }
// esp_idf_svc::mqtt::client::EventPayload::Subscribed(_) => {
// log::info!("Mqtt subscribed")
// }
// esp_idf_svc::mqtt::client::EventPayload::Unsubscribed(_) => {
// log::info!("Mqtt unsubscribed")
// }
// esp_idf_svc::mqtt::client::EventPayload::Published(_) => {
// log::info!("Mqtt published")
// }
// esp_idf_svc::mqtt::client::EventPayload::Deleted(_) => {
// log::info!("Mqtt deleted")
// }
// }
// })?;
//
// let mut wait_for_connections_event = 0;
// while wait_for_connections_event < 100 {
// wait_for_connections_event += 1;
// match mqtt_connected_event_received.load(std::sync::atomic::Ordering::Relaxed) {
// true => {
// log::info!("Mqtt connection callback received, progressing");
// match mqtt_connected_event_ok.load(std::sync::atomic::Ordering::Relaxed) {
// true => {
// log::info!(
// "Mqtt did callback as connected, testing with roundtrip now"
// );
// //subscribe to roundtrip
// client.subscribe(round_trip_topic.as_str(), ExactlyOnce)?;
// client.subscribe(stay_alive_topic.as_str(), ExactlyOnce)?;
// //publish to roundtrip
// client.publish(
// round_trip_topic.as_str(),
// ExactlyOnce,
// false,
// "online_test".as_bytes(),
// )?;
//
// let mut wait_for_roundtrip = 0;
// while wait_for_roundtrip < 100 {
// wait_for_roundtrip += 1;
// match round_trip_ok.load(std::sync::atomic::Ordering::Relaxed) {
// true => {
// log::info!("Round trip registered, proceeding");
// self.mqtt_client = Some(MqttClient {
// mqtt_client: client,
// base_topic: base_topic_copy,
// });
// return anyhow::Ok(());
// }
// false => {
// unsafe { vTaskDelay(10) };
// }
// }
// }
// bail!("Mqtt did not complete roundtrip in time");
// }
// false => {
// bail!("Mqtt did respond but with failure")
// }
// }
// }
// false => {
// unsafe { vTaskDelay(10) };
// }
// }
// }
// bail!("Mqtt did not fire connection callback in time");
let last_will_topic = format!("{}/state", base_topic);
let round_trip_topic = format!("{}/internal/roundtrip", base_topic);
let stay_alive_topic = format!("{}/stay_alive", base_topic);
let mut builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 0> =
McutieBuilder::new(stack, "plant ctrl", mqtt_url);
if network_config.mqtt_user.is_some() && network_config.mqtt_password.is_some() {
builder = builder.with_authentication(
network_config.mqtt_user.as_ref().unwrap().as_str(),
network_config.mqtt_password.as_ref().unwrap().as_str(),
);
info!("With authentification");
}
let lwt = Topic::General(last_will_topic);
let lwt = mk_static!(Topic<String>, lwt);
let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce);
builder = builder.with_last_will(lwt);
//TODO make configurable
builder = builder.with_device_id("plantctrl");
let builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 2> = builder
.with_subscriptions([
Topic::General(round_trip_topic.clone()),
Topic::General(stay_alive_topic.clone()),
]);
let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16;
let (receiver, task) = builder.build(keep_alive);
let spawner = Spawner::for_current_executor().await;
spawner.spawn(mqtt_incoming_task(
receiver,
round_trip_topic.clone(),
stay_alive_topic.clone(),
))?;
spawner.spawn(mqtt_runner(task))?;
LOG_ACCESS
.lock()
.await
.log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic)
.await;
LOG_ACCESS
.lock()
.await
.log(LogMessage::MqttInfo, 0, 0, "", mqtt_url)
.await;
let mqtt_timeout = 15000;
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + mqtt_timeout as u64 * 1000;
while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) {
let cur = TIME_ACCESS.get().await.lock().await.current_time_us();
if cur > timeout {
bail!("Timeout waiting MQTT connect event")
}
Timer::after(Duration::from_millis(100)).await;
}
Topic::General(round_trip_topic.clone())
.with_display("online_text")
.publish()
.await
.unwrap();
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + mqtt_timeout as u64 * 1000;
while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) {
let cur = TIME_ACCESS.get().await.lock().await.current_time_us();
if cur > timeout {
//ensure we do not further try to publish
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
bail!("Timeout waiting MQTT roundtrip")
}
Timer::after(Duration::from_millis(100)).await;
}
Ok(())
}
pub(crate) async fn mqtt_publish(&mut self, _subtopic: &str, _message: &[u8]) -> FatResult<()> {
bail!("todo");
//
// if self.mqtt_client.is_none() {
// return anyhow::Ok(());
// }
// if !subtopic.starts_with("/") {
// log::info!("Subtopic without / at start {}", subtopic);
// bail!("Subtopic without / at start {}", subtopic);
// }
// if subtopic.len() > 192 {
// log::info!("Subtopic exceeds 192 chars {}", subtopic);
// bail!("Subtopic exceeds 192 chars {}", subtopic);
// }
// let client = self.mqtt_client.as_mut().unwrap();
// let mut full_topic: heapless::String<256> = heapless::String::new();
// if full_topic.push_str(client.base_topic.as_str()).is_err() {
// log::info!("Some error assembling full_topic 1");
// bail!("Some error assembling full_topic 1")
// };
// if full_topic.push_str(subtopic).is_err() {
// log::info!("Some error assembling full_topic 2");
// bail!("Some error assembling full_topic 2")
// };
// let publish = client
// .mqtt_client
// .publish(&full_topic, ExactlyOnce, true, message);
// Timer::after_millis(10).await;
// match publish {
// OkStd(message_id) => {
// log::info!(
// "Published mqtt topic {} with message {:#?} msgid is {:?}",
// full_topic,
// String::from_utf8_lossy(message),
// message_id
// );
// anyhow::Ok(())
// }
// Err(err) => {
// log::info!(
// "Error during mqtt send on topic {} with message {:#?} error is {:?}",
// full_topic,
// String::from_utf8_lossy(message),
// err
// );
// Err(err)?
// }
// }
pub(crate) async fn mqtt_inner(&mut self, subtopic: &str, message: &str) -> FatResult<()> {
if !subtopic.starts_with("/") {
bail!("Subtopic without / at start {}", subtopic);
}
if subtopic.len() > 192 {
bail!("Subtopic exceeds 192 chars {}", subtopic);
}
let base_topic = MQTT_BASE_TOPIC
.try_get()
.context("missing base topic in static!")?;
let full_topic = format!("{base_topic}{subtopic}");
loop {
let result = Topic::General(full_topic.as_str())
.with_display(message)
.retain(true)
.publish()
.await;
match result {
Ok(()) => return Ok(()),
Err(err) => {
let retry = match err {
Error::IOError => false,
Error::TimedOut => true,
Error::TooLarge => false,
Error::PacketError => false,
Error::Invalid => false,
};
if !retry {
bail!(
"Error during mqtt send on topic {} with message {:#?} error is {:?}",
&full_topic,
message,
err
);
}
info!(
"Retransmit for {} with message {:#?} error is {:?} retrying {}",
&full_topic, message, err, retry
);
Timer::after(Duration::from_millis(100)).await;
}
}
}
}
pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) {
let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed);
if !online {
return;
}
let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed);
if !roundtrip_ok {
info!("MQTT roundtrip not received yet, dropping message");
return;
}
match self.mqtt_inner(subtopic, message).await {
Ok(()) => {}
Err(err) => {
info!(
"Error during mqtt send on topic {} with message {:#?} error is {:?}",
subtopic, message, err
);
}
};
}
}
#[embassy_executor::task]
async fn mqtt_runner(
task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>,
) {
task.run().await;
}
#[embassy_executor::task]
async fn mqtt_incoming_task(
receiver: McutieReceiver,
round_trip_topic: String,
stay_alive_topic: String,
) {
loop {
let message = receiver.receive().await;
match message {
MqttMessage::Connected => {
info!("Mqtt connected");
MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed);
}
MqttMessage::Publish(topic, payload) => match topic {
Topic::DeviceType(type_topic) => {}
Topic::Device(device_topic) => {}
Topic::General(topic) => {
let subtopic = topic.as_str();
if subtopic.eq(round_trip_topic.as_str()) {
MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed);
} else if subtopic.eq(stay_alive_topic.as_str()) {
let value = payload.eq_ignore_ascii_case("true".as_ref())
|| payload.eq_ignore_ascii_case("1".as_ref());
let a = match value {
true => 1,
false => 0,
};
LOG_ACCESS
.lock()
.await
.log(LogMessage::MqttStayAliveRec, a, 0, "", "")
.await;
MQTT_STAY_ALIVE.store(value, Ordering::Relaxed);
} else {
LOG_ACCESS
.lock()
.await
.log(LogMessage::UnknownTopic, 0, 0, "", &*topic)
.await;
}
}
},
MqttMessage::Disconnected => {
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
info!("Mqtt disconnected");
}
MqttMessage::HomeAssistantOnline => {
info!("Home assistant is online");
}
}
}
}
#[embassy_executor::task(pool_size = 2)]
async fn net_task(mut runner: Runner<'static, WifiDevice<'static>>) {
runner.run().await;
}
#[embassy_executor::task]
@@ -943,7 +957,7 @@ async fn run_dhcp(stack: Stack<'static>, gw_ip_addr: &'static str) {
let buffers = UdpBuffers::<3, 1024, 1024, 10>::new();
let unbound_socket = Udp::new(stack, &buffers);
let mut bound_socket = unbound_socket
.bind(core::net::SocketAddr::V4(SocketAddrV4::new(
.bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
DEFAULT_SERVER_PORT,
)))
@@ -962,8 +976,3 @@ async fn run_dhcp(stack: Stack<'static>, gw_ip_addr: &'static str) {
Timer::after(Duration::from_millis(500)).await;
}
}
#[embassy_executor::task(pool_size = 2)]
async fn net_task(mut runner: Runner<'static, WifiDevice<'static>>) {
runner.run().await
}