Skip to content

Commit

Permalink
Add proxy support
Browse files Browse the repository at this point in the history
Signed-off-by: Raymond Zhao <[email protected]>
  • Loading branch information
rayz committed Feb 24, 2025
1 parent fc9101d commit 734a0fc
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 13 deletions.
260 changes: 252 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ ahash = { version = "0.8", default-features = false, features = ["std", "runtime
async-compression = { version = "0.4.13", default-features = false }
bitmask-enum = { version = "2.2", default-features = false }
figment = { version = "0.10", default-features = false }
headers = { version = "0.4.0" }
http = { version = "1", default-features = false }
http-body = { version = "1", default-features = false }
http-body-util = { version = "0.1.0", default-features = false }
hyper = { version = "1", default-features = false }
hyper-proxy2 = { version = "0.1.0" }
hyper-rustls = { version = "0.27", default-features = false, features = ["aws-lc-rs", "http2", "rustls-native-certs"] }
hyper-util = { version = "0.1.10", default-features = false }
indexmap = { version = "2", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions lib/saluki-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ ddsketch-agent = { workspace = true }
float-cmp = { workspace = true, features = ["ratio"] }
futures = { workspace = true }
hashbrown = { workspace = true }
headers = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["client"] }
hyper-proxy2 = { workspace = true }
hyper-rustls = { workspace = true }
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "tokio"] }
indexmap = { workspace = true, features = ["std"] }
Expand Down
11 changes: 10 additions & 1 deletion lib/saluki-components/src/destinations/datadog/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use serde::Deserialize;

use super::{endpoints::EndpointConfiguration, retry::RetryConfiguration};
use super::{endpoints::EndpointConfiguration, proxy::ProxyConfiguration, retry::RetryConfiguration};

const fn default_endpoint_concurrency() -> usize {
1
Expand Down Expand Up @@ -48,6 +48,10 @@ pub struct ForwarderConfiguration {
/// Retry configuration.
#[serde(flatten)]
retry: RetryConfiguration,

/// Proxy configuration.
#[serde(flatten)]
proxy: Option<ProxyConfiguration>,
}

impl ForwarderConfiguration {
Expand Down Expand Up @@ -75,4 +79,9 @@ impl ForwarderConfiguration {
pub fn retry(&self) -> &RetryConfiguration {
&self.retry
}

/// Returns a reference to the proxy configuration.
pub fn proxy(&self) -> &Option<ProxyConfiguration> {
&self.proxy
}
}
9 changes: 6 additions & 3 deletions lib/saluki-components/src/destinations/datadog/common/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@ where
F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
{
let endpoints = config.endpoint().build_resolved_endpoints(maybe_refreshable_config)?;
let client = HttpClient::builder()
let mut client_builder = HttpClient::builder()
.with_request_timeout(config.request_timeout())
.with_retry_policy(config.retry().to_default_http_retry_policy())
.with_bytes_sent_counter(telemetry.bytes_sent().clone())
.with_endpoint_telemetry(metrics_builder, Some(endpoint_name))
.build()?;
.with_endpoint_telemetry(metrics_builder, Some(endpoint_name));
if let Some(proxy) = config.proxy() {
client_builder = client_builder.with_proxies(proxy.build()?);
}
let client = client_builder.build()?;

Ok(Self {
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod config;
pub mod endpoints;
pub mod io;
pub mod middleware;
mod proxy;
mod retry;
pub mod telemetry;
44 changes: 44 additions & 0 deletions lib/saluki-components/src/destinations/datadog/common/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use headers::Authorization;
use http::Uri;
use hyper_proxy2::{Intercept, Proxy};
use saluki_error::GenericError;
use serde::Deserialize;
use url::Url;

#[derive(Clone, Deserialize)]
pub struct ProxyConfiguration {
/// The proxy server for HTTP requests.
#[serde(rename = "proxy_http")]
http_server: Option<String>,

/// The proxy server for HTTPS requests.
#[serde(rename = "proxy_https")]
https_server: Option<String>,
}

impl ProxyConfiguration {
/// Creates the list of proxies.
pub fn build(&self) -> Result<Vec<Proxy>, GenericError> {
let mut proxies = Vec::new();
if let Some(url) = &self.http_server {
proxies.push(self.new_proxy(url, Intercept::Http)?);
}
if let Some(url) = &self.https_server {
proxies.push(self.new_proxy(url, Intercept::Https)?);
}
Ok(proxies)
}

fn new_proxy(&self, url: &str, _intercept: Intercept) -> Result<Proxy, GenericError> {
let url = Url::parse(url)?;
let username = url.username();
let password = url.password().unwrap_or("");
let proxy_uri: Uri = url.as_str().parse()?;
let mut proxy = hyper_proxy2::Proxy::new(Intercept::All, proxy_uri);
if !username.is_empty() {
let auth = Authorization::basic(username, password);
proxy.set_authorization(auth);
}
Ok(proxy)
}
}
2 changes: 2 additions & 0 deletions lib/saluki-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ chrono = { workspace = true }
datadog-protos = { workspace = true }
ddsketch-agent = { workspace = true }
futures = { workspace = true }
headers = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true, features = ["client", "server"] }
hyper-proxy2 = { workspace = true }
hyper-rustls = { workspace = true }
hyper-util = { workspace = true, features = ["client", "client-legacy", "http1", "http2", "server", "tokio"] }
indexmap = { workspace = true, features = ["std"] }
Expand Down
21 changes: 20 additions & 1 deletion lib/saluki-io/src/net/client/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{future::Future, pin::Pin, task::Poll, time::Duration};

use http::{Request, Response, Uri};
use hyper::body::{Body, Incoming};
use hyper_proxy2::Proxy;
use hyper_util::{
client::legacy::{connect::capture_connection, Builder},
rt::{TokioExecutor, TokioTimer},
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct HttpClientBuilder<P = NoopRetryPolicy> {
retry_policy: P,
request_timeout: Option<Duration>,
endpoint_telemetry: Option<EndpointTelemetryLayer>,
proxies: Option<Vec<Proxy>>,
}

impl<P> HttpClientBuilder<P> {
Expand Down Expand Up @@ -183,9 +185,18 @@ impl<P> HttpClientBuilder<P> {
request_timeout: self.request_timeout,
retry_policy,
endpoint_telemetry: self.endpoint_telemetry,
proxies: self.proxies,
}
}

/// Sets the proxies to be used for outgoing requests.
///
/// Defaults to no proxies. (i.e requests will be sent directly without using a proxy).
pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
self.proxies = Some(proxies);
self
}

/// Enables per-endpoint telemetry for HTTP transactions.
///
/// See [`EndpointTelemetryLayer`] for more information.
Expand Down Expand Up @@ -252,7 +263,14 @@ impl<P> HttpClientBuilder<P> {
{
let tls_config = self.tls_builder.build()?;
let connector = self.connector_builder.build(tls_config);
let client = self.hyper_builder.build(connector);
let mut proxy_connector = hyper_proxy2::ProxyConnector::new(connector)?;
if let Some(proxies) = &self.proxies {
for proxy in proxies {
let pc = proxy.to_owned();
proxy_connector.add_proxy(pc);
}
}
let client = self.hyper_builder.build(proxy_connector);

let inner = ServiceBuilder::new()
.retry(self.retry_policy)
Expand Down Expand Up @@ -280,6 +298,7 @@ impl Default for HttpClientBuilder {
request_timeout: Some(Duration::from_secs(20)),
retry_policy: NoopRetryPolicy,
endpoint_telemetry: None,
proxies: None,
}
}
}

0 comments on commit 734a0fc

Please sign in to comment.