Skip to content

Commit

Permalink
impl on http2
Browse files Browse the repository at this point in the history
  • Loading branch information
hatoo committed Jan 30, 2024
1 parent 0d68cf8 commit 850ea9e
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use futures::future::FutureExt;
use futures::{future::FutureExt, Future};
use http_body_util::Full;
use hyper::{
body::{Body, Incoming},
Expand Down Expand Up @@ -413,14 +413,13 @@ impl Client {
}
}

async fn work_http1(
fn timeout_future(
&self,
client_state: &mut ClientStateHttp1,
dead_line: Option<std::time::Instant>,
) -> Result<RequestResult, ClientError> {
let timeout = match (dead_line, self.timeout) {
) -> Pin<Box<dyn Future<Output = ClientError> + Send>> {
match (dead_line, self.timeout) {
(Some(dead_line), Some(timeout)) => {
if std::time::Instant::now() + timeout > dead_line {
if tokio::time::Instant::now() + timeout > dead_line.into() {
async move {
tokio::time::sleep_until(dead_line.into()).await;
ClientError::Deadline
Expand All @@ -445,7 +444,15 @@ impl Client {
}
.boxed(),
(None, None) => std::future::pending().boxed(),
};
}
}

async fn work_http1(
&self,
client_state: &mut ClientStateHttp1,
dead_line: Option<std::time::Instant>,
) -> Result<RequestResult, ClientError> {
let timeout = self.timeout_future(dead_line);

let do_req = async {
let url = self.url_generator.generate(&mut client_state.rng)?;
Expand Down Expand Up @@ -558,12 +565,9 @@ impl Client {
async fn work_http2(
&self,
client_state: &mut ClientStateHttp2,
dead_line: Option<std::time::Instant>,
) -> Result<RequestResult, ClientError> {
let timeout = if let Some(timeout) = self.timeout {
tokio::time::sleep(timeout).boxed()
} else {
std::future::pending().boxed()
};
let timeout = self.timeout_future(dead_line);

let do_req = async {
let url = self.url_generator.generate(&mut client_state.rng)?;
Expand Down Expand Up @@ -606,8 +610,8 @@ impl Client {
res = do_req => {
res
}
_ = timeout => {
Err(ClientError::Timeout)
client_error = timeout => {
Err(client_error)
}
}
}
Expand Down Expand Up @@ -821,8 +825,9 @@ pub async fn work(
tokio::spawn(async move {
while counter.fetch_add(1, Ordering::Relaxed) < n_tasks
{
let mut res =
client.work_http2(&mut client_state).await;
let mut res = client
.work_http2(&mut client_state, None)
.await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
Expand Down Expand Up @@ -964,8 +969,9 @@ pub async fn work_with_qps(
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
let mut res = client
.work_http2(&mut client_state, None)
.await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
Expand Down Expand Up @@ -1107,8 +1113,9 @@ pub async fn work_with_qps_latency_correction(
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
let mut res = client
.work_http2(&mut client_state, None)
.await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
Expand Down Expand Up @@ -1210,8 +1217,9 @@ pub async fn work_until(
tokio::spawn(async move {
// This is where HTTP2 loops to make all the requests for a given client and worker
loop {
let mut res =
client.work_http2(&mut client_state).await;
let mut res = client
.work_http2(&mut client_state, Some(dead_line))
.await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
Expand All @@ -1222,24 +1230,25 @@ pub async fn work_until(
}
})
})
.chain(std::iter::once(tokio::spawn(async move {
tokio::time::sleep_until(dead_line.into()).await;
true
})))
.collect::<Vec<_>>();

let (is_cancel, _, rest) =
futures::future::select_all(futures).await;
for f in rest {
f.abort();
let _ = f.await;
}

if matches!(is_cancel, Ok(true)) {
break;
}
}

Err(err) => report_tx.send_async(Err(err)).await.unwrap(),
Err(err) => {
report_tx.send_async(Err(err)).await.unwrap();
if tokio::time::Instant::now() >= dead_line.into() {
break;
}
}
}
}
})
Expand Down Expand Up @@ -1343,8 +1352,9 @@ pub async fn work_until_with_qps(
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
let mut res = client
.work_http2(&mut client_state, Some(dead_line))
.await;
let is_cancel = is_cancel_error(&res);
let is_reconnect = is_hyper_error(&res);
set_connection_time(&mut res, connection_time);
Expand Down Expand Up @@ -1487,8 +1497,9 @@ pub async fn work_until_with_qps_latency_correction(
let mut client_state = client_state.clone();
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
let mut res =
client.work_http2(&mut client_state).await;
let mut res = client
.work_http2(&mut client_state, Some(dead_line))
.await;
set_start_latency_correction(&mut res, start);
set_connection_time(&mut res, connection_time);
let is_cancel = is_cancel_error(&res);
Expand Down

0 comments on commit 850ea9e

Please sign in to comment.