From 5a95185a1f200a1f58d82b5052dda4e68eca0eb3 Mon Sep 17 00:00:00 2001 From: Bilal Mahmoud Date: Fri, 8 Nov 2024 13:04:06 +0100 Subject: [PATCH] H-3565: Remove `Role` and use associated type in subsystem traits (#5608) --- libs/@local/graph/api/src/rpc/account.rs | 135 ++++++---- libs/@local/graph/api/src/rpc/auth.rs | 69 +++-- libs/@local/graph/api/src/rpc/echo.rs | 63 +++-- libs/@local/graph/api/src/rpc/mod.rs | 11 - libs/@local/graph/api/src/rpc/session.rs | 2 +- libs/@local/harpc/server/examples/account.rs | 266 +++++++------------ libs/@local/harpc/server/src/delegate.rs | 2 +- libs/@local/harpc/server/src/router.rs | 6 +- libs/@local/harpc/system/src/delegate.rs | 5 +- libs/@local/harpc/system/src/lib.rs | 1 - libs/@local/harpc/system/src/role.rs | 56 ---- 11 files changed, 287 insertions(+), 329 deletions(-) delete mode 100644 libs/@local/harpc/system/src/role.rs diff --git a/libs/@local/graph/api/src/rpc/account.rs b/libs/@local/graph/api/src/rpc/account.rs index 2862a25c088..aaa1cad2183 100644 --- a/libs/@local/graph/api/src/rpc/account.rs +++ b/libs/@local/graph/api/src/rpc/account.rs @@ -1,5 +1,8 @@ use alloc::{borrow::Cow, sync::Arc}; -use core::error::{self, Error}; +use core::{ + error::{self, Error}, + marker::PhantomData, +}; use error_stack::{Report, ResultExt as _}; use graph::store::StorePool; @@ -10,7 +13,7 @@ use harpc_server::{ session::Session, utils::{delegate_call_discrete, parse_procedure_id}, }; -use harpc_system::{delegate::SubsystemDelegate, role::Role}; +use harpc_system::delegate::SubsystemDelegate; use harpc_tower::{body::Body, either::Either, request::Request, response::Response}; use harpc_types::{error_code::ErrorCode, response_kind::ResponseKind}; use hash_graph_authorization::{ @@ -31,7 +34,7 @@ use hash_graph_types::{ }; use hash_temporal_client::TemporalClient; -use super::{role, session::Account}; +use super::session::Account; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct PermissionResponse { @@ -50,43 +53,43 @@ impl Error for AccountNotFoundError { } } +#[must_use] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display, derive_more::Error)] #[display("unable to fullfil account request")] pub struct AccountError; -pub trait AccountSystem -where - R: Role, -{ +pub trait AccountSystem { + type ExecutionScope; + async fn create_account( &self, - session: R::Session, + scope: Self::ExecutionScope, params: InsertAccountIdParams, ) -> Result>; async fn create_account_group( &self, - session: R::Session, + scope: Self::ExecutionScope, params: InsertAccountGroupIdParams, ) -> Result>; async fn check_account_group_permission( &self, - session: R::Session, + scope: Self::ExecutionScope, account_group_id: AccountGroupId, permission: AccountGroupPermission, ) -> Result>; async fn add_account_group_member( &self, - session: R::Session, + scope: Self::ExecutionScope, account_group_id: AccountGroupId, account_id: AccountId, ) -> Result<(), Report>; async fn remove_account_group_member( &self, - session: R::Session, + scope: Self::ExecutionScope, account_group_id: AccountGroupId, account_id: AccountId, ) -> Result<(), Report>; @@ -258,17 +261,19 @@ where } } -impl AccountSystem for AccountServer +impl AccountSystem for AccountServer where S: StorePool + Send + Sync, A: AuthorizationApiPool + Send + Sync, { + type ExecutionScope = Session; + async fn create_account( &self, - session: Session, + scope: Session, params: InsertAccountIdParams, ) -> Result> { - let actor_id = Self::actor(&session)?; + let actor_id = Self::actor(&scope)?; let mut store = self.store().await?; @@ -283,10 +288,10 @@ where async fn create_account_group( &self, - session: Session, + scope: Session, params: InsertAccountGroupIdParams, ) -> Result> { - let actor_id = Self::actor(&session)?; + let actor_id = Self::actor(&scope)?; let mut store = self.store().await?; @@ -319,11 +324,11 @@ where async fn check_account_group_permission( &self, - session: Session, + scope: Session, account_group_id: AccountGroupId, permission: AccountGroupPermission, ) -> Result> { - let actor_id = Self::actor(&session)?; + let actor_id = Self::actor(&scope)?; let auth = self.authorization_api().await?; @@ -351,11 +356,11 @@ where async fn add_account_group_member( &self, - session: Session, + scope: Session, account_group_id: AccountGroupId, account_id: AccountId, ) -> Result<(), Report> { - let actor_id = Self::actor(&session)?; + let actor_id = Self::actor(&scope)?; let mut auth = self.authorization_api().await?; @@ -377,8 +382,8 @@ where if !check.has_permission { return Err(Report::new(Forbidden { - subsystem: session.request_info().subsystem, - procedure: session.request_info().procedure, + subsystem: scope.request_info().subsystem, + procedure: scope.request_info().procedure, reason: Cow::Borrowed("actor does not have permission to add account group member"), }) .change_context(AccountError)); @@ -403,11 +408,11 @@ where async fn remove_account_group_member( &self, - session: Session, + scope: Session, account_group_id: AccountGroupId, account_id: AccountId, ) -> Result<(), Report> { - let actor_id = Self::actor(&session)?; + let actor_id = Self::actor(&scope)?; let mut auth = self.authorization_api().await?; @@ -428,7 +433,7 @@ where .change_context(AccountError)?; if !check.has_permission { - let request_info = session.request_info(); + let request_info = scope.request_info(); return Err(Report::new(Forbidden { subsystem: request_info.subsystem, @@ -465,24 +470,26 @@ pub struct AccountDelegate { } impl AccountDelegate { + #[must_use] pub const fn new(inner: T) -> Self { Self { inner } } } -impl SubsystemDelegate, C> for AccountDelegate +impl SubsystemDelegate for AccountDelegate where T: AccountSystem< - role::Server, create_account(..): Send, create_account_group(..): Send, check_account_group_permission(..): Send, add_account_group_member(..): Send, remove_account_group_member(..): Send, + ExecutionScope: Send, > + Send, C: Encoder + ReportDecoder + Clone + Send, { type Error = Report; + type ExecutionScope = T::ExecutionScope; type Subsystem = meta::AccountSystem; type Body @@ -493,7 +500,7 @@ where async fn call( self, request: Request, - session: Session, + scope: T::ExecutionScope, codec: C, ) -> Result>, Self::Error> where @@ -509,14 +516,14 @@ where match id { meta::AccountProcedureId::CreateAccount => { delegate_call_discrete(request, codec, |params| async move { - self.inner.create_account(session, params).await + self.inner.create_account(scope, params).await }) .await .map(|response| response.map_body(Either::Left)) } meta::AccountProcedureId::CreateAccountGroup => { delegate_call_discrete(request, codec, |params| async move { - self.inner.create_account_group(session, params).await + self.inner.create_account_group(scope, params).await }) .await .map(|response| response.map_body(Either::Left).map_body(Either::Right)) @@ -526,7 +533,7 @@ where codec, |(account_group_id, permission)| async move { self.inner - .check_account_group_permission(session, account_group_id, permission) + .check_account_group_permission(scope, account_group_id, permission) .await }, ) @@ -542,7 +549,7 @@ where codec, |(account_group_id, account_id)| async move { self.inner - .add_account_group_member(session, account_group_id, account_id) + .add_account_group_member(scope, account_group_id, account_id) .await }, ) @@ -559,7 +566,7 @@ where codec, |(account_group_id, account_id)| async move { self.inner - .remove_account_group_member(session, account_group_id, account_id) + .remove_account_group_member(scope, account_group_id, account_id) .await }, ) @@ -576,30 +583,51 @@ where } // TODO: this can be auto generated by the `harpc` crate -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct AccountClient; +#[derive_where::derive_where(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct AccountClient { + _session: PhantomData *const S>, + _codec: PhantomData *const C>, +} + +impl AccountClient { + #[must_use] + pub fn new() -> Self { + Self { + _session: PhantomData, + _codec: PhantomData, + } + } +} -impl AccountSystem> for AccountClient +impl Default for AccountClient { + fn default() -> Self { + Self::new() + } +} + +impl AccountSystem for AccountClient where - Svc: harpc_client::connection::ConnectionService, + S: harpc_client::connection::ConnectionService, C: harpc_client::connection::ConnectionCodec, { + type ExecutionScope = Connection; + async fn create_account( &self, - session: Connection, + scope: Connection, params: InsertAccountIdParams, ) -> Result> { - invoke_call_discrete(session, meta::AccountProcedureId::CreateAccount, [params]) + invoke_call_discrete(scope, meta::AccountProcedureId::CreateAccount, [params]) .await .change_context(AccountError) } async fn create_account_group( &self, - session: Connection, + scope: Connection, params: InsertAccountGroupIdParams, ) -> Result> { - invoke_call_discrete(session, meta::AccountProcedureId::CreateAccountGroup, [ + invoke_call_discrete(scope, meta::AccountProcedureId::CreateAccountGroup, [ params, ]) .await @@ -608,12 +636,12 @@ where async fn check_account_group_permission( &self, - session: Connection, + scope: Connection, account_group_id: AccountGroupId, permission: AccountGroupPermission, ) -> Result> { invoke_call_discrete( - session, + scope, meta::AccountProcedureId::CheckAccountGroupPermission, [(account_group_id, permission)], ) @@ -623,28 +651,27 @@ where async fn add_account_group_member( &self, - session: Connection, + scope: Connection, account_group_id: AccountGroupId, account_id: AccountId, ) -> Result<(), Report> { - invoke_call_discrete(session, meta::AccountProcedureId::AddAccountGroupMember, [ - (account_group_id, account_id), - ]) + invoke_call_discrete(scope, meta::AccountProcedureId::AddAccountGroupMember, [( + account_group_id, + account_id, + )]) .await .change_context(AccountError) } async fn remove_account_group_member( &self, - session: Connection, + scope: Connection, account_group_id: AccountGroupId, account_id: AccountId, ) -> Result<(), Report> { - invoke_call_discrete( - session, - meta::AccountProcedureId::RemoveAccountGroupMember, - [(account_group_id, account_id)], - ) + invoke_call_discrete(scope, meta::AccountProcedureId::RemoveAccountGroupMember, [ + (account_group_id, account_id), + ]) .await .change_context(AccountError) } diff --git a/libs/@local/graph/api/src/rpc/auth.rs b/libs/@local/graph/api/src/rpc/auth.rs index 43db35f60e8..b10ab921341 100644 --- a/libs/@local/graph/api/src/rpc/auth.rs +++ b/libs/@local/graph/api/src/rpc/auth.rs @@ -1,4 +1,4 @@ -use core::fmt::Debug; +use core::{fmt::Debug, marker::PhantomData}; use error_stack::{Report, ResultExt as _}; use harpc_client::{connection::Connection, utils::invoke_call_discrete}; @@ -13,19 +13,19 @@ use harpc_tower::{body::Body, request::Request, response::Response}; use harpc_types::response_kind::ResponseKind; use hash_graph_types::account::AccountId; -use super::{role, session::Account}; +use super::session::Account; +#[must_use] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display, derive_more::Error)] #[display("unable to authenticate user")] pub struct AuthenticationError; -pub trait AuthenticationSystem -where - R: role::Role, -{ +pub trait AuthenticationSystem { + type ExecutionScope; + async fn authenticate( &self, - session: R::Session, + scope: Self::ExecutionScope, actor_id: AccountId, ) -> Result<(), Report>; } @@ -92,13 +92,15 @@ pub mod meta { #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct AuthenticationServer; -impl AuthenticationSystem for AuthenticationServer { +impl AuthenticationSystem for AuthenticationServer { + type ExecutionScope = Session; + async fn authenticate( &self, - session: Session, + scope: Session, actor_id: AccountId, ) -> Result<(), Report> { - session + scope .update(Account { actor_id: Some(actor_id), }) @@ -114,12 +116,20 @@ pub struct AuthenticationDelegate { inner: T, } -impl SubsystemDelegate, C> for AuthenticationDelegate +impl AuthenticationDelegate { + #[must_use] + pub const fn new(inner: T) -> Self { + Self { inner } + } +} + +impl SubsystemDelegate for AuthenticationDelegate where - T: AuthenticationSystem + Send, + T: AuthenticationSystem + Send, C: Encoder + ReportDecoder + Clone + Send, { type Error = Report; + type ExecutionScope = T::ExecutionScope; type Subsystem = meta::AuthenticationSystem; type Body @@ -130,7 +140,7 @@ where async fn call( self, request: Request, - session: Session, + scope: T::ExecutionScope, codec: C, ) -> Result>, Self::Error> where @@ -141,7 +151,7 @@ where match id { meta::AuthenticationProcedureId::Authenticate => { delegate_call_discrete(request, codec, |actor_id| async move { - self.inner.authenticate(session, actor_id).await + self.inner.authenticate(scope, actor_id).await }) .await } @@ -151,19 +161,40 @@ where // TODO: this can be auto generated by the `harpc` crate #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct AuthenticationClient; +pub struct AuthenticationClient { + _service: PhantomData *const S>, + _codec: PhantomData *const C>, +} -impl AuthenticationSystem> for AuthenticationClient +impl AuthenticationClient { + #[must_use] + pub const fn new() -> Self { + Self { + _service: PhantomData, + _codec: PhantomData, + } + } +} + +impl Default for AuthenticationClient { + fn default() -> Self { + Self::new() + } +} + +impl AuthenticationSystem for AuthenticationClient where - Svc: harpc_client::connection::ConnectionService, + S: harpc_client::connection::ConnectionService, C: harpc_client::connection::ConnectionCodec, { + type ExecutionScope = Connection; + async fn authenticate( &self, - session: Connection, + scope: Connection, actor_id: AccountId, ) -> Result<(), Report> { - invoke_call_discrete(session, meta::AuthenticationProcedureId::Authenticate, [ + invoke_call_discrete(scope, meta::AuthenticationProcedureId::Authenticate, [ actor_id, ]) .await diff --git a/libs/@local/graph/api/src/rpc/echo.rs b/libs/@local/graph/api/src/rpc/echo.rs index fa9b93da2d6..0cd4daaa74f 100644 --- a/libs/@local/graph/api/src/rpc/echo.rs +++ b/libs/@local/graph/api/src/rpc/echo.rs @@ -1,3 +1,5 @@ +use core::marker::PhantomData; + use error_stack::{Report, ResultExt as _}; use harpc_client::{connection::Connection, utils::invoke_call_discrete}; use harpc_codec::{decode::ReportDecoder, encode::Encoder}; @@ -6,23 +8,23 @@ use harpc_server::{ session::Session, utils::{delegate_call_discrete, parse_procedure_id}, }; -use harpc_system::{delegate::SubsystemDelegate, role::Role}; +use harpc_system::delegate::SubsystemDelegate; use harpc_tower::{body::Body, request::Request, response::Response}; use harpc_types::response_kind::ResponseKind; -use super::{role, session::Account}; +use super::session::Account; +#[must_use] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display, derive_more::Error)] #[display("unable to fullfil ping request")] pub struct EchoError; -pub trait EchoSystem -where - R: Role, -{ +pub trait EchoSystem { + type ExecutionScope; + async fn echo( &self, - session: R::Session, + scope: Self::ExecutionScope, payload: Box, ) -> Result, Report>; } @@ -88,7 +90,9 @@ pub mod meta { #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct EchoServer; -impl EchoSystem for EchoServer { +impl EchoSystem for EchoServer { + type ExecutionScope = Session; + async fn echo( &self, _: Session, @@ -105,17 +109,19 @@ pub struct EchoDelegate { } impl EchoDelegate { + #[must_use] pub const fn new(inner: T) -> Self { Self { inner } } } -impl SubsystemDelegate, C> for EchoDelegate +impl SubsystemDelegate for EchoDelegate where - T: EchoSystem + Send, + T: EchoSystem + Send, C: Encoder + ReportDecoder + Clone + Send, { type Error = Report; + type ExecutionScope = T::ExecutionScope; type Subsystem = meta::EchoSystem; type Body @@ -126,7 +132,7 @@ where async fn call( self, request: Request, - session: Session, + scope: T::ExecutionScope, codec: C, ) -> Result>, Self::Error> where @@ -137,7 +143,7 @@ where match id { meta::EchoProcedureId::Echo => { delegate_call_discrete(request, codec, |payload| async move { - self.inner.echo(session, payload).await + self.inner.echo(scope, payload).await }) .await } @@ -145,20 +151,41 @@ where } } -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct EchoClient; +#[derive_where::derive_where(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct EchoClient { + _service: PhantomData *const S>, + _codec: PhantomData *const C>, +} -impl EchoSystem> for EchoClient +impl EchoClient { + #[must_use] + pub const fn new() -> Self { + Self { + _service: PhantomData, + _codec: PhantomData, + } + } +} + +impl Default for EchoClient { + fn default() -> Self { + Self::new() + } +} + +impl EchoSystem for EchoClient where - Svc: harpc_client::connection::ConnectionService, + S: harpc_client::connection::ConnectionService, C: harpc_client::connection::ConnectionCodec, { + type ExecutionScope = Connection; + async fn echo( &self, - session: Connection, + scope: Connection, payload: Box, ) -> Result, Report> { - invoke_call_discrete(session, meta::EchoProcedureId::Echo, [payload]) + invoke_call_discrete(scope, meta::EchoProcedureId::Echo, [payload]) .await .change_context(EchoError) } diff --git a/libs/@local/graph/api/src/rpc/mod.rs b/libs/@local/graph/api/src/rpc/mod.rs index 2a8a6e623b3..5fc7cdae78e 100644 --- a/libs/@local/graph/api/src/rpc/mod.rs +++ b/libs/@local/graph/api/src/rpc/mod.rs @@ -6,17 +6,6 @@ mod session; use harpc_system::SubsystemIdentifier; use harpc_types::subsystem::SubsystemId; -mod role { - use harpc_client::connection::Connection; - use harpc_server::session::Session; - pub(crate) use harpc_system::role::Role; - - use super::session::Account; - - pub(crate) type Server = harpc_system::role::Server>; - pub(crate) type Client = harpc_system::role::Client>; -} - #[derive(Debug, Copy, Clone)] pub enum GraphSubsystemId { Echo, diff --git a/libs/@local/graph/api/src/rpc/session.rs b/libs/@local/graph/api/src/rpc/session.rs index 42009199c92..b2837ce06de 100644 --- a/libs/@local/graph/api/src/rpc/session.rs +++ b/libs/@local/graph/api/src/rpc/session.rs @@ -1,6 +1,6 @@ use hash_graph_types::account::AccountId; #[derive(Debug, Clone, Default)] -pub(crate) struct Account { +pub struct Account { pub actor_id: Option, } diff --git a/libs/@local/harpc/server/examples/account.rs b/libs/@local/harpc/server/examples/account.rs index c2a52e4b95d..3926bc3086d 100644 --- a/libs/@local/harpc/server/examples/account.rs +++ b/libs/@local/harpc/server/examples/account.rs @@ -1,6 +1,10 @@ -#![feature(never_type, impl_trait_in_assoc_type, result_flattening)] +#![feature( + never_type, + impl_trait_in_assoc_type, + result_flattening, + return_type_notation +)] #![expect( - clippy::unwrap_used, clippy::print_stdout, clippy::use_debug, unused_variables, @@ -9,41 +13,42 @@ extern crate alloc; -use alloc::vec; -use core::{error::Error, fmt::Debug}; +use core::{fmt::Debug, marker::PhantomData}; use std::time::Instant; -use bytes::Buf; -use error_stack::{FutureExt as _, Report, ResultExt as _}; +use error_stack::{Report, ResultExt as _}; use frunk::HList; -use futures::{Stream, StreamExt as _, TryFutureExt as _, TryStreamExt as _, pin_mut, stream}; -use harpc_client::{Client, ClientConfig, connection::Connection}; -use harpc_codec::{decode::Decoder, encode::Encoder, json::JsonCodec}; -use harpc_server::{Server, ServerConfig, router::RouterBuilder, serve::serve, session::SessionId}; +use harpc_client::{ + Client, ClientConfig, + connection::{Connection, ConnectionCodec, ConnectionService}, + utils::invoke_call_discrete, +}; +use harpc_codec::{decode::ReportDecoder, encode::Encoder, json::JsonCodec}; +use harpc_server::{ + Server, ServerConfig, + error::DelegationError, + router::RouterBuilder, + serve::serve, + utils::{delegate_call_discrete, parse_procedure_id}, +}; use harpc_system::{ Subsystem, SubsystemIdentifier, delegate::SubsystemDelegate, procedure::{Procedure, ProcedureIdentifier}, - role, }; use harpc_tower::{ - Extensions, - body::{Body, BodyExt as _}, + body::Body, layer::{ body_report::HandleBodyReportLayer, boxed::BoxedResponseLayer, report::HandleReportLayer, }, - request::{self, Request}, - response::{Parts, Response}, + request::Request, + response::Response, }; use harpc_types::{ - procedure::{ProcedureDescriptor, ProcedureId}, - response_kind::ResponseKind, - subsystem::SubsystemId, - version::Version, + procedure::ProcedureId, response_kind::ResponseKind, subsystem::SubsystemId, version::Version, }; use hash_graph_types::account::AccountId; use multiaddr::multiaddr; -use tower::ServiceExt as _; use uuid::Uuid; #[derive(Debug, Copy, Clone)] @@ -115,39 +120,44 @@ impl Procedure for CreateAccount { const ID: ::ProcedureId = AccountProcedureId::CreateAccount; } -#[derive(Debug, Copy, Clone, PartialEq, Eq, thiserror::Error)] -enum AccountError { - #[error("unable to establish connection to server")] - Connection, - #[error("unable to encode request")] - Encode, - #[error("unable to decode response")] - Decode, - #[error("expected at least a single response")] - ExpectedResponse, -} +#[must_use] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display, derive_more::Error)] +#[display("unable to fullfil account request")] +pub struct AccountError; -trait AccountSystem -where - R: role::Role, -{ - fn create_account( +trait AccountSystem { + type ExecutionScope; + + async fn create_account( &self, - session: &R::Session, + scope: Self::ExecutionScope, payload: CreateAccount, - ) -> impl Future>> + Send; + ) -> Result>; } -#[derive(Debug, Clone)] -struct AccountSystemImpl; +#[derive_where::derive_where(Debug, Clone)] +struct AccountSystemImpl { + _scope: PhantomData *const S>, +} + +impl AccountSystemImpl { + #[must_use] + const fn new() -> Self { + Self { + _scope: PhantomData, + } + } +} -impl AccountSystem> for AccountSystemImpl +impl AccountSystem for AccountSystemImpl where S: Send + Sync, { + type ExecutionScope = S; + async fn create_account( &self, - session: &S, + scope: Self::ExecutionScope, payload: CreateAccount, ) -> Result> { Ok(AccountId::new(Uuid::new_v4())) @@ -155,106 +165,41 @@ where } #[derive(Debug, Clone)] -struct AccountSystemClient; +struct AccountSystemClient { + _service: PhantomData *const S>, + _codec: PhantomData *const C>, +} -impl - AccountSystem>> for AccountSystemClient +impl AccountSystemClient { + const fn new() -> Self { + Self { + _service: PhantomData, + _codec: PhantomData, + } + } +} + +impl Default for AccountSystemClient { + fn default() -> Self { + Self::new() + } +} + +impl AccountSystem for AccountSystemClient where - // TODO: I want to get rid of the boxed stream here, the problem is just that `Output` has `` - // as a type parameter, therefore cannot parameterize over it, unless we box or duplicate the - // trait requirement. both are not great solutions. - Svc: tower::Service< - Request>>, - Response = Response, - Error = Report, - Future: Send, - > + Clone - + Send - + Sync, - St: Stream> + Send + Sync, - ResData: Buf, - C: Encoder, Buf: Send + 'static> - + Decoder> - + Clone - + Send - + Sync, - DecoderError: Error + Send + Sync + 'static, - EncoderError: Error + Send + Sync + 'static, - ServiceError: Error + Send + Sync + 'static, + S: ConnectionService, + C: ConnectionCodec, { - fn create_account( + type ExecutionScope = Connection; + + async fn create_account( &self, - session: &Connection, + scope: Connection, payload: CreateAccount, - ) -> impl Future>> { - let codec = session.codec().clone(); - let connection = session.clone(); - - // In theory we could also skip the allocation here, but the problem is that in that case we - // would send data that *might* be malformed, or is missing data. Instead of skipping said - // data we allocate. In future we might want to instead have something like - // tracing::error or a panic instead, but this is sufficient for now. - // (more importantly it also opt us out of having a stream as input that we then encode, - // which should be fine?) - // - // In theory we'd need to be able to propagate the error into the transport layer, while - // possible we would await yet another challenge, what happens if the transport layer - // encounters an error? We can't very well send that error to the server just for us to - // return it, the server might already be processing things and now suddenly needs to stop? - // So we'd need to panic or filter on the client and would have partially committed data on - // the server. - // - // This circumvents the problem because we just return an error early, in the future - if - // the need arises - we might want to investigate request cancellation (which should be - // possible in the protocol). - // - // That'd allow us to cancel the request but would make response handling *a lot* more - // complex. - // - // This isn't a solved problem at all in e.g. rust in general, because there are some things - // you can't just cancel. How do you roll back a potentially already committed transaction? - // The current hypothesis is that the overhead required for one less allocation simply isn't - // worth it, but in the future we might want to revisit this. - codec - .clone() - .encode(stream::iter([payload])) - .try_collect() - .change_context(AccountError::Encode) - .map_ok(|bytes: Vec<_>| { - Request::from_parts( - request::Parts { - subsystem: Account::descriptor(), - procedure: ProcedureDescriptor { - id: CreateAccount::ID.into_id(), - }, - session: SessionId::CLIENT, - extensions: Extensions::new(), - }, - stream::iter(bytes), - ) - }) - .and_then(move |request| { - connection - .oneshot(request) - .change_context(AccountError::Connection) - }) - .and_then(move |response| { - let (parts, body) = response.into_parts(); - - let data = codec.decode(body); - - async move { - tokio::pin!(data); - - let data = data - .next() - .await - .ok_or_else(|| Report::new(AccountError::ExpectedResponse))? - .change_context(AccountError::Decode)?; - - Ok(data) - } - }) + ) -> Result> { + invoke_call_discrete(scope, AccountProcedureId::CreateAccount, [payload]) + .await + .change_context(AccountError) } } @@ -263,13 +208,20 @@ struct AccountServerDelegate { subsystem: T, } -impl SubsystemDelegate for AccountServerDelegate +impl AccountServerDelegate { + #[must_use] + const fn new(subsystem: T) -> Self { + Self { subsystem } + } +} + +impl SubsystemDelegate for AccountServerDelegate where - T: AccountSystem> + Send + Sync, - S: Send + Sync, - C: Encoder + Decoder + Clone + Send + Sync + 'static, + T: AccountSystem + Send + Sync, + C: Encoder + ReportDecoder + Clone + Send, { - type Error = Report; + type Error = Report; + type ExecutionScope = T::ExecutionScope; type Subsystem = Account; type Body @@ -280,30 +232,20 @@ where async fn call( self, request: Request, - session: S, + scope: T::ExecutionScope, codec: C, ) -> Result>, Self::Error> where B: Body + Send + Sync, { - let session_id = request.session(); - let ProcedureDescriptor { id } = request.procedure(); - let id = AccountProcedureId::from_id(id).unwrap(); + let id = parse_procedure_id(&request)?; match id { AccountProcedureId::CreateAccount => { - let body = request.into_body(); - let data = body.into_stream().into_data_stream(); - - let stream = codec.clone().decode(data); - pin_mut!(stream); - - let payload = stream.next().await.unwrap().unwrap(); - - let account_id = self.subsystem.create_account(&session, payload).await?; - let data = codec.encode(stream::iter([account_id])); - - Ok(Response::from_ok(Parts::new(session_id), data)) + delegate_call_discrete(request, codec, |payload| async move { + self.subsystem.create_account(scope, payload).await + }) + .await } } } @@ -319,9 +261,7 @@ async fn server() { .layer(HandleReportLayer::new()) .layer(HandleBodyReportLayer::new()) }) - .register(AccountServerDelegate { - subsystem: AccountSystemImpl, - }); + .register(AccountServerDelegate::new(AccountSystemImpl::new())); let task = router.background_task(server.events()); tokio::spawn(task.into_future()); @@ -342,7 +282,7 @@ async fn client() { let client = Client::new(ClientConfig::default(), JsonCodec).expect("should be able to start service"); - let service = AccountSystemClient; + let service = AccountSystemClient::new(); let connection = client .connect(multiaddr![Ip4([127, 0, 0, 1]), Tcp(10500_u16)]) @@ -352,7 +292,7 @@ async fn client() { for _ in 0..16 { let now = Instant::now(); let account_id = service - .create_account(&connection, CreateAccount { id: None }) + .create_account(connection.clone(), CreateAccount { id: None }) .await .expect("should be able to create account"); diff --git a/libs/@local/harpc/server/src/delegate.rs b/libs/@local/harpc/server/src/delegate.rs index 4fbe1fd5241..99a60739e97 100644 --- a/libs/@local/harpc/server/src/delegate.rs +++ b/libs/@local/harpc/server/src/delegate.rs @@ -43,7 +43,7 @@ impl SubsystemDelegateService { impl Service> for SubsystemDelegateService where - D: SubsystemDelegate, C> + Clone + Send, + D: SubsystemDelegate> + Clone + Send, S: Default + Clone + Send + Sync + 'static, C: Clone + Send + 'static, ReqBody: Body + Send + Sync, diff --git a/libs/@local/harpc/server/src/router.rs b/libs/@local/harpc/server/src/router.rs index b26a745b7b5..f1369ddccb6 100644 --- a/libs/@local/harpc/server/src/router.rs +++ b/libs/@local/harpc/server/src/router.rs @@ -20,7 +20,7 @@ use tower::{Layer, Service, ServiceBuilder, layer::util::Identity}; use crate::{ delegate::SubsystemDelegateService, route::{Handler, Route}, - session::{self, SessionStorage}, + session::{self, Session, SessionStorage}, }; pub struct RouterBuilder { @@ -58,7 +58,7 @@ impl RouterBuilder { type ServiceHandler = Handler< >>::Service, - <>::Subsystem as Subsystem>::SubsystemId, + <>::Subsystem as Subsystem>::SubsystemId, >; impl RouterBuilder { @@ -87,7 +87,7 @@ impl RouterBuilder { delegate: D, ) -> RouterBuilder, R>, L, S, C> where - D: SubsystemDelegate + Clone + Send, + D: SubsystemDelegate> + Clone + Send, L: Layer>, S: Default + Send + Sync + 'static, C: Clone + Send + 'static, diff --git a/libs/@local/harpc/system/src/delegate.rs b/libs/@local/harpc/system/src/delegate.rs index bf9fde3623f..80bdfd96ce9 100644 --- a/libs/@local/harpc/system/src/delegate.rs +++ b/libs/@local/harpc/system/src/delegate.rs @@ -17,9 +17,10 @@ use crate::Subsystem; /// /// The caller must verify that the version and service of the incoming request match those of /// [`Self::Subsystem`]. -pub trait SubsystemDelegate { +pub trait SubsystemDelegate { /// The inner service type that this delegate wraps. type Subsystem: Subsystem; + type ExecutionScope; type Error; @@ -41,7 +42,7 @@ pub trait SubsystemDelegate { fn call( self, request: Request, - session: S, + scope: Self::ExecutionScope, codec: C, ) -> impl Future>, Self::Error>> + Send where diff --git a/libs/@local/harpc/system/src/lib.rs b/libs/@local/harpc/system/src/lib.rs index 87e20aa855e..0914bff72cd 100644 --- a/libs/@local/harpc/system/src/lib.rs +++ b/libs/@local/harpc/system/src/lib.rs @@ -10,7 +10,6 @@ use self::{metadata::Deprecation, procedure::ProcedureIdentifier}; pub mod delegate; pub mod metadata; pub mod procedure; -pub mod role; pub trait SubsystemIdentifier: Copy { fn from_id(id: SubsystemId) -> Option diff --git a/libs/@local/harpc/system/src/role.rs b/libs/@local/harpc/system/src/role.rs deleted file mode 100644 index 3af20bac821..00000000000 --- a/libs/@local/harpc/system/src/role.rs +++ /dev/null @@ -1,56 +0,0 @@ -use core::marker::PhantomData; - -mod sealed { - pub trait Sealed {} -} - -/// Represents a role in a client-server communication model. -/// -/// This trait defines the basic structure for different roles in a networked -/// application. Two implementors of this trait are provided: [`Server`] and [`Client`]. -/// -/// The trait cannot be implemented outside of this crate. -pub trait Role: sealed::Sealed { - /// The session type associated with this role. - /// - /// This allows different roles to have different session structures, - /// tailored to their specific needs in the communication process. - type Session: Send + Sync; -} - -/// Role representing the server side of a session. -/// -/// The server role is responsible for handling incoming requests and maintaining -/// stateful sessions. Unlike clients, servers can persist state between requests. -/// -/// The associated `Session` type allows the server to store and access -/// session-specific data across multiple requests. -pub struct Server { - _marker: PhantomData *const S>, -} - -impl sealed::Sealed for Server where S: Send + Sync {} - -impl Role for Server -where - S: Send + Sync, -{ - type Session = S; -} - -/// Role representing the client side of a session. -/// -/// The client role is stateless and focused solely on the ability to send requests to the server. -/// This contrasts with the server role, which can maintain state between requests. -pub struct Client { - _marker: PhantomData *const S>, -} - -impl sealed::Sealed for Client {} - -impl Role for Client -where - S: Send + Sync, -{ - type Session = S; -}