diff --git a/src/future/future_group.rs b/src/future/future_group.rs index a64a418..df18a0a 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -67,6 +67,7 @@ pub struct FutureGroup { wakers: WakerVec, states: PollVec, keys: BTreeSet, + capacity: usize, } impl Debug for FutureGroup { @@ -108,6 +109,7 @@ impl FutureGroup { wakers: WakerVec::new(capacity), states: PollVec::new(capacity), keys: BTreeSet::new(), + capacity, } } @@ -142,7 +144,7 @@ impl FutureGroup { /// # let group: FutureGroup = group; /// ``` pub fn capacity(&self) -> usize { - self.futures.capacity() + self.capacity } /// Returns true if there are no futures currently active in the group. @@ -232,10 +234,11 @@ impl FutureGroup { // If our slab allocated more space we need to // update our tracking structures along with it. let capacity = self.capacity(); - let max_len = capacity.max(index); + let max_len = capacity.max(index + 1); if max_len > capacity { self.wakers.resize(max_len); self.states.resize(max_len); + self.capacity = max_len; } // Set the corresponding state @@ -424,4 +427,14 @@ mod test { assert!(group.is_empty()); }); } + + #[test] + fn insert_many() { + futures_lite::future::block_on(async { + let mut group = FutureGroup::new(); + for _ in 0..100 { + group.insert(future::ready(2)); + } + }); + } } diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 02ffe42..e5d8f0d 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -66,6 +66,7 @@ pub struct StreamGroup { states: PollVec, keys: BTreeSet, key_removal_queue: SmallVec<[usize; 10]>, + capacity: usize, } impl Debug for StreamGroup { @@ -108,6 +109,7 @@ impl StreamGroup { states: PollVec::new(capacity), keys: BTreeSet::new(), key_removal_queue: smallvec![], + capacity, } } @@ -141,7 +143,7 @@ impl StreamGroup { /// # let group: StreamGroup = group; /// ``` pub fn capacity(&self) -> usize { - self.streams.capacity() + self.capacity } /// Returns true if there are no futures currently active in the group. @@ -224,17 +226,19 @@ impl StreamGroup { where S: Stream, { + dbg!(); let index = self.streams.insert(stream); - self.keys.insert(index); + self.keys.insert(dbg!(index)); let key = Key(index); // If our slab allocated more space we need to // update our tracking structures along with it. let capacity = self.capacity(); - let max_len = capacity.max(index); + let max_len = capacity.max(index + 1); if max_len > capacity { self.wakers.resize(max_len); self.states.resize(max_len); + self.capacity = max_len; } // Set the corresponding state @@ -443,4 +447,14 @@ mod test { assert!(group.is_empty()); }); } + + #[test] + fn insert_many() { + futures_lite::future::block_on(async { + let mut group = StreamGroup::new(); + for _ in 0..100 { + group.insert(stream::once(2)); + } + }); + } }