diff --git a/src/future/future_group.rs b/src/future/future_group.rs index e327f2b..4d000c2 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -67,14 +67,15 @@ pub struct FutureGroup { wakers: WakerVec, states: PollVec, keys: BTreeSet, + capacity: usize, } impl Debug for FutureGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FutureGroup") .field("slab", &"[..]") - .field("len", &self.futures.len()) - .field("capacity", &self.futures.capacity()) + .field("len", &self.len()) + .field("capacity", &self.capacity) .finish() } } @@ -110,6 +111,7 @@ impl FutureGroup { wakers: WakerVec::new(capacity), states: PollVec::new(capacity), keys: BTreeSet::new(), + capacity, } } @@ -127,6 +129,7 @@ impl FutureGroup { /// group.insert(future::ready(12)); /// assert_eq!(group.len(), 1); /// ``` + #[inline(always)] pub fn len(&self) -> usize { self.futures.len() } @@ -144,7 +147,7 @@ impl FutureGroup { /// # let group: FutureGroup = group; /// ``` pub fn capacity(&self) -> usize { - self.futures.capacity() + self.capacity } /// Returns true if there are no futures currently active in the group. @@ -209,6 +212,36 @@ impl FutureGroup { pub fn contains_key(&mut self, key: Key) -> bool { self.keys.contains(&key.0) } + + /// Reserves capacity for `additional` more futures to be inserted. + /// Does nothing if the capacity is already sufficient. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use std::future::Ready; + /// # futures_lite::future::block_on(async { + /// let mut group: FutureGroup> = FutureGroup::with_capacity(0); + /// assert_eq!(group.capacity(), 0); + /// group.reserve(10); + /// assert_eq!(group.capacity(), 10); + /// + /// // does nothing if capacity is sufficient + /// group.reserve(5); + /// assert_eq!(group.capacity(), 10); + /// # }) + /// ``` + pub fn reserve(&mut self, additional: usize) { + if self.len() + additional < self.capacity { + return; + } + let new_cap = self.capacity + additional; + self.wakers.resize(new_cap); + self.states.resize(new_cap); + self.futures.reserve_exact(additional); + self.capacity = new_cap; + } } impl FutureGroup { @@ -223,26 +256,22 @@ impl FutureGroup { /// let mut group = FutureGroup::with_capacity(2); /// group.insert(future::ready(12)); /// ``` - pub fn insert(&mut self, stream: F) -> Key + pub fn insert(&mut self, future: F) -> Key where F: Future, { - let index = self.futures.insert(stream); - self.keys.insert(index); - let key = Key(index); + if self.capacity <= self.len() { + self.reserve(self.capacity * 2 + 1); + } - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); + let index = self.futures.insert(future); + self.keys.insert(index); // Set the corresponding state self.states[index].set_pending(); - let mut readiness = self.wakers.readiness(); - readiness.set_ready(index); + self.wakers.readiness().set_ready(index); - key + Key(index) } /// Insert a value into a pinned `FutureGroup` @@ -251,14 +280,14 @@ impl FutureGroup { /// `ConcurrentStream`. We should never expose this publicly, as the entire /// point of this crate is that we abstract the futures poll machinery away /// from end-users. - pub(crate) fn insert_pinned(self: Pin<&mut Self>, stream: F) -> Key + pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key where F: Future, { let mut this = self.project(); // SAFETY: inserting a value into the futures slab does not ever move // any of the existing values. - let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(stream); + let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future); this.keys.insert(index); let key = Key(index); @@ -455,4 +484,16 @@ mod test { assert!(group.is_empty()); }); } + + #[test] + fn capacity_grow_on_insert() { + futures_lite::future::block_on(async { + let mut group = FutureGroup::new(); + let cap = group.capacity(); + + group.insert(future::ready(1)); + + assert!(group.capacity() > cap); + }); + } } diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index f0bcab7..7ca3ab4 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -66,6 +66,7 @@ pub struct StreamGroup { states: PollVec, keys: BTreeSet, key_removal_queue: SmallVec<[usize; 10]>, + capacity: usize, } impl Debug for StreamGroup { @@ -108,6 +109,7 @@ impl StreamGroup { states: PollVec::new(capacity), keys: BTreeSet::new(), key_removal_queue: smallvec![], + capacity, } } @@ -124,6 +126,7 @@ impl StreamGroup { /// group.insert(stream::once(12)); /// assert_eq!(group.len(), 1); /// ``` + #[inline(always)] pub fn len(&self) -> usize { self.streams.len() } @@ -141,7 +144,7 @@ impl StreamGroup { /// # let group: StreamGroup = group; /// ``` pub fn capacity(&self) -> usize { - self.streams.capacity() + self.capacity } /// Returns true if there are no futures currently active in the group. @@ -157,6 +160,7 @@ impl StreamGroup { /// group.insert(stream::once(12)); /// assert!(!group.is_empty()); /// ``` + #[inline(always)] pub fn is_empty(&self) -> bool { self.streams.is_empty() } @@ -206,6 +210,36 @@ impl StreamGroup { pub fn contains_key(&mut self, key: Key) -> bool { self.keys.contains(&key.0) } + + /// Reserves capacity for `additional` more streams to be inserted. + /// Does nothing if the capacity is already sufficient. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::stream::StreamGroup; + /// use futures_lite::stream::Once; + /// # futures_lite::future::block_on(async { + /// let mut group: StreamGroup> = StreamGroup::with_capacity(0); + /// assert_eq!(group.capacity(), 0); + /// group.reserve(10); + /// assert_eq!(group.capacity(), 10); + /// + /// // does nothing if capacity is sufficient + /// group.reserve(5); + /// assert_eq!(group.capacity(), 10); + /// # }) + /// ``` + pub fn reserve(&mut self, additional: usize) { + if self.len() + additional < self.capacity { + return; + } + let new_cap = self.capacity + additional; + self.wakers.resize(new_cap); + self.states.resize(new_cap); + self.streams.reserve_exact(additional); + self.capacity = new_cap; + } } impl StreamGroup { @@ -224,22 +258,18 @@ impl StreamGroup { where S: Stream, { + if self.capacity <= self.len() { + self.reserve(self.capacity * 2 + 1); + } + let index = self.streams.insert(stream); self.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); // Set the corresponding state self.states[index].set_pending(); - let mut readiness = self.wakers.readiness(); - readiness.set_ready(index); + self.wakers.readiness().set_ready(index); - key + Key(index) } /// Create a stream which also yields the key of each item. @@ -270,10 +300,10 @@ impl StreamGroup { impl StreamGroup { fn poll_next_inner( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &Context<'_>, ) -> Poll::Item)>> { - let mut this = self.project(); + let mut this = self.as_mut().project(); // Short-circuit if we have no streams to iterate over if this.streams.is_empty() { @@ -441,4 +471,16 @@ mod test { assert!(group.is_empty()); }); } + + #[test] + fn capacity_grow_on_insert() { + futures_lite::future::block_on(async { + let mut group = StreamGroup::new(); + let cap = group.capacity(); + + group.insert(stream::once(1)); + + assert!(group.capacity() > cap); + }); + } }