Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
4t145 committed Jan 4, 2024
1 parent c263353 commit 9184393
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 312 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ tardis = { version = "=0.1.0-rc.7" }

# Http
http = { version = "0.2" }
rustls = { version = "0.21.0" }
rustls = { version = "0.22.1" }
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1.2", features = ["server-auto", "tokio", "client-legacy", "client"] }
http-body-util = { version = "0.1" }
hyper-rustls = { version = "0.24" }
hyper-tls = { version = "0.6.0"}
rustls-pemfile = { version = "1" }
tokio-rustls = { version = "0.24", default-features = false }
tokio-rustls = { version = "0.25", default-features = false }

# K8s
kube = { version = "0.85", features = ["runtime", "derive"] }
Expand Down
22 changes: 14 additions & 8 deletions kernel-common/src/inner_model/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Display, str::FromStr};
use std::{fmt::Display, str::FromStr, net::IpAddr};

use super::plugin_filter::SgRouteFilter;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -46,7 +46,7 @@ pub struct SgListener {
/// Name is the name of the Listener. This name MUST be unique within a Gateway.
pub name: String,
/// Ip bound to the Listener. Default is 0.0.0.0
pub ip: Option<String>,
pub ip: Option<IpAddr>,
/// Port is the network port. Multiple listeners may use the same port, subject
/// to the Listener compatibility rules.
pub port: u16,
Expand Down Expand Up @@ -77,17 +77,23 @@ pub enum SgProtocol {
Wss,
}

impl Display for SgProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl SgProtocol {
pub const fn as_str(&self) -> &'static str {
match self {
SgProtocol::Http => write!(f, "http"),
SgProtocol::Https => write!(f, "https"),
SgProtocol::Ws => write!(f, "ws"),
SgProtocol::Wss => write!(f, "wss"),
SgProtocol::Http => "http",
SgProtocol::Https => "https",
SgProtocol::Ws => "ws",
SgProtocol::Wss => "wss",
}
}
}

impl Display for SgProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}

/// GatewayTLSConfig describes a TLS configuration.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SgTlsConfig {
Expand Down
3 changes: 2 additions & 1 deletion kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ urlencoding.workspace = true
async-compression.workspace = true
http-body-util.workspace = true
hyper-util.workspace = true
hyper-tls.workspace = true
kernel-common = { path = "../kernel-common" }
tardis = { workspace = true, features = ["future", "crypto", "tls"] }
http.workspace = true
rustls = { workspace = true, features = ["dangerous_configuration"] }
rustls = { workspace = true }
hyper.workspace = true
hyper-rustls.workspace = true
rustls-pemfile.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/config/config_by_k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ async fn process_http_route_config(mut http_route_objs: Vec<HttpSpaceroute>) ->
},
http_route_obj.spec.inner.parent_refs.as_ref().ok_or_else(|| TardisError::format_error("[SG.Config] HttpRoute [spec.parentRefs] is required", ""))?[0].name
);
let priority=http_route_obj.annotations().get(kernel_common::constants::ANNOTATION_RESOURCE_PRIORITY).and_then(|a| a.parse::<i64>().ok()).unwrap_or(0);
let priority = http_route_obj.annotations().get(kernel_common::constants::ANNOTATION_RESOURCE_PRIORITY).and_then(|a| a.parse::<i64>().ok()).unwrap_or(0);
let http_route_config = SgHttpRoute {
name: get_k8s_obj_unique(&http_route_obj),
gateway_name: rel_gateway_name,
Expand Down
186 changes: 99 additions & 87 deletions kernel/src/functions/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,47 @@ use std::{
time::Duration,
};

use crate::plugins::context::SgRoutePluginContext;
use http::{HeaderMap, HeaderValue, Method, Request, Response, StatusCode};
use hyper::Error;
use crate::plugins::context::{SgRouteFilterRequestAction, SgRoutePluginContext};
use http_body_util::Empty;
use hyper::{body::Incoming, Error};
use hyper::{header::HeaderValue, HeaderMap, Method, Request, Response, StatusCode, Uri};
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::client::legacy::connect::HttpConnector;
use kernel_common::inner_model::gateway::SgProtocol;
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerifier};
use tardis::{
basic::{error::TardisError, result::TardisResult},
log,
tokio::time::timeout,
tokio::{time::timeout, self},
};

const DEFAULT_TIMEOUT_MS: u64 = 5000;
type Client = hyper_util::client::legacy::Client<HttpsConnector<HttpConnector>, ()>;
const DEFAULT_TIMEOUT: Duration = Duration::from_millis(5000);

static DEFAULT_CLIENT: OnceLock<Client<HttpsConnector<HttpConnector>>> = OnceLock::new();
static DEFAULT_CLIENT: OnceLock<Client> = OnceLock::new();

pub fn init() -> TardisResult<&'static Client<HttpsConnector<HttpConnector>>> {
pub fn init() -> TardisResult<&'static Client> {
if DEFAULT_CLIENT.get().is_none() {
let _ = DEFAULT_CLIENT.set(do_init(false)?);
}
Ok(default_client())
}

pub fn get_ignore_validation_clint() -> TardisResult<Client<HttpsConnector<HttpConnector>>> {
pub fn get_ignore_validation_clint() -> TardisResult<Client> {
do_init(true)
}

fn do_init(ignore_validation: bool) -> TardisResult<Client<HttpsConnector<HttpConnector>>> {
fn do_init(ignore_validation: bool) -> TardisResult<Client> {
fn get_tls_config(ignore: bool) -> rustls::ClientConfig {
if ignore {
get_rustls_config_dangerous()
} else {
rustls::ClientConfig::builder().with_safe_defaults().with_native_roots().with_no_client_auth()
}
}

let tokio_rt = hyper_util::rt::TokioExecutor::default();
let https = hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(get_tls_config(ignore_validation)).https_or_http().enable_http1().build();
let tls_client = Client::builder().build(https);
let tls_client = Client::builder(tokio_rt).build(https);

Ok(tls_client)
}
Expand All @@ -54,84 +58,96 @@ pub fn get_rustls_config_dangerous() -> rustls::ClientConfig {

config
}

#[derive(Debug)]
pub struct NoCertificateVerification {}
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
impl ServerCertVerifier for NoCertificateVerification {
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn verify_tls13_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}

fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
Ok(HandshakeSignatureValid::assertion())
}

fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
end_entity: &rustls::pki_types::CertificateDer<'_>,
intermediates: &[rustls::pki_types::CertificateDer<'_>],
server_name: &rustls::pki_types::ServerName<'_>,
ocsp_response: &[u8],
now: rustls::pki_types::UnixTime,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
}

#[inline]
fn default_client() -> &'static Client<HttpsConnector<HttpConnector>> {
fn default_client() -> &'static Client {
DEFAULT_CLIENT.get().expect("DEFAULT_CLIENT not initialized")
}

pub struct RequestConfig {
timeout: Option<Duration>,

pub timeout: Option<Duration>,
}


pub async fn request(
client: &Client<HttpsConnector<HttpConnector>>,
rule_timeout_ms: Option<u64>,
redirect: bool,
mut ctx: SgRoutePluginContext,
) -> TardisResult<SgRoutePluginContext> {
if redirect {
ctx = do_request(client, &ctx.request.get_uri().to_string(), rule_timeout_ms, ctx).await?;
impl SgRoutePluginContext {
pub async fn request(&mut self, client: &Client, mut config: RequestConfig) -> TardisResult<()> {
if SgRouteFilterRequestAction::Redirect == self.get_action() {
self.do_request(client, &config).await?;
}
if let Some(backend) = self.get_chosen_backend_mut() {
let mut base_uri = backend.build_base_uri();
if let Some(prq) = self.request.get_uri().path_and_query() {
base_uri.path_and_query(prq)
}
let uri = base_uri.build()?;
config.timeout = backend.timeout_ms.map(Duration::from_millis).or(config.timeout);
*self.request.get_uri_mut() = uri;
self.do_request(client, &config)
}
Ok(())
}
if let Some(backend) = ctx.get_chose_backend() {
let scheme = backend.protocol.as_ref().unwrap_or(&SgProtocol::Http);
let host = format!("{}{}", backend.name_or_host, backend.namespace.as_ref().map(|n| format!(".{n}")).unwrap_or("".to_string()));
let port = if (backend.port == 0 || backend.port == 80) && scheme == &SgProtocol::Http || (backend.port == 0 || backend.port == 443) && scheme == &SgProtocol::Https {
"".to_string()
} else {
format!(":{}", backend.port)
pub async fn do_request(&mut self, client: &Client, config: &RequestConfig) -> TardisResult<()> {
let ctx = match raw_request(
Some(client),
self.request.get_method().clone(),
self.request.get_uri(),
self.request.take_body(),
self.request.get_headers(),
config.timeout,
)
.await
{
Ok(response) => self.resp(response.status(), response.headers().clone(), response.into_body()),
Err(e) => self.resp_from_error(e),
};
let url = format!("{}://{}{}{}", scheme, host, port, ctx.request.get_uri().path_and_query().map(|p| p.as_str()).unwrap_or(""));
let timeout_ms = if let Some(timeout_ms) = backend.timeout_ms { Some(timeout_ms) } else { rule_timeout_ms };
ctx = do_request(client, &url, timeout_ms, ctx).await?;
ctx.set_chose_backend(backend);
Ok(ctx)
}
Ok(ctx)
}

async fn do_request(client: &Client<HttpsConnector<HttpConnector>>, url: &str, timeout_ms: Option<u64>, mut ctx: SgRoutePluginContext) -> TardisResult<SgRoutePluginContext> {
let ctx = match raw_request(
Some(client),
ctx.request.get_method().clone(),
url,
ctx.request.take_body(),
ctx.request.get_headers(),
timeout_ms,
)
.await
{
Ok(response) => ctx.resp(response.status(), response.headers().clone(), response.into_body()),
Err(e) => ctx.resp_from_error(e),
};
Ok(ctx)
}

pub async fn raw_request(
client: Option<&Client<HttpsConnector<HttpConnector>>>,
pub async fn raw_request<B>(
client: Option<&Client>,
method: Method,
url: &str,
body: Body,
url: &Uri,
body: Incoming,
headers: &HeaderMap<HeaderValue>,
timeout_ms: Option<u64>,
) -> TardisResult<Response<Body>> {
let timeout_ms = timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS);
timeout: Option<Duration>,
) -> TardisResult<Response<Incoming>> {
let timeout_ms = timeout.unwrap_or(DEFAULT_TIMEOUT);
let method_str = method.to_string();
let url_str = url.to_string();

Expand All @@ -152,10 +168,10 @@ pub async fn raw_request(
req = req.uri(url);
let req = req.body(body).map_err(|error| TardisError::internal_error(&format!("[SG.Route] Build request method {method_str} url {url_str} error:{error}"), ""))?;
let req = if let Some(client) = client { client.request(req) } else { init()?.request(req) };
let response = match timeout(Duration::from_millis(timeout_ms), req).await {
let response = match tokio::time::timeout(Duration::from_millis(timeout_ms), req).await {
Ok(response) => response.map_err(|error: Error| TardisError::custom("502", &format!("[SG.Client] Request method {method_str} url {url_str} error: {error}"), "")),
Err(_) => {
Response::builder().status(StatusCode::GATEWAY_TIMEOUT).body(Body::empty()).map_err(|e| TardisError::internal_error(&format!("[SG.Client] timeout error: {e}"), ""))
Response::builder().status(StatusCode::GATEWAY_TIMEOUT).body(Empty::default()).map_err(|e| TardisError::internal_error(&format!("[SG.Client] timeout error: {e}"), ""))
}
}?;
Ok(response)
Expand All @@ -167,15 +183,13 @@ mod tests {
use hyper::body::Body;
use tardis::{basic::result::TardisResult, tokio};

use crate::plugins::context::AvailableBackendInst;
use crate::{
functions::http_client::{init, request},
plugins::context::SgRoutePluginContext,
};
use hyper::{client::HttpConnector, Client};
use crate::plugins::context::{AvailableBackendInst, SgRouteFilterRequestAction};
use crate::{functions::http_client::init, plugins::context::SgRoutePluginContext};
use hyper_rustls::HttpsConnector;
use kernel_common::inner_model::gateway::SgProtocol;

use super::Client;

#[tokio::test]
async fn test_request() -> TardisResult<()> {
let client = init().unwrap();
Expand Down Expand Up @@ -340,19 +354,17 @@ mod tests {

// Because this unit test depends on the external url,
// it may be due to the failure of the external url, so add retry
async fn retry_test_request(
client: &Client<HttpsConnector<HttpConnector>>,
rule_timeout_ms: Option<u64>,
redirect: bool,
mut ctx: SgRoutePluginContext,
) -> TardisResult<SgRoutePluginContext> {
async fn retry_test_request(client: &Client, rule_timeout_ms: Option<u64>, redirect: bool, mut ctx: SgRoutePluginContext) -> TardisResult<SgRoutePluginContext> {
if redirect {
ctx.set_action(SgRouteFilterRequestAction::Redirect);
}
let clone_body = ctx.request.dump_body().await?;
let mut clone_ctx = ctx.clone();
clone_ctx.request.set_body(clone_body);
let mut result = request(client, rule_timeout_ms, redirect, ctx).await?;
if !result.response.get_status_code().is_success() {
result = request(client, rule_timeout_ms, redirect, clone_ctx).await?;
ctx.request(client, rule_timeout_ms).await?;
if !ctx.response.get_status_code().is_success() {
clone_ctx.request(client, rule_timeout_ms).await?;
}
Ok(result)
Ok(clone_ctx)
}
}
Loading

0 comments on commit 9184393

Please sign in to comment.