Skip to content

Commit

Permalink
track capacity locally
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 19, 2024
1 parent 3e0e10b commit c14400c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
17 changes: 15 additions & 2 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct FutureGroup<F> {
wakers: WakerVec,
states: PollVec,
keys: BTreeSet<usize>,
capacity: usize,
}

impl<T: Debug> Debug for FutureGroup<T> {
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<F> FutureGroup<F> {
wakers: WakerVec::new(capacity),
states: PollVec::new(capacity),
keys: BTreeSet::new(),
capacity,
}
}

Expand Down Expand Up @@ -142,7 +144,7 @@ impl<F> FutureGroup<F> {
/// # let group: FutureGroup<usize> = group;
/// ```
pub fn capacity(&self) -> usize {
self.futures.capacity()
self.capacity
}

/// Returns true if there are no futures currently active in the group.
Expand Down Expand Up @@ -232,10 +234,11 @@ impl<F: Future> FutureGroup<F> {
// If our slab allocated more space we need to
// update our tracking structures along with it.
let capacity = self.capacity();
let max_len = capacity.max(index);
let max_len = capacity.max(index + 1);
if max_len > capacity {
self.wakers.resize(max_len);
self.states.resize(max_len);
self.capacity = max_len;
}

// Set the corresponding state
Expand Down Expand Up @@ -424,4 +427,14 @@ mod test {
assert!(group.is_empty());
});
}

#[test]
fn insert_many() {
futures_lite::future::block_on(async {
let mut group = FutureGroup::new();
for _ in 0..100 {
group.insert(future::ready(2));
}
});
}
}
20 changes: 17 additions & 3 deletions src/stream/stream_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct StreamGroup<S> {
states: PollVec,
keys: BTreeSet<usize>,
key_removal_queue: SmallVec<[usize; 10]>,
capacity: usize,
}

impl<T: Debug> Debug for StreamGroup<T> {
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<S> StreamGroup<S> {
states: PollVec::new(capacity),
keys: BTreeSet::new(),
key_removal_queue: smallvec![],
capacity,
}
}

Expand Down Expand Up @@ -141,7 +143,7 @@ impl<S> StreamGroup<S> {
/// # let group: StreamGroup<usize> = group;
/// ```
pub fn capacity(&self) -> usize {
self.streams.capacity()
self.capacity
}

/// Returns true if there are no futures currently active in the group.
Expand Down Expand Up @@ -224,17 +226,19 @@ impl<S: Stream> StreamGroup<S> {
where
S: Stream,
{
dbg!();

Check failure on line 229 in src/stream/stream_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

cannot find macro `dbg` in this scope
let index = self.streams.insert(stream);
self.keys.insert(index);
self.keys.insert(dbg!(index));

Check failure on line 231 in src/stream/stream_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

cannot find macro `dbg` in this scope
let key = Key(index);

// If our slab allocated more space we need to
// update our tracking structures along with it.
let capacity = self.capacity();
let max_len = capacity.max(index);
let max_len = capacity.max(index + 1);
if max_len > capacity {
self.wakers.resize(max_len);
self.states.resize(max_len);
self.capacity = max_len;
}

// Set the corresponding state
Expand Down Expand Up @@ -443,4 +447,14 @@ mod test {
assert!(group.is_empty());
});
}

#[test]
fn insert_many() {
futures_lite::future::block_on(async {
let mut group = StreamGroup::new();
for _ in 0..100 {
group.insert(stream::once(2));
}
});
}
}

0 comments on commit c14400c

Please sign in to comment.