Skip to content

Commit

Permalink
feat: Upgrade http / hyper to 1.0 with axum 0.7
Browse files Browse the repository at this point in the history
Signed-off-by: Natsuki Ikeguchi <[email protected]>
  • Loading branch information
siketyan committed Jan 14, 2024
1 parent a425ca7 commit f84092d
Show file tree
Hide file tree
Showing 28 changed files with 259 additions and 221 deletions.
16 changes: 9 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ path = "src/lib.rs"

[features]
default = []
hyper = ["dep:tokio","dep:hyper", "dep:hyper-rustls", "dep:tokio-stream","dep:tokio-tungstenite", "dep:tokio-tungstenite", "dep:signal-hook", "dep:signal-hook-tokio"]
hyper = ["dep:tokio", "dep:http-body-util", "dep:hyper", "dep:hyper-rustls", "dep:hyper-util", "dep:tokio-stream","dep:tokio-tungstenite", "dep:tokio-tungstenite", "dep:signal-hook", "dep:signal-hook-tokio"]
axum = ["hyper", "dep:axum", "dep:tower"]

[dependencies]
Expand All @@ -39,20 +39,22 @@ hex = "0.4"
tracing = "0.1"
ring = "0.17"
lazy_static = "1.4"
http = "0.2"
http = "1.0"
async-trait = "0.1"
bytes = "1"
rand = "0.8"
async-recursion="1.0"
mime = "0.3"
chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] }
url = { version = "2.5", features = ["serde"]}
hyper = { version ="0.14", features = ["http2","server", "client", "h2", "stream"], optional = true }
http-body-util = { version = "0.1", optional = true }
hyper = { version ="1.0", features = ["http2","server", "client"], optional = true }
hyper-util = { version = "0.1", features = ["client", "client-legacy", "server"], optional = true }
tokio = { version = "1", features = ["bytes","rt-multi-thread","signal","tracing"], optional = true }
tokio-stream = { version = "0.1", optional = true }
hyper-rustls = { version="0.24", features = ["rustls-native-certs", "http2"], optional = true }
hyper-rustls = { version="0.26", features = ["rustls-native-certs", "http2"], optional = true }
tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-native-roots"], optional = true }
axum = { version = "0.6", optional = true }
axum = { version = "0.7", optional = true }
tower = { version = "0.4", optional = true }
serde_urlencoded = "0.7.1"

Expand All @@ -67,8 +69,8 @@ ctrlc = { version = "3.4", features = ["termination"] }
cargo-husky = { version = "1", default-features = false, features = ["run-for-all", "prepush-hook", "run-cargo-fmt"] }
cargo-audit = "0.18"
tracing-subscriber = { version ="0.3", features = ["env-filter"] }
hyper-proxy = "0.9"
hyper = { version ="0.14", features = ["full"] }
hyper-proxy2 = "0.1"
hyper = { version ="1.0", features = ["full"] }
tokio = { version = "1", features = ["full"] }

[package.metadata.release]
Expand Down
20 changes: 13 additions & 7 deletions examples/axum_events_api_server.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use slack_morphism::prelude::*;

use hyper::{Body, Response};
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty, Full};
use hyper::Response;
use tracing::*;

use axum::Extension;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::net::TcpListener;

async fn test_oauth_install_function(
resp: SlackOAuthV2AccessTokenResponse,
Expand All @@ -29,12 +34,14 @@ async fn test_error_install() -> String {
async fn test_push_event(
Extension(_environment): Extension<Arc<SlackHyperListenerEnvironment>>,
Extension(event): Extension<SlackPushEvent>,
) -> Response<Body> {
) -> Response<BoxBody<Bytes, Infallible>> {
println!("Received push event: {:?}", event);

match event {
SlackPushEvent::UrlVerification(url_ver) => Response::new(Body::from(url_ver.challenge)),
_ => Response::new(Body::empty()),
SlackPushEvent::UrlVerification(url_ver) => {
Response::new(Full::new(url_ver.challenge.into()).boxed())
}
_ => Response::new(Empty::new().boxed()),
}
}

Expand Down Expand Up @@ -68,7 +75,7 @@ fn test_error_handler(

async fn test_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client: Arc<SlackHyperClient> =
Arc::new(SlackClient::new(SlackClientHyperConnector::new()));
Arc::new(SlackClient::new(SlackClientHyperConnector::new()?));

let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 8080));
info!("Loading server: {}", addr);
Expand Down Expand Up @@ -123,8 +130,7 @@ async fn test_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
),
);

axum::Server::bind(&addr)
.serve(app.into_make_service())
axum::serve(TcpListener::bind(&addr).await.unwrap(), app)
.await
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::stream::BoxStream;
use futures::TryStreamExt;

async fn test_simple_api_calls() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = SlackClient::new(SlackClientHyperConnector::new());
let client = SlackClient::new(SlackClientHyperConnector::new()?);
let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into();
let token: SlackApiToken = SlackApiToken::new(token_value);

Expand All @@ -31,7 +31,7 @@ async fn test_simple_api_calls() -> Result<(), Box<dyn std::error::Error + Send
}

async fn test_post_message() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = SlackClient::new(SlackClientHyperConnector::new());
let client = SlackClient::new(SlackClientHyperConnector::new()?);
let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into();
let token: SlackApiToken = SlackApiToken::new(token_value);
let session = client.open_session(&token);
Expand All @@ -48,7 +48,7 @@ async fn test_post_message() -> Result<(), Box<dyn std::error::Error + Send + Sy
}

async fn test_scrolling_user_list() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = SlackClient::new(SlackClientHyperConnector::new());
let client = SlackClient::new(SlackClientHyperConnector::new()?);
let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into();
let token: SlackApiToken = SlackApiToken::new(token_value);
let session = client.open_session(&token);
Expand Down
2 changes: 1 addition & 1 deletion examples/client_with_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use tracing::*;

async fn test_simple_api_calls_as_predicate() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
{
let client = SlackClient::new(SlackClientHyperConnector::new());
let client = SlackClient::new(SlackClientHyperConnector::new()?);
let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into();
let token: SlackApiToken =
SlackApiToken::new(token_value).with_team_id(config_env_var("SLACK_TEST_TEAM_ID")?.into()); // While Team ID is optional but still useful for tracing and rate control purposes
Expand Down
82 changes: 46 additions & 36 deletions examples/events_api_server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use slack_morphism::prelude::*;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response};
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tracing::*;

use std::convert::Infallible;
use std::sync::Arc;

async fn test_oauth_install_function(
Expand Down Expand Up @@ -82,16 +89,17 @@ struct UserStateExample(u64);

async fn test_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client: Arc<SlackHyperClient> =
Arc::new(SlackClient::new(SlackClientHyperConnector::new()));
Arc::new(SlackClient::new(SlackClientHyperConnector::new()?));

let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 8080));
info!("Loading server: {}", addr);

async fn your_others_routes(
_req: Request<Body>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
_req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, Box<dyn std::error::Error + Send + Sync>>
{
Response::builder()
.body("Hey, this is a default users route handler".into())
.body(Full::new("Hey, this is a default users route handler".into()).boxed())
.map_err(|e| e.into())
}

Expand Down Expand Up @@ -120,47 +128,49 @@ async fn test_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.with_user_state(UserStateExample(0)),
);

let make_svc = make_service_fn(move |_| {
let listener = TcpListener::bind(&addr).await?;

info!("Server is listening on {}", &addr);

loop {
let (tcp, _) = listener.accept().await?;
let io = TokioIo::new(tcp);

let thread_oauth_config = oauth_listener_config.clone();
let thread_push_events_config = push_events_config.clone();
let thread_interaction_events_config = interactions_events_config.clone();
let thread_command_events_config = command_events_config.clone();
let listener = SlackClientEventsHyperListener::new(listener_environment.clone());
async move {
let routes = chain_service_routes_fn(
listener.oauth_service_fn(thread_oauth_config, test_oauth_install_function),
let routes = chain_service_routes_fn(
listener.oauth_service_fn(thread_oauth_config, test_oauth_install_function),
chain_service_routes_fn(
listener
.push_events_service_fn(thread_push_events_config, test_push_events_function),
chain_service_routes_fn(
listener.push_events_service_fn(
thread_push_events_config,
test_push_events_function,
listener.interaction_events_service_fn(
thread_interaction_events_config,
test_interaction_events_function,
),
chain_service_routes_fn(
listener.interaction_events_service_fn(
thread_interaction_events_config,
test_interaction_events_function,
),
chain_service_routes_fn(
listener.command_events_service_fn(
thread_command_events_config,
test_command_events_function,
),
your_others_routes,
listener.command_events_service_fn(
thread_command_events_config,
test_command_events_function,
),
your_others_routes,
),
),
);

Ok::<_, Box<dyn std::error::Error + Send + Sync>>(service_fn(routes))
}
});

info!("Server is listening on {}", &addr);

let server = hyper::server::Server::bind(&addr).serve(make_svc);
server.await.map_err(|e| {
error!("Server error: {}", e);
e.into()
})
),
);

tokio::task::spawn(async move {
if let Err(err) = hyper::server::conn::http1::Builder::new()
.serve_connection(io, service_fn(routes))
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
}

pub fn config_env_var(name: &str) -> Result<String, String> {
Expand Down
4 changes: 2 additions & 2 deletions examples/proxy_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use slack_morphism::prelude::*;

use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use hyper_proxy2::{Intercept, Proxy, ProxyConnector};

async fn test_proxy_client() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let proxy = {
let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.with_native_roots()?
.https_only()
.enable_http1()
.build();
Expand Down
2 changes: 1 addition & 1 deletion examples/rate_control_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use tracing::*;

async fn test_rate_control_client() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = SlackClient::new(
SlackClientHyperConnector::new()
SlackClientHyperConnector::new()?
.with_rate_control(SlackApiRateControlConfig::new().with_max_retries(5)),
);

Expand Down
2 changes: 1 addition & 1 deletion examples/socket_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn test_error_handler(
}

async fn test_client_with_socket_mode() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()));
let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()?));

let socket_mode_callbacks = SlackSocketModeListenerCallbacks::new()
.with_command_events(test_command_events_function)
Expand Down
2 changes: 1 addition & 1 deletion examples/state_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn test_error_handler(
}

async fn test_client_with_socket_mode() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()));
let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()?));

let socket_mode_callbacks =
SlackSocketModeListenerCallbacks::new().with_push_events(test_push_events_sm_function);
Expand Down
2 changes: 1 addition & 1 deletion examples/webhook_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use slack_morphism::prelude::*;
use url::Url;

async fn test_webhook_message() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = SlackClient::new(SlackClientHyperConnector::new());
let client = SlackClient::new(SlackClientHyperConnector::new()?);
let webhook_url: Url = Url::parse(config_env_var("SLACK_TEST_WEBHOOK_URL")?.as_str())?;

client
Expand Down
2 changes: 1 addition & 1 deletion src/axum_support/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::hyper_tokio::SlackClientHyperConnector;
use crate::listener::SlackClientEventsListenerEnvironment;
use hyper::client::connect::Connect;
use hyper_util::client::legacy::connect::Connect;
use std::sync::Arc;

mod slack_events_middleware;
Expand Down
13 changes: 6 additions & 7 deletions src/axum_support/slack_events_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use crate::listener::SlackClientEventsListenerEnvironment;
use crate::prelude::hyper_ext::HyperExtensions;
use crate::signature_verifier::SlackEventSignatureVerifier;
use crate::{SlackClientHttpConnector, SlackSigningSecret};
use axum::body::BoxBody;
use axum::response::IntoResponse;
use axum::{body::Body, http::Request, response::Response};
use futures_util::future::BoxFuture;
use hyper::client::connect::Connect;
use hyper_util::client::legacy::connect::Connect;
use std::convert::Infallible;
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -104,7 +103,7 @@ where
);
Ok(Response::builder()
.status(http_status)
.body(BoxBody::default())
.body(Body::default())
.unwrap())
} else {
*verified_request.body_mut() = Body::from(verified_body);
Expand All @@ -126,7 +125,7 @@ where
);
Ok(Response::builder()
.status(http_status)
.body(BoxBody::default())
.body(Body::default())
.unwrap())
}
}
Expand All @@ -141,7 +140,7 @@ where
);
Ok(Response::builder()
.status(http_status)
.body(BoxBody::default())
.body(Body::default())
.unwrap())
}
}
Expand Down Expand Up @@ -212,12 +211,12 @@ where
}

impl<H: 'static + Send + Sync + Connect + Clone> SlackEventsAxumListener<H> {
pub fn events_layer<S, ReqBody, I>(
pub fn events_layer<S, I>(
&self,
slack_signing_secret: &SlackSigningSecret,
) -> SlackEventsApiMiddleware<SlackClientHyperConnector<H>, S, SlackEventsEmptyExtractor>
where
S: Service<Request<ReqBody>, Response = I> + Send + 'static + Clone,
S: Service<Request<Body>, Response = I> + Send + 'static + Clone,
S::Future: Send + 'static,
S::Error: std::error::Error + 'static + Send + Sync,
I: IntoResponse,
Expand Down
Loading

0 comments on commit f84092d

Please sign in to comment.