get most stuff working again, by upgrading to newer esp-hal version

this involved adding a lot of code from the develop branch step by step
there are still some bugs, but at least i can get into the web interface
and configure stuff again \o/ … measuring and pumping is working as well
This commit is contained in:
2026-05-04 23:46:27 +02:00
parent ecb7707357
commit db401aac55
23 changed files with 2029 additions and 1292 deletions

View File

@@ -1,8 +1,11 @@
use crate::bail;
use crate::config::{NetworkConfig, PlantControllerConfig};
use crate::hal::{PLANT_COUNT, TIME_ACCESS};
use crate::log::{LogMessage, LOG_ACCESS};
use crate::hal::PLANT_COUNT;
use crate::log::{log, LogMessage};
use alloc::vec;
use chrono::{DateTime, Utc};
use esp_hal::Blocking;
use esp_hal::uart::Uart;
use serde::Serialize;
use crate::fat_error::{ContextExt, FatError, FatResult};
@@ -14,15 +17,17 @@ use core::net::{IpAddr, Ipv4Addr, SocketAddr};
use core::str::FromStr;
use core::sync::atomic::Ordering;
use embassy_executor::Spawner;
use embassy_net::udp::UdpSocket;
use embassy_net::{DhcpConfig, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4};
use embassy_net::dns::DnsQueryType;
use embassy_net::udp::{PacketMetadata, UdpSocket};
use embassy_net::{DhcpConfig, IpAddress, Ipv4Cidr, Runner, Stack, StackResources, StaticConfigV4};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::mutex::{Mutex, MutexGuard};
use embassy_sync::once_lock::OnceLock;
use embassy_time::{Duration, Timer};
use embedded_storage::nor_flash::ReadNorFlash;
use embassy_time::{Duration, Timer, WithTimeout};
use embedded_storage::nor_flash::{check_erase, NorFlash, ReadNorFlash, RmwNorFlashStorage};
use esp_bootloader_esp_idf::ota::OtaImageState::Valid;
use esp_bootloader_esp_idf::ota::{Ota, OtaImageState};
use esp_bootloader_esp_idf::partitions::FlashRegion;
use esp_bootloader_esp_idf::partitions::{AppPartitionSubType, FlashRegion};
use esp_hal::gpio::{Input, RtcPinWithResistors};
use esp_hal::rng::Rng;
use esp_hal::rtc_cntl::{
@@ -31,31 +36,34 @@ use esp_hal::rtc_cntl::{
};
use esp_hal::system::software_reset;
use esp_println::println;
use esp_radio::wifi::ap::{AccessPointConfig, AccessPointInfo};
use esp_radio::wifi::scan::{ScanConfig, ScanTypeConfig};
use esp_radio::wifi::sta::StationConfig;
use esp_radio::wifi::{AuthenticationMethod, Config, Interface, WifiController};
use esp_storage::FlashStorage;
use esp_wifi::wifi::{
AccessPointConfiguration, AccessPointInfo, AuthMethod, ClientConfiguration, Configuration,
ScanConfig, ScanTypeConfig, WifiController, WifiDevice, WifiState,
};
use littlefs2::fs::Filesystem;
use littlefs2_core::{FileType, PathBuf, SeekFrom};
use log::{info, warn};
use log::{info, warn, error};
use mcutie::{
Error, McutieBuilder, McutieReceiver, McutieTask, MqttMessage, PublishDisplay, Publishable,
QoS, Topic,
};
use portable_atomic::AtomicBool;
use smoltcp::socket::udp::PacketMetadata;
use smoltcp::wire::DnsQueryType;
use sntpc::{get_time, NtpContext, NtpTimestampGenerator};
use sntpc::{NtpContext, NtpTimestampGenerator, NtpUdpSocket, get_time};
#[esp_hal::ram(rtc_fast, persistent)]
use super::shared_flash::MutexFlashStorage;
#[esp_hal::ram(unstable(rtc_fast), unstable(persistent))]
static mut LAST_WATERING_TIMESTAMP: [i64; PLANT_COUNT] = [0; PLANT_COUNT];
#[esp_hal::ram(rtc_fast, persistent)]
#[esp_hal::ram(unstable(rtc_fast), unstable(persistent))]
static mut CONSECUTIVE_WATERING_PLANT: [u32; PLANT_COUNT] = [0; PLANT_COUNT];
#[esp_hal::ram(rtc_fast, persistent)]
#[esp_hal::ram(unstable(rtc_fast), unstable(persistent))]
static mut LOW_VOLTAGE_DETECTED: i8 = 0;
#[esp_hal::ram(rtc_fast, persistent)]
#[esp_hal::ram(unstable(rtc_fast), unstable(persistent))]
static mut RESTART_TO_CONF: i8 = 0;
#[esp_hal::ram(unstable(rtc_fast), unstable(persistent))]
static mut LAST_CORROSION_PROTECTION_CHECK_DAY: i8 = -1;
const CONFIG_FILE: &str = "config.json";
const NTP_SERVER: &str = "pool.ntp.org";
@@ -112,22 +120,61 @@ impl NtpTimestampGenerator for Timestamp {
self.stamp.timestamp_subsec_micros()
}
}
struct EmbassyNtpSocket<'a, 'b> {
socket: &'a UdpSocket<'b>,
}
impl<'a, 'b> EmbassyNtpSocket<'a, 'b> {
fn new(socket: &'a UdpSocket<'b>) -> Self {
Self { socket }
}
}
impl NtpUdpSocket for EmbassyNtpSocket<'_, '_> {
async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> sntpc::Result<usize> {
self.socket
.send_to(buf, addr)
.await
.map_err(|_| sntpc::Error::Network)?;
Ok(buf.len())
}
async fn recv_from(&self, buf: &mut [u8]) -> sntpc::Result<(usize, SocketAddr)> {
let (len, metadata) = self
.socket
.recv_from(buf)
.await
.map_err(|_| sntpc::Error::Network)?;
let addr = match metadata.endpoint.addr {
IpAddress::Ipv4(ip) => IpAddr::V4(ip),
IpAddress::Ipv6(ip) => IpAddr::V6(ip),
};
Ok((len, SocketAddr::new(addr, metadata.endpoint.port)))
}
}
pub struct Esp<'a> {
pub fs: Arc<Mutex<CriticalSectionRawMutex, Filesystem<'static, LittleFs2Filesystem>>>,
pub rng: Rng,
//first starter (ap or sta will take these)
pub interface_sta: Option<WifiDevice<'static>>,
pub interface_ap: Option<WifiDevice<'static>>,
pub interface_sta: Option<Interface<'static>>,
pub interface_ap: Option<Interface<'static>>,
pub controller: Arc<Mutex<CriticalSectionRawMutex, WifiController<'static>>>,
pub boot_button: Input<'a>,
// RTC-capable GPIO used as external wake source (store the raw peripheral)
pub wake_gpio1: esp_hal::peripherals::GPIO1<'static>,
pub uart0: Uart<'a, Blocking>,
pub ota: Ota<'static, FlashStorage>,
pub ota_next: &'static mut FlashRegion<'static, FlashStorage>,
pub rtc: Rtc<'a>,
pub ota: Ota<'static, RmwNorFlashStorage<'static, &'static mut MutexFlashStorage>>,
pub ota_target: &'static mut FlashRegion<'static, MutexFlashStorage>,
pub current: AppPartitionSubType,
pub slot0_state: OtaImageState,
pub slot1_state: OtaImageState,
}
// SAFETY: On this target we never move Esp across OS threads; the firmware runs single-core
@@ -148,6 +195,47 @@ macro_rules! mk_static {
}
impl Esp<'_> {
pub fn get_time(&self) -> DateTime<Utc> {
DateTime::from_timestamp_micros(self.rtc.current_time_us() as i64)
.unwrap_or(DateTime::UNIX_EPOCH)
}
pub fn set_time(&mut self, time: DateTime<Utc>) {
self.rtc.set_current_time_us(time.timestamp_micros() as u64);
}
pub(crate) async fn read_serial_line(&mut self) -> FatResult<Option<alloc::string::String>> {
let mut buf = [0u8; 1];
let mut line = String::new();
loop {
match self.uart0.read_buffered(&mut buf) {
Ok(read) => {
if read == 0 {
return Ok(None);
}
let c = buf[0] as char;
if c == '\n' {
return Ok(Some(line));
}
line.push(c);
}
Err(error) => {
if line.is_empty() {
return Ok(None);
} else {
error!("Error reading serial line: {error:?}");
// If we already have some data, we should probably wait a bit or just return what we have?
// But the protocol expects a full line or message.
// For simplicity in config mode, we can block here or just return None if nothing is there yet.
// However, if we started receiving, we should probably finish or timeout.
continue;
}
}
}
}
}
pub(crate) async fn delete_file(&self, filename: String) -> FatResult<()> {
let file = PathBuf::try_from(filename.as_str())?;
let access = self.fs.lock().await;
@@ -212,26 +300,47 @@ impl Esp<'_> {
Ok((buf, read))
}
pub(crate) fn get_ota_slot(&mut self) -> String {
match self.ota.current_slot() {
Ok(slot) => {
format!("{:?}", slot)
}
Err(err) => {
format!("{:?}", err)
}
pub(crate) async fn write_ota(&mut self, offset: u32, buf: &[u8]) -> Result<(), FatError> {
let _ = check_erase(self.ota_target, offset, offset + 4096);
info!("erasing and writing block 0x{offset:x}");
self.ota_target.erase(offset, offset + 4096)?;
let mut temp = vec![0; buf.len()];
let read_back = temp.as_mut_slice();
//change to nor flash, align writes!
self.ota_target.write(offset, buf)?;
self.ota_target.read(offset, read_back)?;
if buf != read_back {
info!("Expected {buf:?} but got {read_back:?}");
bail!(
"Flash error, read back does not match write buffer at offset {:x}",
offset
)
}
Ok(())
}
pub(crate) fn get_ota_state(&mut self) -> String {
match self.ota.current_ota_state() {
Ok(state) => {
format!("{:?}", state)
}
Err(err) => {
format!("{:?}", err)
}
pub(crate) async fn finalize_ota(&mut self) -> Result<(), FatError> {
let current = self.ota.current_app_partition()?;
if self.ota.current_ota_state()? != Valid {
info!("Validating current slot {current:?} as it was able to ota");
self.ota.set_current_ota_state(Valid)?;
}
let next = match current {
AppPartitionSubType::Ota0 => AppPartitionSubType::Ota1,
AppPartitionSubType::Ota1 => AppPartitionSubType::Ota0,
_ => {
bail!("Invalid current slot {current:?} for ota");
}
};
self.ota.set_current_app_partition(next)?;
info!("switched slot");
self.ota.set_current_ota_state(OtaImageState::New)?;
info!("switched state for new partition");
let state_new = self.ota.current_ota_state()?;
info!("state on new partition now {state_new:?}");
self.set_restart_to_conf(true);
Ok(())
}
// let current = ota.current_slot()?;
@@ -264,31 +373,38 @@ impl Esp<'_> {
&mut tx_meta,
&mut tx_buffer,
);
socket.bind(123).unwrap();
socket.bind(123).context("Could not bind UDP socket")?;
let context = NtpContext::new(Timestamp::default());
let ntp_socket = EmbassyNtpSocket::new(&socket);
let ntp_addrs = stack
.dns_query(NTP_SERVER, DnsQueryType::A)
.await
.expect("Failed to resolve DNS");
.context("Failed to resolve DNS")?;
if ntp_addrs.is_empty() {
bail!("Failed to resolve DNS");
bail!("No IP addresses found for NTP server");
}
let ntp = ntp_addrs[0];
info!("NTP server: {ntp:?}");
let mut counter = 0;
loop {
let addr: IpAddr = ntp_addrs[0].into();
let result = get_time(SocketAddr::from((addr, 123)), &socket, context).await;
let addr: IpAddr = ntp.into();
let timeout = get_time(SocketAddr::from((addr, 123)), &ntp_socket, context)
.with_timeout(Duration::from_millis((_max_wait_ms / 10) as u64))
.await;
match result {
Ok(time) => {
info!("Time: {:?}", time);
match timeout {
Ok(result) => {
let time = result?;
info!("Time: {time:?}");
return DateTime::from_timestamp(time.seconds as i64, 0)
.context("Could not convert Sntp result");
}
Err(e) => {
warn!("Error: {:?}", e);
Err(err) => {
warn!("sntp timeout, retry: {err:?}");
counter += 1;
if counter > 10 {
bail!("Failed to get time from NTP server");
@@ -299,27 +415,15 @@ impl Esp<'_> {
}
}
pub async fn flash_ota(&mut self) -> FatResult<()> {
let capacity = self.ota_next.capacity();
bail!("not implemented")
}
pub(crate) async fn wifi_scan(&mut self) -> FatResult<Vec<AccessPointInfo>> {
info!("start wifi scan");
let mut lock = self.controller.try_lock()?;
info!("start wifi scan lock");
let scan_config = ScanConfig {
ssid: None,
bssid: None,
channel: None,
show_hidden: false,
scan_type: ScanTypeConfig::Active {
min: Default::default(),
max: Default::default(),
},
};
let rv = lock.scan_with_config_async(scan_config).await?;
let scan_config = ScanConfig::default().with_scan_type(ScanTypeConfig::Active {
min: esp_hal::time::Duration::from_millis(0),
max: esp_hal::time::Duration::from_millis(0),
});
let rv = lock.scan_async(&scan_config).await?;
info!("end wifi scan lock");
Ok(rv)
}
@@ -368,17 +472,17 @@ impl Esp<'_> {
}
}
pub(crate) async fn wifi_ap(&mut self) -> FatResult<Stack<'static>> {
pub(crate) async fn wifi_ap(&mut self, spawner: Spawner) -> FatResult<Stack<'static>> {
let ssid = match self.load_config().await {
Ok(config) => config.network.ap_ssid.as_str().to_string(),
Err(_) => "PlantCtrl Emergency Mode".to_string(),
};
let spawner = Spawner::for_current_executor().await;
let device = self.interface_ap.take().unwrap();
let gw_ip_addr_str = "192.168.71.1";
let gw_ip_addr = Ipv4Addr::from_str(gw_ip_addr_str).expect("failed to parse gateway ip");
let device = self
.interface_ap
.take()
.context("AP interface already taken")?;
let gw_ip_addr = Ipv4Addr::new(192, 168, 71, 1);
let config = embassy_net::Config::ipv4_static(StaticConfigV4 {
address: Ipv4Cidr::new(gw_ip_addr, 24),
@@ -398,22 +502,14 @@ impl Esp<'_> {
);
let stack = mk_static!(Stack, stack);
let client_config = Configuration::AccessPoint(AccessPointConfiguration {
ssid: ssid.clone(),
..Default::default()
});
let client_config =
Config::AccessPoint(AccessPointConfig::default().with_ssid(ssid.clone()));
self.controller.lock().await.set_config(&client_config)?;
self.controller
.lock()
.await
.set_configuration(&client_config)?;
println!("start new");
self.controller.lock().await.start()?;
println!("start net task");
spawner.spawn(net_task(runner)).ok();
spawner.spawn(net_task(runner)?);
println!("run dhcp");
spawner.spawn(run_dhcp(stack.clone(), gw_ip_addr_str)).ok();
spawner.spawn(run_dhcp(*stack, gw_ip_addr)?);
loop {
if stack.is_link_up() {
@@ -424,31 +520,31 @@ impl Esp<'_> {
while !stack.is_config_up() {
Timer::after(Duration::from_millis(100)).await
}
println!("Connect to the AP `${ssid}` and point your browser to http://{gw_ip_addr_str}/");
println!("Connect to the AP `${ssid}` and point your browser to http://{gw_ip_addr}/");
stack
.config_v4()
.inspect(|c| println!("ipv4 config: {c:?}"));
Ok(stack.clone())
Ok(*stack)
}
pub(crate) async fn wifi(
&mut self,
network_config: &NetworkConfig,
spawner: Spawner,
) -> FatResult<Stack<'static>> {
esp_wifi::wifi_set_log_verbose();
let ssid = network_config.ssid.clone();
match &ssid {
esp_radio::wifi_set_log_verbose();
let ssid = match &network_config.ssid {
Some(ssid) => {
if ssid.is_empty() {
bail!("Wifi ssid was empty")
}
ssid.to_string()
}
None => {
bail!("Wifi ssid was empty")
}
}
let ssid = ssid.unwrap().to_string();
};
info!("attempting to connect wifi {ssid}");
let password = match network_config.password {
Some(ref password) => password.to_string(),
@@ -456,9 +552,10 @@ impl Esp<'_> {
};
let max_wait = network_config.max_wait;
let spawner = Spawner::for_current_executor().await;
let device = self.interface_sta.take().unwrap();
let device = self
.interface_sta
.take()
.context("STA interface already taken")?;
let config = embassy_net::Config::dhcpv4(DhcpConfig::default());
let seed = (self.rng.random() as u64) << 32 | self.rng.random() as u64;
@@ -472,122 +569,80 @@ impl Esp<'_> {
);
let stack = mk_static!(Stack, stack);
let client_config = Configuration::Client(ClientConfiguration {
ssid,
bssid: None,
auth_method: AuthMethod::WPA2Personal, //FIXME read from config, fill via scan
password,
channel: None,
});
let auth_method = if password.is_empty() {
AuthenticationMethod::None
} else {
AuthenticationMethod::Wpa2Personal
};
let client_config = StationConfig::default()
.with_ssid(ssid)
.with_auth_method(auth_method)
.with_password(password);
self.controller
.lock()
.await
.set_configuration(&client_config)?;
spawner.spawn(net_task(runner)).ok();
self.controller.lock().await.start_async().await?;
.set_config(&Config::Station(client_config))?;
spawner.spawn(net_task(runner)?);
self.controller
.lock()
.await
.connect_async()
.with_timeout(Duration::from_millis(max_wait as u64 * 1000))
.await
.context("Timeout waiting for wifi sta connected")??;
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + max_wait as u64 * 1000;
loop {
let state = esp_wifi::wifi::sta_state();
match state {
WifiState::StaStarted => {
self.controller.lock().await.connect()?;
break;
}
_ => {}
let res = async {
while !stack.is_link_up() {
Timer::after(Duration::from_millis(500)).await;
}
if {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} > timeout
{
bail!("Timeout waiting for wifi sta ready")
}
Timer::after(Duration::from_millis(500)).await;
Ok::<(), FatError>(())
}
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + max_wait as u64 * 1000;
loop {
let state = esp_wifi::wifi::sta_state();
match state {
WifiState::StaConnected => {
break;
}
_ => {}
}
if {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} > timeout
{
bail!("Timeout waiting for wifi sta connected")
}
Timer::after(Duration::from_millis(500)).await;
.with_timeout(Duration::from_millis(max_wait as u64 * 1000))
.await;
if res.is_err() {
bail!("Timeout waiting for wifi link up")
}
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + max_wait as u64 * 1000;
while !stack.is_link_up() {
if {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} > timeout
{
bail!("Timeout waiting for wifi link up")
let res = async {
while !stack.is_config_up() {
Timer::after(Duration::from_millis(100)).await
}
Timer::after(Duration::from_millis(500)).await;
Ok::<(), FatError>(())
}
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + max_wait as u64 * 1000;
while !stack.is_config_up() {
if {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} > timeout
{
bail!("Timeout waiting for wifi config up")
}
Timer::after(Duration::from_millis(100)).await
.with_timeout(Duration::from_millis(max_wait as u64 * 1000))
.await;
if res.is_err() {
bail!("Timeout waiting for wifi config up")
}
info!("Connected WIFI, dhcp: {:?}", stack.config_v4());
Ok(stack.clone())
Ok(*stack)
}
pub fn deep_sleep(
&mut self,
duration_in_ms: u64,
mut rtc: MutexGuard<CriticalSectionRawMutex, Rtc>,
) -> ! {
// Configure and enter deep sleep using esp-hal. Also keep prior behavior where
// duration_in_ms == 0 triggers an immediate reset.
pub fn deep_sleep(&mut self, duration_in_ms: u64) -> ! {
// Mark the current OTA image as valid if we reached here while in pending verify.
if let Ok(cur) = self.ota.current_ota_state() {
if cur == OtaImageState::PendingVerify {
self.ota
.set_current_ota_state(OtaImageState::Valid)
.expect("Could not set image to valid");
info!("Marking OTA image as valid");
if let Err(err) = self.ota.set_current_ota_state(Valid) {
error!("Could not set image to valid: {:?}", err);
}
}
} else {
info!("No OTA image to mark as valid");
}
if duration_in_ms == 0 {
software_reset();
} else {
///let timer = TimerWakeupSource::new(core::time::Duration::from_millis(duration_in_ms));
let timer = TimerWakeupSource::new(core::time::Duration::from_millis(5000));
let timer = TimerWakeupSource::new(core::time::Duration::from_millis(duration_in_ms));
let mut wake_pins: [(&mut dyn RtcPinWithResistors, WakeupLevel); 1] =
[(&mut self.wake_gpio1, WakeupLevel::Low)];
let ext1 = esp_hal::rtc_cntl::sleep::Ext1WakeupSource::new(&mut wake_pins);
rtc.sleep_deep(&[&timer, &ext1]);
self.rtc.sleep_deep(&[&timer, &ext1]);
}
}
@@ -646,47 +701,36 @@ impl Esp<'_> {
} else {
RESTART_TO_CONF = 0;
}
LAST_CORROSION_PROTECTION_CHECK_DAY = -1;
};
} else {
unsafe {
if to_config_mode {
RESTART_TO_CONF = 1;
}
LOG_ACCESS
.lock()
.await
.log(
LogMessage::RestartToConfig,
RESTART_TO_CONF as u32,
0,
"",
"",
)
.await;
LOG_ACCESS
.lock()
.await
.log(
LogMessage::LowVoltage,
LOW_VOLTAGE_DETECTED as u32,
0,
"",
"",
)
.await;
for i in 0..PLANT_COUNT {
log::info!(
"LAST_WATERING_TIMESTAMP[{}] = UTC {}",
i,
LAST_WATERING_TIMESTAMP[i]
);
log(
LogMessage::RestartToConfig,
RESTART_TO_CONF as u32,
0,
"",
"",
);
log(
LogMessage::LowVoltage,
LOW_VOLTAGE_DETECTED as u32,
0,
"",
"",
);
// is executed before main, no other code will alter these values during printing
#[allow(static_mut_refs)]
for (i, time) in LAST_WATERING_TIMESTAMP.iter().enumerate() {
info!("LAST_WATERING_TIMESTAMP[{i}] = UTC {time}");
}
for i in 0..PLANT_COUNT {
log::info!(
"CONSECUTIVE_WATERING_PLANT[{}] = {}",
i,
CONSECUTIVE_WATERING_PLANT[i]
);
// is executed before main, no other code will alter these values during printing
#[allow(static_mut_refs)]
for (i, item) in CONSECUTIVE_WATERING_PLANT.iter().enumerate() {
info!("CONSECUTIVE_WATERING_PLANT[{i}] = {item}");
}
}
}
@@ -696,6 +740,7 @@ impl Esp<'_> {
&mut self,
network_config: &'static NetworkConfig,
stack: Stack<'static>,
spawner: Spawner,
) -> FatResult<()> {
let base_topic = network_config
.base_topic
@@ -718,17 +763,17 @@ impl Esp<'_> {
bail!("Mqtt url was empty")
}
let last_will_topic = format!("{}/state", base_topic);
let round_trip_topic = format!("{}/internal/roundtrip", base_topic);
let stay_alive_topic = format!("{}/stay_alive", base_topic);
let last_will_topic = format!("{base_topic}/state");
let round_trip_topic = format!("{base_topic}/internal/roundtrip");
let stay_alive_topic = format!("{base_topic}/stay_alive");
let mut builder: McutieBuilder<'_, String, PublishDisplay<String, &str>, 0> =
McutieBuilder::new(stack, "plant ctrl", mqtt_url);
if network_config.mqtt_user.is_some() && network_config.mqtt_password.is_some() {
builder = builder.with_authentication(
network_config.mqtt_user.as_ref().unwrap().as_str(),
network_config.mqtt_password.as_ref().unwrap().as_str(),
);
if let (Some(mqtt_user), Some(mqtt_password)) = (
network_config.mqtt_user.as_ref(),
network_config.mqtt_password.as_ref(),
) {
builder = builder.with_authentication(mqtt_user, mqtt_password);
info!("With authentification");
}
@@ -748,57 +793,51 @@ impl Esp<'_> {
let keep_alive = Duration::from_secs(60 * 60 * 2).as_secs() as u16;
let (receiver, task) = builder.build(keep_alive);
let spawner = Spawner::for_current_executor().await;
spawner.spawn(mqtt_incoming_task(
receiver,
round_trip_topic.clone(),
stay_alive_topic.clone(),
))?;
spawner.spawn(mqtt_runner(task))?;
)?);
spawner.spawn(mqtt_runner(task)?);
LOG_ACCESS
.lock()
.await
.log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic)
.await;
log(LogMessage::StayAlive, 0, 0, "", &stay_alive_topic);
LOG_ACCESS
.lock()
.await
.log(LogMessage::MqttInfo, 0, 0, "", mqtt_url)
.await;
log(LogMessage::MqttInfo, 0, 0, "", mqtt_url);
let mqtt_timeout = 15000;
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + mqtt_timeout as u64 * 1000;
while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) {
let cur = TIME_ACCESS.get().await.lock().await.current_time_us();
if cur > timeout {
bail!("Timeout waiting MQTT connect event")
let res = async {
while !MQTT_CONNECTED_EVENT_RECEIVED.load(Ordering::Relaxed) {
crate::hal::PlantHal::feed_watchdog();
Timer::after(Duration::from_millis(100)).await;
}
Timer::after(Duration::from_millis(100)).await;
Ok::<(), FatError>(())
}
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
.await;
if res.is_err() {
bail!("Timeout waiting MQTT connect event")
}
Topic::General(round_trip_topic.clone())
let _ = Topic::General(round_trip_topic.clone())
.with_display("online_text")
.publish()
.await
.unwrap();
.await;
let timeout = {
let guard = TIME_ACCESS.get().await.lock().await;
guard.current_time_us()
} + mqtt_timeout as u64 * 1000;
while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) {
let cur = TIME_ACCESS.get().await.lock().await.current_time_us();
if cur > timeout {
//ensure we do not further try to publish
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
bail!("Timeout waiting MQTT roundtrip")
let res = async {
while !MQTT_ROUND_TRIP_RECEIVED.load(Ordering::Relaxed) {
crate::hal::PlantHal::feed_watchdog();
Timer::after(Duration::from_millis(100)).await;
}
Timer::after(Duration::from_millis(100)).await;
Ok::<(), FatError>(())
}
.with_timeout(Duration::from_millis(mqtt_timeout as u64))
.await;
if res.is_err() {
//ensure we do not further try to publish
MQTT_CONNECTED_EVENT_RECEIVED.store(false, Ordering::Relaxed);
bail!("Timeout waiting MQTT roundtrip")
}
Ok(())
}
@@ -864,8 +903,7 @@ impl Esp<'_> {
Ok(()) => {}
Err(err) => {
info!(
"Error during mqtt send on topic {} with message {:#?} error is {:?}",
subtopic, message, err
"Error during mqtt send on topic {subtopic} with message {message:#?} error is {err:?}"
);
}
};
@@ -893,8 +931,8 @@ async fn mqtt_incoming_task(
MQTT_CONNECTED_EVENT_RECEIVED.store(true, Ordering::Relaxed);
}
MqttMessage::Publish(topic, payload) => match topic {
Topic::DeviceType(type_topic) => {}
Topic::Device(device_topic) => {}
Topic::DeviceType(_type_topic) => {}
Topic::Device(_device_topic) => {}
Topic::General(topic) => {
let subtopic = topic.as_str();
@@ -907,18 +945,10 @@ async fn mqtt_incoming_task(
true => 1,
false => 0,
};
LOG_ACCESS
.lock()
.await
.log(LogMessage::MqttStayAliveRec, a, 0, "", "")
.await;
log(LogMessage::MqttStayAliveRec, a, 0, "", "");
MQTT_STAY_ALIVE.store(value, Ordering::Relaxed);
} else {
LOG_ACCESS
.lock()
.await
.log(LogMessage::UnknownTopic, 0, 0, "", &*topic)
.await;
log(LogMessage::UnknownTopic, 0, 0, "", &topic);
}
}
},
@@ -931,13 +961,13 @@ async fn mqtt_incoming_task(
}
#[embassy_executor::task(pool_size = 2)]
async fn net_task(mut runner: Runner<'static, WifiDevice<'static>>) {
async fn net_task(mut runner: Runner<'static, Interface<'static>>) {
runner.run().await;
}
#[embassy_executor::task]
async fn run_dhcp(stack: Stack<'static>, gw_ip_addr: &'static str) {
use core::net::{Ipv4Addr, SocketAddrV4};
async fn run_dhcp(stack: Stack<'static>, ip: Ipv4Addr) {
use core::net::SocketAddrV4;
use edge_dhcp::{
io::{self, DEFAULT_SERVER_PORT},
@@ -946,21 +976,25 @@ async fn run_dhcp(stack: Stack<'static>, gw_ip_addr: &'static str) {
use edge_nal::UdpBind;
use edge_nal_embassy::{Udp, UdpBuffers};
let ip = Ipv4Addr::from_str(gw_ip_addr).expect("dhcp task failed to parse gw ip");
let mut buf = [0u8; 1500];
let mut gw_buf = [Ipv4Addr::UNSPECIFIED];
let buffers = UdpBuffers::<3, 1024, 1024, 10>::new();
let unbound_socket = Udp::new(stack, &buffers);
let mut bound_socket = unbound_socket
let mut bound_socket = match unbound_socket
.bind(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
DEFAULT_SERVER_PORT,
)))
.await
.unwrap();
{
Ok(s) => s,
Err(e) => {
error!("dhcp task failed to bind socket: {:?}", e);
return;
}
};
loop {
_ = io::server::run(
@@ -970,7 +1004,7 @@ async fn run_dhcp(stack: Stack<'static>, gw_ip_addr: &'static str) {
&mut buf,
)
.await
.inspect_err(|e| log::warn!("DHCP server error: {e:?}"));
.inspect_err(|e| warn!("DHCP server error: {e:?}"));
Timer::after(Duration::from_millis(500)).await;
}
}