refactor: remove MQTT code from esp.rs
This commit is contained in:
@@ -22,7 +22,6 @@ use embassy_net::udp::{PacketMetadata, UdpSocket};
|
|||||||
use embassy_net::{DhcpConfig, IpAddress, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4};
|
use embassy_net::{DhcpConfig, IpAddress, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4};
|
||||||
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||||
use embassy_sync::mutex::{Mutex, MutexGuard};
|
use embassy_sync::mutex::{Mutex, MutexGuard};
|
||||||
use embassy_sync::once_lock::OnceLock;
|
|
||||||
use embassy_time::{Duration, Timer, WithTimeout};
|
use embassy_time::{Duration, Timer, WithTimeout};
|
||||||
use embedded_storage::nor_flash::{check_erase, NorFlash, ReadNorFlash, RmwNorFlashStorage};
|
use embedded_storage::nor_flash::{check_erase, NorFlash, ReadNorFlash, RmwNorFlashStorage};
|
||||||
use esp_bootloader_esp_idf::ota::OtaImageState::Valid;
|
use esp_bootloader_esp_idf::ota::OtaImageState::Valid;
|
||||||
@@ -44,10 +43,6 @@ use esp_storage::FlashStorage;
|
|||||||
use littlefs2::fs::Filesystem;
|
use littlefs2::fs::Filesystem;
|
||||||
use littlefs2_core::{FileType, PathBuf, SeekFrom};
|
use littlefs2_core::{FileType, PathBuf, SeekFrom};
|
||||||
use log::{info, warn, error};
|
use log::{info, warn, error};
|
||||||
use mcutie::{
|
|
||||||
Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable,
|
|
||||||
QoS, Topic,
|
|
||||||
};
|
|
||||||
use portable_atomic::AtomicBool;
|
use portable_atomic::AtomicBool;
|
||||||
use sntpc::{NtpContext, NtpTimestampGenerator, NtpUdpSocket, get_time};
|
use sntpc::{NtpContext, NtpTimestampGenerator, NtpUdpSocket, get_time};
|
||||||
|
|
||||||
@@ -68,11 +63,6 @@ static mut LAST_CORROSION_PROTECTION_CHECK_DAY: i8 = -1;
|
|||||||
const CONFIG_FILE: &str = "config.json";
|
const CONFIG_FILE: &str = "config.json";
|
||||||
const NTP_SERVER: &str = "pool.ntp.org";
|
const NTP_SERVER: &str = "pool.ntp.org";
|
||||||
|
|
||||||
static MQTT_CONNECTED_EVENT_RECEIVED: AtomicBool = AtomicBool::new(false);
|
|
||||||
static MQTT_ROUND_TRIP_RECEIVED: AtomicBool = AtomicBool::new(false);
|
|
||||||
pub use crate::mqtt::MQTT_STAY_ALIVE;
|
|
||||||
static MQTT_BASE_TOPIC: OnceLock<String> = OnceLock::new();
|
|
||||||
|
|
||||||
#[derive(Serialize, Debug)]
|
#[derive(Serialize, Debug)]
|
||||||
pub struct FileInfo {
|
pub struct FileInfo {
|
||||||
filename: String,
|
filename: String,
|
||||||
@@ -740,228 +730,6 @@ impl Esp<'_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn mqtt(
|
|
||||||
&mut self,
|
|
||||||
network_config: &'static NetworkConfig,
|
|
||||||
stack: Stack<'static>,
|
|
||||||
spawner: Spawner,
|
|
||||||
) -> FatResult<()> {
|
|
||||||
let base_topic = network_config
|
|
||||||
.base_topic
|
|
||||||
.as_ref()
|
|
||||||
.context("missing base topic")?;
|
|
||||||
if base_topic.is_empty() {
|
|
||||||
bail!("Mqtt base_topic was empty")
|
|
||||||
}
|
|
||||||
MQTT_BASE_TOPIC
|
|
||||||
.init(base_topic.to_string())
|
|
||||||
.map_err(|_| FatError::String {
|
|
||||||
error: "Error setting basetopic".to_string(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let mqtt_url = network_config
|
|
||||||
.mqtt_url
|
|
||||||
.as_ref()
|
|
||||||
.context("missing mqtt url")?;
|
|
||||||
if mqtt_url.is_empty() {
|
|
||||||
bail!("Mqtt url was empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
let last_will_topic = format!("{base_topic}/state");
|
|
||||||
let round_trip_topic = format!("{base_topic}/internal/roundtrip");
|
|
||||||
let stay_alive_topic = format!("{base_topic}/stay_alive");
|
|
||||||
|
|
||||||
let mut builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 0> =
|
|
||||||
McutieBuilder::new(stack, "plant ctrl", mqtt_url);
|
|
||||||
if let (Some(mqtt_user), Some(mqtt_password)) = (
|
|
||||||
network_config.mqtt_user.as_ref(),
|
|
||||||
network_config.mqtt_password.as_ref(),
|
|
||||||
) {
|
|
||||||
builder = builder.with_authentication(mqtt_user, mqtt_password);
|
|
||||||
info!("With authentification");
|
|
||||||
}
|
|
||||||
|
|
||||||
let lwt = Topic::General(last_will_topic);
|
|
||||||
let lwt = mk_static!(Topic<String>, lwt);
|
|
||||||
let lwt = lwt.with_display("lost").retain(true).qos(QoS::AtLeastOnce);
|
|
||||||
builder = builder.with_last_will(lwt);
|
|
||||||
//TODO make configurable
|
|
||||||
builder = builder.with_device_id("plantctrl");
|
|
||||||
|
|
||||||
let builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 2> = builder
|
|
||||||
.with_subscriptions([
|
|
||||||
Topic::General(round_trip_topic.clone()),
|
|
||||||
Topic::General(stay_alive_topic.clone()),
|
|
||||||
]);
|
|
||||||
|
|
||||||
let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16;
|
|
||||||
let (receiver, task) = builder.build(keep_alive);
|
|
||||||
|
|
||||||
spawner.spawn(mqtt_incoming_task(
|
|
||||||
receiver,
|
|
||||||
round_trip_topic.clone(),
|
|
||||||
stay_alive_topic.clone(),
|
|
||||||
)?);
|
|
||||||
spawner.spawn(mqtt_runner(task)?);
|
|
||||||
|
|
||||||
log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
|
|
||||||
|
|
||||||
log(LogMessage::MqttInfo, 0, 0, "", mqtt_url);
|
|
||||||
|
|
||||||
let mqtt_timeout = 15000;
|
|
||||||
let res = async {
|
|
||||||
while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) {
|
|
||||||
crate::hal::PlantHal::feed_watchdog();
|
|
||||||
Timer::after(Duration::from_millis(100)).await;
|
|
||||||
}
|
|
||||||
Ok::<(), FatError>(())
|
|
||||||
}
|
|
||||||
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if res.is_err() {
|
|
||||||
bail!("Timeout waiting MQTT connect event")
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = Topic::General(round_trip_topic.clone())
|
|
||||||
.with_display("online_text")
|
|
||||||
.publish()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let res = async {
|
|
||||||
while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) {
|
|
||||||
crate::hal::PlantHal::feed_watchdog();
|
|
||||||
Timer::after(Duration::from_millis(100)).await;
|
|
||||||
}
|
|
||||||
Ok::<(), FatError>(())
|
|
||||||
}
|
|
||||||
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if res.is_err() {
|
|
||||||
//ensure we do not further try to publish
|
|
||||||
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
|
|
||||||
bail!("Timeout waiting MQTT roundtrip")
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn mqtt_inner(&mut self, subtopic: &str, message: &str) -> FatResult<()> {
|
|
||||||
if !subtopic.starts_with("/") {
|
|
||||||
bail!("Subtopic without / at start {}", subtopic);
|
|
||||||
}
|
|
||||||
if subtopic.len() > 192 {
|
|
||||||
bail!("Subtopic exceeds 192 chars {}", subtopic);
|
|
||||||
}
|
|
||||||
let base_topic = MQTT_BASE_TOPIC
|
|
||||||
.try_get()
|
|
||||||
.context("missing base topic in static!")?;
|
|
||||||
|
|
||||||
let full_topic = format!("{base_topic}{subtopic}");
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let result = Topic::General(full_topic.as_str())
|
|
||||||
.with_display(message)
|
|
||||||
.retain(true)
|
|
||||||
.publish()
|
|
||||||
.await;
|
|
||||||
match result {
|
|
||||||
Ok(()) => return Ok(()),
|
|
||||||
Err(err) => {
|
|
||||||
let retry = match err {
|
|
||||||
Error::IOError => false,
|
|
||||||
Error::TimedOut => true,
|
|
||||||
Error::TooLarge => false,
|
|
||||||
Error::PacketError => false,
|
|
||||||
Error::Invalid => false,
|
|
||||||
Error::Rejected => false,
|
|
||||||
};
|
|
||||||
if !retry {
|
|
||||||
bail!(
|
|
||||||
"Error during mqtt send on topic {} with message {:#?} error is {:?}",
|
|
||||||
&full_topic,
|
|
||||||
message,
|
|
||||||
err
|
|
||||||
);
|
|
||||||
}
|
|
||||||
info!(
|
|
||||||
"Retransmit for {} with message {:#?} error is {:?} retrying {}",
|
|
||||||
&full_topic, message, err, retry
|
|
||||||
);
|
|
||||||
Timer::after(Duration::from_millis(100)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub(crate) async fn mqtt_publish(&mut self, subtopic: &str, message: &str) {
|
|
||||||
let online = MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed);
|
|
||||||
if !online {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let roundtrip_ok = MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed);
|
|
||||||
if !roundtrip_ok {
|
|
||||||
info!("MQTT roundtrip not received yet, dropping message");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
match self.mqtt_inner(subtopic, message).await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => {
|
|
||||||
info!(
|
|
||||||
"Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[embassy_executor::task]
|
|
||||||
async fn mqtt_runner(
|
|
||||||
task: McutieTask<'static, String, PublishDisplay<'static, String, &'static str>, 2>,
|
|
||||||
) {
|
|
||||||
task.run().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[embassy_executor::task]
|
|
||||||
async fn mqtt_incoming_task(
|
|
||||||
receiver: McutieReceiver,
|
|
||||||
round_trip_topic: String,
|
|
||||||
stay_alive_topic: String,
|
|
||||||
) {
|
|
||||||
loop {
|
|
||||||
let message = receiver.receive().await;
|
|
||||||
match message {
|
|
||||||
MqttMessage::Connected => {
|
|
||||||
info!("Mqtt connected");
|
|
||||||
MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
MqttMessage::Publish(topic, payload) => match topic {
|
|
||||||
Topic::DeviceType(_type_topic) => {}
|
|
||||||
Topic::Device(_device_topic) => {}
|
|
||||||
Topic::General(topic) => {
|
|
||||||
let subtopic = topic.as_str();
|
|
||||||
|
|
||||||
if subtopic.eq(round_trip_topic.as_str()) {
|
|
||||||
MQTT_ROUND_TRIP_RECEIVED.store(true, Ordering::Relaxed);
|
|
||||||
} else if subtopic.eq(stay_alive_topic.as_str()) {
|
|
||||||
let value = payload.eq_ignore_ascii_case("true".as_ref())
|
|
||||||
|| payload.eq_ignore_ascii_case("1".as_ref());
|
|
||||||
let a = match value {
|
|
||||||
true => 1,
|
|
||||||
false => 0,
|
|
||||||
};
|
|
||||||
log(LogMessage::MqttStayAliveRec, a, 0, "", "");
|
|
||||||
MQTT_STAY_ALIVE.store(value, Ordering::Relaxed);
|
|
||||||
} else {
|
|
||||||
log(LogMessage::UnknownTopic, 0, 0, "", &topic);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
MqttMessage::Disconnected => {
|
|
||||||
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
|
|
||||||
info!("Mqtt disconnected");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[embassy_executor::task(pool_size = 2)]
|
#[embassy_executor::task(pool_size = 2)]
|
||||||
|
|||||||
Reference in New Issue
Block a user