Add mcutie MQTT client implementation and improve library structure
- Integrated `mcutie` library as a core MQTT client for device communication. - Added support for Home Assistant entities (binary sensor, button) via MQTT. - Implemented buffer management, async operations, and packet encoding/decoding. - Introduced structured error handling and device registration features. - Updated `Cargo.toml` with new dependencies and enabled feature flags for `serde` and `log`. - Enhanced logging macros with configurable options (`defmt` or `log`). - Organized codebase into modules (buffer, components, IO, publish, etc.) for better maintainability. fix legacy dependecencies and compatiblity with mcutie vendored lib fix shit i hate this
This commit is contained in:
267
rust/src/mcutie_3_0_0/pipe.rs
Normal file
267
rust/src/mcutie_3_0_0/pipe.rs
Normal file
@@ -0,0 +1,267 @@
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll, Waker},
|
||||
};
|
||||
|
||||
use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex};
|
||||
use pin_project::pin_project;
|
||||
|
||||
struct PipeData<T, const N: usize> {
|
||||
connect_count: usize,
|
||||
receiver_waker: Option<Waker>,
|
||||
sender_waker: Option<Waker>,
|
||||
pending: Option<T>,
|
||||
}
|
||||
|
||||
fn swap_wakers(waker: &mut Option<Waker>, new_waker: &Waker) {
|
||||
if let Some(old_waker) = waker.take() {
|
||||
if old_waker.will_wake(new_waker) {
|
||||
*waker = Some(old_waker)
|
||||
} else {
|
||||
if !new_waker.will_wake(&old_waker) {
|
||||
old_waker.wake();
|
||||
}
|
||||
|
||||
*waker = Some(new_waker.clone());
|
||||
}
|
||||
} else {
|
||||
*waker = Some(new_waker.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ReceiveFuture<'a, M: RawMutex, T, const N: usize> {
|
||||
pipe: &'a ConnectedPipe<M, T, N>,
|
||||
}
|
||||
|
||||
impl<M: RawMutex, T, const N: usize> Future for ReceiveFuture<'_, M, T, N> {
|
||||
type Output = T;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.pipe.inner.lock(|cell| {
|
||||
let mut inner = cell.borrow_mut();
|
||||
|
||||
if let Some(waker) = inner.sender_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
|
||||
if let Some(item) = inner.pending.take() {
|
||||
if let Some(old_waker) = inner.receiver_waker.take() {
|
||||
old_waker.wake();
|
||||
}
|
||||
|
||||
Poll::Ready(item)
|
||||
} else {
|
||||
swap_wakers(&mut inner.receiver_waker, cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PipeReader<'a, M: RawMutex, T, const N: usize> {
|
||||
pipe: &'a ConnectedPipe<M, T, N>,
|
||||
}
|
||||
|
||||
impl<M: RawMutex, T, const N: usize> PipeReader<'_, M, T, N> {
|
||||
#[must_use]
|
||||
pub(crate) fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
|
||||
ReceiveFuture { pipe: self.pipe }
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: RawMutex, T, const N: usize> Drop for PipeReader<'_, M, T, N> {
|
||||
fn drop(&mut self) {
|
||||
self.pipe.inner.lock(|cell| {
|
||||
let mut inner = cell.borrow_mut();
|
||||
inner.connect_count -= 1;
|
||||
|
||||
if inner.connect_count == 0 {
|
||||
inner.pending = None;
|
||||
}
|
||||
|
||||
if let Some(waker) = inner.sender_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub(crate) struct PushFuture<'a, M: RawMutex, T, const N: usize> {
|
||||
data: Option<T>,
|
||||
pipe: &'a ConnectedPipe<M, T, N>,
|
||||
}
|
||||
|
||||
impl<M: RawMutex, T, const N: usize> Future for PushFuture<'_, M, T, N> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.pipe.inner.lock(|cell| {
|
||||
let project = self.project();
|
||||
let mut inner = cell.borrow_mut();
|
||||
|
||||
if let Some(receiver) = inner.receiver_waker.take() {
|
||||
receiver.wake();
|
||||
}
|
||||
|
||||
if project.data.is_none() || inner.connect_count == 0 {
|
||||
trace!("Dropping packet");
|
||||
Poll::Ready(())
|
||||
} else if inner.pending.is_some() {
|
||||
swap_wakers(&mut inner.sender_waker, cx.waker());
|
||||
Poll::Pending
|
||||
} else {
|
||||
inner.pending = project.data.take();
|
||||
|
||||
Poll::Ready(())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A pipe that knows whether a receiver is connected. If so pushing to the
|
||||
/// queue waits until there is space in the queue, otherwise data is simply
|
||||
/// dropped.
|
||||
pub(crate) struct ConnectedPipe<M: RawMutex, T, const N: usize> {
|
||||
inner: Mutex<M, RefCell<PipeData<T, N>>>,
|
||||
}
|
||||
|
||||
impl<M: RawMutex, T, const N: usize> ConnectedPipe<M, T, N> {
|
||||
pub(crate) const fn new() -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(RefCell::new(PipeData {
|
||||
connect_count: 0,
|
||||
receiver_waker: None,
|
||||
sender_waker: None,
|
||||
pending: None,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
/// A future that waits for a new item to be available.
|
||||
pub(crate) fn reader(&self) -> PipeReader<'_, M, T, N> {
|
||||
self.inner.lock(|cell| {
|
||||
let mut inner = cell.borrow_mut();
|
||||
inner.connect_count += 1;
|
||||
|
||||
PipeReader { pipe: self }
|
||||
})
|
||||
}
|
||||
|
||||
/// Pushes an item to the reader, waiting for a slot to become available if
|
||||
/// connected.
|
||||
#[must_use]
|
||||
pub(crate) fn push(&self, data: T) -> PushFuture<'_, M, T, N> {
|
||||
PushFuture {
|
||||
data: Some(data),
|
||||
pipe: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::time::Duration;
|
||||
|
||||
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
|
||||
use futures_executor::{LocalPool, ThreadPool};
|
||||
use futures_timer::Delay;
|
||||
use futures_util::{future::select, pin_mut, task::SpawnExt, FutureExt};
|
||||
|
||||
use super::ConnectedPipe;
|
||||
|
||||
async fn wait_milis(milis: u64) {
|
||||
Delay::new(Duration::from_millis(milis)).await;
|
||||
}
|
||||
|
||||
// #[futures_test::test]
|
||||
#[test]
|
||||
fn test_send_receive() {
|
||||
let mut executor = LocalPool::new();
|
||||
let spawner = executor.spawner();
|
||||
|
||||
static PIPE: ConnectedPipe<CriticalSectionRawMutex, usize, 5> = ConnectedPipe::new();
|
||||
|
||||
// Task that sends
|
||||
spawner
|
||||
.spawn(async {
|
||||
wait_milis(10).await;
|
||||
|
||||
PIPE.push(23).await;
|
||||
PIPE.push(56).await;
|
||||
PIPE.push(67).await;
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Task that receives
|
||||
spawner
|
||||
.spawn(async {
|
||||
let reader = PIPE.reader();
|
||||
let value = reader.receive().await;
|
||||
assert_eq!(value, 23);
|
||||
let value = reader.receive().await;
|
||||
assert_eq!(value, 56);
|
||||
let value = reader.receive().await;
|
||||
assert_eq!(value, 67);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
executor.run();
|
||||
}
|
||||
|
||||
#[futures_test::test]
|
||||
async fn test_send_drop() {
|
||||
static PIPE: ConnectedPipe<CriticalSectionRawMutex, usize, 5> = ConnectedPipe::new();
|
||||
|
||||
PIPE.push(23).await;
|
||||
PIPE.push(56).await;
|
||||
PIPE.push(67).await;
|
||||
|
||||
// Create a reader after sending
|
||||
let reader = PIPE.reader();
|
||||
let receive = reader.receive().fuse();
|
||||
pin_mut!(receive);
|
||||
|
||||
let timeout = wait_milis(50).fuse();
|
||||
pin_mut!(timeout);
|
||||
|
||||
let either = select(receive, timeout).await;
|
||||
|
||||
match either {
|
||||
futures_util::future::Either::Left(_) => {
|
||||
panic!("There should be nothing to receive!");
|
||||
}
|
||||
futures_util::future::Either::Right(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[futures_test::test]
|
||||
async fn test_bulk_send_publish() {
|
||||
static PIPE: ConnectedPipe<CriticalSectionRawMutex, usize, 5> = ConnectedPipe::new();
|
||||
|
||||
let executor = ThreadPool::new().unwrap();
|
||||
|
||||
executor
|
||||
.spawn(async {
|
||||
for i in 0..1000 {
|
||||
PIPE.push(i).await;
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
executor
|
||||
.spawn(async {
|
||||
for i in 1000..2000 {
|
||||
PIPE.push(i).await;
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let reader = PIPE.reader();
|
||||
for _ in 0..800 {
|
||||
reader.receive().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user