Skip to content

Commit

Permalink
remove unneccessary pin_project in countdown future/stream
Browse files Browse the repository at this point in the history
  • Loading branch information
wishawa committed Feb 2, 2023
1 parent b29307d commit c777f28
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 34 deletions.
30 changes: 13 additions & 17 deletions benches/utils/countdown_futures.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures_core::Future;
use pin_project::pin_project;

use std::cell::{Cell, RefCell};
use std::collections::BinaryHeap;
Expand Down Expand Up @@ -45,7 +44,6 @@ pub fn futures_tuple() -> (
}

/// A future which will _eventually_ be ready, but needs to be polled N times before it is.
#[pin_project]
pub struct CountdownFuture {
state: State,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
Expand Down Expand Up @@ -73,40 +71,38 @@ impl CountdownFuture {
impl Future for CountdownFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If we are the last stream to be polled, skip strait to the Polled state.
if this.wakers.borrow().len() + 1 == *this.max_count {
*this.state = State::Polled;
if self.wakers.borrow().len() + 1 == self.max_count {
self.state = State::Polled;
}

match this.state {
match self.state {
State::Init => {
// Push our waker onto the stack so we get woken again someday.
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
*this.state = State::Polled;
.push(PrioritizedWaker(self.index, cx.waker().clone()));
self.state = State::Polled;
Poll::Pending
}
State::Polled => {
// Wake up the next one
let _ = this
let _ = self
.wakers
.borrow_mut()
.pop()
.map(|PrioritizedWaker(_, waker)| waker.wake());

if this.completed_count.get() == *this.index {
*this.state = State::Done;
this.completed_count.set(this.completed_count.get() + 1);
if self.completed_count.get() == self.index {
self.state = State::Done;
self.completed_count.set(self.completed_count.get() + 1);
Poll::Ready(())
} else {
// We're not done yet, so schedule another wakeup
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
.push(PrioritizedWaker(self.index, cx.waker().clone()));
Poll::Pending
}
}
Expand Down
30 changes: 13 additions & 17 deletions benches/utils/countdown_streams.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures_core::Stream;
use pin_project::pin_project;

use std::cell::{Cell, RefCell};
use std::collections::BinaryHeap;
Expand Down Expand Up @@ -45,7 +44,6 @@ pub fn streams_tuple() -> (
}

/// A stream which will _eventually_ be ready, but needs to be polled N times before it is.
#[pin_project]
pub struct CountdownStream {
state: State,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
Expand Down Expand Up @@ -73,40 +71,38 @@ impl CountdownStream {
impl Stream for CountdownStream {
type Item = ();

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// If we are the last stream to be polled, skip strait to the Polled state.
if this.wakers.borrow().len() + 1 == *this.max_count {
*this.state = State::Polled;
if self.wakers.borrow().len() + 1 == self.max_count {
self.state = State::Polled;
}

match this.state {
match &mut self.state {
State::Init => {
// Push our waker onto the stack so we get woken again someday.
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
*this.state = State::Polled;
.push(PrioritizedWaker(self.index, cx.waker().clone()));
self.state = State::Polled;
Poll::Pending
}
State::Polled => {
// Wake up the next one
let _ = this
let _ = self
.wakers
.borrow_mut()
.pop()
.map(|PrioritizedWaker(_, waker)| waker.wake());

if this.completed_count.get() == *this.index {
*this.state = State::Done;
this.completed_count.set(this.completed_count.get() + 1);
if self.completed_count.get() == self.index {
self.state = State::Done;
self.completed_count.set(self.completed_count.get() + 1);
Poll::Ready(Some(()))
} else {
// We're not done yet, so schedule another wakeup
this.wakers
self.wakers
.borrow_mut()
.push(PrioritizedWaker(*this.index, cx.waker().clone()));
.push(PrioritizedWaker(self.index, cx.waker().clone()));
Poll::Pending
}
}
Expand Down

0 comments on commit c777f28

Please sign in to comment.