From 00601b9421226382d486d1674fe6727f03bcf3e3 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 18 May 2024 10:30:12 +0100 Subject: [PATCH] revert function deletion --- src/future/future_group.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 7e75934..70528d9 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -274,6 +274,38 @@ impl FutureGroup { Key(index) } + #[allow(unused)] + /// Insert a value into a pinned `FutureGroup` + /// + /// This method is private because it serves as an implementation detail for + /// `ConcurrentStream`. We should never expose this publicly, as the entire + /// point of this crate is that we abstract the futures poll machinery away + /// from end-users. + pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key + where + F: Future, + { + let mut this = self.project(); + // SAFETY: inserting a value into the futures slab does not ever move + // any of the existing values. + let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future); + this.keys.insert(index); + let key = Key(index); + + // If our slab allocated more space we need to + // update our tracking structures along with it. + let max_len = this.futures.as_ref().capacity().max(index); + this.wakers.resize(max_len); + this.states.resize(max_len); + + // Set the corresponding state + this.states[index].set_pending(); + let mut readiness = this.wakers.readiness(); + readiness.set_ready(index); + + key + } + /// Create a stream which also yields the key of each item. /// /// # Example