From c679d66bca1b07eb65288ba91efb0ebcd1ac413a Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Wed, 16 Nov 2022 20:02:22 -0300 Subject: [PATCH] Move test channel to utils --- src/stream/merge/array.rs | 70 +----------------------------------- src/stream/merge/vec.rs | 70 +----------------------------------- src/utils/channel.rs | 76 +++++++++++++++++++++++++++++++++++++++ src/utils/mod.rs | 3 ++ 4 files changed, 81 insertions(+), 138 deletions(-) create mode 100644 src/utils/channel.rs diff --git a/src/stream/merge/array.rs b/src/stream/merge/array.rs index 1282404..e512b74 100644 --- a/src/stream/merge/array.rs +++ b/src/stream/merge/array.rs @@ -124,11 +124,10 @@ where #[cfg(test)] mod tests { use std::cell::RefCell; - use std::collections::VecDeque; use std::rc::Rc; - use std::task::Waker; use super::*; + use crate::utils::channel::local_channel; use futures::executor::LocalPool; use futures::task::LocalSpawnExt; use futures_lite::future::block_on; @@ -174,73 +173,6 @@ mod tests { /// The purpose of this test is to make sure we have the waking logic working. #[test] fn merge_channels() { - struct LocalChannel { - queue: VecDeque, - waker: Option, - closed: bool, - } - - struct LocalReceiver { - channel: Rc>>, - } - - impl Stream for LocalReceiver { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut channel = self.channel.borrow_mut(); - - match channel.queue.pop_front() { - Some(item) => Poll::Ready(Some(item)), - None => { - if channel.closed { - Poll::Ready(None) - } else { - channel.waker = Some(cx.waker().clone()); - Poll::Pending - } - } - } - } - } - - struct LocalSender { - channel: Rc>>, - } - - impl LocalSender { - fn send(&self, item: T) { - let mut channel = self.channel.borrow_mut(); - - channel.queue.push_back(item); - - let _ = channel.waker.take().map(Waker::wake); - } - } - - impl Drop for LocalSender { - fn drop(&mut self) { - let mut channel = self.channel.borrow_mut(); - channel.closed = true; - let _ = channel.waker.take().map(Waker::wake); - } - } - - fn local_channel() -> (LocalSender, LocalReceiver) { - let channel = Rc::new(RefCell::new(LocalChannel { - queue: VecDeque::new(), - waker: None, - closed: false, - })); - - ( - LocalSender { - channel: channel.clone(), - }, - LocalReceiver { channel }, - ) - } - let mut pool = LocalPool::new(); let done = Rc::new(RefCell::new(false)); diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index b67d70c..82c774c 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -125,11 +125,10 @@ where #[cfg(test)] mod tests { use std::cell::RefCell; - use std::collections::VecDeque; use std::rc::Rc; - use std::task::Waker; use super::*; + use crate::utils::channel::local_channel; use futures::executor::LocalPool; use futures::task::LocalSpawnExt; use futures_lite::future::block_on; @@ -175,73 +174,6 @@ mod tests { /// The purpose of this test is to make sure we have the waking logic working. #[test] fn merge_channels() { - struct LocalChannel { - queue: VecDeque, - waker: Option, - closed: bool, - } - - struct LocalReceiver { - channel: Rc>>, - } - - impl Stream for LocalReceiver { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut channel = self.channel.borrow_mut(); - - match channel.queue.pop_front() { - Some(item) => Poll::Ready(Some(item)), - None => { - if channel.closed { - Poll::Ready(None) - } else { - channel.waker = Some(cx.waker().clone()); - Poll::Pending - } - } - } - } - } - - struct LocalSender { - channel: Rc>>, - } - - impl LocalSender { - fn send(&self, item: T) { - let mut channel = self.channel.borrow_mut(); - - channel.queue.push_back(item); - - let _ = channel.waker.take().map(Waker::wake); - } - } - - impl Drop for LocalSender { - fn drop(&mut self) { - let mut channel = self.channel.borrow_mut(); - channel.closed = true; - let _ = channel.waker.take().map(Waker::wake); - } - } - - fn local_channel() -> (LocalSender, LocalReceiver) { - let channel = Rc::new(RefCell::new(LocalChannel { - queue: VecDeque::new(), - waker: None, - closed: false, - })); - - ( - LocalSender { - channel: channel.clone(), - }, - LocalReceiver { channel }, - ) - } - let mut pool = LocalPool::new(); let done = Rc::new(RefCell::new(false)); diff --git a/src/utils/channel.rs b/src/utils/channel.rs new file mode 100644 index 0000000..d3a6709 --- /dev/null +++ b/src/utils/channel.rs @@ -0,0 +1,76 @@ +use std::{ + cell::RefCell, + collections::VecDeque, + pin::Pin, + rc::Rc, + task::{Context, Poll, Waker}, +}; + +use futures::Stream; + +pub(crate) struct LocalChannel { + queue: VecDeque, + waker: Option, + closed: bool, +} + +pub(crate) struct LocalReceiver { + channel: Rc>>, +} + +impl Stream for LocalReceiver { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut channel = self.channel.borrow_mut(); + + match channel.queue.pop_front() { + Some(item) => Poll::Ready(Some(item)), + None => { + if channel.closed { + Poll::Ready(None) + } else { + channel.waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } + } +} + +pub(crate) struct LocalSender { + channel: Rc>>, +} + +impl LocalSender { + pub(crate) fn send(&self, item: T) { + let mut channel = self.channel.borrow_mut(); + + channel.queue.push_back(item); + + let _ = channel.waker.take().map(Waker::wake); + } +} + +impl Drop for LocalSender { + fn drop(&mut self) { + let mut channel = self.channel.borrow_mut(); + channel.closed = true; + let _ = channel.waker.take().map(Waker::wake); + } +} + +pub(crate) fn local_channel() -> (LocalSender, LocalReceiver) { + let channel = Rc::new(RefCell::new(LocalChannel { + queue: VecDeque::new(), + waker: None, + closed: false, + })); + + ( + LocalSender { + channel: channel.clone(), + }, + LocalReceiver { channel }, + ) +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 5d51564..f4437a8 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -24,3 +24,6 @@ pub(crate) use wakers::{InlineWakerArray, WakerArray, WakerVec}; #[cfg(test)] pub(crate) use wakers::DummyWaker; + +#[cfg(test)] +pub(crate) mod channel;