Skip to content

Commit

Permalink
Merge pull request #91 from matheus-consoli/move-test-channel-to-utils
Browse files Browse the repository at this point in the history
Move test channel to utils
  • Loading branch information
yoshuawuyts authored Nov 17, 2022
2 parents 330c8ee + c679d66 commit 1a73a7c
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 138 deletions.
70 changes: 1 addition & 69 deletions src/stream/merge/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,10 @@ where
#[cfg(test)]
mod tests {
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
use std::task::Waker;

use super::*;
use crate::utils::channel::local_channel;
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures_lite::future::block_on;
Expand Down Expand Up @@ -174,73 +173,6 @@ mod tests {
/// The purpose of this test is to make sure we have the waking logic working.
#[test]
fn merge_channels() {
struct LocalChannel<T> {
queue: VecDeque<T>,
waker: Option<Waker>,
closed: bool,
}

struct LocalReceiver<T> {
channel: Rc<RefCell<LocalChannel<T>>>,
}

impl<T> Stream for LocalReceiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = self.channel.borrow_mut();

match channel.queue.pop_front() {
Some(item) => Poll::Ready(Some(item)),
None => {
if channel.closed {
Poll::Ready(None)
} else {
channel.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
}

struct LocalSender<T> {
channel: Rc<RefCell<LocalChannel<T>>>,
}

impl<T> LocalSender<T> {
fn send(&self, item: T) {
let mut channel = self.channel.borrow_mut();

channel.queue.push_back(item);

let _ = channel.waker.take().map(Waker::wake);
}
}

impl<T> Drop for LocalSender<T> {
fn drop(&mut self) {
let mut channel = self.channel.borrow_mut();
channel.closed = true;
let _ = channel.waker.take().map(Waker::wake);
}
}

fn local_channel<T>() -> (LocalSender<T>, LocalReceiver<T>) {
let channel = Rc::new(RefCell::new(LocalChannel {
queue: VecDeque::new(),
waker: None,
closed: false,
}));

(
LocalSender {
channel: channel.clone(),
},
LocalReceiver { channel },
)
}

let mut pool = LocalPool::new();

let done = Rc::new(RefCell::new(false));
Expand Down
70 changes: 1 addition & 69 deletions src/stream/merge/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,10 @@ where
#[cfg(test)]
mod tests {
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
use std::task::Waker;

use super::*;
use crate::utils::channel::local_channel;
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
use futures_lite::future::block_on;
Expand Down Expand Up @@ -175,73 +174,6 @@ mod tests {
/// The purpose of this test is to make sure we have the waking logic working.
#[test]
fn merge_channels() {
struct LocalChannel<T> {
queue: VecDeque<T>,
waker: Option<Waker>,
closed: bool,
}

struct LocalReceiver<T> {
channel: Rc<RefCell<LocalChannel<T>>>,
}

impl<T> Stream for LocalReceiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = self.channel.borrow_mut();

match channel.queue.pop_front() {
Some(item) => Poll::Ready(Some(item)),
None => {
if channel.closed {
Poll::Ready(None)
} else {
channel.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
}

struct LocalSender<T> {
channel: Rc<RefCell<LocalChannel<T>>>,
}

impl<T> LocalSender<T> {
fn send(&self, item: T) {
let mut channel = self.channel.borrow_mut();

channel.queue.push_back(item);

let _ = channel.waker.take().map(Waker::wake);
}
}

impl<T> Drop for LocalSender<T> {
fn drop(&mut self) {
let mut channel = self.channel.borrow_mut();
channel.closed = true;
let _ = channel.waker.take().map(Waker::wake);
}
}

fn local_channel<T>() -> (LocalSender<T>, LocalReceiver<T>) {
let channel = Rc::new(RefCell::new(LocalChannel {
queue: VecDeque::new(),
waker: None,
closed: false,
}));

(
LocalSender {
channel: channel.clone(),
},
LocalReceiver { channel },
)
}

let mut pool = LocalPool::new();

let done = Rc::new(RefCell::new(false));
Expand Down
76 changes: 76 additions & 0 deletions src/utils/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::{
cell::RefCell,
collections::VecDeque,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};

use futures::Stream;

pub(crate) struct LocalChannel<T> {
queue: VecDeque<T>,
waker: Option<Waker>,
closed: bool,
}

pub(crate) struct LocalReceiver<T> {
channel: Rc<RefCell<LocalChannel<T>>>,
}

impl<T> Stream for LocalReceiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = self.channel.borrow_mut();

match channel.queue.pop_front() {
Some(item) => Poll::Ready(Some(item)),
None => {
if channel.closed {
Poll::Ready(None)
} else {
channel.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
}

pub(crate) struct LocalSender<T> {
channel: Rc<RefCell<LocalChannel<T>>>,
}

impl<T> LocalSender<T> {
pub(crate) fn send(&self, item: T) {
let mut channel = self.channel.borrow_mut();

channel.queue.push_back(item);

let _ = channel.waker.take().map(Waker::wake);
}
}

impl<T> Drop for LocalSender<T> {
fn drop(&mut self) {
let mut channel = self.channel.borrow_mut();
channel.closed = true;
let _ = channel.waker.take().map(Waker::wake);
}
}

pub(crate) fn local_channel<T>() -> (LocalSender<T>, LocalReceiver<T>) {
let channel = Rc::new(RefCell::new(LocalChannel {
queue: VecDeque::new(),
waker: None,
closed: false,
}));

(
LocalSender {
channel: channel.clone(),
},
LocalReceiver { channel },
)
}
3 changes: 3 additions & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ pub(crate) use wakers::{InlineWakerArray, WakerArray, WakerVec};

#[cfg(test)]
pub(crate) use wakers::DummyWaker;

#[cfg(test)]
pub(crate) mod channel;

0 comments on commit 1a73a7c

Please sign in to comment.