it's alive
This commit is contained in:
@@ -10,6 +10,10 @@ use alloc::{
|
||||
string::{String, ToString},
|
||||
vec::Vec,
|
||||
};
|
||||
use core::marker::PhantomData;
|
||||
use core::net::IpAddr;
|
||||
use core::str::FromStr;
|
||||
use embassy_time::Timer;
|
||||
|
||||
#[link_section = ".rtc.data"]
|
||||
static mut LAST_WATERING_TIMESTAMP: [i64; PLANT_COUNT] = [0; PLANT_COUNT];
|
||||
@@ -42,15 +46,23 @@ pub struct FileSystemSizeInfo {
|
||||
}
|
||||
|
||||
pub struct MqttClient<'a> {
|
||||
dummy: PhantomData<&'a ()>,
|
||||
//mqtt_client: EspMqttClient<'a>,
|
||||
base_topic: heapless::String<64>,
|
||||
}
|
||||
pub struct Esp<'a> {
|
||||
pub(crate) mqtt_client: Option<MqttClient<'a>>,
|
||||
pub(crate) dummy: PhantomData<&'a ()>,
|
||||
//pub(crate) wifi_driver: EspWifi<'a>,
|
||||
//pub(crate) boot_button: PinDriver<'a, esp_idf_hal::gpio::AnyIOPin, esp_idf_hal::gpio::Input>,
|
||||
}
|
||||
|
||||
pub struct IpInfo {
|
||||
pub(crate) ip: IpAddr,
|
||||
netmask: IpAddr,
|
||||
gateway: IpAddr,
|
||||
}
|
||||
|
||||
struct AccessPointInfo {}
|
||||
|
||||
impl Esp<'_> {
|
||||
@@ -76,23 +88,25 @@ impl Esp<'_> {
|
||||
todo!();
|
||||
}
|
||||
pub(crate) fn time(&mut self) -> anyhow::Result<DateTime<Utc>> {
|
||||
let time = EspSystemTime {}.now().as_millis();
|
||||
let smaller_time = time as i64;
|
||||
let local_time = DateTime::from_timestamp_millis(smaller_time)
|
||||
.ok_or(anyhow!("could not convert timestamp"))?;
|
||||
anyhow::Ok(local_time)
|
||||
bail!("todo");
|
||||
// let time = EspSystemTime {}.now().as_millis();
|
||||
// let smaller_time = time as i64;
|
||||
// let local_time = DateTime::from_timestamp_millis(smaller_time)
|
||||
// .ok_or(anyhow!("could not convert timestamp"))?;
|
||||
// anyhow::Ok(local_time)
|
||||
}
|
||||
|
||||
pub(crate) async fn wifi_scan(&mut self) -> anyhow::Result<Vec<AccessPointInfo>> {
|
||||
self.wifi_driver.start_scan(
|
||||
&ScanConfig {
|
||||
scan_type: ScanType::Passive(Duration::from_secs(5)),
|
||||
show_hidden: false,
|
||||
..Default::default()
|
||||
},
|
||||
true,
|
||||
)?;
|
||||
anyhow::Ok(self.wifi_driver.get_scan_result()?)
|
||||
bail!("todo");
|
||||
// self.wifi_driver.start_scan(
|
||||
// &ScanConfig {
|
||||
// scan_type: ScanType::Passive(Duration::from_secs(5)),
|
||||
// show_hidden: false,
|
||||
// ..Default::default()
|
||||
// },
|
||||
// true,
|
||||
// )?;
|
||||
// anyhow::Ok(self.wifi_driver.get_scan_result()?)
|
||||
}
|
||||
|
||||
pub(crate) fn last_pump_time(&self, plant: usize) -> Option<DateTime<Utc>> {
|
||||
@@ -140,19 +154,22 @@ impl Esp<'_> {
|
||||
Ok(config) => config.network.ap_ssid.clone(),
|
||||
Err(_) => heapless::String::from_str("PlantCtrl Emergency Mode").unwrap(),
|
||||
};
|
||||
|
||||
let apconfig = AccessPointConfiguration {
|
||||
ssid,
|
||||
auth_method: AuthMethod::None,
|
||||
ssid_hidden: false,
|
||||
..Default::default()
|
||||
};
|
||||
self.wifi_driver
|
||||
.set_configuration(&Configuration::AccessPoint(apconfig))?;
|
||||
self.wifi_driver.start()?;
|
||||
anyhow::Ok(())
|
||||
todo!("todo");
|
||||
//
|
||||
// let apconfig = AccessPointConfiguration {
|
||||
// ssid,
|
||||
// auth_method: AuthMethod::None,
|
||||
// ssid_hidden: false,
|
||||
// ..Default::default()
|
||||
// };
|
||||
// self.wifi_driver
|
||||
// .set_configuration(&Configuration::AccessPoint(apconfig))?;
|
||||
// self.wifi_driver.start()?;
|
||||
// anyhow::Ok(())
|
||||
}
|
||||
|
||||
|
||||
|
||||
pub(crate) async fn wifi(&mut self, network_config: &NetworkConfig) -> anyhow::Result<IpInfo> {
|
||||
let ssid = network_config
|
||||
.ssid
|
||||
@@ -160,80 +177,83 @@ impl Esp<'_> {
|
||||
.ok_or(anyhow!("No ssid configured"))?;
|
||||
let password = network_config.password.clone();
|
||||
let max_wait = network_config.max_wait;
|
||||
|
||||
match password {
|
||||
Some(pw) => {
|
||||
//TODO expect error due to invalid pw or similar! //call this during configuration and check if works, revert to config mode if not
|
||||
self.wifi_driver.set_configuration(&Configuration::Client(
|
||||
ClientConfiguration {
|
||||
ssid,
|
||||
password: pw,
|
||||
..Default::default()
|
||||
},
|
||||
))?;
|
||||
}
|
||||
None => {
|
||||
self.wifi_driver.set_configuration(&Configuration::Client(
|
||||
ClientConfiguration {
|
||||
ssid,
|
||||
auth_method: AuthMethod::None,
|
||||
..Default::default()
|
||||
},
|
||||
))?;
|
||||
}
|
||||
}
|
||||
|
||||
self.wifi_driver.start()?;
|
||||
self.wifi_driver.connect()?;
|
||||
|
||||
let delay = Delay::new_default();
|
||||
let mut counter = 0_u32;
|
||||
while !self.wifi_driver.is_connected()? {
|
||||
delay.delay_ms(250);
|
||||
counter += 250;
|
||||
if counter > max_wait {
|
||||
//ignore these errors, Wi-Fi will not be used this
|
||||
self.wifi_driver.disconnect().unwrap_or(());
|
||||
self.wifi_driver.stop().unwrap_or(());
|
||||
bail!("Did not manage wifi connection within timeout");
|
||||
}
|
||||
}
|
||||
log::info!("Should be connected now, waiting for link to be up");
|
||||
|
||||
while !self.wifi_driver.is_up()? {
|
||||
delay.delay_ms(250);
|
||||
counter += 250;
|
||||
if counter > max_wait {
|
||||
//ignore these errors, Wi-Fi will not be used this
|
||||
self.wifi_driver.disconnect().unwrap_or(());
|
||||
self.wifi_driver.stop().unwrap_or(());
|
||||
bail!("Did not manage wifi connection within timeout");
|
||||
}
|
||||
}
|
||||
//update freertos registers ;)
|
||||
let address = self.wifi_driver.sta_netif().get_ip_info()?;
|
||||
log(LogMessage::WifiInfo, 0, 0, "", &format!("{address:?}"));
|
||||
anyhow::Ok(address)
|
||||
bail!("todo")
|
||||
// match password {
|
||||
// Some(pw) => {
|
||||
// //TODO expect error due to invalid pw or similar! //call this during configuration and check if works, revert to config mode if not
|
||||
// self.wifi_driver.set_configuration(&Configuration::Client(
|
||||
// ClientConfiguration {
|
||||
// ssid,
|
||||
// password: pw,
|
||||
// ..Default::default()
|
||||
// },
|
||||
// ))?;
|
||||
// }
|
||||
// None => {
|
||||
// self.wifi_driver.set_configuration(&Configuration::Client(
|
||||
// ClientConfiguration {
|
||||
// ssid,
|
||||
// auth_method: AuthMethod::None,
|
||||
// ..Default::default()
|
||||
// },
|
||||
// ))?;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// self.wifi_driver.start()?;
|
||||
// self.wifi_driver.connect()?;
|
||||
//
|
||||
// let delay = Delay::new_default();
|
||||
// let mut counter = 0_u32;
|
||||
// while !self.wifi_driver.is_connected()? {
|
||||
// delay.delay_ms(250);
|
||||
// counter += 250;
|
||||
// if counter > max_wait {
|
||||
// //ignore these errors, Wi-Fi will not be used this
|
||||
// self.wifi_driver.disconnect().unwrap_or(());
|
||||
// self.wifi_driver.stop().unwrap_or(());
|
||||
// bail!("Did not manage wifi connection within timeout");
|
||||
// }
|
||||
// }
|
||||
// log::info!("Should be connected now, waiting for link to be up");
|
||||
//
|
||||
// while !self.wifi_driver.is_up()? {
|
||||
// delay.delay_ms(250);
|
||||
// counter += 250;
|
||||
// if counter > max_wait {
|
||||
// //ignore these errors, Wi-Fi will not be used this
|
||||
// self.wifi_driver.disconnect().unwrap_or(());
|
||||
// self.wifi_driver.stop().unwrap_or(());
|
||||
// bail!("Did not manage wifi connection within timeout");
|
||||
// }
|
||||
// }
|
||||
// //update freertos registers ;)
|
||||
// let address = self.wifi_driver.sta_netif().get_ip_info()?;
|
||||
// log(LogMessage::WifiInfo, 0, 0, "", &format!("{address:?}"));
|
||||
// anyhow::Ok(address)
|
||||
}
|
||||
pub(crate) async fn load_config(&mut self) -> anyhow::Result<PlantControllerConfig> {
|
||||
let cfg = File::open(Self::CONFIG_FILE)?;
|
||||
let config: PlantControllerConfig = serde_json::from_reader(cfg)?;
|
||||
anyhow::Ok(config)
|
||||
pub(crate) fn load_config(&mut self) -> anyhow::Result<PlantControllerConfig> {
|
||||
bail!("todo");
|
||||
// let cfg = File::open(Self::CONFIG_FILE)?;
|
||||
// let config: PlantControllerConfig = serde_json::from_reader(cfg)?;
|
||||
// anyhow::Ok(config)
|
||||
}
|
||||
pub(crate) async fn save_config(
|
||||
&mut self,
|
||||
config: &PlantControllerConfig,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut cfg = File::create(Self::CONFIG_FILE)?;
|
||||
serde_json::to_writer(&mut cfg, &config)?;
|
||||
log::info!("Wrote config config {:?}", config);
|
||||
anyhow::Ok(())
|
||||
bail!("todo");
|
||||
// let mut cfg = File::create(Self::CONFIG_FILE)?;
|
||||
// serde_json::to_writer(&mut cfg, &config)?;
|
||||
// log::info!("Wrote config config {:?}", config);
|
||||
// anyhow::Ok(())
|
||||
}
|
||||
pub(crate) async fn mount_file_system(&mut self) -> anyhow::Result<()> {
|
||||
pub(crate) fn mount_file_system(&mut self) -> anyhow::Result<()> {
|
||||
bail!("fail");
|
||||
log(LogMessage::MountingFilesystem, 0, 0, "", "");
|
||||
let base_path = String::try_from("/spiffs")?;
|
||||
let storage = String::try_from(Self::SPIFFS_PARTITION_NAME)?;
|
||||
let conf = todo!();
|
||||
//let conf = todo!();
|
||||
|
||||
//let conf = esp_idf_sys::esp_vfs_spiffs_conf_t {
|
||||
//base_path: base_path.as_ptr(),
|
||||
@@ -247,102 +267,112 @@ impl Esp<'_> {
|
||||
//esp_idf_sys::esp!(esp_idf_sys::esp_vfs_spiffs_register(&conf))?;
|
||||
//}
|
||||
|
||||
let free_space = self.file_system_size()?;
|
||||
log(
|
||||
LogMessage::FilesystemMount,
|
||||
free_space.free_size as u32,
|
||||
free_space.total_size as u32,
|
||||
&free_space.used_size.to_string(),
|
||||
"",
|
||||
);
|
||||
// let free_space = self.file_system_size()?;
|
||||
// log(
|
||||
// LogMessage::FilesystemMount,
|
||||
// free_space.free_size as u32,
|
||||
// free_space.total_size as u32,
|
||||
// &free_space.used_size.to_string(),
|
||||
// "",
|
||||
// );
|
||||
anyhow::Ok(())
|
||||
}
|
||||
async fn file_system_size(&mut self) -> anyhow::Result<FileSystemSizeInfo> {
|
||||
let storage = CString::new(Self::SPIFFS_PARTITION_NAME)?;
|
||||
let mut total_size = 0;
|
||||
let mut used_size = 0;
|
||||
unsafe {
|
||||
esp_idf_sys::esp!(esp_spiffs_info(
|
||||
storage.as_ptr(),
|
||||
&mut total_size,
|
||||
&mut used_size
|
||||
))?;
|
||||
}
|
||||
anyhow::Ok(FileSystemSizeInfo {
|
||||
total_size,
|
||||
used_size,
|
||||
free_size: total_size - used_size,
|
||||
})
|
||||
bail!("fail");
|
||||
// let storage = CString::new(Self::SPIFFS_PARTITION_NAME)?;
|
||||
// let mut total_size = 0;
|
||||
// let mut used_size = 0;
|
||||
// unsafe {
|
||||
// esp_idf_sys::esp!(esp_spiffs_info(
|
||||
// storage.as_ptr(),
|
||||
// &mut total_size,
|
||||
// &mut used_size
|
||||
// ))?;
|
||||
// }
|
||||
// anyhow::Ok(FileSystemSizeInfo {
|
||||
// total_size,
|
||||
// used_size,
|
||||
// free_size: total_size - used_size,
|
||||
// })
|
||||
}
|
||||
|
||||
pub(crate) async fn list_files(&self) -> FileList {
|
||||
let storage = CString::new(Self::SPIFFS_PARTITION_NAME).unwrap();
|
||||
|
||||
let mut file_system_corrupt = None;
|
||||
|
||||
let mut iter_error = None;
|
||||
let mut result = Vec::new();
|
||||
|
||||
let filepath = Path::new(Self::BASE_PATH);
|
||||
let read_dir = fs::read_dir(filepath);
|
||||
match read_dir {
|
||||
OkStd(read_dir) => {
|
||||
for item in read_dir {
|
||||
match item {
|
||||
OkStd(file) => {
|
||||
let f = FileInfo {
|
||||
filename: file.file_name().into_string().unwrap(),
|
||||
size: file.metadata().map(|it| it.len()).unwrap_or_default()
|
||||
as usize,
|
||||
};
|
||||
result.push(f);
|
||||
}
|
||||
Err(err) => {
|
||||
iter_error = Some(format!("{err:?}"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
file_system_corrupt = Some(format!("{err:?}"));
|
||||
}
|
||||
}
|
||||
let mut total: usize = 0;
|
||||
let mut used: usize = 0;
|
||||
unsafe {
|
||||
esp_spiffs_info(storage.as_ptr(), &mut total, &mut used);
|
||||
}
|
||||
|
||||
FileList {
|
||||
total,
|
||||
used,
|
||||
file_system_corrupt,
|
||||
files: result,
|
||||
iter_error,
|
||||
}
|
||||
return FileList {
|
||||
total: 0,
|
||||
used: 0,
|
||||
file_system_corrupt: None,
|
||||
files: Vec::new(),
|
||||
iter_error: None,
|
||||
};
|
||||
//
|
||||
// let storage = CString::new(Self::SPIFFS_PARTITION_NAME).unwrap();
|
||||
//
|
||||
// let mut file_system_corrupt = None;
|
||||
//
|
||||
// let mut iter_error = None;
|
||||
// let mut result = Vec::new();
|
||||
//
|
||||
// let filepath = Path::new(Self::BASE_PATH);
|
||||
// let read_dir = fs::read_dir(filepath);
|
||||
// match read_dir {
|
||||
// OkStd(read_dir) => {
|
||||
// for item in read_dir {
|
||||
// match item {
|
||||
// OkStd(file) => {
|
||||
// let f = FileInfo {
|
||||
// filename: file.file_name().into_string().unwrap(),
|
||||
// size: file.metadata().map(|it| it.len()).unwrap_or_default()
|
||||
// as usize,
|
||||
// };
|
||||
// result.push(f);
|
||||
// }
|
||||
// Err(err) => {
|
||||
// iter_error = Some(format!("{err:?}"));
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Err(err) => {
|
||||
// file_system_corrupt = Some(format!("{err:?}"));
|
||||
// }
|
||||
// }
|
||||
// let mut total: usize = 0;
|
||||
// let mut used: usize = 0;
|
||||
// unsafe {
|
||||
// esp_spiffs_info(storage.as_ptr(), &mut total, &mut used);
|
||||
// }
|
||||
//
|
||||
// FileList {
|
||||
// total,
|
||||
// used,
|
||||
// file_system_corrupt,
|
||||
// files: result,
|
||||
// iter_error,
|
||||
// }
|
||||
}
|
||||
pub(crate) async fn delete_file(&self, filename: &str) -> anyhow::Result<()> {
|
||||
let filepath = Path::new(Self::BASE_PATH).join(Path::new(filename));
|
||||
match fs::remove_file(filepath) {
|
||||
OkStd(_) => anyhow::Ok(()),
|
||||
Err(err) => {
|
||||
bail!(format!("{err:?}"))
|
||||
}
|
||||
}
|
||||
}
|
||||
pub(crate) async fn get_file_handle(
|
||||
&self,
|
||||
filename: &str,
|
||||
write: bool,
|
||||
) -> anyhow::Result<File> {
|
||||
let filepath = Path::new(Self::BASE_PATH).join(Path::new(filename));
|
||||
anyhow::Ok(if write {
|
||||
File::create(filepath)?
|
||||
} else {
|
||||
File::open(filepath)?
|
||||
})
|
||||
bail!("todo");
|
||||
// let filepath = Path::new(Self::BASE_PATH).join(Path::new(filename));
|
||||
// match fs::remove_file(filepath) {
|
||||
// OkStd(_) => anyhow::Ok(()),
|
||||
// Err(err) => {
|
||||
// bail!(format!("{err:?}"))
|
||||
// }
|
||||
// }
|
||||
}
|
||||
// pub(crate) async fn get_file_handle(
|
||||
// &self,
|
||||
// filename: &str,
|
||||
// write: bool,
|
||||
// ) -> anyhow::Result<File> {
|
||||
// let filepath = Path::new(Self::BASE_PATH).join(Path::new(filename));
|
||||
// anyhow::Ok(if write {
|
||||
// File::create(filepath)?
|
||||
// } else {
|
||||
// File::open(filepath)?
|
||||
// })
|
||||
// }
|
||||
|
||||
pub(crate) fn init_rtc_deepsleep_memory(&self, init_rtc_store: bool, to_config_mode: bool) {
|
||||
if init_rtc_store {
|
||||
@@ -407,200 +437,204 @@ impl Esp<'_> {
|
||||
bail!("Mqtt url was empty")
|
||||
}
|
||||
|
||||
let last_will_topic = format!("{}/state", base_topic);
|
||||
let mqtt_client_config = MqttClientConfiguration {
|
||||
lwt: Some(LwtConfiguration {
|
||||
topic: &last_will_topic,
|
||||
payload: "lost".as_bytes(),
|
||||
qos: AtLeastOnce,
|
||||
retain: true,
|
||||
}),
|
||||
client_id: Some("plantctrl"),
|
||||
keep_alive_interval: Some(Duration::from_secs(60 * 60 * 2)),
|
||||
username: network_config.mqtt_user.as_ref().map(|v| &**v),
|
||||
password: network_config.mqtt_password.as_ref().map(|v| &**v),
|
||||
//room for improvement
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mqtt_connected_event_received = Arc::new(AtomicBool::new(false));
|
||||
let mqtt_connected_event_ok = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let round_trip_ok = Arc::new(AtomicBool::new(false));
|
||||
let round_trip_topic = format!("{}/internal/roundtrip", base_topic);
|
||||
let stay_alive_topic = format!("{}/stay_alive", base_topic);
|
||||
log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
|
||||
|
||||
let mqtt_connected_event_received_copy = mqtt_connected_event_received.clone();
|
||||
let mqtt_connected_event_ok_copy = mqtt_connected_event_ok.clone();
|
||||
let stay_alive_topic_copy = stay_alive_topic.clone();
|
||||
let round_trip_topic_copy = round_trip_topic.clone();
|
||||
let round_trip_ok_copy = round_trip_ok.clone();
|
||||
let client_id = mqtt_client_config.client_id.unwrap_or("not set");
|
||||
log(LogMessage::MqttInfo, 0, 0, client_id, mqtt_url);
|
||||
let mut client = EspMqttClient::new_cb(mqtt_url, &mqtt_client_config, move |event| {
|
||||
let payload = event.payload();
|
||||
match payload {
|
||||
embedded_svc::mqtt::client::EventPayload::Received {
|
||||
id: _,
|
||||
topic,
|
||||
data,
|
||||
details: _,
|
||||
} => {
|
||||
let data = String::from_utf8_lossy(data);
|
||||
if let Some(topic) = topic {
|
||||
//todo use enums
|
||||
if topic.eq(round_trip_topic_copy.as_str()) {
|
||||
round_trip_ok_copy.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
} else if topic.eq(stay_alive_topic_copy.as_str()) {
|
||||
let value =
|
||||
data.eq_ignore_ascii_case("true") || data.eq_ignore_ascii_case("1");
|
||||
log(LogMessage::MqttStayAliveRec, 0, 0, &data, "");
|
||||
STAY_ALIVE.store(value, std::sync::atomic::Ordering::Relaxed);
|
||||
} else {
|
||||
log(LogMessage::UnknownTopic, 0, 0, "", topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::Connected(_) => {
|
||||
mqtt_connected_event_received_copy
|
||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
mqtt_connected_event_ok_copy.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
log::info!("Mqtt connected");
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::Disconnected => {
|
||||
mqtt_connected_event_received_copy
|
||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
mqtt_connected_event_ok_copy.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||
log::info!("Mqtt disconnected");
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::Error(esp_error) => {
|
||||
log::info!("EspMqttError reported {:?}", esp_error);
|
||||
mqtt_connected_event_received_copy
|
||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
mqtt_connected_event_ok_copy.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||
log::info!("Mqtt error");
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::BeforeConnect => {
|
||||
log::info!("Mqtt before connect")
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::Subscribed(_) => {
|
||||
log::info!("Mqtt subscribed")
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::Unsubscribed(_) => {
|
||||
log::info!("Mqtt unsubscribed")
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::Published(_) => {
|
||||
log::info!("Mqtt published")
|
||||
}
|
||||
esp_idf_svc::mqtt::client::EventPayload::Deleted(_) => {
|
||||
log::info!("Mqtt deleted")
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
let mut wait_for_connections_event = 0;
|
||||
while wait_for_connections_event < 100 {
|
||||
wait_for_connections_event += 1;
|
||||
match mqtt_connected_event_received.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
true => {
|
||||
log::info!("Mqtt connection callback received, progressing");
|
||||
match mqtt_connected_event_ok.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
true => {
|
||||
log::info!(
|
||||
"Mqtt did callback as connected, testing with roundtrip now"
|
||||
);
|
||||
//subscribe to roundtrip
|
||||
client.subscribe(round_trip_topic.as_str(), ExactlyOnce)?;
|
||||
client.subscribe(stay_alive_topic.as_str(), ExactlyOnce)?;
|
||||
//publish to roundtrip
|
||||
client.publish(
|
||||
round_trip_topic.as_str(),
|
||||
ExactlyOnce,
|
||||
false,
|
||||
"online_test".as_bytes(),
|
||||
)?;
|
||||
|
||||
let mut wait_for_roundtrip = 0;
|
||||
while wait_for_roundtrip < 100 {
|
||||
wait_for_roundtrip += 1;
|
||||
match round_trip_ok.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
true => {
|
||||
log::info!("Round trip registered, proceeding");
|
||||
self.mqtt_client = Some(MqttClient {
|
||||
mqtt_client: client,
|
||||
base_topic: base_topic_copy,
|
||||
});
|
||||
return anyhow::Ok(());
|
||||
}
|
||||
false => {
|
||||
unsafe { vTaskDelay(10) };
|
||||
}
|
||||
}
|
||||
}
|
||||
bail!("Mqtt did not complete roundtrip in time");
|
||||
}
|
||||
false => {
|
||||
bail!("Mqtt did respond but with failure")
|
||||
}
|
||||
}
|
||||
}
|
||||
false => {
|
||||
unsafe { vTaskDelay(10) };
|
||||
}
|
||||
}
|
||||
}
|
||||
bail!("Mqtt did not fire connection callback in time");
|
||||
bail!("todo");
|
||||
//
|
||||
// let last_will_topic = format!("{}/state", base_topic);
|
||||
// let mqtt_client_config = MqttClientConfiguration {
|
||||
// lwt: Some(LwtConfiguration {
|
||||
// topic: &last_will_topic,
|
||||
// payload: "lost".as_bytes(),
|
||||
// qos: AtLeastOnce,
|
||||
// retain: true,
|
||||
// }),
|
||||
// client_id: Some("plantctrl"),
|
||||
// keep_alive_interval: Some(Duration::from_secs(60 * 60 * 2)),
|
||||
// username: network_config.mqtt_user.as_ref().map(|v| &**v),
|
||||
// password: network_config.mqtt_password.as_ref().map(|v| &**v),
|
||||
// //room for improvement
|
||||
// ..Default::default()
|
||||
// };
|
||||
//
|
||||
// let mqtt_connected_event_received = Arc::new(AtomicBool::new(false));
|
||||
// let mqtt_connected_event_ok = Arc::new(AtomicBool::new(false));
|
||||
//
|
||||
// let round_trip_ok = Arc::new(AtomicBool::new(false));
|
||||
// let round_trip_topic = format!("{}/internal/roundtrip", base_topic);
|
||||
// let stay_alive_topic = format!("{}/stay_alive", base_topic);
|
||||
// log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
|
||||
//
|
||||
// let mqtt_connected_event_received_copy = mqtt_connected_event_received.clone();
|
||||
// let mqtt_connected_event_ok_copy = mqtt_connected_event_ok.clone();
|
||||
// let stay_alive_topic_copy = stay_alive_topic.clone();
|
||||
// let round_trip_topic_copy = round_trip_topic.clone();
|
||||
// let round_trip_ok_copy = round_trip_ok.clone();
|
||||
// let client_id = mqtt_client_config.client_id.unwrap_or("not set");
|
||||
// log(LogMessage::MqttInfo, 0, 0, client_id, mqtt_url);
|
||||
// let mut client = EspMqttClient::new_cb(mqtt_url, &mqtt_client_config, move |event| {
|
||||
// let payload = event.payload();
|
||||
// match payload {
|
||||
// embedded_svc::mqtt::client::EventPayload::Received {
|
||||
// id: _,
|
||||
// topic,
|
||||
// data,
|
||||
// details: _,
|
||||
// } => {
|
||||
// let data = String::from_utf8_lossy(data);
|
||||
// if let Some(topic) = topic {
|
||||
// //todo use enums
|
||||
// if topic.eq(round_trip_topic_copy.as_str()) {
|
||||
// round_trip_ok_copy.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
// } else if topic.eq(stay_alive_topic_copy.as_str()) {
|
||||
// let value =
|
||||
// data.eq_ignore_ascii_case("true") || data.eq_ignore_ascii_case("1");
|
||||
// log(LogMessage::MqttStayAliveRec, 0, 0, &data, "");
|
||||
// STAY_ALIVE.store(value, std::sync::atomic::Ordering::Relaxed);
|
||||
// } else {
|
||||
// log(LogMessage::UnknownTopic, 0, 0, "", topic);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::Connected(_) => {
|
||||
// mqtt_connected_event_received_copy
|
||||
// .store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
// mqtt_connected_event_ok_copy.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
// log::info!("Mqtt connected");
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::Disconnected => {
|
||||
// mqtt_connected_event_received_copy
|
||||
// .store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
// mqtt_connected_event_ok_copy.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||
// log::info!("Mqtt disconnected");
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::Error(esp_error) => {
|
||||
// log::info!("EspMqttError reported {:?}", esp_error);
|
||||
// mqtt_connected_event_received_copy
|
||||
// .store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
// mqtt_connected_event_ok_copy.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||
// log::info!("Mqtt error");
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::BeforeConnect => {
|
||||
// log::info!("Mqtt before connect")
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::Subscribed(_) => {
|
||||
// log::info!("Mqtt subscribed")
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::Unsubscribed(_) => {
|
||||
// log::info!("Mqtt unsubscribed")
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::Published(_) => {
|
||||
// log::info!("Mqtt published")
|
||||
// }
|
||||
// esp_idf_svc::mqtt::client::EventPayload::Deleted(_) => {
|
||||
// log::info!("Mqtt deleted")
|
||||
// }
|
||||
// }
|
||||
// })?;
|
||||
//
|
||||
// let mut wait_for_connections_event = 0;
|
||||
// while wait_for_connections_event < 100 {
|
||||
// wait_for_connections_event += 1;
|
||||
// match mqtt_connected_event_received.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
// true => {
|
||||
// log::info!("Mqtt connection callback received, progressing");
|
||||
// match mqtt_connected_event_ok.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
// true => {
|
||||
// log::info!(
|
||||
// "Mqtt did callback as connected, testing with roundtrip now"
|
||||
// );
|
||||
// //subscribe to roundtrip
|
||||
// client.subscribe(round_trip_topic.as_str(), ExactlyOnce)?;
|
||||
// client.subscribe(stay_alive_topic.as_str(), ExactlyOnce)?;
|
||||
// //publish to roundtrip
|
||||
// client.publish(
|
||||
// round_trip_topic.as_str(),
|
||||
// ExactlyOnce,
|
||||
// false,
|
||||
// "online_test".as_bytes(),
|
||||
// )?;
|
||||
//
|
||||
// let mut wait_for_roundtrip = 0;
|
||||
// while wait_for_roundtrip < 100 {
|
||||
// wait_for_roundtrip += 1;
|
||||
// match round_trip_ok.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
// true => {
|
||||
// log::info!("Round trip registered, proceeding");
|
||||
// self.mqtt_client = Some(MqttClient {
|
||||
// mqtt_client: client,
|
||||
// base_topic: base_topic_copy,
|
||||
// });
|
||||
// return anyhow::Ok(());
|
||||
// }
|
||||
// false => {
|
||||
// unsafe { vTaskDelay(10) };
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// bail!("Mqtt did not complete roundtrip in time");
|
||||
// }
|
||||
// false => {
|
||||
// bail!("Mqtt did respond but with failure")
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// false => {
|
||||
// unsafe { vTaskDelay(10) };
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// bail!("Mqtt did not fire connection callback in time");
|
||||
}
|
||||
pub(crate) async fn mqtt_publish(
|
||||
&mut self,
|
||||
subtopic: &str,
|
||||
message: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
if self.mqtt_client.is_none() {
|
||||
return anyhow::Ok(());
|
||||
}
|
||||
if !subtopic.starts_with("/") {
|
||||
log::info!("Subtopic without / at start {}", subtopic);
|
||||
bail!("Subtopic without / at start {}", subtopic);
|
||||
}
|
||||
if subtopic.len() > 192 {
|
||||
log::info!("Subtopic exceeds 192 chars {}", subtopic);
|
||||
bail!("Subtopic exceeds 192 chars {}", subtopic);
|
||||
}
|
||||
let client = self.mqtt_client.as_mut().unwrap();
|
||||
let mut full_topic: heapless::String<256> = heapless::String::new();
|
||||
if full_topic.push_str(client.base_topic.as_str()).is_err() {
|
||||
log::info!("Some error assembling full_topic 1");
|
||||
bail!("Some error assembling full_topic 1")
|
||||
};
|
||||
if full_topic.push_str(subtopic).is_err() {
|
||||
log::info!("Some error assembling full_topic 2");
|
||||
bail!("Some error assembling full_topic 2")
|
||||
};
|
||||
let publish = client
|
||||
.mqtt_client
|
||||
.publish(&full_topic, ExactlyOnce, true, message);
|
||||
Delay::new(10).delay_ms(50);
|
||||
match publish {
|
||||
OkStd(message_id) => {
|
||||
log::info!(
|
||||
"Published mqtt topic {} with message {:#?} msgid is {:?}",
|
||||
full_topic,
|
||||
String::from_utf8_lossy(message),
|
||||
message_id
|
||||
);
|
||||
anyhow::Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
log::info!(
|
||||
"Error during mqtt send on topic {} with message {:#?} error is {:?}",
|
||||
full_topic,
|
||||
String::from_utf8_lossy(message),
|
||||
err
|
||||
);
|
||||
Err(err)?
|
||||
}
|
||||
}
|
||||
bail!("todo");
|
||||
//
|
||||
// if self.mqtt_client.is_none() {
|
||||
// return anyhow::Ok(());
|
||||
// }
|
||||
// if !subtopic.starts_with("/") {
|
||||
// log::info!("Subtopic without / at start {}", subtopic);
|
||||
// bail!("Subtopic without / at start {}", subtopic);
|
||||
// }
|
||||
// if subtopic.len() > 192 {
|
||||
// log::info!("Subtopic exceeds 192 chars {}", subtopic);
|
||||
// bail!("Subtopic exceeds 192 chars {}", subtopic);
|
||||
// }
|
||||
// let client = self.mqtt_client.as_mut().unwrap();
|
||||
// let mut full_topic: heapless::String<256> = heapless::String::new();
|
||||
// if full_topic.push_str(client.base_topic.as_str()).is_err() {
|
||||
// log::info!("Some error assembling full_topic 1");
|
||||
// bail!("Some error assembling full_topic 1")
|
||||
// };
|
||||
// if full_topic.push_str(subtopic).is_err() {
|
||||
// log::info!("Some error assembling full_topic 2");
|
||||
// bail!("Some error assembling full_topic 2")
|
||||
// };
|
||||
// let publish = client
|
||||
// .mqtt_client
|
||||
// .publish(&full_topic, ExactlyOnce, true, message);
|
||||
// Timer::after_millis(10).await;
|
||||
// match publish {
|
||||
// OkStd(message_id) => {
|
||||
// log::info!(
|
||||
// "Published mqtt topic {} with message {:#?} msgid is {:?}",
|
||||
// full_topic,
|
||||
// String::from_utf8_lossy(message),
|
||||
// message_id
|
||||
// );
|
||||
// anyhow::Ok(())
|
||||
// }
|
||||
// Err(err) => {
|
||||
// log::info!(
|
||||
// "Error during mqtt send on topic {} with message {:#?} error is {:?}",
|
||||
// full_topic,
|
||||
// String::from_utf8_lossy(message),
|
||||
// err
|
||||
// );
|
||||
// Err(err)?
|
||||
// }
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user