Skip to content

Commit

Permalink
feat: Client pool can delay drop for checkout
Browse files Browse the repository at this point in the history
When a checkout gets interrupted by a ready / idle connection, the checkout may have done significant work to create the connection. In delayed drop mode, instead of cancelling the connection attempt, the connection will continue in the background and then be added to the pool as a new idle connection.

This can save significant time when future connections begin. The downside is that it requires heap-allocating the future for the connector, but this penalty should be small compared to the typical network latencies observed and the potential savings (100s of ms) by not throwing away completed connection work.
  • Loading branch information
alexrudy committed Oct 6, 2024
1 parent 72be686 commit 5102d7e
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 55 deletions.
54 changes: 28 additions & 26 deletions examples/client/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::time::Duration;

use clap::{arg, value_parser, ArgMatches};
use futures_util::{stream::FuturesUnordered, TryStreamExt};
use http::{HeaderName, HeaderValue, Uri};
use http_body_util::BodyExt as _;
use hyperdriver::{client::Client, Body};
Expand Down Expand Up @@ -95,14 +96,6 @@ async fn build_request(args: &ArgMatches, uri: Uri) -> Result<http::Request<Body
Ok(req)
}

#[instrument(level = "info", name = "request", skip_all)]
async fn do_one_request(
client: &mut Client,
request: http::Request<Body>,
) -> Result<http::Response<Body>, BoxError> {
Ok(client.request(request).await?)
}

#[instrument(level = "info", skip_all)]
async fn demonstrate_requests(args: &ArgMatches) -> Result<(), BoxError> {
let timeout = Duration::from_secs(*args.get_one::<u64>("timeout").unwrap());
Expand All @@ -126,35 +119,44 @@ async fn demonstrate_requests(args: &ArgMatches) -> Result<(), BoxError> {

let mut client = Client::build_tcp_http().with_timeout(timeout).build();

let mut res = None;
let mut fut = FuturesUnordered::new();
for _ in 1..=repeat {
let req = build_request(args, uri.clone()).await?;
res = Some(do_one_request(&mut client, req).await?);
let res = client.request(req);
let span = tracing::info_span!("request");
fut.push(res.instrument(span))
}

let res = res.unwrap();
println!("Response: {} - {:?}", res.status(), res.version());
{
let mut res = None;
while let Some(item) = fut.try_next().await? {
res = Some(item);
}

for (name, value) in res.headers() {
if let Ok(value) = value.to_str() {
println!(" {}: {}", name, value);
let res = res.unwrap();
println!("Response: {} - {:?}", res.status(), res.version());

for (name, value) in res.headers() {
if let Ok(value) = value.to_str() {
println!(" {}: {}", name, value);
}
}
}

let span = tracing::debug_span!("read body");
async {
let mut stdout = tokio::io::stdout();
let span = tracing::debug_span!("read body");
async {
let mut stdout = tokio::io::stdout();

let mut body = res.into_body();
while let Some(chunk) = body.frame().await {
if let Some(chunk) = chunk?.data_ref() {
stdout.write_all(chunk).await?;
let mut body = res.into_body();
while let Some(chunk) = body.frame().await {
if let Some(chunk) = chunk?.data_ref() {
stdout.write_all(chunk).await?;
}
}
Ok::<_, BoxError>(())
}
Ok::<_, BoxError>(())
.instrument(span)
.await?;
}
.instrument(span)
.await?;

tracing::info!("finished");

Expand Down
101 changes: 89 additions & 12 deletions src/client/pool/checkout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::client::conn::Transport;
use crate::info::ConnectionInfo;
use crate::info::HasConnectionInfo;

use super::Config;
use super::Key;
use super::Pool;
use super::PoolRef;
Expand Down Expand Up @@ -358,6 +359,7 @@ where
Waiting,
Connected,
Connecting(#[pin] Connector<T, P, B>),
ConnectingWithDelayDrop(Option<Pin<Box<Connector<T, P, B>>>>),
}

impl<T, P, B> fmt::Debug for InnerCheckoutConnecting<T, P, B>
Expand All @@ -373,6 +375,10 @@ where
InnerCheckoutConnecting::Connecting(connector) => {
f.debug_tuple("Connecting").field(connector).finish()
}
InnerCheckoutConnecting::ConnectingWithDelayDrop(connector) => f
.debug_tuple("ConnectingWithDelayDrop")
.field(connector)
.finish(),
}
}
}
Expand Down Expand Up @@ -408,9 +414,10 @@ impl CheckoutMeta {
#[pin_project(PinnedDrop)]
pub(crate) struct Checkout<T, P, B>
where
T: Transport,
P: Protocol<T::IO, B>,
T: Transport + 'static,
P: Protocol<T::IO, B> + Send + 'static,
P::Connection: PoolableConnection,
B: 'static,
{
key: Key,
pool: PoolRef<P::Connection>,
Expand All @@ -419,6 +426,7 @@ where
#[pin]
inner: InnerCheckoutConnecting<T, P, B>,
connection: Option<P::Connection>,
is_delayed_drop: bool,
meta: CheckoutMeta,
#[cfg(debug_assertions)]
id: CheckoutId,
Expand All @@ -443,10 +451,38 @@ where

impl<T, P, B, C> Checkout<T, P, B>
where
T: Transport,
P: Protocol<T::IO, B, Connection = C>,
T: Transport + 'static,
P: Protocol<T::IO, B, Connection = C> + Send + 'static,
P::Connection: PoolableConnection,
B: 'static,
{
/// Converts this checkout into a "delayed drop" checkout.
fn as_delayed(self: Pin<&mut Self>) -> Option<Self> {
let mut this = self.project();

if *this.is_delayed_drop {
return None;
}

match this.inner.as_mut().project() {
CheckoutConnectingProj::ConnectingWithDelayDrop(connector) if connector.is_some() => {
tracing::trace!("converting checkout to delayed drop");
Some(Checkout {
key: this.key.clone(),
pool: this.pool.clone(),
waiter: Waiting::NoPool,
inner: InnerCheckoutConnecting::ConnectingWithDelayDrop(connector.take()),
connection: None,
is_delayed_drop: true,
meta: CheckoutMeta::new(), // New meta to avoid holding spans in the spawned task
#[cfg(debug_assertions)]
id: *this.id,
})
}
_ => None,
}
}

/// Constructs a checkout which does not hold a reference to the pool
/// and so is only waiting on the connector.
///
Expand All @@ -470,6 +506,7 @@ where
waiter: Waiting::NoPool,
inner: InnerCheckoutConnecting::Connecting(connector),
connection: None,
is_delayed_drop: false,
meta: CheckoutMeta::new(),
#[cfg(debug_assertions)]
id,
Expand All @@ -482,6 +519,7 @@ where
waiter: Receiver<Pooled<P::Connection>>,
connect: Option<Connector<T, P, B>>,
connection: Option<P::Connection>,
config: &Config,
) -> Self {
#[cfg(debug_assertions)]
let id = CheckoutId::new();
Expand All @@ -498,18 +536,27 @@ where
waiter: Waiting::Idle(waiter),
inner: InnerCheckoutConnecting::Connected,
connection,
is_delayed_drop: false,
meta,
#[cfg(debug_assertions)]
id,
}
} else if let Some(connector) = connect {
tracing::trace!(%key, "connecting to pool");

let inner = if config.continue_after_preemption {
InnerCheckoutConnecting::ConnectingWithDelayDrop(Some(Box::pin(connector)))
} else {
InnerCheckoutConnecting::Connecting(connector)
};

Self {
key,
pool: pool.as_ref(),
waiter: Waiting::Idle(waiter),
inner: InnerCheckoutConnecting::Connecting(connector),
inner,
connection,
is_delayed_drop: false,
meta,
#[cfg(debug_assertions)]
id,
Expand All @@ -522,6 +569,7 @@ where
waiter: Waiting::Connecting(waiter),
inner: InnerCheckoutConnecting::Waiting,
connection,
is_delayed_drop: false,
meta,
#[cfg(debug_assertions)]
id,
Expand All @@ -532,9 +580,10 @@ where

impl<T, P, B> Future for Checkout<T, P, B>
where
T: Transport,
P: Protocol<T::IO, B>,
T: Transport + 'static,
P: Protocol<T::IO, B> + Send + 'static,
P::Connection: PoolableConnection,
B: 'static,
{
type Output = Result<
Pooled<P::Connection>,
Expand Down Expand Up @@ -585,15 +634,35 @@ where
CheckoutConnectingProj::Connecting(connector) => {
let result = ready!(connector.poll_connector(this.pool, this.key, this.meta, cx));

this.waiter.close();
this.inner.set(InnerCheckoutConnecting::Connected);

match result {
Ok(connection) => {
this.waiter.close();
this.inner.set(InnerCheckoutConnecting::Connected);
Poll::Ready(Ok(register_connected(this.pool, this.key, connection)))
}
Err(e) => Poll::Ready(Err(e)),
}
}
CheckoutConnectingProj::ConnectingWithDelayDrop(Some(connector)) => {
let result = ready!(connector
.as_mut()
.poll_connector(this.pool, this.key, this.meta, cx));

this.waiter.close();
this.inner.set(InnerCheckoutConnecting::Connected);

match result {
Ok(connection) => {
Poll::Ready(Ok(register_connected(this.pool, this.key, connection)))
}
Err(e) => Poll::Ready(Err(e)),
}
}
CheckoutConnectingProj::ConnectingWithDelayDrop(None) => {
// Something stole our connection, this is an error state.
panic!("connection was stolen from checkout")
}
}
}
}
Expand Down Expand Up @@ -634,15 +703,23 @@ where
#[pinned_drop]
impl<T, P, B> PinnedDrop for Checkout<T, P, B>
where
T: Transport,
P: Protocol<T::IO, B>,
T: Transport + 'static,
P: Protocol<T::IO, B> + Send + 'static,
P::Connection: PoolableConnection,
B: 'static,
{
fn drop(mut self: Pin<&mut Self>) {
#[cfg(debug_assertions)]
tracing::trace!(id=%self.id, "drop for checkout");

if let Some(mut pool) = self.pool.lock() {
if let Some(checkout) = self.as_mut().as_delayed() {
tokio::task::spawn(async move {
if let Err(err) = checkout.await {
tracing::error!(error=%err, "error during delayed drop");
}
});
} else if let Some(mut pool) = self.pool.lock() {
// Connection is only cancled when no delayed drop occurs.
pool.cancel_connection(&self.key);
}
}
Expand Down
Loading

0 comments on commit 5102d7e

Please sign in to comment.