use core::{ cell::RefCell, future::Future, pin::Pin, task::{Context, Poll, Waker}, }; use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex}; use pin_project::pin_project; struct PipeData { connect_count: usize, receiver_waker: Option, sender_waker: Option, pending: Option, } fn swap_wakers(waker: &mut Option, new_waker: &Waker) { if let Some(old_waker) = waker.take() { if old_waker.will_wake(new_waker) { *waker = Some(old_waker) } else { if !new_waker.will_wake(&old_waker) { old_waker.wake(); } *waker = Some(new_waker.clone()); } } else { *waker = Some(new_waker.clone()) } } pub(crate) struct ReceiveFuture<'a, M: RawMutex, T, const N: usize> { pipe: &'a ConnectedPipe, } impl Future for ReceiveFuture<'_, M, T, N> { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.pipe.inner.lock(|cell| { let mut inner = cell.borrow_mut(); if let Some(waker) = inner.sender_waker.take() { waker.wake(); } if let Some(item) = inner.pending.take() { if let Some(old_waker) = inner.receiver_waker.take() { old_waker.wake(); } Poll::Ready(item) } else { swap_wakers(&mut inner.receiver_waker, cx.waker()); Poll::Pending } }) } } pub(crate) struct PipeReader<'a, M: RawMutex, T, const N: usize> { pipe: &'a ConnectedPipe, } impl PipeReader<'_, M, T, N> { #[must_use] pub(crate) fn receive(&self) -> ReceiveFuture<'_, M, T, N> { ReceiveFuture { pipe: self.pipe } } } impl Drop for PipeReader<'_, M, T, N> { fn drop(&mut self) { self.pipe.inner.lock(|cell| { let mut inner = cell.borrow_mut(); inner.connect_count -= 1; if inner.connect_count == 0 { inner.pending = None; } if let Some(waker) = inner.sender_waker.take() { waker.wake(); } }) } } #[pin_project] pub(crate) struct PushFuture<'a, M: RawMutex, T, const N: usize> { data: Option, pipe: &'a ConnectedPipe, } impl Future for PushFuture<'_, M, T, N> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.pipe.inner.lock(|cell| { let project = self.project(); let mut inner = cell.borrow_mut(); if let Some(receiver) = inner.receiver_waker.take() { receiver.wake(); } if project.data.is_none() || inner.connect_count == 0 { trace!("Dropping packet"); Poll::Ready(()) } else if inner.pending.is_some() { swap_wakers(&mut inner.sender_waker, cx.waker()); Poll::Pending } else { inner.pending = project.data.take(); Poll::Ready(()) } }) } } /// A pipe that knows whether a receiver is connected. If so pushing to the /// queue waits until there is space in the queue, otherwise data is simply /// dropped. pub(crate) struct ConnectedPipe { inner: Mutex>>, } impl ConnectedPipe { pub(crate) const fn new() -> Self { Self { inner: Mutex::new(RefCell::new(PipeData { connect_count: 0, receiver_waker: None, sender_waker: None, pending: None, })), } } /// A future that waits for a new item to be available. pub(crate) fn reader(&self) -> PipeReader<'_, M, T, N> { self.inner.lock(|cell| { let mut inner = cell.borrow_mut(); inner.connect_count += 1; PipeReader { pipe: self } }) } /// Pushes an item to the reader, waiting for a slot to become available if /// connected. #[must_use] pub(crate) fn push(&self, data: T) -> PushFuture<'_, M, T, N> { PushFuture { data: Some(data), pipe: self, } } } #[cfg(test)] mod tests { use core::time::Duration; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use futures_executor::{LocalPool, ThreadPool}; use futures_timer::Delay; use futures_util::{future::select, pin_mut, task::SpawnExt, FutureExt}; use super::ConnectedPipe; async fn wait_milis(milis: u64) { Delay::new(Duration::from_millis(milis)).await; } // #[futures_test::test] #[test] fn test_send_receive() { let mut executor = LocalPool::new(); let spawner = executor.spawner(); static PIPE: ConnectedPipe = ConnectedPipe::new(); // Task that sends spawner .spawn(async { wait_milis(10).await; PIPE.push(23).await; PIPE.push(56).await; PIPE.push(67).await; }) .unwrap(); // Task that receives spawner .spawn(async { let reader = PIPE.reader(); let value = reader.receive().await; assert_eq!(value, 23); let value = reader.receive().await; assert_eq!(value, 56); let value = reader.receive().await; assert_eq!(value, 67); }) .unwrap(); executor.run(); } #[futures_test::test] async fn test_send_drop() { static PIPE: ConnectedPipe = ConnectedPipe::new(); PIPE.push(23).await; PIPE.push(56).await; PIPE.push(67).await; // Create a reader after sending let reader = PIPE.reader(); let receive = reader.receive().fuse(); pin_mut!(receive); let timeout = wait_milis(50).fuse(); pin_mut!(timeout); let either = select(receive, timeout).await; match either { futures_util::future::Either::Left(_) => { panic!("There should be nothing to receive!"); } futures_util::future::Either::Right(_) => {} } } #[futures_test::test] async fn test_bulk_send_publish() { static PIPE: ConnectedPipe = ConnectedPipe::new(); let executor = ThreadPool::new().unwrap(); executor .spawn(async { for i in 0..1000 { PIPE.push(i).await; } }) .unwrap(); executor .spawn(async { for i in 1000..2000 { PIPE.push(i).await; } }) .unwrap(); let reader = PIPE.reader(); for _ in 0..800 { reader.receive().await; } } }