Skip to content

Commit

Permalink
chore: remove hyper from Cargo dependencies (#3759)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 authored Nov 28, 2023
1 parent 19ffaa8 commit fef86c4
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 63 deletions.
26 changes: 14 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ http = "1.0"
http0 = { package = "http", version = "0.2" }
human-repr = "1.0"
humantime = "2.1.0"
hyper = "0.14"
indexmap = { version = "2.1", features = ["serde"] }
indicatif = { version = "0.17.6", features = ["tokio"] }
integer-encoding = "4.0"
Expand Down Expand Up @@ -119,7 +118,7 @@ mimalloc = { version = "0.1.39", optional = true, default-features = false }
multiaddr = "0.18"
multimap = "0.9.0"
nom = "7.1.3"
nonempty = "0.8.0"
nonempty = "0.9.0"
nonzero_ext = "0.3.0"
num = "0.4.0"
num-bigint = "0.4"
Expand Down Expand Up @@ -184,7 +183,7 @@ tracing-appender = "0.2"
tracing-chrome = "0.7.1"
tracing-loki = { version = "0.2", default-features = false, features = ["compat-0-2-1", "rustls"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
unsigned-varint = { version = "0.7", features = ["codec"] }
unsigned-varint = { version = "0.8", features = ["codec"] }
url = { version = "2.3", features = ["serde"] }
walkdir = "2"
zstd = "0.13"
Expand Down
11 changes: 5 additions & 6 deletions src/utils/reqwest_resume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use bytes::Bytes;
use futures::{ready, FutureExt as _, Stream, TryFutureExt as _};
use hyper::header::{self, HeaderMap, HeaderValue};
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -74,8 +73,8 @@ impl RequestBuilder {
};
let accept_byte_ranges = response
.headers()
.get(header::ACCEPT_RANGES)
.map(HeaderValue::as_bytes)
.get(http0::header::ACCEPT_RANGES)
.map(http0::HeaderValue::as_bytes)
== Some(b"bytes");
let resp = Response {
client,
Expand Down Expand Up @@ -140,10 +139,10 @@ impl Stream for Decoder {
break Poll::Ready(Some(Err(err)));
}
let builder = self.client.request(self.method.clone(), self.url.clone());
let mut headers = HeaderMap::new();
let value = HeaderValue::from_str(&std::format!("bytes={}-", self.pos))
let mut headers = http0::HeaderMap::new();
let value = http0::HeaderValue::from_str(&std::format!("bytes={}-", self.pos))
.expect("unreachable");
headers.insert(header::RANGE, value);
headers.insert(http0::header::RANGE, value);
let builder = builder.headers(headers);
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
self.decoder = Box::pin(
Expand Down
88 changes: 46 additions & 42 deletions src/utils/reqwest_resume/tests.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::utils::reqwest_resume::get;
use axum::body::Body;
use axum::response::IntoResponse;
use bytes::Bytes;
use const_random::const_random;
use futures::stream::StreamExt;
use futures::stream;
use http_range_header::parse_range_header;
use hyper::header::{self, HeaderValue};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddr};
use std::ops::Range;
use std::time::Duration;
use tokio::time::sleep;
use tokio::net::TcpListener;
use tokio_stream::StreamExt as _;

const CHUNK_LEN: usize = 2048;
// `RANDOM_BYTES` size is arbitrarily chosen. We could use something smaller or bigger here.
// The only constraint is that `CHUNK_LEN < RANDOM_BYTES.len()`.
const RANDOM_BYTES: [u8; 8192] = const_random!([u8; 8192]);

fn get_range(value: &HeaderValue) -> Range<usize> {
fn get_range(value: &http::HeaderValue) -> Range<usize> {
let s = std::str::from_utf8(value.as_bytes()).unwrap();
let parse_ranges = parse_range_header(s).unwrap();
parse_ranges
Expand All @@ -35,50 +35,52 @@ fn get_range(value: &HeaderValue) -> Range<usize> {

/// Sends a subset of `RANDOM_BYTES` data on each request. This function will introduce an error
/// to simulate a flaky server by aborting the connection after sending the data.
async fn handle_request(req: Request<Body>) -> Result<Response<Body>, Infallible> {
let (mut sender, body) = Body::channel();

let range = req
.headers()
.get(header::RANGE)
async fn handle_request(headers: http::HeaderMap) -> impl IntoResponse {
let range = headers
.get(http::header::RANGE)
.map_or(0..CHUNK_LEN, get_range);
tokio::task::spawn(async move {

let (status_code, body) = if range.is_empty() {
(http::StatusCode::RANGE_NOT_SATISFIABLE, Body::empty())
} else {
let mut subset: Bytes = RANDOM_BYTES[range.clone()].into();
subset.truncate(CHUNK_LEN);
sender.send_data(subset).await.unwrap();
// Abort only if we don't have sent all the data. This will be signaled by an empty range.
if !range.is_empty() {
// Sleep to ensure the data is sent before the connection is closed.
sleep(Duration::from_millis(100)).await;
// `abort` will close the connection with an error so we can test the
// resume functionality.
sender.abort();
}
});

let mut response: Response<_> = Response::new(body);
response
.headers_mut()
.insert(header::ACCEPT_RANGES, HeaderValue::from_static("bytes"));
Ok(response)
(
http::StatusCode::PARTIAL_CONTENT,
Body::from_stream(
stream::iter([anyhow::Ok(subset), Err(anyhow::anyhow!("Unexpected EOF"))])
.throttle(Duration::from_millis(100)),
),
)
};

let response_headers = [(http::header::ACCEPT_RANGES, "bytes")];
(status_code, response_headers, body)
}

async fn create_flaky_server() -> SocketAddr {
let make_svc =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle_request)) });

let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0 /* OS-assigned */);

let server = Server::bind(&addr).serve(make_svc);
let addr = server.local_addr();
async fn create_listener() -> TcpListener {
TcpListener::bind(SocketAddr::new(
Ipv4Addr::LOCALHOST.into(),
0, /* OS-assigned */
))
.await
.unwrap()
}

tokio::task::spawn(server);
addr
fn create_flaky_server(listener: TcpListener) {
tokio::task::spawn(async move {
let app = axum::Router::new().route("/", axum::routing::get(handle_request));
axum::serve(listener, app.into_make_service())
.await
.unwrap()
});
}

#[tokio::test]
async fn test_resumable_get() {
let addr = create_flaky_server().await;
let listener = create_listener().await;
let addr = listener.local_addr().unwrap();
create_flaky_server(listener);

let resp = get(reqwest::Url::parse(&format!("http://{addr}")).unwrap())
.await
Expand All @@ -95,7 +97,9 @@ async fn test_resumable_get() {

#[tokio::test]
async fn test_non_resumable_get() {
let addr = create_flaky_server().await;
let listener = create_listener().await;
let addr = listener.local_addr().unwrap();
create_flaky_server(listener);

let resp = reqwest::get(reqwest::Url::parse(&format!("http://{addr}")).unwrap())
.await
Expand Down

0 comments on commit fef86c4

Please sign in to comment.