Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from linkerd:main #197

Merged
merged 7 commits into from
Mar 4, 2025
25 changes: 16 additions & 9 deletions linkerd/app/integration/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use http::{Request, Response};
use linkerd_app_core::proxy::http::TracingExecutor;
use parking_lot::Mutex;
use std::io;
Expand All @@ -7,9 +8,10 @@ use tokio_rustls::rustls::{self, ClientConfig};
use tracing::info_span;

type ClientError = hyper::Error;
type Request = http::Request<hyper::Body>;
type Response = http::Response<hyper::Body>;
type Sender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Response, ClientError>>)>;
type Sender = mpsc::UnboundedSender<(
Request<hyper::Body>,
oneshot::Sender<Result<Response<hyper::Body>, ClientError>>,
)>;

#[derive(Clone)]
pub struct TlsConfig {
Expand Down Expand Up @@ -77,6 +79,7 @@ pub fn http2_tls<T: Into<String>>(addr: SocketAddr, auth: T, tls: TlsConfig) ->
pub fn tcp(addr: SocketAddr) -> tcp::TcpClient {
tcp::client(addr)
}

pub struct Client {
addr: SocketAddr,
run: Run,
Expand Down Expand Up @@ -132,11 +135,12 @@ impl Client {
pub fn request(
&self,
builder: http::request::Builder,
) -> impl Future<Output = Result<Response, ClientError>> + Send + Sync + 'static {
) -> impl Future<Output = Result<Response<hyper::Body>, ClientError>> + Send + Sync + 'static
{
self.send_req(builder.body(Bytes::new().into()).unwrap())
}

pub async fn request_body(&self, req: Request) -> Response {
pub async fn request_body(&self, req: Request<hyper::Body>) -> Response<hyper::Body> {
self.send_req(req).await.expect("response")
}

Expand All @@ -155,8 +159,9 @@ impl Client {
#[tracing::instrument(skip(self))]
pub(crate) fn send_req(
&self,
mut req: Request,
) -> impl Future<Output = Result<Response, ClientError>> + Send + Sync + 'static {
mut req: Request<hyper::Body>,
) -> impl Future<Output = Result<Response<hyper::Body>, ClientError>> + Send + Sync + 'static
{
if req.uri().scheme().is_none() {
if self.tls.is_some() {
*req.uri_mut() = format!("https://{}{}", self.authority, req.uri().path())
Expand Down Expand Up @@ -227,8 +232,10 @@ fn run(
version: Run,
tls: Option<TlsConfig>,
) -> (Sender, JoinHandle<()>, Running) {
let (tx, rx) =
mpsc::unbounded_channel::<(Request, oneshot::Sender<Result<Response, ClientError>>)>();
let (tx, rx) = mpsc::unbounded_channel::<(
Request<hyper::Body>,
oneshot::Sender<Result<Response<hyper::Body>, ClientError>>,
)>();

let test_name = thread_name();
let absolute_uris = if let Run::Http1 { absolute_uris } = version {
Expand Down
118 changes: 41 additions & 77 deletions linkerd/app/integration/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::app_core::svc::http::TracingExecutor;
use super::*;
use http::{Request, Response};
use std::{
io,
sync::atomic::{AtomicUsize, Ordering},
Expand All @@ -12,23 +13,35 @@ pub fn new() -> Server {
}

pub fn http1() -> Server {
Server::http1()
Server {
routes: Default::default(),
version: Run::Http1,
tls: None,
}
}

pub fn http1_tls(tls: Arc<ServerConfig>) -> Server {
Server::http1_tls(tls)
Server {
routes: Default::default(),
version: Run::Http1,
tls: Some(tls),
}
}

pub fn http2() -> Server {
Server::http2()
Server {
routes: Default::default(),
version: Run::Http2,
tls: None,
}
}

pub fn http2_tls(tls: Arc<ServerConfig>) -> Server {
Server::http2_tls(tls)
}

pub fn tcp() -> tcp::TcpServer {
tcp::server()
Server {
routes: Default::default(),
version: Run::Http2,
tls: Some(tls),
}
}

pub struct Server {
Expand All @@ -45,9 +58,8 @@ pub struct Listening {
pub(super) http_version: Option<Run>,
}

type Request = http::Request<hyper::Body>;
type Response = http::Response<hyper::Body>;
type RspFuture = Pin<Box<dyn Future<Output = Result<Response, BoxError>> + Send + Sync + 'static>>;
type RspFuture =
Pin<Box<dyn Future<Output = Result<Response<hyper::Body>, Error>> + Send + Sync + 'static>>;

impl Listening {
pub fn connections(&self) -> usize {
Expand Down Expand Up @@ -92,29 +104,6 @@ impl Listening {
}

impl Server {
fn new(run: Run, tls: Option<Arc<ServerConfig>>) -> Self {
Server {
routes: HashMap::new(),
version: run,
tls,
}
}
fn http1() -> Self {
Server::new(Run::Http1, None)
}

fn http1_tls(tls: Arc<ServerConfig>) -> Self {
Server::new(Run::Http1, Some(tls))
}

fn http2() -> Self {
Server::new(Run::Http2, None)
}

fn http2_tls(tls: Arc<ServerConfig>) -> Self {
Server::new(Run::Http2, Some(tls))
}

/// Return a string body as a 200 OK response, with the string as
/// the response body.
pub fn route(mut self, path: &str, resp: &str) -> Self {
Expand All @@ -126,21 +115,21 @@ impl Server {
/// to send back.
pub fn route_fn<F>(self, path: &str, cb: F) -> Self
where
F: Fn(Request) -> Response + Send + Sync + 'static,
F: Fn(Request<hyper::Body>) -> Response<hyper::Body> + Send + Sync + 'static,
{
self.route_async(path, move |req| {
let res = cb(req);
async move { Ok::<_, BoxError>(res) }
async move { Ok::<_, Error>(res) }
})
}

/// Call a closure when the request matches, returning a Future of
/// a response to send back.
pub fn route_async<F, U>(mut self, path: &str, cb: F) -> Self
where
F: Fn(Request) -> U + Send + Sync + 'static,
U: TryFuture<Ok = Response> + Send + Sync + 'static,
U::Error: Into<BoxError> + Send + 'static,
F: Fn(Request<hyper::Body>) -> U + Send + Sync + 'static,
U: TryFuture<Ok = Response<hyper::Body>> + Send + Sync + 'static,
U::Error: Into<Error> + Send + 'static,
{
let func = move |req| Box::pin(cb(req).map_err(Into::into)) as RspFuture;
self.routes.insert(path.into(), Route(Box::new(func)));
Expand All @@ -153,9 +142,9 @@ impl Server {
let resp = resp.clone();
async move {
tokio::time::sleep(latency).await;
Ok::<_, BoxError>(
Ok::<_, Error>(
http::Response::builder()
.status(200)
.status(StatusCode::OK)
.body(hyper::Body::from(resp.clone()))
.unwrap(),
)
Expand Down Expand Up @@ -193,7 +182,7 @@ impl Server {
drain.clone(),
async move {
tracing::info!("support server running");
let mut new_svc = NewSvc(Arc::new(self.routes));
let svc = Svc(Arc::new(self.routes));
if let Some(delay) = delay {
let _ = listening_tx.take().unwrap().send(());
delay.await;
Expand All @@ -213,15 +202,10 @@ impl Server {
.instrument(span.clone())
.await?;
let srv_conn_count = srv_conn_count.clone();
let svc = new_svc.call(());
let svc = svc.clone();
let f = async move {
tracing::trace!("serving...");
let svc = svc.await;
tracing::trace!("service acquired");
srv_conn_count.fetch_add(1, Ordering::Release);
let svc = svc.map_err(|e| {
tracing::error!("support/server new_service error: {}", e)
})?;
let result = match self.version {
Run::Http1 => hyper::server::conn::http1::Builder::new()
.serve_connection(sock, svc)
Expand Down Expand Up @@ -266,15 +250,15 @@ pub(super) enum Run {
Http2,
}

struct Route(Box<dyn Fn(Request) -> RspFuture + Send + Sync>);
struct Route(Box<dyn Fn(Request<hyper::Body>) -> RspFuture + Send + Sync>);

impl Route {
fn string(body: &str) -> Route {
let body = Bytes::from(body.to_string());
Route(Box::new(move |_| {
Box::pin(future::ok(
http::Response::builder()
.status(200)
.status(StatusCode::OK)
.body(hyper::Body::from(body.clone()))
.unwrap(),
))
Expand All @@ -288,13 +272,11 @@ impl std::fmt::Debug for Route {
}
}

type BoxError = Box<dyn std::error::Error + Send + Sync>;

#[derive(Debug)]
#[derive(Clone, Debug)]
struct Svc(Arc<HashMap<String, Route>>);

impl Svc {
fn route(&mut self, req: Request) -> RspFuture {
fn route(&mut self, req: Request<hyper::Body>) -> RspFuture {
match self.0.get(req.uri().path()) {
Some(Route(ref func)) => {
tracing::trace!(path = %req.uri().path(), "found route for path");
Expand All @@ -303,7 +285,7 @@ impl Svc {
None => {
tracing::warn!("server 404: {:?}", req.uri().path());
let res = http::Response::builder()
.status(404)
.status(StatusCode::NOT_FOUND)
.body(Default::default())
.unwrap();
Box::pin(async move { Ok(res) })
Expand All @@ -312,37 +294,20 @@ impl Svc {
}
}

impl tower::Service<Request> for Svc {
type Response = Response;
type Error = BoxError;
impl tower::Service<Request<hyper::Body>> for Svc {
type Response = Response<hyper::Body>;
type Error = Error;
type Future = RspFuture;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request) -> Self::Future {
fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
self.route(req)
}
}

#[derive(Debug)]
struct NewSvc(Arc<HashMap<String, Route>>);

impl Service<()> for NewSvc {
type Response = Svc;
type Error = ::std::io::Error;
type Future = future::Ready<Result<Svc, Self::Error>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: ()) -> Self::Future {
future::ok(Svc(Arc::clone(&self.0)))
}
}

async fn accept_connection(
io: TcpStream,
tls: Option<Arc<ServerConfig>>,
Expand All @@ -357,7 +322,6 @@ async fn accept_connection(
_running: None,
})
}

None => Ok(RunningIo {
io: Box::pin(io),
abs_form: false,
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn nonblocking_identity_detection() {

let msg1 = "custom tcp hello\n";
let msg2 = "custom tcp bye";
let srv = server::tcp()
let srv = crate::tcp::server()
.accept(move |read| {
assert_eq!(read, msg1.as_bytes());
msg2
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn tcp_waits_for_proxies_to_close() {
let msg1 = "custom tcp hello\n";
let msg2 = "custom tcp bye";

let srv = server::tcp()
let srv = crate::tcp::server()
// Trigger a shutdown while TCP stream is busy
.accept_fut(move |mut sock| {
async move {
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl TcpFixture {
const BYE_MSG: &'static str = "custom tcp bye";

async fn server() -> server::Listening {
server::tcp()
crate::tcp::server()
.accept(move |read| {
assert_eq!(read, Self::HELLO_MSG.as_bytes());
TcpFixture::BYE_MSG
Expand Down
Loading