Skip to content

Commit

Permalink
Add reserve method to FutureGroup and StreamGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus-consoli committed Mar 23, 2024
1 parent d9715ff commit a6313c5
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 10 deletions.
24 changes: 14 additions & 10 deletions src/collections/inner_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use smallvec::{smallvec, SmallVec};

use crate::utils::{PollVec, WakerVec};

const GROUP_GROWTH_FACTOR: usize = 2;
const fn grow_group_capacity(cap: usize) -> usize {
cap * 2 + 1
}

#[pin_project::pin_project]
pub struct InnerGroup<A, B> {
Expand Down Expand Up @@ -56,7 +58,7 @@ impl<A, B> InnerGroup<A, B> {

pub fn insert(&mut self, item: A) -> Key {
if !self.has_capacity() {
self.resize((self.cap + 1) * GROUP_GROWTH_FACTOR);
self.reserve(grow_group_capacity(self.cap));
}

let index = self.items.insert(item);
Expand All @@ -75,7 +77,7 @@ impl<A, B> InnerGroup<A, B> {

if !self.has_capacity() {
let r = unsafe { &mut self.as_mut().get_unchecked_mut() };
r.resize((r.cap + 1) * GROUP_GROWTH_FACTOR);
r.reserve(grow_group_capacity(r.cap));
}

let mut this = self.project();
Expand All @@ -102,15 +104,17 @@ impl<A, B> InnerGroup<A, B> {
Some(item)
}

// todo: rename to reserve
pub fn resize(&mut self, cap: usize) {
if self.len + cap < self.cap {
/// Reserve `additional` capacity for new items
/// Does nothing if the capacity is already sufficient
pub fn reserve(&mut self, additional: usize) {
if self.len + additional < self.cap {
return;
}
self.wakers.resize(cap);
self.states.resize(cap);
self.items.reserve_exact(cap);
self.cap = cap;
let new_cap = self.cap + additional;
self.wakers.resize(new_cap);
self.states.resize(new_cap);
self.items.reserve_exact(new_cap);
self.cap = new_cap;
}

pub fn set_top_waker(&mut self, waker: &Waker) {
Expand Down
19 changes: 19 additions & 0 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,25 @@ impl<F> FutureGroup<F> {
pub fn contains_key(&mut self, key: Key) -> bool {
self.inner.contains_key(key)
}

/// Reserves capacity for `additional` more futures to be inserted.
/// Does nothing if the capacity is already sufficient.
///
/// # Example
///
/// ```rust
/// use futures_concurrency::future::FutureGroup;
/// use std::future;
/// # futures_lite::future::block_on(async {
/// let mut group = FutureGroup::with_capacity(0);
/// assert_eq!(group.capacity(), 0);

Check failure on line 200 in src/future/future_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

type annotations needed for `FutureGroup<F>`
/// group.reserve(10);
/// assert_eq!(group.capacity(), 10);
/// # })
/// ```
pub fn reserve(&mut self, additional: usize) {
self.inner.reserve(additional);
}
}

impl<F> Default for FutureGroup<F> {
Expand Down
19 changes: 19 additions & 0 deletions src/stream/stream_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,25 @@ impl<S> StreamGroup<S> {
pub fn contains_key(&mut self, key: Key) -> bool {
self.inner.contains_key(key)
}

/// Reserves capacity for `additional` more streams to be inserted.
/// Does nothing if the capacity is already sufficient.
///
/// # Example
///
/// ```rust
/// use futures_concurrency::future::StreamGroup;
/// use std::future;

Check failure on line 194 in src/stream/stream_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unresolved import `futures_concurrency::future::StreamGroup`
/// # futures_lite::future::block_on(async {
/// let mut group = StreamGroup::with_capacity(0);
/// assert_eq!(group.capacity(), 0);
/// group.reserve(10);
/// assert_eq!(group.capacity(), 10);
/// # })
/// ```
pub fn reserve(&mut self, additional: usize) {
self.inner.reserve(additional);
}
}

impl<S: Stream> StreamGroup<S> {
Expand Down

0 comments on commit a6313c5

Please sign in to comment.