Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from linkerd:main #196

Merged
merged 5 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ jobs:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- uses: Swatinem/rust-cache@f0deed1e0edfc6a9be95417288c0e1099b1eeec3
- run: just fetch
- name: Run cargo deny check bans licenses sources
uses: EmbarkStudios/cargo-deny-action@0484eedcba649433ebd03d9b7c9c002746bbc4b9
with:
command: check bans licenses sources
- run: cargo deny --all-features check bans licenses sources
- run: just check-fmt
- run: just clippy
- run: just doc
Expand Down
14 changes: 9 additions & 5 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,30 @@ docker-image := docker-repo + ":" + docker-tag

# The architecture name to use for packages. Either 'amd64', 'arm64', or 'arm'.
arch := "amd64"
# The OS name to use for packages. Either 'linux' or 'windows'.
os := "linux"

libc := 'gnu'

# If a `arch` is specified, then we change the default cargo `--target`
# to support cross-compilation. Otherwise, we use `rustup` to find the default.
_target := if arch == 'amd64' {
_target := if os + '-' + arch == "linux-amd64" {
"x86_64-unknown-linux-" + libc
} else if arch == "arm64" {
} else if os + '-' + arch == "linux-arm64" {
"aarch64-unknown-linux-" + libc
} else if arch == "arm" {
} else if os + '-' + arch == "linux-arm" {
"armv7-unknown-linux-" + libc + "eabihf"
} else if os + '-' + arch == "windows-amd64" {
"x86_64-pc-windows-" + libc
} else {
error("unsupported arch=" + arch)
error("unsupported: os=" + os + " arch=" + arch + " libc=" + libc)
}

_cargo := 'just-cargo profile=' + profile + ' target=' + _target + ' toolchain=' + toolchain

_target_dir := "target" / _target / profile
_target_bin := _target_dir / "linkerd2-proxy"
_package_name := "linkerd2-proxy-" + package_version + "-" + arch + if libc == 'musl' { '-static' } else { '' }
_package_name := "linkerd2-proxy-" + package_version + "-" + arch + (if libc == 'musl' { '-static' } else { '' }) + (if os == 'windows' { '.exe' } else { '' })
_package_dir := "target/package" / _package_name
shasum := "shasum -a 256"

Expand Down
9 changes: 8 additions & 1 deletion linkerd/app/integration/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ enum Run {
Http2,
}

pub type Running = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;

fn run(
addr: SocketAddr,
version: Run,
Expand All @@ -235,7 +237,12 @@ fn run(
false
};

let (running_tx, running) = running();
let (running_tx, running) = {
let (tx, rx) = oneshot::channel();
let rx = Box::pin(rx.map(|_| ()));
(tx, rx)
};

let conn = Conn {
addr,
absolute_uris,
Expand Down
13 changes: 1 addition & 12 deletions linkerd/app/integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use futures::{future, FutureExt, TryFuture, TryFutureExt};
pub use http::{HeaderMap, Request, Response, StatusCode};
pub use http_body::Body;
pub use linkerd_app as app;
pub use linkerd_app_core::{drain, Addr};
pub use linkerd_app_core::{drain, Addr, Error};
pub use linkerd_app_test::*;
pub use linkerd_tracing::test::*;
use socket2::Socket;
Expand All @@ -50,8 +50,6 @@ pub use tower::Service;
pub const ENV_TEST_PATIENCE_MS: &str = "RUST_TEST_PATIENCE_MS";
pub const DEFAULT_TEST_PATIENCE: Duration = Duration::from_millis(15);

pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

/// Retry an assertion up to a specified number of times, waiting
/// `RUST_TEST_PATIENCE_MS` between retries.
///
Expand Down Expand Up @@ -219,15 +217,6 @@ impl Shutdown {

pub type ShutdownRx = Pin<Box<dyn Future<Output = ()> + Send>>;

/// A channel used to signal when a Client's related connection is running or closed.
pub fn running() -> (oneshot::Sender<()>, Running) {
let (tx, rx) = oneshot::channel();
let rx = Box::pin(rx.map(|_| ()));
(tx, rx)
}

pub type Running = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;

pub fn s(bytes: &[u8]) -> &str {
::std::str::from_utf8(bytes).unwrap()
}
Expand Down
44 changes: 26 additions & 18 deletions linkerd/app/integration/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ type ResponseFuture =

impl<B> tower::Service<http::Request<B>> for SyncSvc
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Send + 'static,
B: Body,
B::Error: std::fmt::Debug,
{
type Response = http::Response<hyper::Body>;
type Error = String;
Expand All @@ -205,20 +204,29 @@ where
}

fn call(&mut self, req: http::Request<B>) -> Self::Future {
// this is okay to do because the body should always be complete, we
// just can't prove it.
let req = futures::executor::block_on(async move {
let (parts, body) = req.into_parts();
let body = match body.collect().await.map(http_body::Collected::to_bytes) {
Ok(body) => body,
Err(_) => unreachable!("body should not fail"),
};
http::Request::from_parts(parts, body)
});
Box::pin(
self.0
.send_req(req.map(Into::into))
.map_err(|err| err.to_string()),
)
let Self(client) = self;
let req = req.map(Self::collect_body).map(Into::into);
let fut = client.send_req(req).map_err(|err| err.to_string());
Box::pin(fut)
}
}

impl SyncSvc {
/// Collects the given [`Body`], returning a [`Bytes`].
///
/// NB: This blocks the current thread until the provided body has been collected. This is
/// an acceptable practice in test code for the sake of simplicitly, because we will always
/// provide [`SyncSvc`] with bodies that are complete.
fn collect_body<B>(body: B) -> Bytes
where
B: Body,
B::Error: std::fmt::Debug,
{
futures::executor::block_on(async move {
body.collect()
.await
.expect("body should not fail")
.to_bytes()
})
}
}