From 5102d7e45ef0dd55b8ecf0bc9014cada39354a35 Mon Sep 17 00:00:00 2001 From: Alex Rudy Date: Sun, 6 Oct 2024 03:07:47 +0000 Subject: [PATCH] feat: Client pool can delay drop for checkout When a checkout gets interrupted by a ready / idle connection, the checkout may have done significant work to create the connection. In delayed drop mode, instead of cancelling the connection attempt, the connection will continue in the background and then be added to the pool as a new idle connection. This can save significant time when future connections begin. The downside is that it requires heap-allocating the future for the connector, but this penalty should be small compared to the typical network latencies observed and the potential savings (100s of ms) by not throwing away completed connection work. --- examples/client/profile.rs | 54 +++++++++---------- src/client/pool/checkout.rs | 101 +++++++++++++++++++++++++++++++----- src/client/pool/mod.rs | 77 +++++++++++++++++++++------ src/client/pool/service.rs | 2 +- src/info/mod.rs | 2 +- 5 files changed, 181 insertions(+), 55 deletions(-) diff --git a/examples/client/profile.rs b/examples/client/profile.rs index aaee52e..1c329c9 100644 --- a/examples/client/profile.rs +++ b/examples/client/profile.rs @@ -7,6 +7,7 @@ use std::time::Duration; use clap::{arg, value_parser, ArgMatches}; +use futures_util::{stream::FuturesUnordered, TryStreamExt}; use http::{HeaderName, HeaderValue, Uri}; use http_body_util::BodyExt as _; use hyperdriver::{client::Client, Body}; @@ -95,14 +96,6 @@ async fn build_request(args: &ArgMatches, uri: Uri) -> Result, -) -> Result, BoxError> { - Ok(client.request(request).await?) -} - #[instrument(level = "info", skip_all)] async fn demonstrate_requests(args: &ArgMatches) -> Result<(), BoxError> { let timeout = Duration::from_secs(*args.get_one::("timeout").unwrap()); @@ -126,35 +119,44 @@ async fn demonstrate_requests(args: &ArgMatches) -> Result<(), BoxError> { let mut client = Client::build_tcp_http().with_timeout(timeout).build(); - let mut res = None; + let mut fut = FuturesUnordered::new(); for _ in 1..=repeat { let req = build_request(args, uri.clone()).await?; - res = Some(do_one_request(&mut client, req).await?); + let res = client.request(req); + let span = tracing::info_span!("request"); + fut.push(res.instrument(span)) } - let res = res.unwrap(); - println!("Response: {} - {:?}", res.status(), res.version()); + { + let mut res = None; + while let Some(item) = fut.try_next().await? { + res = Some(item); + } - for (name, value) in res.headers() { - if let Ok(value) = value.to_str() { - println!(" {}: {}", name, value); + let res = res.unwrap(); + println!("Response: {} - {:?}", res.status(), res.version()); + + for (name, value) in res.headers() { + if let Ok(value) = value.to_str() { + println!(" {}: {}", name, value); + } } - } - let span = tracing::debug_span!("read body"); - async { - let mut stdout = tokio::io::stdout(); + let span = tracing::debug_span!("read body"); + async { + let mut stdout = tokio::io::stdout(); - let mut body = res.into_body(); - while let Some(chunk) = body.frame().await { - if let Some(chunk) = chunk?.data_ref() { - stdout.write_all(chunk).await?; + let mut body = res.into_body(); + while let Some(chunk) = body.frame().await { + if let Some(chunk) = chunk?.data_ref() { + stdout.write_all(chunk).await?; + } } + Ok::<_, BoxError>(()) } - Ok::<_, BoxError>(()) + .instrument(span) + .await?; } - .instrument(span) - .await?; tracing::info!("finished"); diff --git a/src/client/pool/checkout.rs b/src/client/pool/checkout.rs index 1c86956..16f20c9 100644 --- a/src/client/pool/checkout.rs +++ b/src/client/pool/checkout.rs @@ -20,6 +20,7 @@ use crate::client::conn::Transport; use crate::info::ConnectionInfo; use crate::info::HasConnectionInfo; +use super::Config; use super::Key; use super::Pool; use super::PoolRef; @@ -358,6 +359,7 @@ where Waiting, Connected, Connecting(#[pin] Connector), + ConnectingWithDelayDrop(Option>>>), } impl fmt::Debug for InnerCheckoutConnecting @@ -373,6 +375,10 @@ where InnerCheckoutConnecting::Connecting(connector) => { f.debug_tuple("Connecting").field(connector).finish() } + InnerCheckoutConnecting::ConnectingWithDelayDrop(connector) => f + .debug_tuple("ConnectingWithDelayDrop") + .field(connector) + .finish(), } } } @@ -408,9 +414,10 @@ impl CheckoutMeta { #[pin_project(PinnedDrop)] pub(crate) struct Checkout where - T: Transport, - P: Protocol, + T: Transport + 'static, + P: Protocol + Send + 'static, P::Connection: PoolableConnection, + B: 'static, { key: Key, pool: PoolRef, @@ -419,6 +426,7 @@ where #[pin] inner: InnerCheckoutConnecting, connection: Option, + is_delayed_drop: bool, meta: CheckoutMeta, #[cfg(debug_assertions)] id: CheckoutId, @@ -443,10 +451,38 @@ where impl Checkout where - T: Transport, - P: Protocol, + T: Transport + 'static, + P: Protocol + Send + 'static, P::Connection: PoolableConnection, + B: 'static, { + /// Converts this checkout into a "delayed drop" checkout. + fn as_delayed(self: Pin<&mut Self>) -> Option { + let mut this = self.project(); + + if *this.is_delayed_drop { + return None; + } + + match this.inner.as_mut().project() { + CheckoutConnectingProj::ConnectingWithDelayDrop(connector) if connector.is_some() => { + tracing::trace!("converting checkout to delayed drop"); + Some(Checkout { + key: this.key.clone(), + pool: this.pool.clone(), + waiter: Waiting::NoPool, + inner: InnerCheckoutConnecting::ConnectingWithDelayDrop(connector.take()), + connection: None, + is_delayed_drop: true, + meta: CheckoutMeta::new(), // New meta to avoid holding spans in the spawned task + #[cfg(debug_assertions)] + id: *this.id, + }) + } + _ => None, + } + } + /// Constructs a checkout which does not hold a reference to the pool /// and so is only waiting on the connector. /// @@ -470,6 +506,7 @@ where waiter: Waiting::NoPool, inner: InnerCheckoutConnecting::Connecting(connector), connection: None, + is_delayed_drop: false, meta: CheckoutMeta::new(), #[cfg(debug_assertions)] id, @@ -482,6 +519,7 @@ where waiter: Receiver>, connect: Option>, connection: Option, + config: &Config, ) -> Self { #[cfg(debug_assertions)] let id = CheckoutId::new(); @@ -498,18 +536,27 @@ where waiter: Waiting::Idle(waiter), inner: InnerCheckoutConnecting::Connected, connection, + is_delayed_drop: false, meta, #[cfg(debug_assertions)] id, } } else if let Some(connector) = connect { tracing::trace!(%key, "connecting to pool"); + + let inner = if config.continue_after_preemption { + InnerCheckoutConnecting::ConnectingWithDelayDrop(Some(Box::pin(connector))) + } else { + InnerCheckoutConnecting::Connecting(connector) + }; + Self { key, pool: pool.as_ref(), waiter: Waiting::Idle(waiter), - inner: InnerCheckoutConnecting::Connecting(connector), + inner, connection, + is_delayed_drop: false, meta, #[cfg(debug_assertions)] id, @@ -522,6 +569,7 @@ where waiter: Waiting::Connecting(waiter), inner: InnerCheckoutConnecting::Waiting, connection, + is_delayed_drop: false, meta, #[cfg(debug_assertions)] id, @@ -532,9 +580,10 @@ where impl Future for Checkout where - T: Transport, - P: Protocol, + T: Transport + 'static, + P: Protocol + Send + 'static, P::Connection: PoolableConnection, + B: 'static, { type Output = Result< Pooled, @@ -585,15 +634,35 @@ where CheckoutConnectingProj::Connecting(connector) => { let result = ready!(connector.poll_connector(this.pool, this.key, this.meta, cx)); + this.waiter.close(); + this.inner.set(InnerCheckoutConnecting::Connected); + match result { Ok(connection) => { - this.waiter.close(); - this.inner.set(InnerCheckoutConnecting::Connected); Poll::Ready(Ok(register_connected(this.pool, this.key, connection))) } Err(e) => Poll::Ready(Err(e)), } } + CheckoutConnectingProj::ConnectingWithDelayDrop(Some(connector)) => { + let result = ready!(connector + .as_mut() + .poll_connector(this.pool, this.key, this.meta, cx)); + + this.waiter.close(); + this.inner.set(InnerCheckoutConnecting::Connected); + + match result { + Ok(connection) => { + Poll::Ready(Ok(register_connected(this.pool, this.key, connection))) + } + Err(e) => Poll::Ready(Err(e)), + } + } + CheckoutConnectingProj::ConnectingWithDelayDrop(None) => { + // Something stole our connection, this is an error state. + panic!("connection was stolen from checkout") + } } } } @@ -634,15 +703,23 @@ where #[pinned_drop] impl PinnedDrop for Checkout where - T: Transport, - P: Protocol, + T: Transport + 'static, + P: Protocol + Send + 'static, P::Connection: PoolableConnection, + B: 'static, { fn drop(mut self: Pin<&mut Self>) { #[cfg(debug_assertions)] tracing::trace!(id=%self.id, "drop for checkout"); - if let Some(mut pool) = self.pool.lock() { + if let Some(checkout) = self.as_mut().as_delayed() { + tokio::task::spawn(async move { + if let Err(err) = checkout.await { + tracing::error!(error=%err, "error during delayed drop"); + } + }); + } else if let Some(mut pool) = self.pool.lock() { + // Connection is only cancled when no delayed drop occurs. pool.cancel_connection(&self.key); } } diff --git a/src/client/pool/mod.rs b/src/client/pool/mod.rs index f332f3f..67c992d 100644 --- a/src/client/pool/mod.rs +++ b/src/client/pool/mod.rs @@ -110,8 +110,9 @@ impl Pool { ) -> Checkout where T: Transport, - P: Protocol, + P: Protocol + Send + 'static, C: PoolableConnection, + B: 'static, { let mut inner = self.inner.lock(); let (tx, rx) = tokio::sync::oneshot::channel(); @@ -120,7 +121,7 @@ impl Pool { if let Some(connection) = inner.pop(&key) { trace!("connection found in pool"); connector = None; - return Checkout::new(key, self, rx, connector, Some(connection)); + return Checkout::new(key, self, rx, connector, Some(connection), &inner.config); } trace!("checkout interested in pooled connections"); @@ -129,7 +130,7 @@ impl Pool { if inner.connecting.contains(&key) { trace!("connection in progress elsewhere, will wait"); connector = None; - Checkout::new(key, self, rx, connector, None) + Checkout::new(key, self, rx, connector, None, &inner.config) } else { if multiplex { // Only block new connection attempts if we can multiplex on this one. @@ -137,7 +138,7 @@ impl Pool { inner.connecting.insert(key.clone()); } trace!("connecting to host"); - Checkout::new(key, self, rx, connector, None) + Checkout::new(key, self, rx, connector, None, &inner.config) } } } @@ -335,7 +336,7 @@ pub struct Config { pub max_idle_per_host: usize, /// Should in-progress connections continue after they get pre-empted by a new connection? - pub continue_after_premeption: bool, + pub continue_after_preemption: bool, } impl Default for Config { @@ -343,7 +344,7 @@ impl Default for Config { Self { idle_timeout: Some(Duration::from_secs(90)), max_idle_per_host: 32, - continue_after_premeption: true, + continue_after_preemption: true, } } } @@ -480,7 +481,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -539,7 +540,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -597,7 +598,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -643,7 +644,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -681,7 +682,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -736,7 +737,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -772,7 +773,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -800,7 +801,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let key: key::Key = ( @@ -828,7 +829,7 @@ mod tests { let pool = Pool::new(Config { idle_timeout: Some(Duration::from_secs(10)), max_idle_per_host: 5, - continue_after_premeption: false, + continue_after_preemption: false, }); let other = pool.clone(); @@ -880,4 +881,50 @@ mod tests { assert!(c2.is_open()); assert_ne!(c2.id(), cid, "connection should not be re-used"); } + + #[tokio::test] + async fn checkout_delayed_drop() { + let _ = tracing_subscriber::fmt::try_init(); + + let pool = Pool::new(Config { + idle_timeout: Some(Duration::from_secs(10)), + max_idle_per_host: 5, + continue_after_preemption: true, + }); + + let key: key::Key = ( + http::uri::Scheme::HTTP, + http::uri::Authority::from_static("localhost:8080"), + ) + .into(); + + let conn = pool + .checkout( + key.clone(), + false, + MockTransport::single() + .connector("mock://address".parse().unwrap(), HttpProtocol::Http1), + ) + .await + .unwrap(); + + assert!(conn.is_open()); + let cid = conn.id(); + + let checkout = pool.checkout( + key.clone(), + false, + MockTransport::single() + .connector("mock://address".parse().unwrap(), HttpProtocol::Http1), + ); + + drop(conn); + let conn = checkout.await.unwrap(); + assert!(conn.is_open()); + assert_eq!(cid, conn.id()); + + let inner = pool.inner.lock(); + let idles = inner.idle.get(&key).unwrap(); + assert_eq!(idles.len(), 1); + } } diff --git a/src/client/pool/service.rs b/src/client/pool/service.rs index b28e1fb..55fcef4 100644 --- a/src/client/pool/service.rs +++ b/src/client/pool/service.rs @@ -177,7 +177,7 @@ impl pool: Some(pool::Pool::new(pool::Config { idle_timeout: Some(std::time::Duration::from_secs(90)), max_idle_per_host: 32, - continue_after_premeption: true, + continue_after_preemption: true, })), transport: Default::default(), diff --git a/src/info/mod.rs b/src/info/mod.rs index edee805..65212ef 100644 --- a/src/info/mod.rs +++ b/src/info/mod.rs @@ -294,7 +294,7 @@ impl ConnectionInfo { /// Trait for types which can provide connection information. pub trait HasConnectionInfo { /// The address type for this connection. - type Addr: fmt::Display + fmt::Debug; + type Addr: fmt::Display + fmt::Debug + Send; /// Get the connection information for this stream. fn info(&self) -> ConnectionInfo;