Skip to content

Commit

Permalink
[WIP] Rust server
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Nov 9, 2024
1 parent 3e97b32 commit 8e40cf4
Show file tree
Hide file tree
Showing 13 changed files with 486 additions and 54 deletions.
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@
"rust-analyzer.cachePriming.enable": true,
"rust-analyzer.linkedProjects": [
"./packages/rust/armonik/Cargo.toml"
],
"rust-analyzer.cargo.features": [
"client",
"server"
]
}
2 changes: 2 additions & 0 deletions packages/rust/armonik/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 26 additions & 3 deletions packages/rust/armonik/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ version = "3.21.0-beta-0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["client"]
client = ["dep:rustls", "dep:hyper-rustls", "dep:hyper"]
server = ["tonic/server", "dep:tokio-util", "dep:tokio-stream", "dep:tokio"]

[dependencies]
tonic = "0.12"
prost = "0.13"
Expand All @@ -19,9 +24,19 @@ futures = "0.3"
async-trait = "0.1"
snafu = "0.8"
tracing = "0.1"
hyper = { version = "1.5", features = ["client", "http1", "http2"] }
hyper-rustls = { version = "0.27", features = ["http1", "http2"] }
rustls = { version = "0.23", features = ["ring"] }
hyper = { version = "1.5", features = [
"client",
"http1",
"http2",
], optional = true }
hyper-rustls = { version = "0.27", features = [
"http1",
"http2",
], optional = true }
rustls = { version = "0.23", features = ["ring"], optional = true }
tokio-util = { version = "0.7", optional = true }
tokio-stream = { version = "0.1", optional = true }
tokio = { version = "1.41", default-features = false, optional = true }

[dev-dependencies]
eyre = "0.6"
Expand All @@ -39,3 +54,11 @@ tokio = { version = "1.41", features = [

[build-dependencies]
tonic-build = "0.12"

[[example]]
name = "client"
required-features = ["client"]

[[example]]
name = "server"
required-features = ["server"]
95 changes: 49 additions & 46 deletions packages/rust/armonik/build.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,52 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_client(true)
.build_server(true)
.compile_protos(
&[
"protos/V1/agent_common.proto",
"protos/V1/agent_service.proto",
"protos/V1/applications_common.proto",
"protos/V1/applications_fields.proto",
"protos/V1/applications_filters.proto",
"protos/V1/applications_service.proto",
"protos/V1/auth_common.proto",
"protos/V1/auth_service.proto",
"protos/V1/events_common.proto",
"protos/V1/events_service.proto",
"protos/V1/filters_common.proto",
"protos/V1/objects.proto",
"protos/V1/partitions_common.proto",
"protos/V1/partitions_fields.proto",
"protos/V1/partitions_filters.proto",
"protos/V1/partitions_service.proto",
"protos/V1/result_status.proto",
"protos/V1/results_common.proto",
"protos/V1/results_fields.proto",
"protos/V1/results_filters.proto",
"protos/V1/results_service.proto",
"protos/V1/session_status.proto",
"protos/V1/sessions_common.proto",
"protos/V1/sessions_fields.proto",
"protos/V1/sessions_filters.proto",
"protos/V1/sessions_service.proto",
"protos/V1/sort_direction.proto",
"protos/V1/submitter_common.proto",
"protos/V1/submitter_service.proto",
"protos/V1/task_status.proto",
"protos/V1/tasks_common.proto",
"protos/V1/tasks_fields.proto",
"protos/V1/tasks_filters.proto",
"protos/V1/tasks_service.proto",
"protos/V1/versions_common.proto",
"protos/V1/versions_service.proto",
"protos/V1/worker_common.proto",
"protos/V1/worker_service.proto",
],
&["protos/V1"],
)?;
let builder = tonic_build::configure().use_arc_self(true);

#[cfg(feature = "client")]
let builder = builder.build_client(true);
#[cfg(feature = "server")]
let builder = builder.build_server(true);
builder.compile_protos(
&[
"protos/V1/agent_common.proto",
"protos/V1/agent_service.proto",
"protos/V1/applications_common.proto",
"protos/V1/applications_fields.proto",
"protos/V1/applications_filters.proto",
"protos/V1/applications_service.proto",
"protos/V1/auth_common.proto",
"protos/V1/auth_service.proto",
"protos/V1/events_common.proto",
"protos/V1/events_service.proto",
"protos/V1/filters_common.proto",
"protos/V1/objects.proto",
"protos/V1/partitions_common.proto",
"protos/V1/partitions_fields.proto",
"protos/V1/partitions_filters.proto",
"protos/V1/partitions_service.proto",
"protos/V1/result_status.proto",
"protos/V1/results_common.proto",
"protos/V1/results_fields.proto",
"protos/V1/results_filters.proto",
"protos/V1/results_service.proto",
"protos/V1/session_status.proto",
"protos/V1/sessions_common.proto",
"protos/V1/sessions_fields.proto",
"protos/V1/sessions_filters.proto",
"protos/V1/sessions_service.proto",
"protos/V1/sort_direction.proto",
"protos/V1/submitter_common.proto",
"protos/V1/submitter_service.proto",
"protos/V1/task_status.proto",
"protos/V1/tasks_common.proto",
"protos/V1/tasks_fields.proto",
"protos/V1/tasks_filters.proto",
"protos/V1/tasks_service.proto",
"protos/V1/versions_common.proto",
"protos/V1/versions_service.proto",
"protos/V1/worker_common.proto",
"protos/V1/worker_service.proto",
],
&["protos/V1"],
)?;
Ok(())
}
20 changes: 20 additions & 0 deletions packages/rust/armonik/examples/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use tracing_subscriber::{prelude::*, EnvFilter};

#[tokio::main]
async fn main() -> Result<(), eyre::Report> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env())
.init();
let client = armonik::Client::new().await?;

let session = tokio::time::timeout(
tokio::time::Duration::from_secs(1),
client.sessions().create([""], Default::default()),
)
.await??;

println!("Created session {session} using partition");

Ok(())
}
128 changes: 128 additions & 0 deletions packages/rust/armonik/examples/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::sync::Arc;

use tokio_util::sync::CancellationToken;
use tracing_subscriber::{prelude::*, EnvFilter};

use armonik::server::SessionsServiceExt;
use armonik::sessions;

pub struct Server;

impl armonik::server::SessionsService for Server {
/// Get a sessions list using pagination, filters and sorting.
async fn list(
self: Arc<Self>,
_request: sessions::list::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::list::Response, tonic::Status> {
todo!()
}

/// Get a session by its id.
async fn get(
self: Arc<Self>,
_request: sessions::get::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::get::Response, tonic::Status> {
todo!()
}

/// Cancel a session by its id.
async fn cancel(
self: Arc<Self>,
_request: sessions::cancel::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::cancel::Response, tonic::Status> {
todo!()
}

/// Create a session
async fn create(
self: Arc<Self>,
_request: sessions::create::Request,
cancellation_token: CancellationToken,
) -> std::result::Result<sessions::create::Response, tonic::Status> {
tracing::info!("create called");
if let Some(()) = cancellation_token
.run_until_cancelled(tokio::time::sleep(tokio::time::Duration::from_secs(2)))
.await
{
tracing::info!("create returned");
Ok(sessions::create::Response {
session_id: String::from("abc"),
})
} else {
tracing::info!("client cancelled RPC");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tracing::info!("future still running");
Err(tonic::Status::aborted("client cancelled RPC"))
}
}

/// Pause a session by its id.
async fn pause(
self: Arc<Self>,
_request: sessions::pause::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::pause::Response, tonic::Status> {
todo!()
}

/// Resume a paused session by its id.
async fn resume(
self: Arc<Self>,
_request: sessions::resume::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::resume::Response, tonic::Status> {
todo!()
}

/// Close a session by its id.
async fn close(
self: Arc<Self>,
_request: sessions::close::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::close::Response, tonic::Status> {
todo!()
}

/// Purge a session by its id. Removes Results data.
async fn purge(
self: Arc<Self>,
_request: sessions::purge::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::purge::Response, tonic::Status> {
todo!()
}

/// Delete a session by its id. Removes metadata from Results, Sessions and Tasks associated to the session.
async fn delete(
self: Arc<Self>,
_request: sessions::delete::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::delete::Response, tonic::Status> {
todo!()
}

/// Stops clients and/or workers from submitting new tasks in the given session.
async fn stop_submission(
self: Arc<Self>,
_request: sessions::stop_submission::Request,
_cancellation_token: CancellationToken,
) -> std::result::Result<sessions::stop_submission::Response, tonic::Status> {
todo!()
}
}

#[tokio::main]
pub async fn main() -> Result<(), eyre::Report> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env())
.init();
tonic::transport::Server::builder()
.add_service(Server.sessions_server())
.serve("127.0.0.1:3456".parse()?)
.await?;
Ok(())
}
4 changes: 4 additions & 0 deletions packages/rust/armonik/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
//! Rust bindings for the ArmoniK API
pub mod api;
#[cfg(feature = "client")]
pub mod client;
mod objects;
#[cfg(feature = "server")]
pub mod server;

#[cfg(feature = "client")]
pub use client::{Client, ClientConfig};
pub use objects::*;

Expand Down
30 changes: 30 additions & 0 deletions packages/rust/armonik/src/server/applications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::sync::Arc;

use crate::api::v3;
use crate::applications;

super::define_trait_methods! {
trait ApplicationsService {
fn applications::list;
}
}

pub trait ApplicationsServiceExt {
fn applications_server(self) -> v3::applications::applications_server::ApplicationsServer<Self>
where
Self: Sized;
}

impl<T: ApplicationsService + Send + Sync + 'static> ApplicationsServiceExt for T {
fn applications_server(
self,
) -> v3::applications::applications_server::ApplicationsServer<Self> {
v3::applications::applications_server::ApplicationsServer::new(self)
}
}

super::impl_trait_methods! {
impl (v3::applications::applications_server::Applications) for ApplicationsService {
fn list_applications(v3::applications::ListApplicationsRequest) -> v3::applications::ListApplicationsResponse { list }
}
}
29 changes: 29 additions & 0 deletions packages/rust/armonik/src/server/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::Arc;

use crate::api::v3;
use crate::auth;

super::define_trait_methods! {
trait AuthService {
/// Get current user
fn auth::current_user;
}
}

pub trait AuthServiceExt {
fn auth_server(self) -> v3::auth::authentication_server::AuthenticationServer<Self>
where
Self: Sized;
}

impl<T: AuthService + Send + Sync + 'static> AuthServiceExt for T {
fn auth_server(self) -> v3::auth::authentication_server::AuthenticationServer<Self> {
v3::auth::authentication_server::AuthenticationServer::new(self)
}
}

super::impl_trait_methods! {
impl (v3::auth::authentication_server::Authentication) for AuthService {
fn get_current_user(v3::auth::GetCurrentUserRequest) -> v3::auth::GetCurrentUserResponse { current_user }
}
}
Loading

0 comments on commit 8e40cf4

Please sign in to comment.