Skip to content

Commit

Permalink
set bounds on {Future,Stream}Group::insert
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Aug 13, 2023
1 parent b90a83e commit 74b2611
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 62 deletions.
62 changes: 31 additions & 31 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,37 +139,6 @@ impl<F> FutureGroup<F> {
self.futures.is_empty()
}

/// Insert a new future into the group.
///
/// # Example
///
/// ```rust
/// use futures_concurrency::future::FutureGroup;
/// use std::future;
///
/// let mut group = FutureGroup::with_capacity(2);
/// group.insert(future::ready(12));
/// ```
pub fn insert(&mut self, stream: F) -> Key
where
F: Future,
{
let index = self.futures.insert(stream);
self.keys.insert(index);
let key = Key(index);

// If our slab allocated more space we need to
// update our tracking structures along with it.
let max_len = self.capacity().max(index);
self.wakers.resize(max_len);
self.states.resize(max_len);

// Set the corresponding state
self.states[index].set_pending();

key
}

/// Removes a stream from the group. Returns whether the value was present in
/// the group.
///
Expand Down Expand Up @@ -218,6 +187,37 @@ impl<F> FutureGroup<F> {
}

impl<F: Future> FutureGroup<F> {
/// Insert a new future into the group.
///
/// # Example
///
/// ```rust
/// use futures_concurrency::future::FutureGroup;
/// use std::future;
///
/// let mut group = FutureGroup::with_capacity(2);
/// group.insert(future::ready(12));
/// ```
pub fn insert(&mut self, stream: F) -> Key
where
F: Future,
{
let index = self.futures.insert(stream);
self.keys.insert(index);
let key = Key(index);

// If our slab allocated more space we need to
// update our tracking structures along with it.
let max_len = self.capacity().max(index);
self.wakers.resize(max_len);
self.states.resize(max_len);

// Set the corresponding state
self.states[index].set_pending();

key
}

/// Create a stream which also yields the key of each item.
///
/// # Example
Expand Down
62 changes: 31 additions & 31 deletions src/stream/stream_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,37 +139,6 @@ impl<S> StreamGroup<S> {
self.streams.is_empty()
}

/// Insert a new future into the group.
///
/// # Example
///
/// ```rust
/// use futures_concurrency::stream::StreamGroup;
/// use futures_lite::stream;
///
/// let mut group = StreamGroup::with_capacity(2);
/// group.insert(stream::once(12));
/// ```
pub fn insert(&mut self, stream: S) -> Key
where
S: Stream,
{
let index = self.streams.insert(stream);
self.keys.insert(index);
let key = Key(index);

// If our slab allocated more space we need to
// update our tracking structures along with it.
let max_len = self.capacity().max(index);
self.wakers.resize(max_len);
self.states.resize(max_len);

// Set the corresponding state
self.states[index].set_pending();

key
}

/// Removes a stream from the group. Returns whether the value was present in
/// the group.
///
Expand Down Expand Up @@ -218,6 +187,37 @@ impl<S> StreamGroup<S> {
}

impl<S: Stream> StreamGroup<S> {
/// Insert a new future into the group.
///
/// # Example
///
/// ```rust
/// use futures_concurrency::stream::StreamGroup;
/// use futures_lite::stream;
///
/// let mut group = StreamGroup::with_capacity(2);
/// group.insert(stream::once(12));
/// ```
pub fn insert(&mut self, stream: S) -> Key
where
S: Stream,
{
let index = self.streams.insert(stream);
self.keys.insert(index);
let key = Key(index);

// If our slab allocated more space we need to
// update our tracking structures along with it.
let max_len = self.capacity().max(index);
self.wakers.resize(max_len);
self.states.resize(max_len);

// Set the corresponding state
self.states[index].set_pending();

key
}

/// Create a stream which also yields the key of each item.
///
/// # Example
Expand Down

0 comments on commit 74b2611

Please sign in to comment.