From 8e40cf47885fff4831738db7b73cea93a9d324ff Mon Sep 17 00:00:00 2001 From: Florian Lemaitre Date: Wed, 6 Nov 2024 23:52:15 +0100 Subject: [PATCH] [WIP] Rust server --- .vscode/settings.json | 4 + packages/rust/armonik/Cargo.lock | 2 + packages/rust/armonik/Cargo.toml | 29 +++- packages/rust/armonik/build.rs | 95 ++++++------- packages/rust/armonik/examples/client.rs | 20 +++ packages/rust/armonik/examples/server.rs | 128 ++++++++++++++++++ packages/rust/armonik/src/lib.rs | 4 + .../rust/armonik/src/server/applications.rs | 30 ++++ packages/rust/armonik/src/server/auth.rs | 29 ++++ packages/rust/armonik/src/server/events.rs | 46 +++++++ packages/rust/armonik/src/server/mod.rs | 82 +++++++++++ packages/rust/armonik/src/server/sessions.rs | 65 +++++++++ packages/rust/armonik/src/utils.rs | 6 +- 13 files changed, 486 insertions(+), 54 deletions(-) create mode 100644 packages/rust/armonik/examples/client.rs create mode 100644 packages/rust/armonik/examples/server.rs create mode 100644 packages/rust/armonik/src/server/applications.rs create mode 100644 packages/rust/armonik/src/server/auth.rs create mode 100644 packages/rust/armonik/src/server/events.rs create mode 100644 packages/rust/armonik/src/server/mod.rs create mode 100644 packages/rust/armonik/src/server/sessions.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 15c632896..1e9c5e111 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,5 +16,9 @@ "rust-analyzer.cachePriming.enable": true, "rust-analyzer.linkedProjects": [ "./packages/rust/armonik/Cargo.toml" + ], + "rust-analyzer.cargo.features": [ + "client", + "server" ] } diff --git a/packages/rust/armonik/Cargo.lock b/packages/rust/armonik/Cargo.lock index 5ffcdc1e8..7e837df7f 100644 --- a/packages/rust/armonik/Cargo.lock +++ b/packages/rust/armonik/Cargo.lock @@ -50,6 +50,8 @@ dependencies = [ "serial_test", "snafu", "tokio", + "tokio-stream", + "tokio-util", "tonic", "tonic-build", "tracing", diff --git a/packages/rust/armonik/Cargo.toml b/packages/rust/armonik/Cargo.toml index 4754f4f30..3630b72d8 100644 --- a/packages/rust/armonik/Cargo.toml +++ b/packages/rust/armonik/Cargo.toml @@ -11,6 +11,11 @@ version = "3.21.0-beta-0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["client"] +client = ["dep:rustls", "dep:hyper-rustls", "dep:hyper"] +server = ["tonic/server", "dep:tokio-util", "dep:tokio-stream", "dep:tokio"] + [dependencies] tonic = "0.12" prost = "0.13" @@ -19,9 +24,19 @@ futures = "0.3" async-trait = "0.1" snafu = "0.8" tracing = "0.1" -hyper = { version = "1.5", features = ["client", "http1", "http2"] } -hyper-rustls = { version = "0.27", features = ["http1", "http2"] } -rustls = { version = "0.23", features = ["ring"] } +hyper = { version = "1.5", features = [ + "client", + "http1", + "http2", +], optional = true } +hyper-rustls = { version = "0.27", features = [ + "http1", + "http2", +], optional = true } +rustls = { version = "0.23", features = ["ring"], optional = true } +tokio-util = { version = "0.7", optional = true } +tokio-stream = { version = "0.1", optional = true } +tokio = { version = "1.41", default-features = false, optional = true } [dev-dependencies] eyre = "0.6" @@ -39,3 +54,11 @@ tokio = { version = "1.41", features = [ [build-dependencies] tonic-build = "0.12" + +[[example]] +name = "client" +required-features = ["client"] + +[[example]] +name = "server" +required-features = ["server"] diff --git a/packages/rust/armonik/build.rs b/packages/rust/armonik/build.rs index 801f43a43..fa285ba6a 100644 --- a/packages/rust/armonik/build.rs +++ b/packages/rust/armonik/build.rs @@ -1,49 +1,52 @@ fn main() -> Result<(), Box> { - tonic_build::configure() - .build_client(true) - .build_server(true) - .compile_protos( - &[ - "protos/V1/agent_common.proto", - "protos/V1/agent_service.proto", - "protos/V1/applications_common.proto", - "protos/V1/applications_fields.proto", - "protos/V1/applications_filters.proto", - "protos/V1/applications_service.proto", - "protos/V1/auth_common.proto", - "protos/V1/auth_service.proto", - "protos/V1/events_common.proto", - "protos/V1/events_service.proto", - "protos/V1/filters_common.proto", - "protos/V1/objects.proto", - "protos/V1/partitions_common.proto", - "protos/V1/partitions_fields.proto", - "protos/V1/partitions_filters.proto", - "protos/V1/partitions_service.proto", - "protos/V1/result_status.proto", - "protos/V1/results_common.proto", - "protos/V1/results_fields.proto", - "protos/V1/results_filters.proto", - "protos/V1/results_service.proto", - "protos/V1/session_status.proto", - "protos/V1/sessions_common.proto", - "protos/V1/sessions_fields.proto", - "protos/V1/sessions_filters.proto", - "protos/V1/sessions_service.proto", - "protos/V1/sort_direction.proto", - "protos/V1/submitter_common.proto", - "protos/V1/submitter_service.proto", - "protos/V1/task_status.proto", - "protos/V1/tasks_common.proto", - "protos/V1/tasks_fields.proto", - "protos/V1/tasks_filters.proto", - "protos/V1/tasks_service.proto", - "protos/V1/versions_common.proto", - "protos/V1/versions_service.proto", - "protos/V1/worker_common.proto", - "protos/V1/worker_service.proto", - ], - &["protos/V1"], - )?; + let builder = tonic_build::configure().use_arc_self(true); + + #[cfg(feature = "client")] + let builder = builder.build_client(true); + #[cfg(feature = "server")] + let builder = builder.build_server(true); + builder.compile_protos( + &[ + "protos/V1/agent_common.proto", + "protos/V1/agent_service.proto", + "protos/V1/applications_common.proto", + "protos/V1/applications_fields.proto", + "protos/V1/applications_filters.proto", + "protos/V1/applications_service.proto", + "protos/V1/auth_common.proto", + "protos/V1/auth_service.proto", + "protos/V1/events_common.proto", + "protos/V1/events_service.proto", + "protos/V1/filters_common.proto", + "protos/V1/objects.proto", + "protos/V1/partitions_common.proto", + "protos/V1/partitions_fields.proto", + "protos/V1/partitions_filters.proto", + "protos/V1/partitions_service.proto", + "protos/V1/result_status.proto", + "protos/V1/results_common.proto", + "protos/V1/results_fields.proto", + "protos/V1/results_filters.proto", + "protos/V1/results_service.proto", + "protos/V1/session_status.proto", + "protos/V1/sessions_common.proto", + "protos/V1/sessions_fields.proto", + "protos/V1/sessions_filters.proto", + "protos/V1/sessions_service.proto", + "protos/V1/sort_direction.proto", + "protos/V1/submitter_common.proto", + "protos/V1/submitter_service.proto", + "protos/V1/task_status.proto", + "protos/V1/tasks_common.proto", + "protos/V1/tasks_fields.proto", + "protos/V1/tasks_filters.proto", + "protos/V1/tasks_service.proto", + "protos/V1/versions_common.proto", + "protos/V1/versions_service.proto", + "protos/V1/worker_common.proto", + "protos/V1/worker_service.proto", + ], + &["protos/V1"], + )?; Ok(()) } diff --git a/packages/rust/armonik/examples/client.rs b/packages/rust/armonik/examples/client.rs new file mode 100644 index 000000000..0e4c27d43 --- /dev/null +++ b/packages/rust/armonik/examples/client.rs @@ -0,0 +1,20 @@ +use tracing_subscriber::{prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() -> Result<(), eyre::Report> { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + let client = armonik::Client::new().await?; + + let session = tokio::time::timeout( + tokio::time::Duration::from_secs(1), + client.sessions().create([""], Default::default()), + ) + .await??; + + println!("Created session {session} using partition"); + + Ok(()) +} diff --git a/packages/rust/armonik/examples/server.rs b/packages/rust/armonik/examples/server.rs new file mode 100644 index 000000000..3465b4286 --- /dev/null +++ b/packages/rust/armonik/examples/server.rs @@ -0,0 +1,128 @@ +use std::sync::Arc; + +use tokio_util::sync::CancellationToken; +use tracing_subscriber::{prelude::*, EnvFilter}; + +use armonik::server::SessionsServiceExt; +use armonik::sessions; + +pub struct Server; + +impl armonik::server::SessionsService for Server { + /// Get a sessions list using pagination, filters and sorting. + async fn list( + self: Arc, + _request: sessions::list::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Get a session by its id. + async fn get( + self: Arc, + _request: sessions::get::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Cancel a session by its id. + async fn cancel( + self: Arc, + _request: sessions::cancel::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Create a session + async fn create( + self: Arc, + _request: sessions::create::Request, + cancellation_token: CancellationToken, + ) -> std::result::Result { + tracing::info!("create called"); + if let Some(()) = cancellation_token + .run_until_cancelled(tokio::time::sleep(tokio::time::Duration::from_secs(2))) + .await + { + tracing::info!("create returned"); + Ok(sessions::create::Response { + session_id: String::from("abc"), + }) + } else { + tracing::info!("client cancelled RPC"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + tracing::info!("future still running"); + Err(tonic::Status::aborted("client cancelled RPC")) + } + } + + /// Pause a session by its id. + async fn pause( + self: Arc, + _request: sessions::pause::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Resume a paused session by its id. + async fn resume( + self: Arc, + _request: sessions::resume::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Close a session by its id. + async fn close( + self: Arc, + _request: sessions::close::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Purge a session by its id. Removes Results data. + async fn purge( + self: Arc, + _request: sessions::purge::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Delete a session by its id. Removes metadata from Results, Sessions and Tasks associated to the session. + async fn delete( + self: Arc, + _request: sessions::delete::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } + + /// Stops clients and/or workers from submitting new tasks in the given session. + async fn stop_submission( + self: Arc, + _request: sessions::stop_submission::Request, + _cancellation_token: CancellationToken, + ) -> std::result::Result { + todo!() + } +} + +#[tokio::main] +pub async fn main() -> Result<(), eyre::Report> { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + tonic::transport::Server::builder() + .add_service(Server.sessions_server()) + .serve("127.0.0.1:3456".parse()?) + .await?; + Ok(()) +} diff --git a/packages/rust/armonik/src/lib.rs b/packages/rust/armonik/src/lib.rs index 594140db4..ee698ac77 100644 --- a/packages/rust/armonik/src/lib.rs +++ b/packages/rust/armonik/src/lib.rs @@ -1,9 +1,13 @@ //! Rust bindings for the ArmoniK API pub mod api; +#[cfg(feature = "client")] pub mod client; mod objects; +#[cfg(feature = "server")] +pub mod server; +#[cfg(feature = "client")] pub use client::{Client, ClientConfig}; pub use objects::*; diff --git a/packages/rust/armonik/src/server/applications.rs b/packages/rust/armonik/src/server/applications.rs new file mode 100644 index 000000000..b852bad15 --- /dev/null +++ b/packages/rust/armonik/src/server/applications.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use crate::api::v3; +use crate::applications; + +super::define_trait_methods! { + trait ApplicationsService { + fn applications::list; + } +} + +pub trait ApplicationsServiceExt { + fn applications_server(self) -> v3::applications::applications_server::ApplicationsServer + where + Self: Sized; +} + +impl ApplicationsServiceExt for T { + fn applications_server( + self, + ) -> v3::applications::applications_server::ApplicationsServer { + v3::applications::applications_server::ApplicationsServer::new(self) + } +} + +super::impl_trait_methods! { + impl (v3::applications::applications_server::Applications) for ApplicationsService { + fn list_applications(v3::applications::ListApplicationsRequest) -> v3::applications::ListApplicationsResponse { list } + } +} diff --git a/packages/rust/armonik/src/server/auth.rs b/packages/rust/armonik/src/server/auth.rs new file mode 100644 index 000000000..19cc49fcb --- /dev/null +++ b/packages/rust/armonik/src/server/auth.rs @@ -0,0 +1,29 @@ +use std::sync::Arc; + +use crate::api::v3; +use crate::auth; + +super::define_trait_methods! { + trait AuthService { + /// Get current user + fn auth::current_user; + } +} + +pub trait AuthServiceExt { + fn auth_server(self) -> v3::auth::authentication_server::AuthenticationServer + where + Self: Sized; +} + +impl AuthServiceExt for T { + fn auth_server(self) -> v3::auth::authentication_server::AuthenticationServer { + v3::auth::authentication_server::AuthenticationServer::new(self) + } +} + +super::impl_trait_methods! { + impl (v3::auth::authentication_server::Authentication) for AuthService { + fn get_current_user(v3::auth::GetCurrentUserRequest) -> v3::auth::GetCurrentUserResponse { current_user } + } +} diff --git a/packages/rust/armonik/src/server/events.rs b/packages/rust/armonik/src/server/events.rs new file mode 100644 index 000000000..fb7982d5b --- /dev/null +++ b/packages/rust/armonik/src/server/events.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use futures::StreamExt; + +use crate::api::v3; +use crate::events; + +pub trait EventsService { + fn subscribe( + self: Arc, + request: events::subscribe::Request, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> impl std::future::Future< + Output = Result< + impl tonic::codegen::tokio_stream::Stream< + Item = Result, + > + Send, + tonic::Status, + >, + > + Send; +} + +pub trait EventsServiceExt { + fn events_server(self) -> v3::events::events_server::EventsServer + where + Self: Sized; +} + +impl EventsServiceExt for T { + fn events_server(self) -> v3::events::events_server::EventsServer { + v3::events::events_server::EventsServer::new(self) + } +} + +#[async_trait::async_trait] +impl v3::events::events_server::Events for T { + type GetEventsStream = tokio_stream::wrappers::ReceiverStream< + Result, + >; + async fn get_events( + self: Arc, + request: tonic::Request, + ) -> Result, tonic::Status> { + super::impl_trait_methods!(stream server (self, request) {EventsService::subscribe}) + } +} diff --git a/packages/rust/armonik/src/server/mod.rs b/packages/rust/armonik/src/server/mod.rs new file mode 100644 index 000000000..237c81b9c --- /dev/null +++ b/packages/rust/armonik/src/server/mod.rs @@ -0,0 +1,82 @@ +mod applications; +mod auth; +mod events; +mod sessions; + +pub use applications::{ApplicationsService, ApplicationsServiceExt}; +pub use auth::{AuthService, AuthServiceExt}; +pub use events::{EventsService, EventsServiceExt}; +pub use sessions::{SessionsService, SessionsServiceExt}; + +macro_rules! define_trait_methods { + (trait $name:ident {$($(#[$attr:meta])* fn $service:ident::$method:ident ;)* $(--- $($body:tt)*)?}) => { + pub trait $name { + $( + $(#[$attr])* + fn $method( + self: Arc, + request: $service::$method::Request, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> impl std::future::Future> + Send; + )* + $($($body)*)? + } + }; +} + +macro_rules! impl_trait_methods { + (impl ($name:ty) for $type:ident {$(fn $method:ident($request:ty) -> $response:ty {$inner:ident})* $(--- $($body:tt)*)?}) => { + #[async_trait::async_trait] + impl $name for T { + $( + async fn $method( + self: Arc, + request: tonic::Request<$request>, + ) -> std::result::Result, tonic::Status> { + crate::server::impl_trait_methods!(unary (self, request) {T::$inner}) + } + )* + $($($body)*)? + } + }; + (unary ($self:ident, $request:ident) { $inner:path }) => { + { + let ct = tokio_util::sync::CancellationToken::new(); + let _cancel_guard = ct.clone().drop_guard(); + let fut = tokio::spawn(async move { $inner($self, $request.into_inner().into(), ct).await}); + match fut.await { + Ok(Ok(res)) => Ok(tonic::Response::new(res.into())), + Ok(Err(err)) => Err(err), + Err(err) => Err(tonic::Status::internal(err.to_string())), + } + } + }; + (stream server ($self:ident, $request:ident) { $inner:path }) => { + { + let ct = tokio_util::sync::CancellationToken::new(); + let _cancel_guard = ct.clone().drop_guard(); + let fut = tokio::spawn(async move { $inner($self, $request.into_inner().into(), ct).await }); + match fut.await { + Ok(Ok(stream)) => { + let (tx, rx) = tokio::sync::mpsc::channel(1); + tokio::spawn(async move { + let mut stream = std::pin::pin!(stream); + + while let Some(res) = stream.next().await { + _ = tx.send(res.map(Into::into)).await; + } + }); + + Ok(tonic::Response::new( + tokio_stream::wrappers::ReceiverStream::new(rx), + )) + } + Ok(Err(err)) => Err(err), + Err(err) => Err(tonic::Status::internal(err.to_string())), + } + } + }; +} + +use define_trait_methods; +use impl_trait_methods; diff --git a/packages/rust/armonik/src/server/sessions.rs b/packages/rust/armonik/src/server/sessions.rs new file mode 100644 index 000000000..1baecde01 --- /dev/null +++ b/packages/rust/armonik/src/server/sessions.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use crate::api::v3; +use crate::sessions; + +super::define_trait_methods! { + trait SessionsService { + /// Get a sessions list using pagination, filters and sorting. + fn sessions::list; + + /// Get a session by its id. + fn sessions::get; + + /// Cancel a session by its id. + fn sessions::cancel; + + /// Create a session + fn sessions::create; + + /// Pause a session by its id. + fn sessions::pause; + + /// Resume a paused session by its id. + fn sessions::resume; + + /// Close a session by its id. + fn sessions::close; + + /// Purge a session by its id. Removes Results data. + fn sessions::purge; + + /// Delete a session by its id. Removes metadata from Results, Sessions and Tasks associated to the session. + fn sessions::delete; + + /// Stops clients and/or workers from submitting new tasks in the given session. + fn sessions::stop_submission; + } +} + +pub trait SessionsServiceExt { + fn sessions_server(self) -> v3::sessions::sessions_server::SessionsServer + where + Self: Sized; +} + +impl SessionsServiceExt for T { + fn sessions_server(self) -> v3::sessions::sessions_server::SessionsServer { + v3::sessions::sessions_server::SessionsServer::new(self) + } +} + +super::impl_trait_methods! { + impl (v3::sessions::sessions_server::Sessions) for SessionsService { + fn list_sessions(v3::sessions::ListSessionsRequest) -> v3::sessions::ListSessionsResponse { list } + fn get_session(v3::sessions::GetSessionRequest) -> v3::sessions::GetSessionResponse { get } + fn cancel_session(v3::sessions::CancelSessionRequest) -> v3::sessions::CancelSessionResponse { cancel } + fn create_session(v3::sessions::CreateSessionRequest) -> v3::sessions::CreateSessionReply { create } + fn pause_session(v3::sessions::PauseSessionRequest) -> v3::sessions::PauseSessionResponse { pause } + fn resume_session(v3::sessions::ResumeSessionRequest) -> v3::sessions::ResumeSessionResponse { resume } + fn close_session(v3::sessions::CloseSessionRequest) -> v3::sessions::CloseSessionResponse { close } + fn purge_session(v3::sessions::PurgeSessionRequest) -> v3::sessions::PurgeSessionResponse { purge } + fn delete_session(v3::sessions::DeleteSessionRequest) -> v3::sessions::DeleteSessionResponse { delete } + fn stop_submission(v3::sessions::StopSubmissionRequest) -> v3::sessions::StopSubmissionResponse { stop_submission } + } +} diff --git a/packages/rust/armonik/src/utils.rs b/packages/rust/armonik/src/utils.rs index 3ecaf2ea5..6278988cc 100644 --- a/packages/rust/armonik/src/utils.rs +++ b/packages/rust/armonik/src/utils.rs @@ -69,6 +69,7 @@ pub enum ReadEnvError { #[derive(Debug)] pub(crate) struct InsecureCertVerifier; +#[cfg(feature = "client")] impl rustls::client::danger::ServerCertVerifier for InsecureCertVerifier { fn verify_server_cert( &self, @@ -118,11 +119,6 @@ impl rustls::client::danger::ServerCertVerifier for InsecureCertVerifier { } } -struct Foo {} -struct Bar(Vec); - -impl_vec_wrapper!(Bar(Foo)); - /// Implement all traits and functions to define a wrapper around a [`Vec`] /// /// # Examples