From 7ecdb0cac5ebab2e33d7877d9dfaba80d1ad9b9e Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 27 Nov 2023 22:58:59 +0900 Subject: [PATCH 1/3] cargo fmt -- --config imports_granularity=Crate --- src/client.rs | 17 ++++++++++------- src/monitor.rs | 33 +++++++++++++++++++-------------- src/printer.rs | 19 ++++++++++--------- src/timescale.rs | 3 +-- 4 files changed, 40 insertions(+), 32 deletions(-) diff --git a/src/client.rs b/src/client.rs index 28ebae3a..d6a2a0c1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,17 +1,20 @@ use futures::future::FutureExt; use http_body_util::Full; -use hyper::body::{Body, Incoming}; -use hyper::http; +use hyper::{ + body::{Body, Incoming}, + http, +}; use rand::prelude::*; -use std::pin::Pin; -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use thiserror::Error; use tokio::net::TcpStream; use url::{ParseError, Url}; -use crate::tokiort::{TokioExecutor, TokioIo}; -use crate::url_generator::{UrlGenerator, UrlGeneratorError}; -use crate::ConnectToEntry; +use crate::{ + tokiort::{TokioExecutor, TokioIo}, + url_generator::{UrlGenerator, UrlGeneratorError}, + ConnectToEntry, +}; type SendRequestHttp1 = hyper::client::conn::http1::SendRequest>; type SendRequestHttp2 = hyper::client::conn::http2::SendRequest>; diff --git a/src/monitor.rs b/src/monitor.rs index 697f4010..08c26aec 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -1,20 +1,25 @@ use byte_unit::Byte; -use crossterm::event::{Event, KeyCode, KeyEvent, KeyModifiers}; -use crossterm::ExecutableCommand; +use crossterm::{ + event::{Event, KeyCode, KeyEvent, KeyModifiers}, + ExecutableCommand, +}; use flume::TryRecvError; use hyper::http; -use ratatui::backend::CrosstermBackend; -use ratatui::layout::{Constraint, Direction, Layout}; -use ratatui::style::{Color, Style}; -use ratatui::text::{Line, Span}; -use ratatui::widgets::{BarChart, Block, Borders, Gauge, Paragraph}; -use ratatui::Terminal; -use std::collections::BTreeMap; -use std::io; - -use crate::client::{ClientError, RequestResult}; -use crate::printer::PrintMode; -use crate::timescale::{TimeLabel, TimeScale}; +use ratatui::{ + backend::CrosstermBackend, + layout::{Constraint, Direction, Layout}, + style::{Color, Style}, + text::{Line, Span}, + widgets::{BarChart, Block, Borders, Gauge, Paragraph}, + Terminal, +}; +use std::{collections::BTreeMap, io}; + +use crate::{ + client::{ClientError, RequestResult}, + printer::PrintMode, + timescale::{TimeLabel, TimeScale}, +}; /// When the monitor ends pub enum EndLine { diff --git a/src/printer.rs b/src/printer.rs index 13c27b90..2789e811 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -1,15 +1,16 @@ -use crate::client::ConnectionTime; -use crate::client::RequestResult; -use crate::histogram::histogram; -use average::Max; -use average::Variance; +use crate::{ + client::{ConnectionTime, RequestResult}, + histogram::histogram, +}; +use average::{Max, Variance}; use byte_unit::Byte; use crossterm::style::{StyledContent, Stylize}; use hyper::http::{self, StatusCode}; -use std::collections::BTreeMap; -use std::io::Write; -use std::time::Duration; -use std::time::Instant; +use std::{ + collections::BTreeMap, + io::Write, + time::{Duration, Instant}, +}; #[derive(Clone, Copy)] struct StyleScheme { diff --git a/src/timescale.rs b/src/timescale.rs index a06848c4..168634db 100644 --- a/src/timescale.rs +++ b/src/timescale.rs @@ -1,5 +1,4 @@ -use std::fmt; -use std::time::Duration; +use std::{fmt, time::Duration}; #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum TimeScale { From 3a742c3c32b15f0a7803e7556bc9766327406946 Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 27 Nov 2023 23:41:52 +0900 Subject: [PATCH 2/3] update axum --- Cargo.lock | 124 ++++++++++++--------------------------- Cargo.toml | 4 +- tests/tests.rs | 155 ++++++++++++++++++++----------------------------- 3 files changed, 103 insertions(+), 180 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba5f27f2..bddc2826 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,18 +143,19 @@ dependencies = [ [[package]] name = "axum" -version = "0.6.20" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +checksum = "810a80b128d70e6ed2bdf3fe8ed72c0ae56f5f5948d01c2753282dd92a84fce8" dependencies = [ "async-trait", "axum-core", - "bitflags 1.3.2", "bytes", "futures-util", - "http 0.2.11", - "http-body 0.4.5", - "hyper 0.14.27", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", "itoa", "matchit", "memchr", @@ -175,17 +176,20 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.3.4" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +checksum = "de0ddc355eab88f4955090a823715df47acf0b7660aab7a69ad5ce6301ee3b73" dependencies = [ "async-trait", "bytes", "futures-util", - "http 0.2.11", - "http-body 0.4.5", + "http", + "http-body", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper", "tower-layer", "tower-service", ] @@ -587,25 +591,6 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" -[[package]] -name = "h2" -version = "0.3.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.11", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "h2" version = "0.4.0" @@ -617,7 +602,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 1.0.0", + "http", "indexmap", "slab", "tokio", @@ -703,17 +688,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "http" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.0.0" @@ -725,17 +699,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" -dependencies = [ - "bytes", - "http 0.2.11", - "pin-project-lite", -] - [[package]] name = "http-body" version = "1.0.0" @@ -743,7 +706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http 1.0.0", + "http", ] [[package]] @@ -754,8 +717,8 @@ checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http 1.0.0", - "http-body 1.0.0", + "http", + "http-body", "pin-project-lite", ] @@ -779,45 +742,42 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.27" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "24590385be94998c5def4cf53d34edc5381144c805126f00efb954d986f9a7b2" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", - "h2 0.3.22", - "http 0.2.11", - "http-body 0.4.5", + "h2", + "http", + "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", "tokio", - "tower-service", - "tracing", "want", ] [[package]] -name = "hyper" -version = "1.0.0" +name = "hyper-util" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24590385be94998c5def4cf53d34edc5381144c805126f00efb954d986f9a7b2" +checksum = "9ca339002caeb0d159cc6e023dff48e199f081e42fa039895c7c6f38b37f2e9d" dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.0", - "http 1.0.0", - "http-body 1.0.0", - "httparse", - "itoa", + "http", + "http-body", + "hyper", "pin-project-lite", + "socket2", "tokio", - "want", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -852,7 +812,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2 0.5.5", + "socket2", "widestring", "windows-sys", "winreg", @@ -1092,8 +1052,8 @@ dependencies = [ "hickory-resolver", "http-body-util", "humantime", - "hyper 0.14.27", - "hyper 1.0.0", + "hyper", + "hyper-util", "jemallocator", "lazy_static", "libc", @@ -1652,16 +1612,6 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.5" @@ -1794,7 +1744,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.5", + "socket2", "tokio-macros", "windows-sys", ] diff --git a/Cargo.toml b/Cargo.toml index efe25cc9..663ae08c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,9 +68,9 @@ rlimit = "0.10.0" jemallocator = "0.5.0" [dev-dependencies] -hyper14 = { package = "hyper", version = "0.14", features = ["full"] } assert_cmd = "2.0.2" -axum = { version = "0.6.20", features = ["http2"] } +axum = { version = "0.7", features = ["http2"] } bytes = "1.0" +hyper-util = "0.1.1" lazy_static = "1.4.0" regex = "1.9.6" diff --git a/tests/tests.rs b/tests/tests.rs index df51b197..654bbd42 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -6,35 +6,32 @@ use std::{ use assert_cmd::Command; use axum::{ - extract::{Path, RawBody}, + extract::Path, response::Redirect, routing::{any, get}, Router, }; +use bytes::Bytes; +use futures::StreamExt; use http::{HeaderMap, Request, Response}; -use hyper14::{ - body::to_bytes, - http, - server::conn::AddrIncoming, - service::{make_service_fn, service_fn}, - Body, -}; +use hyper::{http, service::service_fn}; +use hyper_util::rt::{TokioExecutor, TokioIo}; // Port 5111- is reserved for testing static PORT: AtomicU16 = AtomicU16::new(5111); -fn bind_port() -> (hyper14::server::Builder, u16) { +async fn bind_port() -> (tokio::net::TcpListener, u16) { let port = PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), port); - (axum::Server::bind(&addr), port) + (tokio::net::TcpListener::bind(addr).await.unwrap(), port) } -fn bind_port_ipv6() -> (hyper14::server::Builder, u16) { +async fn bind_port_ipv6() -> (tokio::net::TcpListener, u16) { let port = PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::LOCALHOST), port); - (axum::Server::bind(&addr), port) + (tokio::net::TcpListener::bind(addr).await.unwrap(), port) } async fn get_header_body(args: &[&str]) -> (HeaderMap, bytes::Bytes) { @@ -42,17 +39,21 @@ async fn get_header_body(args: &[&str]) -> (HeaderMap, bytes::Bytes) { let app = Router::new().route( "/", - get(|header: HeaderMap, RawBody(body): RawBody| async move { - tx.send((header, to_bytes(body).await.unwrap())).unwrap(); - "Hello World" - }), + get( + |header: HeaderMap, req: axum::extract::Request| async move { + let mut body = Vec::new(); + let mut stream = req.into_body().into_data_stream(); + while let Some(data) = stream.next().await { + body.extend_from_slice(data.unwrap().as_ref()); + } + tx.send((header, Bytes::from(body))).unwrap(); + "Hello World" + }, + ), ); - let (server, port) = bind_port(); - - tokio::spawn(async { - server.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); let args: Vec = args.iter().map(|s| s.to_string()).collect(); tokio::task::spawn_blocking(move || { @@ -80,11 +81,8 @@ async fn get_method(args: &[&str]) -> http::method::Method { }), ); - let (server, port) = bind_port(); - - tokio::spawn(async { - server.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); let args: Vec = args.iter().map(|s| s.to_string()).collect(); @@ -104,28 +102,18 @@ async fn get_method(args: &[&str]) -> http::method::Method { } async fn get_query(p: &'static str, args: &[&str]) -> String { - use hyper::Error; let (tx, rx) = flume::unbounded(); - let make_svc = make_service_fn(move |_| { - let tx = tx.clone(); - async move { - Ok::<_, Error>(service_fn(move |req| { - let tx = tx.clone(); - async move { - let (parts, _) = req.into_parts(); - tx.send(parts.uri.to_string()).unwrap(); - Ok::<_, Error>(Response::new(Body::from("Hello World"))) - } - })) - } - }); - - let (server, port) = bind_port(); + let app = Router::new().route( + "/index", + get(move |req: Request| async move { + tx.send(req.uri().to_string()).unwrap(); + "Hello World" + }), + ); - tokio::spawn(async move { - server.serve(make_svc).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); let args: Vec = args.iter().map(|s| s.to_string()).collect(); @@ -148,26 +136,28 @@ async fn get_path_rand_regex(p: &'static str, args: &[&str]) -> String { use hyper::Error; let (tx, rx) = flume::unbounded(); - let make_svc = make_service_fn(move |_| { - let tx = tx.clone(); - async move { - Ok::<_, Error>(service_fn(move |req| { + + let (listener, port) = bind_port().await; + tokio::spawn(async move { + loop { + let (socket, _remote_addr) = listener.accept().await.unwrap(); + let tx = tx.clone(); + let service = service_fn(move |req| { let tx = tx.clone(); async move { let (parts, _) = req.into_parts(); tx.send(parts.uri.to_string()).unwrap(); - Ok::<_, Error>(Response::new(Body::from("Hello World"))) + Ok::<_, Error>(Response::new("Hello World".to_string())) } - })) + }); + tokio::spawn(async move { + hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection(TokioIo::new(socket), service) + .await + }); } }); - let (server, port) = bind_port(); - - tokio::spawn(async move { - server.serve(make_svc).await.unwrap(); - }); - let args: Vec = args.iter().map(|s| s.to_string()).collect(); tokio::task::spawn_blocking(move || { Command::cargo_bin("oha") @@ -187,7 +177,7 @@ async fn get_path_rand_regex(p: &'static str, args: &[&str]) -> String { async fn redirect(n: usize, is_relative: bool, limit: usize) -> bool { let (tx, rx) = flume::unbounded(); - let (server, port) = bind_port(); + let (listener, port) = bind_port().await; let app = Router::new().route( "/:n", @@ -203,9 +193,7 @@ async fn redirect(n: usize, is_relative: bool, limit: usize) -> bool { }), ); - tokio::spawn(async move { - server.serve(app.into_make_service()).await.unwrap(); - }); + tokio::spawn(async { axum::serve(listener, app).await }); tokio::task::spawn_blocking(move || { Command::cargo_bin("oha") @@ -234,10 +222,8 @@ async fn get_host_with_connect_to(host: &'static str) -> String { }), ); - let (server, port) = bind_port(); - tokio::spawn(async move { - server.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); tokio::task::spawn_blocking(move || { Command::cargo_bin("oha") @@ -266,10 +252,8 @@ async fn get_host_with_connect_to_ipv6_target(host: &'static str) -> String { }), ); - let (server, port) = bind_port_ipv6(); - tokio::spawn(async move { - server.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port_ipv6().await; + tokio::spawn(async { axum::serve(listener, app).await }); tokio::task::spawn_blocking(move || { Command::cargo_bin("oha") @@ -298,10 +282,8 @@ async fn get_host_with_connect_to_ipv6_requested() -> String { }), ); - let (server, port) = bind_port(); - tokio::spawn(async move { - server.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); tokio::task::spawn_blocking(move || { Command::cargo_bin("oha") @@ -335,10 +317,8 @@ async fn get_host_with_connect_to_redirect(host: &'static str) -> String { }), ); - let (server, port) = bind_port(); - tokio::spawn(async move { - server.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); tokio::task::spawn_blocking(move || { Command::cargo_bin("oha") @@ -367,10 +347,8 @@ async fn burst_10_req_delay_2s_rate_4(iteration: u8, args: &[&str]) -> usize { }), ); - let (service, port) = bind_port(); - tokio::spawn(async move { - service.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); let args: Vec = args.iter().map(|s| s.to_string()).collect(); tokio::task::spawn_blocking(move || { @@ -432,17 +410,14 @@ async fn get_http_version(args: &[&str]) -> http::Version { let app = Router::new().route( "/", - get(|req: Request| async move { + get(|req: Request| async move { tx.send(req.version()).unwrap(); "Hello World" }), ); - let (server, port) = bind_port(); - - tokio::spawn(async { - server.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port().await; + tokio::spawn(async { axum::serve(listener, app).await }); let args: Vec = args.iter().map(|s| s.to_string()).collect(); tokio::task::spawn_blocking(move || { @@ -720,10 +695,8 @@ async fn test_ipv6() { }), ); - let (service, port) = bind_port_ipv6(); - tokio::spawn(async move { - service.serve(app.into_make_service()).await.unwrap(); - }); + let (listener, port) = bind_port_ipv6().await; + tokio::spawn(async { axum::serve(listener, app).await }); tokio::task::spawn_blocking(move || { Command::cargo_bin("oha") From bc0379e34808200235f0a6d93f95024898e4c36b Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 27 Nov 2023 23:43:51 +0900 Subject: [PATCH 3/3] hyper-util --- Cargo.toml | 2 +- src/client.rs | 4 +- src/main.rs | 1 - src/tokiort.rs | 259 ------------------------------------------------- 4 files changed, 3 insertions(+), 263 deletions(-) delete mode 100644 src/tokiort.rs diff --git a/Cargo.toml b/Cargo.toml index 663ae08c..2408ed08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ regex-syntax = "0.7.4" url = "2.4.0" pin-project-lite = "0.2.13" http-body-util = "0.1.0" +hyper-util = { version = "0.1.1", features = ["tokio"] } [target.'cfg(unix)'.dependencies] rlimit = "0.10.0" @@ -71,6 +72,5 @@ jemallocator = "0.5.0" assert_cmd = "2.0.2" axum = { version = "0.7", features = ["http2"] } bytes = "1.0" -hyper-util = "0.1.1" lazy_static = "1.4.0" regex = "1.9.6" diff --git a/src/client.rs b/src/client.rs index d6a2a0c1..3a62afd6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ use hyper::{ body::{Body, Incoming}, http, }; +use hyper_util::rt::{TokioExecutor, TokioIo}; use rand::prelude::*; use std::{pin::Pin, sync::Arc}; use thiserror::Error; @@ -11,7 +12,6 @@ use tokio::net::TcpStream; use url::{ParseError, Url}; use crate::{ - tokiort::{TokioExecutor, TokioIo}, url_generator::{UrlGenerator, UrlGeneratorError}, ConnectToEntry, }; @@ -238,7 +238,7 @@ impl Stream { } } async fn handshake_http2(self) -> Result { - let builder = hyper::client::conn::http2::Builder::new(TokioExecutor); + let builder = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); match self { Stream::Tcp(stream) => { diff --git a/src/main.rs b/src/main.rs index 15f55d1b..45440052 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,6 @@ mod histogram; mod monitor; mod printer; mod timescale; -mod tokiort; mod url_generator; #[cfg(unix)] diff --git a/src/tokiort.rs b/src/tokiort.rs deleted file mode 100644 index 799795ea..00000000 --- a/src/tokiort.rs +++ /dev/null @@ -1,259 +0,0 @@ -// This entire codes are copied from https://github.com/hyperium/hyper/blob/f9f65b7aa67fa3ec0267fe015945973726285bc2/benches/support/tokiort.rs -/* -Copyright (c) 2014-2021 Sean McArthur - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -*/ - -#![allow(dead_code)] -//! Various runtimes for hyper -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use hyper::rt::{Sleep, Timer}; -use pin_project_lite::pin_project; - -#[derive(Clone)] -/// An Executor that uses the tokio runtime. -pub struct TokioExecutor; - -impl hyper::rt::Executor for TokioExecutor -where - F: std::future::Future + Send + 'static, - F::Output: Send + 'static, -{ - fn execute(&self, fut: F) { - tokio::task::spawn(fut); - } -} - -/// A Timer that uses the tokio runtime. - -#[derive(Clone, Debug)] -pub struct TokioTimer; - -impl Timer for TokioTimer { - fn sleep(&self, duration: Duration) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep(duration), - }) - } - - fn sleep_until(&self, deadline: Instant) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep_until(deadline.into()), - }) - } - - fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { - if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { - sleep.reset(new_deadline) - } - } -} - -struct TokioTimeout { - inner: Pin>>, -} - -impl Future for TokioTimeout -where - T: Future, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll { - self.inner.as_mut().poll(context) - } -} - -// Use TokioSleep to get tokio::time::Sleep to implement Unpin. -// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html -pin_project! { - pub(crate) struct TokioSleep { - #[pin] - pub(crate) inner: tokio::time::Sleep, - } -} - -impl Future for TokioSleep { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) - } -} - -impl Sleep for TokioSleep {} - -impl TokioSleep { - pub fn reset(self: Pin<&mut Self>, deadline: Instant) { - self.project().inner.as_mut().reset(deadline.into()); - } -} - -pin_project! { - #[derive(Debug)] - pub struct TokioIo { - #[pin] - inner: T, - } -} - -impl TokioIo { - pub fn new(inner: T) -> Self { - Self { inner } - } - - pub fn inner(self) -> T { - self.inner - } -} - -impl hyper::rt::Read for TokioIo -where - T: tokio::io::AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut buf: hyper::rt::ReadBufCursor<'_>, - ) -> Poll> { - let n = unsafe { - let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); - match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { - Poll::Ready(Ok(())) => tbuf.filled().len(), - other => return other, - } - }; - - unsafe { - buf.advance(n); - } - Poll::Ready(Ok(())) - } -} - -impl hyper::rt::Write for TokioIo -where - T: tokio::io::AsyncWrite, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) - } - - fn is_write_vectored(&self) -> bool { - tokio::io::AsyncWrite::is_write_vectored(&self.inner) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) - } -} - -impl tokio::io::AsyncRead for TokioIo -where - T: hyper::rt::Read, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - tbuf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - //let init = tbuf.initialized().len(); - let filled = tbuf.filled().len(); - let sub_filled = unsafe { - let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); - - match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { - Poll::Ready(Ok(())) => buf.filled().len(), - other => return other, - } - }; - - let n_filled = filled + sub_filled; - // At least sub_filled bytes had to have been initialized. - let n_init = sub_filled; - unsafe { - tbuf.assume_init(n_init); - tbuf.set_filled(n_filled); - } - - Poll::Ready(Ok(())) - } -} - -impl tokio::io::AsyncWrite for TokioIo -where - T: hyper::rt::Write, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - hyper::rt::Write::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - hyper::rt::Write::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - hyper::rt::Write::poll_shutdown(self.project().inner, cx) - } - - fn is_write_vectored(&self) -> bool { - hyper::rt::Write::is_write_vectored(&self.inner) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) - } -}