From 7d1cf6d2719c098fa06d61ff92c88ce1c800b931 Mon Sep 17 00:00:00 2001 From: Natsuki Ikeguchi Date: Sun, 14 Jan 2024 22:19:55 +0900 Subject: [PATCH] feat: Upgrade http / hyper to 1.0 with axum 0.7 (#232) Signed-off-by: Natsuki Ikeguchi --- Cargo.toml | 16 ++-- examples/axum_events_api_server.rs | 20 +++-- examples/client.rs | 8 +- examples/client_with_tracing.rs | 2 +- examples/events_api_server.rs | 82 +++++++++++-------- examples/proxy_client.rs | 4 +- examples/rate_control_client.rs | 2 +- examples/socket_mode.rs | 2 +- examples/state_management.rs | 2 +- examples/webhook_message.rs | 2 +- src/axum_support/mod.rs | 2 +- src/axum_support/slack_events_middleware.rs | 13 ++- src/axum_support/slack_oauth_routes.rs | 23 ++++-- src/client.rs | 6 +- src/hyper_tokio/connector.rs | 42 ++++++---- src/hyper_tokio/hyper_errors.rs | 8 +- src/hyper_tokio/hyper_ext.rs | 30 +++---- src/hyper_tokio/listener/command_events.rs | 40 ++++----- .../listener/interaction_events.rs | 36 +++----- src/hyper_tokio/listener/mod.rs | 34 ++++---- src/hyper_tokio/listener/oauth.rs | 39 ++++----- src/hyper_tokio/listener/push_events.rs | 41 ++++------ src/hyper_tokio/mod.rs | 6 +- src/hyper_tokio/socket_mode/mod.rs | 2 +- .../socket_mode/tokio_clients_manager.rs | 2 +- src/listener.rs | 11 +-- src/socket_mode/callbacks.rs | 4 +- src/socket_mode/clients_manager_listener.rs | 4 +- 28 files changed, 237 insertions(+), 246 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b9a518bd..ce056f99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ path = "src/lib.rs" [features] default = [] -hyper = ["dep:tokio","dep:hyper", "dep:hyper-rustls", "dep:tokio-stream","dep:tokio-tungstenite", "dep:tokio-tungstenite", "dep:signal-hook", "dep:signal-hook-tokio"] +hyper = ["dep:tokio", "dep:http-body-util", "dep:hyper", "dep:hyper-rustls", "dep:hyper-util", "dep:tokio-stream","dep:tokio-tungstenite", "dep:tokio-tungstenite", "dep:signal-hook", "dep:signal-hook-tokio"] axum = ["hyper", "dep:axum", "dep:tower"] [dependencies] @@ -39,7 +39,7 @@ hex = "0.4" tracing = "0.1" ring = "0.17" lazy_static = "1.4" -http = "0.2" +http = "1.0" async-trait = "0.1" bytes = "1" rand = "0.8" @@ -48,12 +48,14 @@ mime = "0.3" mime_guess = "2" chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] } url = { version = "2.5", features = ["serde"]} -hyper = { version ="0.14", features = ["http2","server", "client", "h2", "stream"], optional = true } +http-body-util = { version = "0.1", optional = true } +hyper = { version ="1.0", features = ["http2","server", "client"], optional = true } +hyper-util = { version = "0.1", features = ["client", "client-legacy", "server"], optional = true } tokio = { version = "1", features = ["bytes","rt-multi-thread","signal","tracing"], optional = true } tokio-stream = { version = "0.1", optional = true } -hyper-rustls = { version="0.24", features = ["rustls-native-certs", "http2"], optional = true } +hyper-rustls = { version="0.26", features = ["rustls-native-certs", "http2"], optional = true } tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-native-roots"], optional = true } -axum = { version = "0.6", optional = true } +axum = { version = "0.7", optional = true } tower = { version = "0.4", optional = true } serde_urlencoded = "0.7.1" @@ -68,8 +70,8 @@ ctrlc = { version = "3.4", features = ["termination"] } cargo-husky = { version = "1", default-features = false, features = ["run-for-all", "prepush-hook", "run-cargo-fmt"] } cargo-audit = "0.18" tracing-subscriber = { version ="0.3", features = ["env-filter"] } -hyper-proxy = "0.9" -hyper = { version ="0.14", features = ["full"] } +hyper-proxy2 = "0.1" +hyper = { version ="1.0", features = ["full"] } tokio = { version = "1", features = ["full"] } [package.metadata.release] diff --git a/examples/axum_events_api_server.rs b/examples/axum_events_api_server.rs index 289366d8..52b7cdbd 100644 --- a/examples/axum_events_api_server.rs +++ b/examples/axum_events_api_server.rs @@ -1,10 +1,15 @@ use slack_morphism::prelude::*; -use hyper::{Body, Response}; +use bytes::Bytes; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Empty, Full}; +use hyper::Response; use tracing::*; use axum::Extension; +use std::convert::Infallible; use std::sync::Arc; +use tokio::net::TcpListener; async fn test_oauth_install_function( resp: SlackOAuthV2AccessTokenResponse, @@ -29,12 +34,14 @@ async fn test_error_install() -> String { async fn test_push_event( Extension(_environment): Extension>, Extension(event): Extension, -) -> Response { +) -> Response> { println!("Received push event: {:?}", event); match event { - SlackPushEvent::UrlVerification(url_ver) => Response::new(Body::from(url_ver.challenge)), - _ => Response::new(Body::empty()), + SlackPushEvent::UrlVerification(url_ver) => { + Response::new(Full::new(url_ver.challenge.into()).boxed()) + } + _ => Response::new(Empty::new().boxed()), } } @@ -68,7 +75,7 @@ fn test_error_handler( async fn test_server() -> Result<(), Box> { let client: Arc = - Arc::new(SlackClient::new(SlackClientHyperConnector::new())); + Arc::new(SlackClient::new(SlackClientHyperConnector::new()?)); let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 8080)); info!("Loading server: {}", addr); @@ -123,8 +130,7 @@ async fn test_server() -> Result<(), Box> { ), ); - axum::Server::bind(&addr) - .serve(app.into_make_service()) + axum::serve(TcpListener::bind(&addr).await.unwrap(), app) .await .unwrap(); diff --git a/examples/client.rs b/examples/client.rs index d456e8c8..c032f03a 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -10,7 +10,7 @@ use futures::stream::BoxStream; use futures::TryStreamExt; async fn test_simple_api_calls() -> Result<(), Box> { - let client = SlackClient::new(SlackClientHyperConnector::new()); + let client = SlackClient::new(SlackClientHyperConnector::new()?); let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); let token: SlackApiToken = SlackApiToken::new(token_value); @@ -31,7 +31,7 @@ async fn test_simple_api_calls() -> Result<(), Box Result<(), Box> { - let client = SlackClient::new(SlackClientHyperConnector::new()); + let client = SlackClient::new(SlackClientHyperConnector::new()?); let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); let token: SlackApiToken = SlackApiToken::new(token_value); let session = client.open_session(&token); @@ -48,7 +48,7 @@ async fn test_post_message() -> Result<(), Box Result<(), Box> { - let client = SlackClient::new(SlackClientHyperConnector::new()); + let client = SlackClient::new(SlackClientHyperConnector::new()?); let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); let token: SlackApiToken = SlackApiToken::new(token_value); let session = client.open_session(&token); @@ -83,7 +83,7 @@ async fn test_scrolling_user_list() -> Result<(), Box Result<(), Box> { - let client = SlackClient::new(SlackClientHyperConnector::new()); + let client = SlackClient::new(SlackClientHyperConnector::new()?); let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); let token: SlackApiToken = SlackApiToken::new(token_value); let session = client.open_session(&token); diff --git a/examples/client_with_tracing.rs b/examples/client_with_tracing.rs index 240750c2..3725f132 100644 --- a/examples/client_with_tracing.rs +++ b/examples/client_with_tracing.rs @@ -3,7 +3,7 @@ use tracing::*; async fn test_simple_api_calls_as_predicate() -> Result<(), Box> { - let client = SlackClient::new(SlackClientHyperConnector::new()); + let client = SlackClient::new(SlackClientHyperConnector::new()?); let token_value: SlackApiTokenValue = config_env_var("SLACK_TEST_TOKEN")?.into(); let token: SlackApiToken = SlackApiToken::new(token_value).with_team_id(config_env_var("SLACK_TEST_TEAM_ID")?.into()); // While Team ID is optional but still useful for tracing and rate control purposes diff --git a/examples/events_api_server.rs b/examples/events_api_server.rs index e2c25c6c..5003b138 100644 --- a/examples/events_api_server.rs +++ b/examples/events_api_server.rs @@ -1,9 +1,16 @@ use slack_morphism::prelude::*; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response}; +use bytes::Bytes; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::Incoming; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; use tracing::*; +use std::convert::Infallible; use std::sync::Arc; async fn test_oauth_install_function( @@ -82,16 +89,17 @@ struct UserStateExample(u64); async fn test_server() -> Result<(), Box> { let client: Arc = - Arc::new(SlackClient::new(SlackClientHyperConnector::new())); + Arc::new(SlackClient::new(SlackClientHyperConnector::new()?)); let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 8080)); info!("Loading server: {}", addr); async fn your_others_routes( - _req: Request, - ) -> Result, Box> { + _req: Request, + ) -> Result>, Box> + { Response::builder() - .body("Hey, this is a default users route handler".into()) + .body(Full::new("Hey, this is a default users route handler".into()).boxed()) .map_err(|e| e.into()) } @@ -120,47 +128,49 @@ async fn test_server() -> Result<(), Box> { .with_user_state(UserStateExample(0)), ); - let make_svc = make_service_fn(move |_| { + let listener = TcpListener::bind(&addr).await?; + + info!("Server is listening on {}", &addr); + + loop { + let (tcp, _) = listener.accept().await?; + let io = TokioIo::new(tcp); + let thread_oauth_config = oauth_listener_config.clone(); let thread_push_events_config = push_events_config.clone(); let thread_interaction_events_config = interactions_events_config.clone(); let thread_command_events_config = command_events_config.clone(); let listener = SlackClientEventsHyperListener::new(listener_environment.clone()); - async move { - let routes = chain_service_routes_fn( - listener.oauth_service_fn(thread_oauth_config, test_oauth_install_function), + let routes = chain_service_routes_fn( + listener.oauth_service_fn(thread_oauth_config, test_oauth_install_function), + chain_service_routes_fn( + listener + .push_events_service_fn(thread_push_events_config, test_push_events_function), chain_service_routes_fn( - listener.push_events_service_fn( - thread_push_events_config, - test_push_events_function, + listener.interaction_events_service_fn( + thread_interaction_events_config, + test_interaction_events_function, ), chain_service_routes_fn( - listener.interaction_events_service_fn( - thread_interaction_events_config, - test_interaction_events_function, - ), - chain_service_routes_fn( - listener.command_events_service_fn( - thread_command_events_config, - test_command_events_function, - ), - your_others_routes, + listener.command_events_service_fn( + thread_command_events_config, + test_command_events_function, ), + your_others_routes, ), ), - ); - - Ok::<_, Box>(service_fn(routes)) - } - }); - - info!("Server is listening on {}", &addr); - - let server = hyper::server::Server::bind(&addr).serve(make_svc); - server.await.map_err(|e| { - error!("Server error: {}", e); - e.into() - }) + ), + ); + + tokio::task::spawn(async move { + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service_fn(routes)) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } } pub fn config_env_var(name: &str) -> Result { diff --git a/examples/proxy_client.rs b/examples/proxy_client.rs index 4e5accb9..6fc1d2e7 100644 --- a/examples/proxy_client.rs +++ b/examples/proxy_client.rs @@ -1,11 +1,11 @@ use slack_morphism::prelude::*; -use hyper_proxy::{Intercept, Proxy, ProxyConnector}; +use hyper_proxy2::{Intercept, Proxy, ProxyConnector}; async fn test_proxy_client() -> Result<(), Box> { let proxy = { let https_connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + .with_native_roots()? .https_only() .enable_http1() .build(); diff --git a/examples/rate_control_client.rs b/examples/rate_control_client.rs index b4d53113..bde2c477 100644 --- a/examples/rate_control_client.rs +++ b/examples/rate_control_client.rs @@ -3,7 +3,7 @@ use tracing::*; async fn test_rate_control_client() -> Result<(), Box> { let client = SlackClient::new( - SlackClientHyperConnector::new() + SlackClientHyperConnector::new()? .with_rate_control(SlackApiRateControlConfig::new().with_max_retries(5)), ); diff --git a/examples/socket_mode.rs b/examples/socket_mode.rs index cf7d2322..e3f629a1 100644 --- a/examples/socket_mode.rs +++ b/examples/socket_mode.rs @@ -171,7 +171,7 @@ fn test_error_handler( } async fn test_client_with_socket_mode() -> Result<(), Box> { - let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new())); + let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()?)); let socket_mode_callbacks = SlackSocketModeListenerCallbacks::new() .with_command_events(test_command_events_function) diff --git a/examples/state_management.rs b/examples/state_management.rs index 5ffb1787..5e7fdafa 100644 --- a/examples/state_management.rs +++ b/examples/state_management.rs @@ -48,7 +48,7 @@ fn test_error_handler( } async fn test_client_with_socket_mode() -> Result<(), Box> { - let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new())); + let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()?)); let socket_mode_callbacks = SlackSocketModeListenerCallbacks::new().with_push_events(test_push_events_sm_function); diff --git a/examples/webhook_message.rs b/examples/webhook_message.rs index b6ba9c1d..854140ab 100644 --- a/examples/webhook_message.rs +++ b/examples/webhook_message.rs @@ -2,7 +2,7 @@ use slack_morphism::prelude::*; use url::Url; async fn test_webhook_message() -> Result<(), Box> { - let client = SlackClient::new(SlackClientHyperConnector::new()); + let client = SlackClient::new(SlackClientHyperConnector::new()?); let webhook_url: Url = Url::parse(config_env_var("SLACK_TEST_WEBHOOK_URL")?.as_str())?; client diff --git a/src/axum_support/mod.rs b/src/axum_support/mod.rs index 733b4f24..dd21e490 100644 --- a/src/axum_support/mod.rs +++ b/src/axum_support/mod.rs @@ -1,6 +1,6 @@ use crate::hyper_tokio::SlackClientHyperConnector; use crate::listener::SlackClientEventsListenerEnvironment; -use hyper::client::connect::Connect; +use hyper_util::client::legacy::connect::Connect; use std::sync::Arc; mod slack_events_middleware; diff --git a/src/axum_support/slack_events_middleware.rs b/src/axum_support/slack_events_middleware.rs index c0401f80..64509282 100644 --- a/src/axum_support/slack_events_middleware.rs +++ b/src/axum_support/slack_events_middleware.rs @@ -7,11 +7,10 @@ use crate::listener::SlackClientEventsListenerEnvironment; use crate::prelude::hyper_ext::HyperExtensions; use crate::signature_verifier::SlackEventSignatureVerifier; use crate::{SlackClientHttpConnector, SlackSigningSecret}; -use axum::body::BoxBody; use axum::response::IntoResponse; use axum::{body::Body, http::Request, response::Response}; use futures_util::future::BoxFuture; -use hyper::client::connect::Connect; +use hyper_util::client::legacy::connect::Connect; use std::convert::Infallible; use std::marker::PhantomData; use std::sync::Arc; @@ -104,7 +103,7 @@ where ); Ok(Response::builder() .status(http_status) - .body(BoxBody::default()) + .body(Body::default()) .unwrap()) } else { *verified_request.body_mut() = Body::from(verified_body); @@ -126,7 +125,7 @@ where ); Ok(Response::builder() .status(http_status) - .body(BoxBody::default()) + .body(Body::default()) .unwrap()) } } @@ -141,7 +140,7 @@ where ); Ok(Response::builder() .status(http_status) - .body(BoxBody::default()) + .body(Body::default()) .unwrap()) } } @@ -212,12 +211,12 @@ where } impl SlackEventsAxumListener { - pub fn events_layer( + pub fn events_layer( &self, slack_signing_secret: &SlackSigningSecret, ) -> SlackEventsApiMiddleware, S, SlackEventsEmptyExtractor> where - S: Service, Response = I> + Send + 'static + Clone, + S: Service, Response = I> + Send + 'static + Clone, S::Future: Send + 'static, S::Error: std::error::Error + 'static + Send + Sync, I: IntoResponse, diff --git a/src/axum_support/slack_oauth_routes.rs b/src/axum_support/slack_oauth_routes.rs index cba3aff9..f73a2be2 100644 --- a/src/axum_support/slack_oauth_routes.rs +++ b/src/axum_support/slack_oauth_routes.rs @@ -2,12 +2,12 @@ use crate::axum_support::SlackEventsAxumListener; use crate::hyper_tokio::hyper_ext::HyperExtensions; use crate::listener::{SlackClientEventsListenerEnvironment, UserCallbackFunction}; use crate::prelude::SlackOAuthListenerConfig; -use axum::response::Response; +use axum::body::Body; +use axum::response::{IntoResponse, Response}; use futures_util::future::BoxFuture; use futures_util::FutureExt; use http::Request; -use hyper::client::connect::Connect; -use hyper::Body; +use hyper_util::client::legacy::connect::Connect; use rvstruct::ValueStruct; use std::future::Future; use std::sync::Arc; @@ -22,7 +22,7 @@ impl SlackEventsAxumListener { pub fn slack_oauth_install( &self, config: &SlackOAuthListenerConfig, - ) -> impl Fn(Request) -> BoxFuture<'static, Response> + 'static + Send + Clone { + ) -> impl Fn(Request) -> BoxFuture<'static, Response> + 'static + Send + Clone { let environment = self.environment.clone(); let config = config.clone(); move |_| { @@ -41,7 +41,7 @@ impl SlackEventsAxumListener { ], ); debug!("Redirecting to Slack OAuth authorize: {}", &full_uri); - HyperExtensions::hyper_redirect_to(full_uri.as_ref()) + HyperExtensions::hyper_redirect_to(full_uri.as_ref()).map(|r| r.into_response()) } .map(|res| Self::handle_error(environment, res)) .boxed() @@ -66,7 +66,7 @@ impl SlackEventsAxumListener { let err_config = config.clone(); async move { - let params = HyperExtensions::parse_query_params(&req); + let params = HyperExtensions::parse_query_params(req.uri()); debug!("Received Slack OAuth callback: {:?}", ¶ms); match (params.get("code"), params.get("error")) { @@ -105,6 +105,7 @@ impl SlackEventsAxumListener { ) .await; HyperExtensions::hyper_redirect_to(&config.redirect_installed_url) + .map(|r| r.into_response()) } Err(err) => { error!("Slack OAuth error: {}", &err); @@ -116,6 +117,7 @@ impl SlackEventsAxumListener { HyperExtensions::hyper_redirect_to( &config.redirect_error_redirect_url, ) + .map(|r| r.into_response()) } } } @@ -134,6 +136,7 @@ impl SlackEventsAxumListener { req.uri().query().map_or("".into(), |q| format!("?{}", &q)) ); HyperExtensions::hyper_redirect_to(&redirect_error_url) + .map(|r| r.into_response()) } _ => { error!("Slack OAuth cancelled with unknown reason"); @@ -146,6 +149,7 @@ impl SlackEventsAxumListener { environment.user_state.clone(), ); HyperExtensions::hyper_redirect_to(&config.redirect_error_redirect_url) + .map(|r| r.into_response()) } } } @@ -163,6 +167,7 @@ impl SlackEventsAxumListener { ); HyperExtensions::hyper_redirect_to(&err_config.redirect_error_redirect_url) .unwrap() + .into_response() } }) .boxed() @@ -195,8 +200,8 @@ impl SlackEventsAxumListener { fn handle_error( environment: Arc>>, - result: AnyStdResult>, - ) -> Response { + result: AnyStdResult, + ) -> Response { match result { Err(err) => { let http_status = (environment.error_handler)( @@ -206,7 +211,7 @@ impl SlackEventsAxumListener { ); Response::builder() .status(http_status) - .body(hyper::Body::empty()) + .body(Body::empty()) .unwrap() } Ok(result) => result, diff --git a/src/client.rs b/src/client.rs index 1df8c6ec..e396112c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -189,11 +189,13 @@ pub trait SlackClientHttpConnector { } } -pub type UserCallbackResult = std::result::Result>; +pub(crate) type BoxError = Box; + +pub type UserCallbackResult = std::result::Result; pub type ClientResult = std::result::Result; -pub type AnyStdResult = std::result::Result>; +pub type AnyStdResult = std::result::Result; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct SlackEnvelopeMessage { diff --git a/src/hyper_tokio/connector.rs b/src/hyper_tokio/connector.rs index 8fb1e926..baf9bc86 100644 --- a/src/hyper_tokio/connector.rs +++ b/src/hyper_tokio/connector.rs @@ -1,13 +1,16 @@ use crate::errors::*; use crate::hyper_tokio::ratectl::SlackTokioRateController; +use crate::hyper_tokio::Body; use crate::models::{SlackClientId, SlackClientSecret}; use crate::*; use async_recursion::async_recursion; use futures::future::{BoxFuture, FutureExt}; -use hyper::client::*; +use http_body_util::{BodyExt, Empty, Full}; use hyper::http::StatusCode; -use hyper::{Body, Request}; +use hyper::Request; use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::*; +use hyper_util::rt::TokioExecutor; use rvstruct::ValueStruct; use crate::prelude::hyper_ext::HyperExtensions; @@ -21,27 +24,28 @@ use url::Url; #[derive(Clone, Debug)] pub struct SlackClientHyperConnector { - hyper_connector: Client, + hyper_connector: Client, tokio_rate_controller: Option>, } -pub type SlackClientHyperHttpsConnector = SlackClientHyperConnector>; +pub type SlackClientHyperHttpsConnector = + SlackClientHyperConnector>; -impl SlackClientHyperConnector> { - pub fn new() -> Self { +impl SlackClientHyperConnector> { + pub fn new() -> std::io::Result { let https_connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + .with_native_roots()? .https_only() .enable_http2() .build(); - Self::with_connector(https_connector) + Ok(Self::with_connector(https_connector)) } } -impl From> - for SlackClientHyperConnector> +impl From> + for SlackClientHyperConnector> { - fn from(https_connector: hyper_rustls::HttpsConnector) -> Self { + fn from(https_connector: HttpsConnector) -> Self { Self::with_connector(https_connector) } } @@ -49,7 +53,7 @@ impl From> impl SlackClientHyperConnector { pub fn with_connector(connector: H) -> Self { Self { - hyper_connector: Client::builder().build::<_, hyper::Body>(connector), + hyper_connector: Client::builder(TokioExecutor::new()).build::<_, Body>(connector), tokio_rate_controller: None, } } @@ -282,7 +286,9 @@ impl SlackClientHttpConnect context_token, ); - http_request.body(Body::empty()).map_err(|e| e.into()) + http_request + .body(Empty::new().boxed()) + .map_err(|e| e.into()) }, context, None, @@ -324,7 +330,7 @@ impl SlackClientHttpConnect client_id.value(), client_secret.value(), ) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) }, context, @@ -367,7 +373,7 @@ impl SlackClientHttpConnect ); http_request - .body(post_json.clone().into()) + .body(Full::new(post_json.clone().into()).boxed()) .map_err(|e| e.into()) }, context, @@ -410,7 +416,9 @@ impl SlackClientHttpConnect format!("token={}&", token.token_value.value()) }); let full_body = toke_body_prefix + &post_url_form; - http_request.body(full_body.into()).map_err(|e| e.into()) + http_request + .body(Full::new(full_body.into()).boxed()) + .map_err(|e| e.into()) }, context, None, @@ -449,7 +457,7 @@ impl SlackClientHttpConnect Ok(file_bytes) => self .send_rate_controlled_request( move || { - let http_body = file_bytes.clone().into(); + let http_body = Full::new(file_bytes.clone()).boxed(); let http_base_request = HyperExtensions::create_http_request( full_uri.clone(), diff --git a/src/hyper_tokio/hyper_errors.rs b/src/hyper_tokio/hyper_errors.rs index 4ced22fa..fb42ec88 100644 --- a/src/hyper_tokio/hyper_errors.rs +++ b/src/hyper_tokio/hyper_errors.rs @@ -1,15 +1,15 @@ use crate::errors::*; -impl From for SlackClientError { - fn from(hyper_err: hyper::Error) -> Self { +impl From for SlackClientError { + fn from(hyper_err: http::Error) -> Self { SlackClientError::HttpProtocolError( SlackClientHttpProtocolError::new().with_cause(Box::new(hyper_err)), ) } } -impl From for SlackClientError { - fn from(hyper_err: hyper::http::Error) -> Self { +impl From for SlackClientError { + fn from(hyper_err: hyper_util::client::legacy::Error) -> Self { SlackClientError::HttpProtocolError( SlackClientHttpProtocolError::new().with_cause(Box::new(hyper_err)), ) diff --git a/src/hyper_tokio/hyper_ext.rs b/src/hyper_tokio/hyper_ext.rs index dcf50a1b..2a051a57 100644 --- a/src/hyper_tokio/hyper_ext.rs +++ b/src/hyper_tokio/hyper_ext.rs @@ -1,3 +1,4 @@ +use crate::hyper_tokio::Body; use crate::signature_verifier::*; use crate::{AnyStdResult, SlackApiToken}; use base64::prelude::*; @@ -5,8 +6,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures_util::TryFutureExt; use http::request::Parts; use http::{Request, Response, Uri}; -use hyper::body::HttpBody; -use hyper::Body; +use http_body_util::{BodyExt, Empty}; use mime::Mime; use rvstruct::ValueStruct; use std::collections::HashMap; @@ -17,10 +17,8 @@ use url::Url; pub struct HyperExtensions; impl HyperExtensions { - pub fn parse_query_params(request: &Request) -> HashMap { - request - .uri() - .query() + pub fn parse_query_params(uri: &Uri) -> HashMap { + uri.query() .map(|v| { url::form_urlencoded::parse(v.as_bytes()) .into_owned() @@ -29,13 +27,11 @@ impl HyperExtensions { .unwrap_or_default() } - pub fn hyper_redirect_to( - url: &str, - ) -> Result, Box> { + pub fn hyper_redirect_to(url: &str) -> AnyStdResult> { Response::builder() .status(hyper::http::StatusCode::FOUND) .header(hyper::header::LOCATION, url) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } @@ -76,10 +72,10 @@ impl HyperExtensions { pub async fn http_body_to_string(body: T) -> AnyStdResult where - T: HttpBody, + T: hyper::body::Body, T::Error: std::error::Error + Sync + Send + 'static, { - let http_body = hyper::body::aggregate(body).await?; + let http_body = body.collect().await?.aggregate(); let mut http_reader = http_body.reader(); let mut http_body_str = String::new(); http_reader.read_to_string(&mut http_body_str)?; @@ -94,10 +90,14 @@ impl HyperExtensions { }) } - pub async fn decode_signed_response( - req: Request, + pub async fn decode_signed_response( + req: Request, signature_verifier: &SlackEventSignatureVerifier, - ) -> AnyStdResult<(String, Parts)> { + ) -> AnyStdResult<(String, Parts)> + where + B: hyper::body::Body, + B::Error: std::error::Error + Send + Sync + 'static, + { match ( req.headers() .get(SlackEventSignatureVerifier::SLACK_SIGNED_HASH_HEADER) diff --git a/src/hyper_tokio/listener/command_events.rs b/src/hyper_tokio/listener/command_events.rs index c4374739..5765f862 100644 --- a/src/hyper_tokio/listener/command_events.rs +++ b/src/hyper_tokio/listener/command_events.rs @@ -8,9 +8,11 @@ use crate::hyper_tokio::*; pub use crate::models::events::*; pub use crate::models::SlackResponseUrl; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; -use hyper::body::*; -use hyper::client::connect::Connect; +use http_body_util::Empty; +use http_body_util::{BodyExt, Full}; +use hyper::body::Incoming; use hyper::{Method, Request, Response, StatusCode}; +use hyper_util::client::legacy::connect::Connect; use std::collections::HashMap; use std::future::Future; use std::sync::Arc; @@ -24,21 +26,10 @@ impl SlackClientEventsHyperListener< impl Future> + 'static + Send, SlackClientHyperConnector, >, - ) -> impl Fn( - Request, - D, - ) -> BoxFuture< - 'a, - Result, Box>, - > - + 'a - + Send - + Clone + ) -> impl Fn(Request, D) -> BoxFuture<'a, AnyStdResult>> + 'a + Send + Clone where - D: Fn(Request) -> F + 'a + Send + Sync + Clone, - F: Future, Box>> - + 'a - + Send, + D: Fn(Request) -> F + 'a + Send + Sync + Clone, + F: Future>> + 'a + Send, R: Into>, { let signature_verifier: Arc = Arc::new( @@ -48,7 +39,7 @@ impl SlackClientEventsHyperListener< let error_handler = self.environment.error_handler.clone(); let user_state_storage = self.environment.user_state.clone(); - move |req: Request, chain: D| { + move |req: Request, chain: D| { let cfg = config.clone(); let sign_verifier = signature_verifier.clone(); let sc = client.clone(); @@ -117,13 +108,16 @@ impl SlackClientEventsHyperListener< "application/json; charset=utf-8", ) .body( - serde_json::to_string(&cresp) - .unwrap() - .into(), + Full::new( + serde_json::to_string(&cresp) + .unwrap() + .into(), + ) + .boxed(), ), None => Response::builder() .status(StatusCode::OK) - .body(Body::empty()), + .body(Empty::new().boxed()), } .map_err(|e| e.into()), Err(err) => { @@ -134,7 +128,7 @@ impl SlackClientEventsHyperListener< ); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } } @@ -147,7 +141,7 @@ impl SlackClientEventsHyperListener< ); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } } diff --git a/src/hyper_tokio/listener/interaction_events.rs b/src/hyper_tokio/listener/interaction_events.rs index a5cfe753..54be18e1 100644 --- a/src/hyper_tokio/listener/interaction_events.rs +++ b/src/hyper_tokio/listener/interaction_events.rs @@ -8,9 +8,10 @@ use crate::blocks::SlackViewSubmissionResponse; use crate::hyper_tokio::hyper_ext::HyperExtensions; use crate::hyper_tokio::*; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; -use hyper::body::*; -use hyper::client::connect::Connect; +use http_body_util::{BodyExt, Empty, Full}; +use hyper::body::Incoming; use hyper::{Method, Request, Response, StatusCode}; +use hyper_util::client::legacy::connect::Connect; use std::collections::HashMap; use std::future::Future; use std::sync::Arc; @@ -24,21 +25,10 @@ impl SlackClientEventsHyperListener< impl Future> + 'static + Send, SlackClientHyperConnector, >, - ) -> impl Fn( - Request, - D, - ) -> BoxFuture< - 'a, - Result, Box>, - > - + 'a - + Send - + Clone + ) -> impl Fn(Request, D) -> BoxFuture<'a, AnyStdResult>> + 'a + Send + Clone where - D: Fn(Request) -> F + 'a + Send + Sync + Clone, - F: Future, Box>> - + 'a - + Send, + D: Fn(Request) -> F + 'a + Send + Sync + Clone, + F: Future>> + 'a + Send, R: SlackInteractionEventResponse, { let signature_verifier: Arc = Arc::new( @@ -48,7 +38,7 @@ impl SlackClientEventsHyperListener< let error_handler = self.environment.error_handler.clone(); let user_state_storage = self.environment.user_state.clone(); - move |req: Request, chain: D| { + move |req: Request, chain: D| { let cfg = config.clone(); let sign_verifier = signature_verifier.clone(); let sc = client.clone(); @@ -90,7 +80,7 @@ impl SlackClientEventsHyperListener< let status_code = thread_error_handler(err, sc, thread_user_state_storage); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } } @@ -103,7 +93,7 @@ impl SlackClientEventsHyperListener< let status_code = thread_error_handler(err, sc, thread_user_state_storage); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } } @@ -112,7 +102,7 @@ impl SlackClientEventsHyperListener< let status_code = thread_error_handler(event_err, sc, thread_user_state_storage); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } } @@ -136,11 +126,11 @@ impl SlackInteractionEventResponse for () { match event { SlackInteractionEvent::ViewSubmission(_) => Response::builder() .status(StatusCode::OK) - .body("".into()) + .body(Empty::new().boxed()) .map_err(|e| e.into()), _ => Response::builder() .status(StatusCode::OK) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()), } } @@ -152,6 +142,6 @@ impl SlackInteractionEventResponse for SlackViewSubmissionResponse { Ok(Response::builder() .status(StatusCode::OK) .header("content-type", "application/json; charset=utf-8") - .body(Body::from(json_str))?) + .body(Full::new(json_str.into()).boxed())?) } } diff --git a/src/hyper_tokio/listener/mod.rs b/src/hyper_tokio/listener/mod.rs index b2c247bd..c3c63a46 100644 --- a/src/hyper_tokio/listener/mod.rs +++ b/src/hyper_tokio/listener/mod.rs @@ -1,17 +1,20 @@ -use crate::hyper_tokio::connector::SlackClientHyperConnector; - use std::future::Future; +use std::sync::Arc; use futures::future::{BoxFuture, FutureExt}; -use hyper::client::connect::Connect; -use hyper::{Body, Request, Response}; +use hyper::body::Incoming; +use hyper::{Request, Response}; +use hyper_util::client::legacy::connect::Connect; +use crate::hyper_tokio::connector::SlackClientHyperConnector; +use crate::hyper_tokio::Body; use crate::listener::SlackClientEventsListenerEnvironment; +use crate::AnyStdResult; + pub use command_events::*; pub use interaction_events::*; pub use oauth::*; pub use push_events::*; -use std::sync::Arc; mod command_events; mod interaction_events; @@ -33,21 +36,12 @@ impl SlackClientEventsHyperListener< pub fn chain_service_routes_fn<'a, R, D, FR, FD>( route: R, default: D, -) -> impl Fn( - Request, -) -> BoxFuture<'a, Result, Box>> - + 'a - + Send - + Clone +) -> impl Fn(Request) -> BoxFuture<'a, AnyStdResult>> + 'a + Send + Clone where - R: Fn(Request, D) -> FR + 'a + Clone + Send, - D: Fn(Request) -> FD + 'a + Clone + Send, - FR: Future, Box>> - + 'a - + Send, - FD: Future, Box>> - + 'a - + Send, + R: Fn(Request, D) -> FR + 'a + Clone + Send, + D: Fn(Request) -> FD + 'a + Clone + Send, + FR: Future>> + 'a + Send, + FD: Future>> + 'a + Send, { - move |req: Request| route(req, default.clone()).boxed() + move |req: Request| route(req, default.clone()).boxed() } diff --git a/src/hyper_tokio/listener/oauth.rs b/src/hyper_tokio/listener/oauth.rs index ecd2dfea..f65e555d 100644 --- a/src/hyper_tokio/listener/oauth.rs +++ b/src/hyper_tokio/listener/oauth.rs @@ -1,16 +1,16 @@ use crate::hyper_tokio::connector::SlackClientHyperConnector; -use crate::hyper_tokio::SlackClientEventsHyperListener; +use crate::hyper_tokio::hyper_ext::HyperExtensions; +use crate::hyper_tokio::{Body, SlackClientEventsHyperListener}; use crate::api::*; use crate::errors::*; use crate::listener::*; -use crate::{SlackClient, SlackClientHttpApiUri}; +use crate::{AnyStdResult, SlackClient, SlackClientHttpApiUri}; -use crate::hyper_tokio::hyper_ext::HyperExtensions; use futures::future::{BoxFuture, FutureExt}; -use hyper::body::*; -use hyper::client::connect::Connect; +use hyper::body::Incoming; use hyper::{Method, Request, Response}; +use hyper_util::client::legacy::connect::Connect; use rvstruct::*; use std::future::Future; use std::sync::Arc; @@ -18,9 +18,9 @@ use tracing::*; impl SlackClientEventsHyperListener { pub(crate) async fn slack_oauth_install_service( - _: Request, + _: Request, config: &SlackOAuthListenerConfig, - ) -> Result, Box> { + ) -> AnyStdResult> { let full_uri = SlackClientHttpApiUri::create_url_with_params( SlackOAuthListenerConfig::OAUTH_AUTHORIZE_URL_VALUE, &vec![ @@ -37,7 +37,7 @@ impl SlackClientEventsHyperListener< } pub(crate) async fn slack_oauth_callback_service( - req: Request, + req: Request, config: &SlackOAuthListenerConfig, client: Arc>>, user_state_storage: SlackClientEventsUserState, @@ -47,8 +47,8 @@ impl SlackClientEventsHyperListener< SlackClientHyperConnector, >, error_handler: BoxedErrorHandler>, - ) -> Result, Box> { - let params = HyperExtensions::parse_query_params(&req); + ) -> AnyStdResult> { + let params = HyperExtensions::parse_query_params(req.uri()); debug!("Received Slack OAuth callback: {:?}", ¶ms); match (params.get("code"), params.get("error")) { @@ -126,27 +126,16 @@ impl SlackClientEventsHyperListener< impl Future + 'static + Send, SlackClientHyperConnector, >, - ) -> impl Fn( - Request, - D, - ) -> BoxFuture< - 'a, - Result, Box>, - > - + 'a - + Send - + Clone + ) -> impl Fn(Request, D) -> BoxFuture<'a, AnyStdResult>> + 'a + Send + Clone where - D: Fn(Request) -> F + 'a + Send + Sync + Clone, - F: Future, Box>> - + 'a - + Send, + D: Fn(Request) -> F + 'a + Send + Sync + Clone, + F: Future>> + 'a + Send, { let client = self.environment.client.clone(); let listener_error_handler = self.environment.error_handler.clone(); let user_state_storage = self.environment.user_state.clone(); - move |req: Request, chain: D| { + move |req: Request, chain: D| { let cfg = config.clone(); let sc = client.clone(); let error_handler = listener_error_handler.clone(); diff --git a/src/hyper_tokio/listener/push_events.rs b/src/hyper_tokio/listener/push_events.rs index 30365846..b7c40c7f 100644 --- a/src/hyper_tokio/listener/push_events.rs +++ b/src/hyper_tokio/listener/push_events.rs @@ -3,16 +3,18 @@ use crate::hyper_tokio::connector::SlackClientHyperConnector; use crate::hyper_tokio::hyper_ext::HyperExtensions; use crate::hyper_tokio::*; use crate::listener::*; -pub use crate::models::events::*; use crate::signature_verifier::SlackEventSignatureVerifier; use futures::future::{BoxFuture, FutureExt, TryFutureExt}; -use hyper::body::*; -use hyper::client::connect::Connect; +use http_body_util::{BodyExt, Empty, Full}; +use hyper::body::Incoming; use hyper::{Method, Request, Response}; +use hyper_util::client::legacy::connect::Connect; use std::future::Future; use std::sync::Arc; use tracing::*; +pub use crate::models::events::*; + impl SlackClientEventsHyperListener { pub fn push_events_service_fn<'a, D, F>( &self, @@ -22,21 +24,10 @@ impl SlackClientEventsHyperListener< impl Future> + 'static + Send, SlackClientHyperConnector, >, - ) -> impl Fn( - Request, - D, - ) -> BoxFuture< - 'a, - Result, Box>, - > - + 'a - + Send - + Clone + ) -> impl Fn(Request, D) -> BoxFuture<'a, AnyStdResult>> + 'a + Send + Clone where - D: Fn(Request) -> F + 'a + Send + Sync + Clone, - F: Future, Box>> - + 'a - + Send, + D: Fn(Request) -> F + 'a + Send + Sync + Clone, + F: Future>> + 'a + Send, { let signature_verifier: Arc = Arc::new( SlackEventSignatureVerifier::new(&config.events_signing_secret), @@ -45,7 +36,7 @@ impl SlackClientEventsHyperListener< let error_handler = self.environment.error_handler.clone(); let user_state_storage = self.environment.user_state.clone(); - move |req: Request, chain: D| { + move |req: Request, chain: D| { let cfg = config.clone(); let sign_verifier = signature_verifier.clone(); let sc = client.clone(); @@ -76,9 +67,9 @@ impl SlackClientEventsHyperListener< ) .await { - Ok(_) => { - Ok(Response::new(Body::from(url_ver.challenge))) - } + Ok(_) => Ok(Response::new( + Full::new(url_ver.challenge.into()).boxed(), + )), Err(err) => { let status_code = thread_error_handler( err, @@ -87,7 +78,7 @@ impl SlackClientEventsHyperListener< ); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } } @@ -101,7 +92,7 @@ impl SlackClientEventsHyperListener< ) .await { - Ok(_) => Ok(Response::new(Body::empty())), + Ok(_) => Ok(Response::new(Empty::new().boxed())), Err(err) => { let status_code = thread_error_handler( err, @@ -110,7 +101,7 @@ impl SlackClientEventsHyperListener< ); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } } @@ -123,7 +114,7 @@ impl SlackClientEventsHyperListener< ); Response::builder() .status(status_code) - .body(Body::empty()) + .body(Empty::new().boxed()) .map_err(|e| e.into()) } }, diff --git a/src/hyper_tokio/mod.rs b/src/hyper_tokio/mod.rs index 3310ef14..9e1c080f 100644 --- a/src/hyper_tokio/mod.rs +++ b/src/hyper_tokio/mod.rs @@ -7,6 +7,7 @@ pub use crate::hyper_tokio::connector::SlackClientHyperConnector; pub use crate::hyper_tokio::connector::SlackClientHyperHttpsConnector; use crate::SlackClient; +use std::convert::Infallible; use crate::*; @@ -29,4 +30,7 @@ pub type SlackHyperClient = SlackClient; pub type SlackHyperListenerEnvironment = SlackClientEventsListenerEnvironment; -pub type SlackHyperHttpsConnector = hyper_rustls::HttpsConnector; +pub type SlackHyperHttpsConnector = + hyper_rustls::HttpsConnector; + +pub(crate) type Body = http_body_util::combinators::BoxBody; diff --git a/src/hyper_tokio/socket_mode/mod.rs b/src/hyper_tokio/socket_mode/mod.rs index 82dae6c4..f6543104 100644 --- a/src/hyper_tokio/socket_mode/mod.rs +++ b/src/hyper_tokio/socket_mode/mod.rs @@ -2,7 +2,7 @@ use crate::clients_manager::{SlackSocketModeClientsManager, SlackSocketModeClien use crate::hyper_tokio::connector::SlackClientHyperConnector; use crate::hyper_tokio::socket_mode::tokio_clients_manager::SlackSocketModeTokioClientsManager; use crate::listener::SlackClientEventsListenerEnvironment; -use hyper::client::connect::Connect; +use hyper_util::client::legacy::connect::Connect; use std::sync::Arc; mod tokio_clients_manager; diff --git a/src/hyper_tokio/socket_mode/tokio_clients_manager.rs b/src/hyper_tokio/socket_mode/tokio_clients_manager.rs index 46293afd..e5126036 100644 --- a/src/hyper_tokio/socket_mode/tokio_clients_manager.rs +++ b/src/hyper_tokio/socket_mode/tokio_clients_manager.rs @@ -1,6 +1,6 @@ use crate::*; use async_trait::async_trait; -use hyper::client::connect::Connect; +use hyper_util::client::legacy::connect::Connect; use std::sync::Arc; use crate::hyper_tokio::socket_mode::tungstenite_wss_client::SlackTungsteniteWssClient; diff --git a/src/listener.rs b/src/listener.rs index 5440c878..6459ba44 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -1,5 +1,5 @@ use crate::models::*; -use crate::{ClientResult, SlackClient, SlackClientHttpConnector}; +use crate::{BoxError, ClientResult, SlackClient, SlackClientHttpConnector}; use futures::executor::block_on; use futures::FutureExt; use rsb_derive::Builder; @@ -45,7 +45,7 @@ where } fn empty_error_handler( - err: Box, + err: BoxError, _client: Arc>, _user_state_storage: SlackClientEventsUserState, ) -> http::StatusCode { @@ -96,11 +96,8 @@ impl SlackClientEventsUserStateStorage { pub type BoxedErrorHandler = Box>; -pub type ErrorHandler = fn( - Box, - Arc>, - SlackClientEventsUserState, -) -> HttpStatusCode; +pub type ErrorHandler = + fn(BoxError, Arc>, SlackClientEventsUserState) -> HttpStatusCode; #[derive(Debug, PartialEq, Eq, Clone, Builder)] pub struct SlackCommandEventsListenerConfig { diff --git a/src/socket_mode/callbacks.rs b/src/socket_mode/callbacks.rs index 492c53c0..c79712fb 100644 --- a/src/socket_mode/callbacks.rs +++ b/src/socket_mode/callbacks.rs @@ -3,7 +3,7 @@ use crate::events::*; use crate::listener::{SlackClientEventsUserState, UserCallbackFunction}; use crate::models::events::{SlackCommandEvent, SlackCommandEventResponse}; use crate::models::socket_mode::SlackSocketModeHelloEvent; -use crate::{SlackClient, SlackClientHttpConnector, UserCallbackResult}; +use crate::{AnyStdResult, SlackClient, SlackClientHttpConnector, UserCallbackResult}; use futures::future::BoxFuture; use std::future::Future; use std::sync::Arc; @@ -115,7 +115,7 @@ where event: SlackCommandEvent, _client: Arc>, _states: SlackClientEventsUserState, - ) -> Result> { + ) -> AnyStdResult { warn!("No callback is specified for a command event: {:?}", event); Err(Box::new(SlackClientError::SystemError( SlackClientSystemError::new() diff --git a/src/socket_mode/clients_manager_listener.rs b/src/socket_mode/clients_manager_listener.rs index 4c552d2c..2487d125 100644 --- a/src/socket_mode/clients_manager_listener.rs +++ b/src/socket_mode/clients_manager_listener.rs @@ -17,7 +17,7 @@ pub trait SlackSocketModeClientListener { message_body: String, ) -> Option; - async fn on_error(&self, error: Box); + async fn on_error(&self, error: BoxError); async fn on_disconnect(&self, client_id: &SlackSocketModeWssClientId); } @@ -217,7 +217,7 @@ where } } - async fn on_error(&self, error: Box) { + async fn on_error(&self, error: BoxError) { self.listener_environment.error_handler.clone()( error, self.listener_environment.client.clone(),