diff --git a/Cargo.toml b/Cargo.toml index 8dd73b40..b915e179 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ rustls = { version = "0.22.1" } hyper = { version = "1", features = ["full"] } hyper-util = { version = "0.1.2", features = ["server-auto", "tokio", "client-legacy", "client"] } http-body-util = { version = "0.1" } -hyper-rustls = { version = "0.24" } +hyper-rustls = { version = "0.25" } hyper-tls = { version = "0.6.0"} rustls-pemfile = { version = "1" } tokio-rustls = { version = "0.25", default-features = false } diff --git a/kernel/src/functions/http_client.rs b/kernel/src/functions/http_client.rs index 9749145f..f68261c6 100644 --- a/kernel/src/functions/http_client.rs +++ b/kernel/src/functions/http_client.rs @@ -1,11 +1,15 @@ use std::{ + fmt::Debug, sync::{Arc, OnceLock}, time::Duration, }; use crate::plugins::context::{SgRouteFilterRequestAction, SgRoutePluginContext}; -use http_body_util::Empty; -use hyper::{body::Incoming, Error}; +use http_body_util::{combinators::BoxBody, Empty}; +use hyper::{ + body::{Body, Incoming}, + Error, +}; use hyper::{header::HeaderValue, HeaderMap, Method, Request, Response, StatusCode, Uri}; use hyper_rustls::{ConfigBuilderExt, HttpsConnector}; use hyper_util::client::legacy::connect::HttpConnector; @@ -14,10 +18,10 @@ use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerifier}; use tardis::{ basic::{error::TardisError, result::TardisResult}, log, - tokio::{time::timeout, self}, + tokio::{self, time::timeout}, }; -type Client = hyper_util::client::legacy::Client, ()>; +type Client = hyper_util::client::legacy::Client, BoxBody>; const DEFAULT_TIMEOUT: Duration = Duration::from_millis(5000); static DEFAULT_CLIENT: OnceLock = OnceLock::new(); @@ -139,22 +143,22 @@ impl SgRoutePluginContext { } } -pub async fn raw_request( +pub async fn raw_request>( client: Option<&Client>, method: Method, url: &Uri, - body: Incoming, + body: B, headers: &HeaderMap, timeout: Option, ) -> TardisResult> { - let timeout_ms = timeout.unwrap_or(DEFAULT_TIMEOUT); + let timeout = timeout.unwrap_or(DEFAULT_TIMEOUT); let method_str = method.to_string(); let url_str = url.to_string(); if log::level_enabled!(log::Level::TRACE) { - log::trace!("[SG.Client] Request method {method_str} url {url_str} header {headers:?} {body:?}, timeout {timeout_ms} ms",); + log::trace!("[SG.Client] Request method {method_str} url {url_str} header {headers:?} {body:?}, timeout {timeout:?} ms",); } else if log::level_enabled!(log::Level::DEBUG) { - log::debug!("[SG.Client] Request method {method_str} url {url_str} header {headers:?}, timeout {timeout_ms} ms",); + log::debug!("[SG.Client] Request method {method_str} url {url_str} header {headers:?}, timeout {timeout:?} ms",); } let mut req = Request::builder(); @@ -167,8 +171,12 @@ pub async fn raw_request( } req = req.uri(url); let req = req.body(body).map_err(|error| TardisError::internal_error(&format!("[SG.Route] Build request method {method_str} url {url_str} error:{error}"), ""))?; - let req = if let Some(client) = client { client.request(req) } else { init()?.request(req) }; - let response = match tokio::time::timeout(Duration::from_millis(timeout_ms), req).await { + let req = if let Some(client) = client { + client.request(req) + } else { + init()?.request(req) + }; + let response = match tokio::time::timeout(timeout, req).await { Ok(response) => response.map_err(|error: Error| TardisError::custom("502", &format!("[SG.Client] Request method {method_str} url {url_str} error: {error}"), "")), Err(_) => { Response::builder().status(StatusCode::GATEWAY_TIMEOUT).body(Empty::default()).map_err(|e| TardisError::internal_error(&format!("[SG.Client] timeout error: {e}"), "")) diff --git a/kernel/src/functions/http_route.rs b/kernel/src/functions/http_route.rs index 96942e80..586e177e 100644 --- a/kernel/src/functions/http_route.rs +++ b/kernel/src/functions/http_route.rs @@ -10,8 +10,7 @@ use crate::{ filters::{self, BoxSgPluginFilter, SgPluginFilterInitDto}, }, }; -use http::{header::UPGRADE, HeaderValue, Request, Response}; -use hyper::{body::Incoming, StatusCode}; +use hyper::{body::Incoming, StatusCode, Request, Response, header::{HeaderValue, UPGRADE}}; use crate::plugins::context::AvailableBackendInst; use itertools::Itertools; diff --git a/kernel/src/helpers/url_helper.rs b/kernel/src/helpers/url_helper.rs index 98408f12..5b00a465 100644 --- a/kernel/src/helpers/url_helper.rs +++ b/kernel/src/helpers/url_helper.rs @@ -1,6 +1,6 @@ use std::str::FromStr; -use http::Uri; +use hyper::Uri; use tardis::{ basic::{error::TardisError, result::TardisResult}, url::Url, diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 050c3e8b..99c32204 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -40,7 +40,7 @@ pub mod functions; pub mod helpers; pub mod instance; pub mod plugins; - +mod utils; #[inline] pub async fn startup_k8s(namespace: Option) -> TardisResult<()> { k8s_client::inst( diff --git a/kernel/src/plugins/context.rs b/kernel/src/plugins/context.rs index 3b1811d5..9adf64f7 100644 --- a/kernel/src/plugins/context.rs +++ b/kernel/src/plugins/context.rs @@ -1,14 +1,16 @@ -use http::uri::Builder; -use http::{HeaderMap, HeaderName, HeaderValue, Method, Response, StatusCode, Uri, Version}; -use http_body_util::{BodyExt, Collected}; -use hyper::body::{Body, Incoming}; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Collected, Empty, Full, BodyStream}; +use hyper::body::{Body, Bytes, Incoming}; +use hyper::header::{HeaderName, HeaderValue}; +use hyper::{HeaderMap, Method, Response, StatusCode, Uri, Version, http}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::convert::Infallible; use std::net::SocketAddr; use std::ops::{Deref, DerefMut}; +use std::time::Duration; use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; - use tardis::TardisFuns; use kernel_common::inner_model::gateway::SgProtocol; @@ -60,7 +62,7 @@ impl AvailableBackendInst { } } - pub fn build_base_uri(&self) -> Builder { + pub fn build_base_uri(&self) -> hyper::http::uri::Builder { let scheme = self.protocol.as_ref().unwrap_or(&SgProtocol::Http); let port = if (self.port == 0 || self.port == 80) && scheme == &SgProtocol::Http || (self.port == 0 || self.port == 443) && scheme == &SgProtocol::Https { None @@ -163,7 +165,7 @@ pub struct SgCtxRequest { pub version: MaybeModified, pub headers: MaybeModified>, pub remote_addr: SocketAddr, - pub body: Incoming, + pub body: BoxBody, } impl SgCtxRequest { @@ -267,16 +269,22 @@ impl SgCtxRequest { &self.remote_addr } - pub fn take_body(&mut self) -> Incoming { - std::mem::take(&mut self.body) + pub fn take_body(&mut self) -> BoxBody { + self.replace_body(Empty::default()) } - pub fn replace_body(&mut self, body: impl Into) -> Incoming { - std::mem::replace(&mut self.body, body.into()) + pub fn replace_body(&mut self, body: B) -> BoxBody + where + B: Body, + { + std::mem::replace(&mut self.body, body.boxed()) } #[inline] - pub fn set_body(&mut self, body: impl Into) { + pub fn set_body(&mut self, body: B) + where + B: Body, + { let _ = self.replace_body(body); } @@ -294,7 +302,8 @@ impl SgCtxRequest { /// this method will read all of the body and clone it, and it's body will become an once stream which holds the whole body. pub async fn dump_body(&mut self) -> TardisResult { let bytes = self.take_body_into_bytes().await?; - self.set_body(bytes.clone()); + let body = Full::new(bytes.clone()).map_err(crate::utils::never::); + self.set_body(BoxBody::new(body)); Ok(bytes) } } @@ -401,36 +410,45 @@ impl SgCtxResponse { Ok(()) } - #[inline] - pub fn take_body(&mut self) -> Incoming { - std::mem::take(&mut self.body) + pub fn take_body(&mut self) -> BoxBody { + self.replace_body(Empty::default()) } - #[inline] - pub fn replace_body(&mut self, body: impl Into) -> Incoming { - std::mem::replace(&mut self.body, body.into()) + pub fn take_body_stream(&mut self) -> BodyStream> { + BodyStream::new(self.take_body()) + } + + pub fn replace_body(&mut self, body: B) -> BoxBody + where + B: Body, + { + std::mem::replace(&mut self.body, body.boxed()) } #[inline] - pub fn set_body(&mut self, body: impl Into) { + pub fn set_body(&mut self, body: B) + where + B: Body, + { let _ = self.replace_body(body); } - /// it's a shortcut for [take_body](SgCtxResponse) + [hyper::body::to_bytes] + /// it's a shortcut for [take_body](SgCtxRequest) + [hyper::body::to_bytes] pub async fn take_body_into_bytes(&mut self) -> TardisResult { self.take_body().collect().await.map(Collected::to_bytes).map_err(|e| TardisError::format_error(&format!("[SG.Filter] fail to collect body into bytes: {e}"), "")) } - /// it's a shortcut for [take_body](SgCtxResponse) + [hyper::body::aggregate] + /// it's a shortcut for [`take_body`](SgCtxRequest) + [hyper::body::aggregate] pub async fn take_body_into_buf(&mut self) -> TardisResult { self.take_body().collect().await.map(Collected::aggregate).map_err(|e| TardisError::format_error(&format!("[SG.Filter] fail to aggregate body: {e}"), "")) } /// # Performance - /// This method will read **all** of the body and **clone** it, and it's body will become an once stream which holds the whole body. + /// this method will read all of the body and clone it, and it's body will become an once stream which holds the whole body. pub async fn dump_body(&mut self) -> TardisResult { let bytes = self.take_body_into_bytes().await?; - self.set_body(bytes.clone()); + let body = Full::new(bytes.clone()).map_err(crate::utils::never::); + self.set_body(BoxBody::new(body)); Ok(bytes) } } @@ -653,6 +671,10 @@ impl SgRoutePluginContext { } } + pub fn get_timeout(&self) -> Option { + self.get_timeout_ms().map(Duration::from_millis) + } + pub fn get_rule_matched(&self) -> Option { self.chosen_route_rule.as_ref().and_then(|r| r.matched_match.clone()) } diff --git a/kernel/src/plugins/filters.rs b/kernel/src/plugins/filters.rs index c4a48582..0e8cf6d4 100644 --- a/kernel/src/plugins/filters.rs +++ b/kernel/src/plugins/filters.rs @@ -11,6 +11,7 @@ pub mod status; use async_trait::async_trait; use core::fmt; +use std::future::Future; use serde_json::Value; use std::collections::HashMap; @@ -146,9 +147,9 @@ pub trait SgPluginFilter: Send + Sync + 'static { } } - async fn init(&mut self, init_dto: &SgPluginFilterInitDto) -> TardisResult<()>; + fn init(&mut self, init_dto: &SgPluginFilterInitDto) -> impl Future> + Send; - async fn destroy(&self) -> TardisResult<()>; + fn destroy(&self) -> impl Future> + Send; /// Request Filtering: /// @@ -158,7 +159,7 @@ pub trait SgPluginFilter: Send + Sync + 'static { /// instance. /// - `ctx`: A mutable context object that holds information about the /// request and allows for modifications. - async fn req_filter(&self, id: &str, ctx: SgRoutePluginContext) -> TardisResult<(bool, SgRoutePluginContext)>; + fn req_filter(&self, id: &str, ctx: SgRoutePluginContext) -> impl Future> + Send; /// Response Filtering: /// @@ -168,7 +169,7 @@ pub trait SgPluginFilter: Send + Sync + 'static { /// instance. /// - `ctx`: A mutable context object that holds information about the /// request and allows for modifications. - async fn resp_filter(&self, id: &str, ctx: SgRoutePluginContext) -> TardisResult<(bool, SgRoutePluginContext)>; + fn resp_filter(&self, id: &str, ctx: SgRoutePluginContext) -> impl Future> + Send; fn boxed(self) -> BoxSgPluginFilter where @@ -178,7 +179,7 @@ pub trait SgPluginFilter: Send + Sync + 'static { } } -pub fn http_common_modify_path(uri: &http::Uri, modify_path: &Option, matched_match_inst: Option<&SgHttpRouteMatchInst>) -> TardisResult> { +pub fn http_common_modify_path(uri: &hyper::Uri, modify_path: &Option, matched_match_inst: Option<&SgHttpRouteMatchInst>) -> TardisResult> { if let Some(modify_path) = &modify_path { let mut uri = Url::parse(&uri.to_string())?; match modify_path.kind { diff --git a/kernel/src/plugins/filters/compression.rs b/kernel/src/plugins/filters/compression.rs index 91962792..fac6f527 100644 --- a/kernel/src/plugins/filters/compression.rs +++ b/kernel/src/plugins/filters/compression.rs @@ -2,9 +2,13 @@ use std::{cmp::Ordering, pin::Pin}; use crate::def_filter; use async_compression::tokio::bufread::{BrotliDecoder, BrotliEncoder, DeflateDecoder, DeflateEncoder, GzipDecoder, GzipEncoder}; -use http::{header, HeaderValue}; -use hyper::body::Body; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, BodyStream, StreamBody}; +use hyper::body::{Body, Frame}; +use hyper::http::{header, HeaderValue}; use serde::{Deserialize, Serialize}; +use tardis::basic::error::TardisError; +use tardis::futures_util::{Stream, StreamExt}; use tardis::{basic::result::TardisResult, futures_util::TryStreamExt, tokio::io::BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; @@ -66,7 +70,6 @@ impl CompressionType { } } - impl SgPluginFilter for SgFilterCompression { fn accept(&self) -> super::SgPluginFilterAccept { super::SgPluginFilterAccept { @@ -97,6 +100,7 @@ impl SgPluginFilter for SgFilterCompression { fn convert_error(err: hyper::Error) -> std::io::Error { std::io::Error::new(std::io::ErrorKind::Other, err) } + let read_stream_mapper = |b: std::io::Result| b.map(Frame::data).map_err(TardisError::from); if desired_response_encoding == resp_encode_type { return Ok((true, ctx)); } @@ -107,10 +111,12 @@ impl SgPluginFilter for SgFilterCompression { CompressionType::Br => ctx.response.set_header(header::CONTENT_ENCODING, CompressionType::Br.into())?, } } - let mut body = ctx.response.take_body(); - body = if let Some(resp_encode_type) = resp_encode_type { + if let Some(resp_encode_type) = resp_encode_type { + let s = StreamExt::map(ctx.response.take_body_stream(), |maybe_frame| { + maybe_frame.map_err(std::io::Error::other).map(|f| f.into_data().unwrap_or_default()) + }); ctx.response.remove_header(header::CONTENT_LENGTH)?; - let bytes_reader = StreamReader::new(body.map_err(convert_error)); + let bytes_reader = StreamReader::new(s); let mut read_stream: Pin> = match resp_encode_type { CompressionType::Gzip => Box::pin(GzipDecoder::new(bytes_reader)), CompressionType::Deflate => Box::pin(DeflateDecoder::new(bytes_reader)), @@ -123,22 +129,22 @@ impl SgPluginFilter for SgFilterCompression { CompressionType::Br => Box::pin(BrotliEncoder::new(BufReader::new(read_stream))), }; } - Body::wrap_stream(ReaderStream::new(read_stream)) + ctx.response.set_body(StreamBody::new(ReaderStream::new(read_stream).map(read_stream_mapper))); } else if let Some(desired_response_encoding) = desired_response_encoding { + let s = StreamExt::map(ctx.response.take_body_stream(), |maybe_frame| { + maybe_frame.map_err(std::io::Error::other).map(|f| f.into_data().unwrap_or_default()) + }); ctx.response.remove_header(header::CONTENT_LENGTH)?; - let bytes_reader = StreamReader::new(body.map_err(convert_error)); + ctx.response.get_headers_mut().insert(hyper::header::TRANSFER_ENCODING, hyper::header::HeaderValue::from_static("chunked")); + let bytes_reader = StreamReader::new(s); match desired_response_encoding { - CompressionType::Gzip => Body::wrap_stream(ReaderStream::new(GzipEncoder::new(bytes_reader))), - CompressionType::Deflate => Body::wrap_stream(ReaderStream::new(DeflateEncoder::new(bytes_reader))), - CompressionType::Br => Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(bytes_reader))), + CompressionType::Gzip => ctx.response.set_body(StreamBody::new(ReaderStream::new(GzipEncoder::new(bytes_reader)).map(read_stream_mapper))), + CompressionType::Deflate => ctx.response.set_body(StreamBody::new(ReaderStream::new(DeflateEncoder::new(bytes_reader)).map(read_stream_mapper))), + CompressionType::Br => ctx.response.set_body(StreamBody::new(ReaderStream::new(BrotliEncoder::new(bytes_reader)).map(read_stream_mapper))), } } else { - body + // ctx.response.take_body() }; - - ctx.response.set_body(body); - // let body = ctx.response.dump_body().await?; - // dbg!(body); Ok((true, ctx)) } } diff --git a/kernel/src/plugins/filters/header_modifier.rs b/kernel/src/plugins/filters/header_modifier.rs index 3ae8a98c..d5bb2f10 100644 --- a/kernel/src/plugins/filters/header_modifier.rs +++ b/kernel/src/plugins/filters/header_modifier.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use http::HeaderName; +use hyper::http::header::HeaderName; use super::{SgPluginFilter, SgPluginFilterAccept, SgPluginFilterInitDto, SgPluginFilterKind, SgRoutePluginContext}; use crate::def_filter; @@ -10,7 +10,6 @@ use tardis::basic::result::TardisResult; def_filter!(SG_FILTER_HEADER_MODIFIER_CODE, SgFilterHeaderModifierDef, SgFilterHeaderModifier); - impl SgPluginFilter for SgFilterHeaderModifier { fn accept(&self) -> SgPluginFilterAccept { SgPluginFilterAccept { diff --git a/kernel/src/plugins/filters/inject.rs b/kernel/src/plugins/filters/inject.rs index 2c1a50bf..6c683227 100644 --- a/kernel/src/plugins/filters/inject.rs +++ b/kernel/src/plugins/filters/inject.rs @@ -1,6 +1,8 @@ +use std::time::Duration; + use crate::def_filter; use async_trait::async_trait; -use http::{HeaderName, Method}; +use hyper::{header::HeaderName, Method, Uri}; use serde::{Deserialize, Serialize}; use tardis::basic::{error::TardisError, result::TardisResult}; @@ -12,9 +14,9 @@ def_filter!("inject", SgFilterInjectDef, SgFilterInject); #[derive(Default, Debug, Serialize, Deserialize, Clone)] pub struct SgFilterInject { - pub req_inject_url: Option, + pub req_inject_url: Option, pub req_timeout_ms: Option, - pub resp_inject_url: Option, + pub resp_inject_url: Option, pub resp_timeout_ms: Option, } @@ -24,7 +26,6 @@ const SG_INJECT_REAL_METHOD: HeaderName = HeaderName::from_static("sg-inject-rea #[allow(clippy::declare_interior_mutable_const)] const SG_INJECT_REAL_URL: HeaderName = HeaderName::from_static("sg-inject-real-url"); - impl SgPluginFilter for SgFilterInject { fn accept(&self) -> super::SgPluginFilterAccept { super::SgPluginFilterAccept { @@ -47,7 +48,15 @@ impl SgPluginFilter for SgFilterInject { let real_url = ctx.request.get_uri().clone(); ctx.request.set_header(SG_INJECT_REAL_METHOD, real_method.as_str())?; ctx.request.set_header(SG_INJECT_REAL_URL, &real_url.to_string())?; - let mut resp = http_client::raw_request(None, Method::PUT, req_inject_url, ctx.request.take_body(), ctx.request.get_headers(), self.req_timeout_ms).await?; + let mut resp = http_client::raw_request( + None, + Method::PUT, + req_inject_url, + ctx.request.take_body(), + ctx.request.get_headers(), + self.req_timeout_ms.map(Duration::from_millis), + ) + .await?; let new_req_headers = resp.headers_mut(); let new_req_method = new_req_headers .get(SG_INJECT_REAL_METHOD) @@ -97,7 +106,7 @@ impl SgPluginFilter for SgFilterInject { resp_inject_url, ctx.response.take_body(), ctx.response.get_headers(), - self.resp_timeout_ms, + self.resp_timeout_ms.map(Duration::from_millis), ) .await?; ctx = ctx.resp(resp.status(), resp.headers().clone(), resp.into_body()); diff --git a/kernel/src/plugins/filters/maintenance.rs b/kernel/src/plugins/filters/maintenance.rs index b27412af..0671c9b5 100644 --- a/kernel/src/plugins/filters/maintenance.rs +++ b/kernel/src/plugins/filters/maintenance.rs @@ -1,12 +1,13 @@ +use crate::def_filter; use async_trait::async_trait; -use http::header; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use hyper::header; use ipnet::IpNet; -use std::net::IpAddr; -use std::ops::Range; - -use crate::def_filter; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use std::net::IpAddr; +use std::ops::Range; use tardis::basic::{error::TardisError, result::TardisResult}; use tardis::chrono::{Local, NaiveTime}; @@ -71,7 +72,6 @@ impl Default for SgFilterMaintenance { } } - impl SgPluginFilter for SgFilterMaintenance { fn accept(&self) -> super::SgPluginFilterAccept { super::SgPluginFilterAccept { @@ -142,12 +142,12 @@ impl SgPluginFilter for SgFilterMaintenance { "## ); - ctx.response.set_body(body); + ctx.response.set_body(Full::new(Bytes::from(body)).map_err(crate::utils::never)); } else if content_type.contains(&"application/json") || accept_type.contains(&"application/json") { let msg = self.msg.clone(); return Err(TardisError::forbidden(&msg, "")); } else { - ctx.response.set_body(format!("

{}

", self.title)); + ctx.response.set_body(Full::new(Bytes::from(format!("

{}

", self.title))).map_err(crate::utils::never)); } Ok((false, ctx)) } else { diff --git a/kernel/src/plugins/filters/redirect.rs b/kernel/src/plugins/filters/redirect.rs index 8264f2d3..30d64765 100644 --- a/kernel/src/plugins/filters/redirect.rs +++ b/kernel/src/plugins/filters/redirect.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use http::StatusCode; +use hyper::StatusCode; use kernel_common::gatewayapi_support_filter::{SgFilterRedirect, SG_FILTER_REDIRECT_CODE}; use tardis::basic::{error::TardisError, result::TardisResult}; diff --git a/kernel/src/plugins/filters/retry.rs b/kernel/src/plugins/filters/retry.rs index 629a0764..042d0d1e 100644 --- a/kernel/src/plugins/filters/retry.rs +++ b/kernel/src/plugins/filters/retry.rs @@ -1,6 +1,10 @@ use std::{sync::Arc, thread}; -use hyper::body::{Bytes, Body}; +use http_body_util::{BodyExt, Full}; +use hyper::{ + body::{Body, Bytes}, + Uri, +}; use itertools::Itertools; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; @@ -61,7 +65,6 @@ pub enum BackOff { Random, } - impl SgPluginFilter for SgFilterRetry { fn accept(&self) -> super::SgPluginFilterAccept { super::SgPluginFilterAccept { @@ -99,13 +102,13 @@ impl SgPluginFilter for SgFilterRetry { rng.gen_range(self.base_interval..self.max_interval) } }; - let time_out = ctx.get_timeout_ms(); + let time_out = ctx.get_timeout(); log::trace!("[SG.Filter.Retry] retry request retry_times:{} next_retry_backoff:{}", retry_count, backoff_interval); match http_client::raw_request( None, ctx.request.get_method().clone(), - &choose_backend_url(&mut ctx), - req_body.clone().map(Body::from).unwrap_or_default(), + &choose_backend_uri(&mut ctx), + Full::new(req_body.clone().unwrap_or_default()).map_err(crate::utils::never), ctx.request.get_headers(), time_out, ) @@ -126,7 +129,7 @@ impl SgPluginFilter for SgFilterRetry { } } -fn choose_backend_url(ctx: &mut SgRoutePluginContext) -> String { +fn choose_backend_uri(ctx: &mut SgRoutePluginContext) -> Uri { let backend_name = ctx.get_chose_backend_name(); let available_backend = ctx.get_available_backend(); if backend_name.is_some() { @@ -138,9 +141,9 @@ fn choose_backend_url(ctx: &mut SgRoutePluginContext) -> String { } else { available_backend.get(0) }; - backend.map(|backend| backend.build_base_uri()).unwrap_or_else(|| "".to_string()) + backend.map(|backend| backend.build_base_uri()).unwrap_or_default() } else { - ctx.request.get_uri().to_string() + ctx.request.get_uri() } } diff --git a/kernel/src/plugins/filters/status.rs b/kernel/src/plugins/filters/status.rs index 40ca632f..e689c60b 100644 --- a/kernel/src/plugins/filters/status.rs +++ b/kernel/src/plugins/filters/status.rs @@ -1,10 +1,11 @@ -use std::net::IpAddr; +use std::net::{IpAddr, SocketAddr}; use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use http::Request; +use hyper::body::Incoming; use hyper::server::conn::{http1, http2}; use hyper::service::service_fn; +use hyper::Request; use hyper_util::rt::{TokioExecutor, TokioIo}; use serde::{Deserialize, Serialize}; @@ -102,7 +103,7 @@ impl SgPluginFilter for SgFilterStatus { } let addr_ip: IpAddr = self.serv_addr.parse().map_err(|e| TardisError::conflict(&format!("[SG.Filter.Status] serv_addr parse error: {e}"), ""))?; - let addr = (addr_ip, self.port).into(); + let addr: SocketAddr = (addr_ip, self.port).into(); let title = Arc::new(Mutex::new(self.title.clone())); let gateway_name = Arc::new(Mutex::new(init_dto.gateway_name.clone())); let cache_key = Arc::new(Mutex::new(get_cache_key(self, &init_dto.gateway_name))); @@ -124,18 +125,22 @@ impl SgPluginFilter for SgFilterStatus { } }; match conn { - Ok((stream, socket)) => { + Ok((stream, _socket)) => { let io = TokioIo::new(stream); + let gateway_name = gateway_name.clone(); + let cache_key = cache_key.clone(); + let title = title.clone(); server.serve_connection( io, - service_fn(move |request: Request<()>| status_plugin::create_status_html(request, gateway_name.clone(), cache_key.clone(), title.clone())), - ) + service_fn(move |_request: Request| status_plugin::create_status_html(gateway_name.clone(), cache_key.clone(), title.clone())), + ); } Err(e) => { log::error!("[SG.Filter.Status] accept error: {e}"); - }, + } } } + return Ok(()); }; let join = tokio::spawn(task); diff --git a/kernel/src/plugins/filters/status/status_plugin.rs b/kernel/src/plugins/filters/status/status_plugin.rs index aba92a99..828b6260 100644 --- a/kernel/src/plugins/filters/status/status_plugin.rs +++ b/kernel/src/plugins/filters/status/status_plugin.rs @@ -1,5 +1,4 @@ -use http::{Request, Response}; -use hyper::body::Body; +use hyper::{body::Body, Response}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -40,7 +39,7 @@ impl Status { } pub(crate) async fn create_status_html( - _: Request<()>, + // _: Request<()>, _gateway_name: Arc>, _cache_key: Arc>, title: Arc>, diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs new file mode 100644 index 00000000..951b4491 --- /dev/null +++ b/kernel/src/utils.rs @@ -0,0 +1,3 @@ +pub fn never(_: A) -> R { + unreachable!("never function been called") +} \ No newline at end of file