From 10447a2e7560b404b378f8e390489513893ceffa Mon Sep 17 00:00:00 2001 From: Bilal Mahmoud Date: Wed, 6 Nov 2024 19:29:27 +0100 Subject: [PATCH] H-3552: `graph`: Integrate `harpc` into the graph (#5577) Co-authored-by: Tim Diekmann <21277928+TimDiekmann@users.noreply.github.com> --- Cargo.lock | 35 + apps/hash-graph/libs/api/Cargo.toml | 13 +- apps/hash-graph/libs/api/package.json | 6 + apps/hash-graph/libs/api/src/lib.rs | 8 +- apps/hash-graph/libs/api/src/rest/mod.rs | 6 +- apps/hash-graph/libs/api/src/rpc/account.rs | 708 ++++++++++++++++++ apps/hash-graph/libs/api/src/rpc/auth.rs | 189 +++++ apps/hash-graph/libs/api/src/rpc/echo.rs | 183 +++++ apps/hash-graph/libs/api/src/rpc/mod.rs | 15 + apps/hash-graph/libs/api/src/rpc/session.rs | 6 + apps/hash-graph/libs/store/src/account/mod.rs | 6 +- libs/@local/harpc/client/Cargo.toml | 7 +- libs/@local/harpc/client/package.json | 4 +- .../harpc/client/src/connection/alias.rs | 79 ++ .../@local/harpc/client/src/connection/mod.rs | 3 + libs/@local/harpc/client/src/error.rs | 90 +++ libs/@local/harpc/client/src/lib.rs | 2 + libs/@local/harpc/client/src/utils.rs | 99 +++ libs/@local/harpc/codec/src/decode.rs | 16 + libs/@local/harpc/server/Cargo.toml | 4 +- libs/@local/harpc/server/examples/account.rs | 5 +- libs/@local/harpc/server/src/delegate.rs | 11 +- libs/@local/harpc/server/src/error.rs | 155 +++- libs/@local/harpc/server/src/lib.rs | 1 + libs/@local/harpc/server/src/route.rs | 7 +- libs/@local/harpc/server/src/session.rs | 43 +- libs/@local/harpc/server/src/utils.rs | 95 +++ libs/@local/harpc/service/src/lib.rs | 17 +- libs/@local/harpc/service/src/procedure.rs | 11 +- libs/@local/harpc/types/src/error_code.rs | 16 +- libs/@local/harpc/types/src/procedure.rs | 16 + libs/@local/harpc/types/src/service.rs | 18 + 32 files changed, 1815 insertions(+), 59 deletions(-) create mode 100644 apps/hash-graph/libs/api/src/rpc/account.rs create mode 100644 apps/hash-graph/libs/api/src/rpc/auth.rs create mode 100644 apps/hash-graph/libs/api/src/rpc/echo.rs create mode 100644 apps/hash-graph/libs/api/src/rpc/mod.rs create mode 100644 apps/hash-graph/libs/api/src/rpc/session.rs create mode 100644 libs/@local/harpc/client/src/connection/alias.rs create mode 100644 libs/@local/harpc/client/src/error.rs create mode 100644 libs/@local/harpc/client/src/utils.rs create mode 100644 libs/@local/harpc/server/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index dbcaedeebfd..5a7d0ac716f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2475,10 +2475,20 @@ dependencies = [ "axum 0.7.7", "axum-core 0.4.5", "bytes", + "derive-where", + "derive_more 1.0.0", "error-stack", + "frunk", + "futures", "graph", "graph-type-defs", "graph-types", + "harpc-client", + "harpc-codec", + "harpc-server", + "harpc-service", + "harpc-tower", + "harpc-types", "hash-graph-store", "hash-status", "http 1.1.0", @@ -2493,6 +2503,7 @@ dependencies = [ "temporal-client 0.0.0", "temporal-versioning", "time", + "tower 0.5.1", "tower-http", "tracing", "tracing-opentelemetry", @@ -2639,12 +2650,17 @@ version = "0.0.0" dependencies = [ "bytes", "derive-where", + "derive_more 1.0.0", "error-stack", "futures", "harpc-codec", "harpc-net", + "harpc-service", "harpc-tower", + "harpc-types", "multiaddr", + "serde", + "serde-value", "thiserror", "tokio-util", "tower 0.5.1", @@ -4693,6 +4709,15 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "orx-concurrent-option" version = "1.3.0" @@ -6120,6 +6145,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.214" diff --git a/apps/hash-graph/libs/api/Cargo.toml b/apps/hash-graph/libs/api/Cargo.toml index 8cf33fdd9c2..66601805f7b 100644 --- a/apps/hash-graph/libs/api/Cargo.toml +++ b/apps/hash-graph/libs/api/Cargo.toml @@ -20,6 +20,8 @@ graph = { workspace = true, public = true, features = ["utoipa"] } graph-types = { workspace = true, public = true, features = ["utoipa"] } hash-graph-store = { workspace = true, features = ["utoipa"] } temporal-client = { workspace = true, public = true } +harpc-server = { workspace = true, public = true } +harpc-client = { workspace = true, public = true } # Public third-party dependencies axum = { workspace = true, public = true } @@ -27,10 +29,14 @@ axum-core = { workspace = true, public = true } http = { workspace = true, public = true } tower-http = { workspace = true, public = true } tracing = { workspace = true, public = true } +error-stack = { workspace = true, features = ["futures", "spantrace", "unstable"] } # Private workspace dependencies -error-stack = { workspace = true, features = ["spantrace"] } graph-type-defs = { workspace = true } +harpc-codec = { workspace = true } +harpc-service = { workspace = true } +harpc-tower = { workspace = true } +harpc-types = { workspace = true } hash-status = { workspace = true } temporal-versioning = { workspace = true } type-system = { workspace = true, features = ["utoipa"] } @@ -39,6 +45,10 @@ validation = { workspace = true, features = ["utoipa"] } # Private third-party dependencies async-trait = { workspace = true } bytes = { workspace = true } +derive-where = { workspace = true } +derive_more = { version = "1.0.0", features = ["display", "error"] } +frunk = "0.4.3" +futures = { workspace = true } hyper = { workspace = true } include_dir = { workspace = true } mime = { workspace = true } @@ -48,6 +58,7 @@ sentry = { workspace = true } serde = { workspace = true, features = ['derive'] } serde_json = { workspace = true } time = { workspace = true } +tower = { workspace = true } tracing-opentelemetry = { workspace = true } utoipa = { workspace = true } uuid = { workspace = true } diff --git a/apps/hash-graph/libs/api/package.json b/apps/hash-graph/libs/api/package.json index 81012f5ff32..a189821de15 100644 --- a/apps/hash-graph/libs/api/package.json +++ b/apps/hash-graph/libs/api/package.json @@ -14,6 +14,12 @@ "@rust/graph": "0.0.0-private", "@rust/graph-type-defs": "0.0.0-private", "@rust/graph-types": "0.0.0-private", + "@rust/harpc-client": "0.0.0-private", + "@rust/harpc-codec": "0.0.0-private", + "@rust/harpc-server": "0.0.0-private", + "@rust/harpc-service": "0.0.0-private", + "@rust/harpc-tower": "0.0.0-private", + "@rust/harpc-types": "0.0.0-private", "@rust/hash-graph-store": "0.0.0-private", "@rust/hash-status": "0.0.0-private", "@rust/temporal-client": "0.0.0-private", diff --git a/apps/hash-graph/libs/api/src/lib.rs b/apps/hash-graph/libs/api/src/lib.rs index f601a6818f5..8d6217f7d4c 100644 --- a/apps/hash-graph/libs/api/src/lib.rs +++ b/apps/hash-graph/libs/api/src/lib.rs @@ -1,5 +1,11 @@ -#![feature(impl_trait_in_assoc_type)] +#![feature( + impl_trait_in_assoc_type, + never_type, + return_type_notation, + error_generic_member_access +)] extern crate alloc; pub mod rest; +pub mod rpc; diff --git a/apps/hash-graph/libs/api/src/rest/mod.rs b/apps/hash-graph/libs/api/src/rest/mod.rs index 09be595f0f3..101ad37ecce 100644 --- a/apps/hash-graph/libs/api/src/rest/mod.rs +++ b/apps/hash-graph/libs/api/src/rest/mod.rs @@ -62,7 +62,7 @@ use hash_graph_store::{ use hash_status::Status; use include_dir::{Dir, include_dir}; use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use temporal_client::TemporalClient; use temporal_versioning::{ ClosedTemporalBound, DecisionTime, LeftClosedTemporalInterval, LimitedTemporalBound, @@ -116,9 +116,9 @@ impl FromRequestParts for AuthenticatedUserHeader { } } -#[derive(Debug, Serialize, ToSchema)] +#[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct PermissionResponse { - has_permission: bool, + pub has_permission: bool, } pub trait RestApiStore: Store + TypeFetcher { diff --git a/apps/hash-graph/libs/api/src/rpc/account.rs b/apps/hash-graph/libs/api/src/rpc/account.rs new file mode 100644 index 00000000000..099410da797 --- /dev/null +++ b/apps/hash-graph/libs/api/src/rpc/account.rs @@ -0,0 +1,708 @@ +use alloc::{borrow::Cow, sync::Arc}; +use core::error::{self, Error}; + +use authorization::{ + AuthorizationApi as _, AuthorizationApiPool, + backend::ModifyRelationshipOperation, + schema::{ + AccountGroupMemberSubject, AccountGroupPermission, AccountGroupRelationAndSubject, + WebOwnerSubject, + }, + zanzibar::Consistency, +}; +use error_stack::{Report, ResultExt as _}; +use graph::store::StorePool; +use graph_types::{ + account::{AccountGroupId, AccountId}, + owned_by_id::OwnedById, +}; +use harpc_client::{connection::Connection, utils::invoke_call_discrete}; +use harpc_codec::{decode::ReportDecoder, encode::Encoder}; +use harpc_server::{ + error::{DelegationError, Forbidden}, + session::Session, + utils::{delegate_call_discrete, parse_procedure_id}, +}; +use harpc_service::{delegate::ServiceDelegate, role::Role}; +use harpc_tower::{body::Body, either::Either, request::Request, response::Response}; +use harpc_types::{error_code::ErrorCode, response_kind::ResponseKind}; +use hash_graph_store::account::{ + AccountStore as _, InsertAccountGroupIdParams, InsertAccountIdParams, +}; +use temporal_client::TemporalClient; + +use super::{role, session::Account}; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct PermissionResponse { + pub has_permission: bool, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display)] +#[display("account {id} does not exist in the graph")] +pub struct AccountNotFoundError { + id: AccountId, +} + +impl Error for AccountNotFoundError { + fn provide<'a>(&'a self, request: &mut error::Request<'a>) { + request.provide_value(ErrorCode::RESOURCE_NOT_FOUND); + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display, derive_more::Error)] +#[display("unable to fullfil account request")] +pub struct AccountError; + +pub trait AccountService +where + R: Role, +{ + async fn create_account( + &self, + session: R::Session, + params: InsertAccountIdParams, + ) -> Result>; + + async fn create_account_group( + &self, + session: R::Session, + params: InsertAccountGroupIdParams, + ) -> Result>; + + async fn check_account_group_permission( + &self, + session: R::Session, + account_group_id: AccountGroupId, + permission: AccountGroupPermission, + ) -> Result>; + + async fn add_account_group_member( + &self, + session: R::Session, + account_group_id: AccountGroupId, + account_id: AccountId, + ) -> Result<(), Report>; + + async fn remove_account_group_member( + &self, + session: R::Session, + account_group_id: AccountGroupId, + account_id: AccountId, + ) -> Result<(), Report>; +} + +// TODO: this can be auto generated by the `harpc` crate +pub mod meta { + //! The `meta` module contains the metadata for the account service. + //! In the future this will be automatically generated by the `harpc` crate. + + use frunk::HList; + use harpc_service::{ + Service, + metadata::Metadata, + procedure::{Procedure, ProcedureIdentifier}, + }; + use harpc_types::{procedure::ProcedureId, service::ServiceId, version::Version}; + + pub enum AccountProcedureId { + CreateAccount, + CreateAccountGroup, + CheckAccountGroupPermission, + AddAccountGroupMember, + RemoveAccountGroupMember, + } + + impl ProcedureIdentifier for AccountProcedureId { + type Service = AccountService; + + fn from_id(id: ProcedureId) -> Option { + match id.value() { + 0x00 => Some(Self::CreateAccount), + 0x01 => Some(Self::CreateAccountGroup), + 0x02 => Some(Self::CheckAccountGroupPermission), + 0x03 => Some(Self::AddAccountGroupMember), + 0x04 => Some(Self::RemoveAccountGroupMember), + _ => None, + } + } + + fn into_id(self) -> ProcedureId { + match self { + Self::CreateAccount => ProcedureId::new(0x00), + Self::CreateAccountGroup => ProcedureId::new(0x01), + Self::CheckAccountGroupPermission => ProcedureId::new(0x02), + Self::AddAccountGroupMember => ProcedureId::new(0x03), + Self::RemoveAccountGroupMember => ProcedureId::new(0x04), + } + } + } + + pub struct AccountService; + + impl Service for AccountService { + type ProcedureId = AccountProcedureId; + type Procedures = HList![ + ProcedureCreateAccount, + ProcedureCreateAccountGroup, + ProcedureCheckAccountGroupPermission, + ProcedureAddAccountGroupMember, + ProcedureRemoveAccountGroupMember + ]; + + const ID: ServiceId = ServiceId::new(0x01); + const VERSION: Version = Version { + major: 0x00, + minor: 0x00, + }; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } + + pub struct ProcedureCreateAccount; + + impl Procedure for ProcedureCreateAccount { + type Service = AccountService; + + const ID: ::ProcedureId = AccountProcedureId::CreateAccount; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } + + pub struct ProcedureCreateAccountGroup; + + impl Procedure for ProcedureCreateAccountGroup { + type Service = AccountService; + + const ID: ::ProcedureId = AccountProcedureId::CreateAccountGroup; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } + + pub struct ProcedureCheckAccountGroupPermission; + + impl Procedure for ProcedureCheckAccountGroupPermission { + type Service = AccountService; + + const ID: ::ProcedureId = + AccountProcedureId::CheckAccountGroupPermission; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } + + pub struct ProcedureAddAccountGroupMember; + + impl Procedure for ProcedureAddAccountGroupMember { + type Service = AccountService; + + const ID: ::ProcedureId = + AccountProcedureId::AddAccountGroupMember; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } + + pub struct ProcedureRemoveAccountGroupMember; + + impl Procedure for ProcedureRemoveAccountGroupMember { + type Service = AccountService; + + const ID: ::ProcedureId = + AccountProcedureId::RemoveAccountGroupMember; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } +} + +#[derive(Debug)] +#[derive_where::derive_where(Clone)] +pub struct AccountServer { + authorization_api_pool: Arc, + temporal_client: Option>, + store_pool: Arc, +} + +impl AccountServer +where + S: StorePool + Send + Sync, + A: AuthorizationApiPool + Send + Sync, +{ + async fn authorization_api(&self) -> Result, Report> { + self.authorization_api_pool + .acquire() + .await + .inspect_err(|error| { + tracing::error!(?error, "Could not acquire access to the authorization API"); + }) + .change_context(AccountError) + } + + async fn store(&self) -> Result>, Report> { + let authorization_api = self.authorization_api().await?; + + self.store_pool + .acquire(authorization_api, self.temporal_client.clone()) + .await + .inspect_err(|report| { + tracing::error!(error=?report, "Could not acquire store"); + }) + .change_context(AccountError) + } + + fn actor(session: &Session) -> Result> { + let &Account { + actor_id: Some(actor_id), + } = session.get() + else { + let request_info = session.request_info(); + + return Err(Report::new(Forbidden { + service: request_info.service, + procedure: request_info.procedure, + reason: Cow::Borrowed("user authentication required"), + }) + .change_context(AccountError)); + }; + + Ok(actor_id) + } +} + +impl AccountService for AccountServer +where + S: StorePool + Send + Sync, + A: AuthorizationApiPool + Send + Sync, +{ + async fn create_account( + &self, + session: Session, + params: InsertAccountIdParams, + ) -> Result> { + let actor_id = Self::actor(&session)?; + + let mut store = self.store().await?; + + let account_id = params.account_id; + store + .insert_account_id(actor_id, params) + .await + .change_context(AccountError)?; + + Ok(account_id) + } + + async fn create_account_group( + &self, + session: Session, + params: InsertAccountGroupIdParams, + ) -> Result> { + let actor_id = Self::actor(&session)?; + + let mut store = self.store().await?; + + let account = store + .identify_owned_by_id(OwnedById::from(actor_id)) + .await + .inspect_err(|report| { + tracing::error!(error=?report, "Could not identify account"); + }) + .change_context(AccountError)?; + + if account != (WebOwnerSubject::Account { id: actor_id }) { + tracing::error!("Account does not exist in the graph"); + return Err( + Report::new(AccountNotFoundError { id: actor_id }).change_context(AccountError) + ); + } + + let account_group_id = params.account_group_id; + store + .insert_account_group_id(actor_id, params) + .await + .inspect_err(|report| { + tracing::error!(error=?report, "Could not create account id"); + }) + .change_context(AccountError)?; + + Ok(account_group_id) + } + + async fn check_account_group_permission( + &self, + session: Session, + account_group_id: AccountGroupId, + permission: AccountGroupPermission, + ) -> Result> { + let actor_id = Self::actor(&session)?; + + let auth = self.authorization_api().await?; + + let check = auth + .check_account_group_permission( + actor_id, + permission, + account_group_id, + Consistency::FullyConsistent, + ) + .await + .inspect_err(|error| { + tracing::error!( + ?error, + "Could not check if permission on the account group is granted to the \ + specified actor" + ); + }) + .change_context(AccountError)?; + + Ok(PermissionResponse { + has_permission: check.has_permission, + }) + } + + async fn add_account_group_member( + &self, + session: Session, + account_group_id: AccountGroupId, + account_id: AccountId, + ) -> Result<(), Report> { + let actor_id = Self::actor(&session)?; + + let mut auth = self.authorization_api().await?; + + let check = auth + .check_account_group_permission( + actor_id, + AccountGroupPermission::AddMember, + account_group_id, + Consistency::FullyConsistent, + ) + .await + .inspect_err(|error| { + tracing::error!( + ?error, + "Could not check if account group member can be added" + ); + }) + .change_context(AccountError)?; + + if !check.has_permission { + return Err(Report::new(Forbidden { + service: session.request_info().service, + procedure: session.request_info().procedure, + reason: Cow::Borrowed("actor does not have permission to add account group member"), + }) + .change_context(AccountError)); + } + + auth.modify_account_group_relations([( + ModifyRelationshipOperation::Create, + account_group_id, + AccountGroupRelationAndSubject::Member { + subject: AccountGroupMemberSubject::Account { id: account_id }, + level: 0, + }, + )]) + .await + .inspect_err(|error| { + tracing::error!(?error, "Could not add account group member"); + }) + .change_context(AccountError)?; + + Ok(()) + } + + async fn remove_account_group_member( + &self, + session: Session, + account_group_id: AccountGroupId, + account_id: AccountId, + ) -> Result<(), Report> { + let actor_id = Self::actor(&session)?; + + let mut auth = self.authorization_api().await?; + + let check = auth + .check_account_group_permission( + actor_id, + AccountGroupPermission::RemoveMember, + account_group_id, + Consistency::FullyConsistent, + ) + .await + .inspect_err(|error| { + tracing::error!( + ?error, + "Could not check if account group member can be removed" + ); + }) + .change_context(AccountError)?; + + if !check.has_permission { + let request_info = session.request_info(); + + return Err(Report::new(Forbidden { + service: request_info.service, + procedure: request_info.procedure, + reason: Cow::Borrowed( + "actor does not have permission to remove account group member", + ), + }) + .change_context(AccountError)); + } + + auth.modify_account_group_relations([( + ModifyRelationshipOperation::Delete, + account_group_id, + AccountGroupRelationAndSubject::Member { + subject: AccountGroupMemberSubject::Account { id: account_id }, + level: 0, + }, + )]) + .await + .inspect_err(|error| { + tracing::error!(?error, "Could not remove account group member"); + }) + .change_context(AccountError)?; + + Ok(()) + } +} + +// TODO: this can be auto generated by the `harpc` crate +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct AccountDelegate { + inner: T, +} + +impl AccountDelegate { + pub const fn new(inner: T) -> Self { + Self { inner } + } +} + +impl ServiceDelegate, C> for AccountDelegate +where + T: AccountService< + role::Server, + create_account(..): Send, + create_account_group(..): Send, + check_account_group_permission(..): Send, + add_account_group_member(..): Send, + remove_account_group_member(..): Send, + > + Send, + C: Encoder + ReportDecoder + Clone + Send, +{ + type Error = Report; + type Service = meta::AccountService; + + type Body + = impl Body, Error = ::Error> + where + Source: Body + Send + Sync; + + async fn call( + self, + request: Request, + session: Session, + codec: C, + ) -> Result>, Self::Error> + where + B: Body + Send + Sync, + { + let id = parse_procedure_id(&request)?; + + // The Either chain here isn't... great, but the only other way would be to box. To box we'd + // need to require that the `Decoder` is both `Send` and `Sync`, which it can be, + // but to completely write out the trait bound is a bit of a pain. + // We would instead most likely need to add `+ Sync` to the GAT, which would over-constrain + // it unnecessarily, but would _in theory_ allow us to remove the `Either` chain. + match id { + meta::AccountProcedureId::CreateAccount => { + delegate_call_discrete(request, codec, |params| async move { + self.inner.create_account(session, params).await + }) + .await + .map(|response| response.map_body(Either::Left)) + } + meta::AccountProcedureId::CreateAccountGroup => { + delegate_call_discrete(request, codec, |params| async move { + self.inner.create_account_group(session, params).await + }) + .await + .map(|response| response.map_body(Either::Left).map_body(Either::Right)) + } + meta::AccountProcedureId::CheckAccountGroupPermission => delegate_call_discrete( + request, + codec, + |(account_group_id, permission)| async move { + self.inner + .check_account_group_permission(session, account_group_id, permission) + .await + }, + ) + .await + .map(|response| { + response + .map_body(Either::Left) + .map_body(Either::Right) + .map_body(Either::Right) + }), + meta::AccountProcedureId::AddAccountGroupMember => delegate_call_discrete( + request, + codec, + |(account_group_id, account_id)| async move { + self.inner + .add_account_group_member(session, account_group_id, account_id) + .await + }, + ) + .await + .map(|response| { + response + .map_body(Either::Left) + .map_body(Either::Right) + .map_body(Either::Right) + .map_body(Either::Right) + }), + meta::AccountProcedureId::RemoveAccountGroupMember => delegate_call_discrete( + request, + codec, + |(account_group_id, account_id)| async move { + self.inner + .remove_account_group_member(session, account_group_id, account_id) + .await + }, + ) + .await + .map(|response| { + response + .map_body(Either::Right) + .map_body(Either::Right) + .map_body(Either::Right) + .map_body(Either::Right) + }), + } + } +} + +// TODO: this can be auto generated by the `harpc` crate +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct AccountClient; + +impl AccountService> for AccountClient +where + Svc: harpc_client::connection::ConnectionService, + C: harpc_client::connection::ConnectionCodec, +{ + async fn create_account( + &self, + session: Connection, + params: InsertAccountIdParams, + ) -> Result> { + invoke_call_discrete(session, meta::AccountProcedureId::CreateAccount, [params]) + .await + .change_context(AccountError) + } + + async fn create_account_group( + &self, + session: Connection, + params: InsertAccountGroupIdParams, + ) -> Result> { + invoke_call_discrete(session, meta::AccountProcedureId::CreateAccountGroup, [ + params, + ]) + .await + .change_context(AccountError) + } + + async fn check_account_group_permission( + &self, + session: Connection, + account_group_id: AccountGroupId, + permission: AccountGroupPermission, + ) -> Result> { + invoke_call_discrete( + session, + meta::AccountProcedureId::CheckAccountGroupPermission, + [(account_group_id, permission)], + ) + .await + .change_context(AccountError) + } + + async fn add_account_group_member( + &self, + session: Connection, + account_group_id: AccountGroupId, + account_id: AccountId, + ) -> Result<(), Report> { + invoke_call_discrete(session, meta::AccountProcedureId::AddAccountGroupMember, [ + (account_group_id, account_id), + ]) + .await + .change_context(AccountError) + } + + async fn remove_account_group_member( + &self, + session: Connection, + account_group_id: AccountGroupId, + account_id: AccountId, + ) -> Result<(), Report> { + invoke_call_discrete( + session, + meta::AccountProcedureId::RemoveAccountGroupMember, + [(account_group_id, account_id)], + ) + .await + .change_context(AccountError) + } +} diff --git a/apps/hash-graph/libs/api/src/rpc/auth.rs b/apps/hash-graph/libs/api/src/rpc/auth.rs new file mode 100644 index 00000000000..e2141656f3b --- /dev/null +++ b/apps/hash-graph/libs/api/src/rpc/auth.rs @@ -0,0 +1,189 @@ +use core::fmt::Debug; + +use error_stack::{Report, ResultExt as _}; +use graph_types::account::AccountId; +use harpc_client::{connection::Connection, utils::invoke_call_discrete}; +use harpc_codec::{decode::ReportDecoder, encode::Encoder}; +use harpc_server::{ + error::DelegationError, + session::Session, + utils::{delegate_call_discrete, parse_procedure_id}, +}; +use harpc_service::delegate::ServiceDelegate; +use harpc_tower::{body::Body, request::Request, response::Response}; +use harpc_types::response_kind::ResponseKind; + +use super::{role, session::Account}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display, derive_more::Error)] +#[display("unable to authenticate user")] +pub struct AuthenticationError; + +pub trait AuthenticationService +where + R: role::Role, +{ + async fn authenticate( + &self, + session: R::Session, + actor_id: AccountId, + ) -> Result<(), Report>; +} + +// TODO: this can be auto generated by the `harpc` crate +pub mod meta { + //! The `meta` module contains the metadata for the account service. + //! In the future this will be automatically generated by the `harpc` crate. + + use frunk::HList; + use harpc_service::{ + Service, + metadata::Metadata, + procedure::{Procedure, ProcedureIdentifier}, + }; + use harpc_types::{procedure::ProcedureId, service::ServiceId, version::Version}; + + pub enum AuthenticationProcedureId { + Authenticate, + } + + impl ProcedureIdentifier for AuthenticationProcedureId { + type Service = AuthenticationService; + + fn from_id(id: ProcedureId) -> Option { + match id.value() { + 0x00 => Some(Self::Authenticate), + _ => None, + } + } + + fn into_id(self) -> ProcedureId { + match self { + Self::Authenticate => ProcedureId::new(0x00), + } + } + } + + pub struct AuthenticationService; + + impl Service for AuthenticationService { + type ProcedureId = AuthenticationProcedureId; + type Procedures = HList![ProcedureAuthenticate]; + + const ID: ServiceId = ServiceId::new(0x00); + const VERSION: Version = Version { + major: 0x00, + minor: 0x00, + }; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } + + pub struct ProcedureAuthenticate; + + impl Procedure for ProcedureAuthenticate { + type Service = AuthenticationService; + + const ID: ::ProcedureId = AuthenticationProcedureId::Authenticate; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct AuthenticationServer; + +impl AuthenticationService for AuthenticationServer { + async fn authenticate( + &self, + session: Session, + actor_id: AccountId, + ) -> Result<(), Report> { + session + .update(Account { + actor_id: Some(actor_id), + }) + .await; + + Ok(()) + } +} + +// TODO: this can be auto generated by the `harpc` crate +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct AuthenticationDelegate { + inner: T, +} + +impl ServiceDelegate, C> for AuthenticationDelegate +where + T: AuthenticationService + Send, + C: Encoder + ReportDecoder + Clone + Send, +{ + type Error = Report; + type Service = meta::AuthenticationService; + + type Body + = impl Body, Error = ::Error> + where + Source: Body + Send + Sync; + + async fn call( + self, + request: Request, + session: Session, + codec: C, + ) -> Result>, Self::Error> + where + B: Body + Send + Sync, + { + let id = parse_procedure_id(&request)?; + + match id { + meta::AuthenticationProcedureId::Authenticate => { + delegate_call_discrete(request, codec, |actor_id| async move { + self.inner.authenticate(session, actor_id).await + }) + .await + } + } + } +} + +// TODO: this can be auto generated by the `harpc` crate +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct AuthenticationClient; + +impl AuthenticationService> for AuthenticationClient +where + Svc: harpc_client::connection::ConnectionService, + C: harpc_client::connection::ConnectionCodec, +{ + async fn authenticate( + &self, + session: Connection, + actor_id: AccountId, + ) -> Result<(), Report> { + invoke_call_discrete(session, meta::AuthenticationProcedureId::Authenticate, [ + actor_id, + ]) + .await + .change_context(AuthenticationError) + } +} diff --git a/apps/hash-graph/libs/api/src/rpc/echo.rs b/apps/hash-graph/libs/api/src/rpc/echo.rs new file mode 100644 index 00000000000..50867b4815c --- /dev/null +++ b/apps/hash-graph/libs/api/src/rpc/echo.rs @@ -0,0 +1,183 @@ +use error_stack::{Report, ResultExt as _}; +use harpc_client::{connection::Connection, utils::invoke_call_discrete}; +use harpc_codec::{decode::ReportDecoder, encode::Encoder}; +use harpc_server::{ + error::DelegationError, + session::Session, + utils::{delegate_call_discrete, parse_procedure_id}, +}; +use harpc_service::{delegate::ServiceDelegate, role::Role}; +use harpc_tower::{body::Body, request::Request, response::Response}; +use harpc_types::response_kind::ResponseKind; + +use super::{role, session::Account}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, derive_more::Display, derive_more::Error)] +#[display("unable to fullfil ping request")] +pub struct EchoError; + +pub trait EchoService +where + R: Role, +{ + async fn echo( + &self, + session: R::Session, + payload: Box, + ) -> Result, Report>; +} + +// TODO: this can be auto generated by the `harpc` crate +pub mod meta { + //! The `meta` module contains the metadata for the ping service. + //! In the future this will be automatically generated by the `harpc` crate. + + use frunk::HList; + use harpc_service::{ + Service, + metadata::Metadata, + procedure::{Procedure, ProcedureIdentifier}, + }; + use harpc_types::{procedure::ProcedureId, service::ServiceId, version::Version}; + + pub enum EchoProcedureId { + Echo, + } + + impl ProcedureIdentifier for EchoProcedureId { + type Service = EchoService; + + fn from_id(id: ProcedureId) -> Option { + match id.value() { + 0x00 => Some(Self::Echo), + _ => None, + } + } + + fn into_id(self) -> ProcedureId { + match self { + Self::Echo => ProcedureId::new(0x00), + } + } + } + + pub struct EchoService; + + impl Service for EchoService { + type ProcedureId = EchoProcedureId; + type Procedures = HList![ProcedureEcho]; + + const ID: ServiceId = ServiceId::new(0x02); + const VERSION: Version = Version { + major: 0x00, + minor: 0x00, + }; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } + + pub struct ProcedureEcho; + + impl Procedure for ProcedureEcho { + type Service = EchoService; + + const ID: ::ProcedureId = EchoProcedureId::Echo; + + fn metadata() -> Metadata { + Metadata { + since: Version { + major: 0x00, + minor: 0x00, + }, + deprecation: None, + } + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct EchoServer; + +impl EchoService for EchoServer { + async fn echo( + &self, + _: Session, + payload: Box, + ) -> Result, Report> { + Ok(payload) + } +} + +// TODO: this can be auto generated by the `harpc` crate +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct EchoDelegate { + inner: T, +} + +impl EchoDelegate { + pub const fn new(inner: T) -> Self { + Self { inner } + } +} + +impl ServiceDelegate, C> for EchoDelegate +where + T: EchoService + Send, + C: Encoder + ReportDecoder + Clone + Send, +{ + type Error = Report; + type Service = meta::EchoService; + + type Body + = impl Body, Error = ::Error> + where + Source: Body + Send + Sync; + + async fn call( + self, + request: Request, + session: Session, + codec: C, + ) -> Result>, Self::Error> + where + B: Body + Send + Sync, + { + let id = parse_procedure_id(&request)?; + + match id { + meta::EchoProcedureId::Echo => { + delegate_call_discrete(request, codec, |payload| async move { + self.inner.echo(session, payload).await + }) + .await + } + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct EchoClient; + +impl EchoService> for EchoClient +where + Svc: harpc_client::connection::ConnectionService, + C: harpc_client::connection::ConnectionCodec, +{ + async fn echo( + &self, + session: Connection, + payload: Box, + ) -> Result, Report> { + invoke_call_discrete(session, meta::EchoProcedureId::Echo, [payload]) + .await + .change_context(EchoError) + } +} diff --git a/apps/hash-graph/libs/api/src/rpc/mod.rs b/apps/hash-graph/libs/api/src/rpc/mod.rs new file mode 100644 index 00000000000..e1e072dc2a1 --- /dev/null +++ b/apps/hash-graph/libs/api/src/rpc/mod.rs @@ -0,0 +1,15 @@ +pub mod account; +pub mod auth; +pub mod echo; +mod session; + +mod role { + use harpc_client::connection::Connection; + use harpc_server::session::Session; + pub(crate) use harpc_service::role::Role; + + use super::session::Account; + + pub(crate) type Server = harpc_service::role::Server>; + pub(crate) type Client = harpc_service::role::Client>; +} diff --git a/apps/hash-graph/libs/api/src/rpc/session.rs b/apps/hash-graph/libs/api/src/rpc/session.rs new file mode 100644 index 00000000000..d183770114b --- /dev/null +++ b/apps/hash-graph/libs/api/src/rpc/session.rs @@ -0,0 +1,6 @@ +use graph_types::account::AccountId; + +#[derive(Debug, Clone, Default)] +pub(crate) struct Account { + pub actor_id: Option, +} diff --git a/apps/hash-graph/libs/store/src/account/mod.rs b/apps/hash-graph/libs/store/src/account/mod.rs index 8f6ece10eeb..c5051a4f89e 100644 --- a/apps/hash-graph/libs/store/src/account/mod.rs +++ b/apps/hash-graph/libs/store/src/account/mod.rs @@ -4,7 +4,7 @@ use graph_types::{ account::{AccountGroupId, AccountId}, owned_by_id::OwnedById, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use thiserror::Error; fn random_account_id() -> AccountId { @@ -19,7 +19,7 @@ fn random_account_group_id() -> AccountGroupId { #[error("Could not insert account")] pub struct AccountInsertionError; -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct InsertAccountIdParams { @@ -31,7 +31,7 @@ pub struct InsertAccountIdParams { #[error("Could not insert account group")] pub struct AccountGroupInsertionError; -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct InsertAccountGroupIdParams { diff --git a/libs/@local/harpc/client/Cargo.toml b/libs/@local/harpc/client/Cargo.toml index b42a2d74da1..bea6892f584 100644 --- a/libs/@local/harpc/client/Cargo.toml +++ b/libs/@local/harpc/client/Cargo.toml @@ -11,14 +11,17 @@ publish.workspace = true [dependencies] # Public workspace dependencies harpc-tower = { workspace = true, public = true } +harpc-service = { workspace = true, public = true } # Public third-party dependencies tower = { workspace = true, public = true } +serde-value = {version = "0.7.0", public = true } # Private workspace dependencies -error-stack = { workspace = true } +error-stack = { workspace = true, features = ["unstable", "futures"] } harpc-codec = { workspace = true } harpc-net = { workspace = true } +harpc-types = { workspace = true } # Private third-party dependencies bytes = { workspace = true } @@ -27,6 +30,8 @@ futures = { workspace = true } multiaddr = { workspace = true } thiserror = { workspace = true } tokio-util = { workspace = true } +serde = { workspace = true, features = ["derive"] } +derive_more = { version = "1.0.0", features = ["display"] } [lints] workspace = true diff --git a/libs/@local/harpc/client/package.json b/libs/@local/harpc/client/package.json index 1a88c0269ca..24aa0895cc3 100644 --- a/libs/@local/harpc/client/package.json +++ b/libs/@local/harpc/client/package.json @@ -7,6 +7,8 @@ "@rust/error-stack": "0.5.0", "@rust/harpc-codec": "0.0.0-private", "@rust/harpc-net": "0.0.0-private", - "@rust/harpc-tower": "0.0.0-private" + "@rust/harpc-service": "0.0.0-private", + "@rust/harpc-tower": "0.0.0-private", + "@rust/harpc-types": "0.0.0-private" } } diff --git a/libs/@local/harpc/client/src/connection/alias.rs b/libs/@local/harpc/client/src/connection/alias.rs new file mode 100644 index 00000000000..fd62176ab96 --- /dev/null +++ b/libs/@local/harpc/client/src/connection/alias.rs @@ -0,0 +1,79 @@ +//! Trait aliases used for services and codecs, to simplify the trait bounds. + +use alloc::vec; +use core::error::Error; + +use bytes::Buf; +use error_stack::Report; +use futures::stream; +use harpc_codec::{decode::Decoder, encode::Encoder}; +use harpc_tower::{request::Request, response::Response}; + +pub type ConnectionRequestStream = stream::Iter::Buf>>; + +pub trait ConnectionService: + tower::Service< + Request>>, + Response = Response, + Error = Report, + Future: Send, + > + Clone + + Send + + Sync +where + C: Encoder, +{ + type ResponseData: Buf; + type ResponseError; + + type ServiceError: Error + Send + Sync + 'static; + type ResponseStream: futures::Stream> + + Send + + Sync; +} + +impl ConnectionService for T +where + T: tower::Service< + Request>, + Response = Response, + Error = Report, + Future: Send, + > + Clone + + Send + + Sync, + C: Encoder, + St: futures::Stream> + Send + Sync, + ResData: Buf, + ServiceError: Error + Send + Sync + 'static, +{ + type ResponseData = ResData; + type ResponseError = ResError; + type ResponseStream = St; + type ServiceError = ServiceError; +} + +pub trait ConnectionCodec: + Encoder, Buf: Send> + + Decoder> + + Clone + + Send + + Sync +{ + type EncoderError: Error + Send + Sync + 'static; + type DecoderError: Error + Send + Sync + 'static; +} + +impl ConnectionCodec for C +where + C: Encoder, Buf: Send> + + Decoder> + + Clone + + Send + + Sync, + EncoderError: Error + Send + Sync + 'static, + DecoderError: Error + Send + Sync + 'static, +{ + type DecoderError = DecoderError; + type EncoderError = EncoderError; +} diff --git a/libs/@local/harpc/client/src/connection/mod.rs b/libs/@local/harpc/client/src/connection/mod.rs index b4a42d43cdc..da58957ab29 100644 --- a/libs/@local/harpc/client/src/connection/mod.rs +++ b/libs/@local/harpc/client/src/connection/mod.rs @@ -5,6 +5,9 @@ use futures::Stream; use harpc_tower::request::Request; use tower::Service; +pub use self::alias::{ConnectionCodec, ConnectionRequestStream, ConnectionService}; + +mod alias; pub mod default; pub mod service; diff --git a/libs/@local/harpc/client/src/error.rs b/libs/@local/harpc/client/src/error.rs new file mode 100644 index 00000000000..5d091620080 --- /dev/null +++ b/libs/@local/harpc/client/src/error.rs @@ -0,0 +1,90 @@ +use core::{ + error::Error, + fmt::{self, Display}, +}; + +// H-xxxx: error-stack reports are currently not de-serializable, see: +#[derive( + Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, derive_more::Display, +)] +#[display("The remote server has encountered an error: {_0:?}")] +pub struct RemoteError(serde_value::Value); + +impl Error for RemoteError {} + +impl RemoteError { + #[must_use] + pub const fn new(value: serde_value::Value) -> Self { + Self(value) + } +} + +impl From for RemoteError { + fn from(value: serde_value::Value) -> Self { + Self(value) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ResponseExpectedItemCountMismatch { + min: Option, + max: Option, +} + +impl ResponseExpectedItemCountMismatch { + #[must_use] + pub const fn exactly(expected: usize) -> Self { + Self { + min: Some(expected), + max: Some(expected), + } + } + + #[must_use] + pub const fn at_least(min: usize) -> Self { + Self { + min: Some(min), + max: None, + } + } + + #[must_use] + pub const fn at_most(max: usize) -> Self { + Self { + min: None, + max: Some(max), + } + } + + #[must_use] + pub const fn with_min(mut self, min: usize) -> Self { + self.min = Some(min); + self + } + + #[must_use] + pub const fn with_max(mut self, max: usize) -> Self { + self.max = Some(max); + self + } +} + +impl Display for ResponseExpectedItemCountMismatch { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match (self.min, self.max) { + (Some(min), Some(max)) if min == max => write!(fmt, "expected length of {min}"), + (Some(min), Some(max)) => write!(fmt, "expected length between {min} and {max}"), + (Some(min), None) => write!(fmt, "expected length of at least {min}"), + (None, Some(max)) => write!(fmt, "expected length of at most {max}"), + (None, None) => fmt.write_str("expected length"), + } + } +} + +impl Error for ResponseExpectedItemCountMismatch {} + +#[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)] +#[display("The client has encountered an error while making a call to the remote server.")] +pub struct RemoteInvocationError; + +impl Error for RemoteInvocationError {} diff --git a/libs/@local/harpc/client/src/lib.rs b/libs/@local/harpc/client/src/lib.rs index 676b15bfc63..aafce70b478 100644 --- a/libs/@local/harpc/client/src/lib.rs +++ b/libs/@local/harpc/client/src/lib.rs @@ -3,6 +3,8 @@ extern crate alloc; pub mod connection; +pub mod error; +pub mod utils; use alloc::sync::Arc; diff --git a/libs/@local/harpc/client/src/utils.rs b/libs/@local/harpc/client/src/utils.rs new file mode 100644 index 00000000000..c849a23dae5 --- /dev/null +++ b/libs/@local/harpc/client/src/utils.rs @@ -0,0 +1,99 @@ +//! Common utilities used to implement various traits as well as used in the proc-macro. + +use core::error::Error; + +use error_stack::{Report, ResultExt as _, TryReportStreamExt as _}; +use futures::{StreamExt as _, stream}; +use harpc_codec::encode::Encoder; +use harpc_net::session::server::SessionId; +use harpc_service::{Service, procedure::ProcedureIdentifier}; +use harpc_tower::{ + Extensions, + request::{self, Request}, +}; +use harpc_types::{procedure::ProcedureDescriptor, service::ServiceDescriptor}; +use tower::ServiceExt as _; + +use crate::{ + connection::{Connection, ConnectionCodec, ConnectionRequestStream, ConnectionService}, + error::{RemoteError, RemoteInvocationError, ResponseExpectedItemCountMismatch}, +}; + +/// Encode a request of an iterator of items. +pub async fn encode_request_iter( + codec: E, + procedure: P, + items: impl IntoIterator + Send, +) -> Result>, Report<[C]>> +where + P: ProcedureIdentifier + Send, + E: Encoder, Buf: Send> + Send, + C: Error + Send + Sync + 'static, +{ + let items: Vec<_> = codec + .encode(stream::iter(items)) + .try_collect_reports() + .await?; + + Ok(Request::from_parts( + request::Parts { + service: ServiceDescriptor { + id: ::ID, + version: ::VERSION, + }, + procedure: ProcedureDescriptor { + id: procedure.into_id(), + }, + session: SessionId::CLIENT, + extensions: Extensions::new(), + }, + stream::iter(items), + )) +} + +/// Delegates a call to a closure with a predetermined amount of inputs and outputs. +/// +/// # Errors +/// +/// This function returns a `Report` in the following cases: +/// - If encoding the request fails +/// - If the service call fails +/// - If the response doesn't contain exactly one item +/// - If decoding the response fails +/// - If the remote server returns an error +pub async fn invoke_call_discrete( + connection: Connection, + procedure: impl ProcedureIdentifier + Send, + request: impl IntoIterator + Send, +) -> Result> +where + Svc: ConnectionService, + C: ConnectionCodec, + O: serde::de::DeserializeOwned, +{ + let (service, codec) = connection.into_parts(); + + let request = encode_request_iter(codec.clone(), procedure, request) + .await + .change_context(RemoteInvocationError)?; + + let response = service + .oneshot(request) + .await + .change_context(RemoteInvocationError)?; + + let (_, body) = response.into_parts(); + + let items = codec.decode(body); + let mut items = core::pin::pin!(items); + + let data: Result<_, _> = items + .next() + .await + .ok_or_else(|| Report::new(ResponseExpectedItemCountMismatch::exactly(1))) + .change_context(RemoteInvocationError)? + .change_context(RemoteInvocationError)?; + + data.map_err(RemoteError::new) + .change_context(RemoteInvocationError) +} diff --git a/libs/@local/harpc/codec/src/decode.rs b/libs/@local/harpc/codec/src/decode.rs index d23877ca709..b228d3634fb 100644 --- a/libs/@local/harpc/codec/src/decode.rs +++ b/libs/@local/harpc/codec/src/decode.rs @@ -1,4 +1,7 @@ +use core::error::Error; + use bytes::Buf; +use error_stack::Report; use futures_core::{Stream, TryStream}; pub trait Decoder { @@ -14,3 +17,16 @@ pub trait Decoder { T: serde::de::DeserializeOwned, S: TryStream + Send; } + +// This trait is needed for service delegate, as otherwise the TAIT captures the underlying generic. +pub trait ReportDecoder: Decoder> { + type Context: Error + Send + Sync + 'static; +} + +impl ReportDecoder for T +where + T: Decoder>, + C: Error + Send + Sync + 'static, +{ + type Context = C; +} diff --git a/libs/@local/harpc/server/Cargo.toml b/libs/@local/harpc/server/Cargo.toml index 6aa3f96b148..3d417af2aeb 100644 --- a/libs/@local/harpc/server/Cargo.toml +++ b/libs/@local/harpc/server/Cargo.toml @@ -11,20 +11,20 @@ publish.workspace = true [dependencies] # Public workspace dependencies harpc-service = { workspace = true, public = true } +harpc-net = { workspace = true, public = true } # Public third-party dependencies frunk_core = { version = "0.4.3", public = true } tower = { workspace = true, public = true, features = ["make"] } # Private workspace dependencies -harpc-net = { workspace = true } harpc-tower = { workspace = true } harpc-types = { workspace = true } # Private third-party dependencies bytes = { workspace = true } derive-where = { workspace = true } -derive_more = { version = "1.0.0", features = ["display", "error"] } +derive_more = { version = "1.0.0", features = ["debug", "display", "error"] } error-stack = { workspace = true } frunk = "0.4.3" futures = { workspace = true } diff --git a/libs/@local/harpc/server/examples/account.rs b/libs/@local/harpc/server/examples/account.rs index ccd60e82c1f..438b1a77aa5 100644 --- a/libs/@local/harpc/server/examples/account.rs +++ b/libs/@local/harpc/server/examples/account.rs @@ -20,8 +20,7 @@ use futures::{Stream, StreamExt as _, TryFutureExt as _, TryStreamExt as _, pin_ use graph_types::account::AccountId; use harpc_client::{Client, ClientConfig, connection::Connection}; use harpc_codec::{decode::Decoder, encode::Encoder, json::JsonCodec}; -use harpc_net::session::server::SessionId; -use harpc_server::{Server, ServerConfig, router::RouterBuilder, serve::serve}; +use harpc_server::{Server, ServerConfig, router::RouterBuilder, serve::serve, session::SessionId}; use harpc_service::{ Service, delegate::ServiceDelegate, @@ -53,6 +52,8 @@ enum AccountProcedureId { } impl ProcedureIdentifier for AccountProcedureId { + type Service = Account; + fn from_id(id: ProcedureId) -> Option { match id.value() { 0 => Some(Self::CreateAccount), diff --git a/libs/@local/harpc/server/src/delegate.rs b/libs/@local/harpc/server/src/delegate.rs index 821270f35b6..75752f173b4 100644 --- a/libs/@local/harpc/server/src/delegate.rs +++ b/libs/@local/harpc/server/src/delegate.rs @@ -8,7 +8,7 @@ use harpc_service::delegate::ServiceDelegate; use harpc_tower::{body::Body, request::Request, response::Response}; use tower::Service; -use crate::session::{Session, SessionStorage}; +use crate::session::{RequestInfo, Session, SessionStorage}; /// Bridge between `harpc-service` and `tower`. /// @@ -16,7 +16,7 @@ use crate::session::{Session, SessionStorage}; /// for taking the incoming request, selecting the appropriate session and codec, and then /// delegating the request to the inner service (which is cloned). /// -/// A concious decision was made not to have `ServiceDelegate` be a `Service`, as it allows for +/// A conscious decision was made not to have `ServiceDelegate` be a `Service`, as it allows for /// greater ergonomics, and allows server implementation that are not based on tower in the future. /// For example, because of the inherit `oneshot` nature of our tower implementation, having `&mut /// self` as a parameter would be more confusing than helpful. @@ -67,7 +67,12 @@ where async move { let storage = session; - let session = storage.get_or_insert(req.session()).await; + let session = storage + .get_or_insert(req.session(), RequestInfo { + service: req.service(), + procedure: req.procedure(), + }) + .await; delegate.call(req, session, codec).await } diff --git a/libs/@local/harpc/server/src/error.rs b/libs/@local/harpc/server/src/error.rs index 63769e96621..dbf2780e16a 100644 --- a/libs/@local/harpc/server/src/error.rs +++ b/libs/@local/harpc/server/src/error.rs @@ -1,28 +1,133 @@ -use core::error::Error; - -use harpc_types::{error_code::ErrorCode, service::ServiceId, version::Version}; - -#[derive( - Debug, - Copy, - Clone, - PartialEq, - Eq, - PartialOrd, - Ord, - Hash, - derive_more::Display, - serde::Serialize, - serde::Deserialize, -)] -#[display("service by id {service:?} and version {version:?} not found")] -pub struct NotFound { - pub service: ServiceId, - pub version: Version, -} - -impl Error for NotFound { +use alloc::borrow::Cow; +use core::{ + error::Error, + fmt::{self, Display}, +}; + +use harpc_types::{ + error_code::ErrorCode, + procedure::{ProcedureDescriptor, ProcedureId}, + service::ServiceDescriptor, +}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::Display)] +#[display("service {service} not found")] +pub struct ServiceNotFound { + pub service: ServiceDescriptor, +} + +impl Error for ServiceNotFound { + fn provide<'a>(&'a self, request: &mut core::error::Request<'a>) { + request.provide_value(ErrorCode::SERVICE_NOT_FOUND); + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::Display)] +#[display("procedure {procedure} not found in service {service}")] +pub struct ProcedureNotFound { + pub service: ServiceDescriptor, + + pub procedure: ProcedureId, +} + +impl Error for ProcedureNotFound { + fn provide<'a>(&'a self, request: &mut core::error::Request<'a>) { + request.provide_value(ErrorCode::PROCEDURE_NOT_FOUND); + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Forbidden { + pub service: ServiceDescriptor, + pub procedure: ProcedureDescriptor, + + pub reason: Cow<'static, str>, +} + +impl Display for Forbidden { + fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + fmt, + "forbidden to call {}::{}", + self.service, self.procedure + )?; + + if !self.reason.is_empty() { + write!(fmt, ", reason: {}", self.reason)?; + } + + Ok(()) + } +} + +impl Error for Forbidden { + fn provide<'a>(&'a self, request: &mut core::error::Request<'a>) { + request.provide_value(ErrorCode::FORBIDDEN); + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RequestExpectedItemCountMismatch { + min: Option, + max: Option, +} + +impl RequestExpectedItemCountMismatch { + #[must_use] + pub const fn exactly(expected: usize) -> Self { + Self { + min: Some(expected), + max: Some(expected), + } + } + + #[must_use] + pub const fn at_least(min: usize) -> Self { + Self { + min: Some(min), + max: None, + } + } + + #[must_use] + pub const fn at_most(max: usize) -> Self { + Self { + min: None, + max: Some(max), + } + } + + #[must_use] + pub const fn with_min(mut self, min: usize) -> Self { + self.min = Some(min); + self + } + + #[must_use] + pub const fn with_max(mut self, max: usize) -> Self { + self.max = Some(max); + self + } +} + +impl Display for RequestExpectedItemCountMismatch { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match (self.min, self.max) { + (Some(min), Some(max)) if min == max => write!(fmt, "expected length of {min}"), + (Some(min), Some(max)) => write!(fmt, "expected length between {min} and {max}"), + (Some(min), None) => write!(fmt, "expected length of at least {min}"), + (None, Some(max)) => write!(fmt, "expected length of at most {max}"), + (None, None) => fmt.write_str("expected length"), + } + } +} + +impl Error for RequestExpectedItemCountMismatch { fn provide<'a>(&'a self, request: &mut core::error::Request<'a>) { - request.provide_value(ErrorCode::NOT_FOUND); + request.provide_value(ErrorCode::REQUEST_EXPECTED_ITEM_COUNT_MISMATCH); } } + +#[derive(Debug, Copy, Clone, PartialEq, Eq, derive_more::Display, derive_more::Error)] +#[display("unable to delegate request to service implementation")] +pub struct DelegationError; diff --git a/libs/@local/harpc/server/src/lib.rs b/libs/@local/harpc/server/src/lib.rs index aa8a8128504..9cb7f37c9f3 100644 --- a/libs/@local/harpc/server/src/lib.rs +++ b/libs/@local/harpc/server/src/lib.rs @@ -9,6 +9,7 @@ pub mod router; pub mod serve; pub mod session; +pub mod utils; use core::{ pin::Pin, diff --git a/libs/@local/harpc/server/src/route.rs b/libs/@local/harpc/server/src/route.rs index d61f3a49657..654b43f6d41 100644 --- a/libs/@local/harpc/server/src/route.rs +++ b/libs/@local/harpc/server/src/route.rs @@ -12,7 +12,7 @@ use harpc_tower::{ use harpc_types::{response_kind::ResponseKind, service::ServiceId, version::Version}; use tower::{Service, ServiceExt as _, util::Oneshot}; -use crate::error::NotFound; +use crate::error::ServiceNotFound; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct Handler { @@ -132,9 +132,8 @@ impl Route for HNil { where ReqBody: Body + Send + Sync, { - let error = NotFound { - service: request.service().id, - version: request.service().version, + let error = ServiceNotFound { + service: request.service(), }; let session = request.session(); diff --git a/libs/@local/harpc/server/src/session.rs b/libs/@local/harpc/server/src/session.rs index add8c7f4dcc..663aec0ca93 100644 --- a/libs/@local/harpc/server/src/session.rs +++ b/libs/@local/harpc/server/src/session.rs @@ -3,16 +3,28 @@ use core::{fmt::Debug, sync::atomic::AtomicUsize, time::Duration}; use std::{collections::HashSet, sync::Mutex}; use futures::{Stream, StreamExt as _}; -use harpc_net::session::server::{SessionEvent, SessionEventError, SessionId}; +pub use harpc_net::session::server::SessionId; +use harpc_net::session::server::{SessionEvent, SessionEventError}; +use harpc_types::{procedure::ProcedureDescriptor, service::ServiceDescriptor}; use scc::{ebr::Guard, hash_index::Entry}; use tokio::pin; use tokio_util::sync::CancellationToken; +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct RequestInfo { + pub service: ServiceDescriptor, + pub procedure: ProcedureDescriptor, +} + +#[derive(derive_more::Debug)] pub struct Session { + #[debug(skip)] storage: Arc>, key: SessionId, value: Arc, + + request_info: RequestInfo, } impl Session @@ -68,6 +80,12 @@ where false } } + + /// Request information associated with the current request. + #[must_use] + pub const fn request_info(&self) -> RequestInfo { + self.request_info + } } impl AsRef for Session { @@ -78,10 +96,10 @@ impl AsRef for Session { #[derive(Debug)] struct Marked { - // we use an std Mutex here, because we do not use the guard across an await point, therefore + // We use an std mutex here, because we do not use the guard across an await point, therefore // an std mutex is faster, smaller and more efficient. inner: Mutex>, - // SeqCst is not needed as we don't require total ordering across all threads. + // `SeqCst` is not needed as we don't require total ordering across all threads. len: AtomicUsize, } @@ -153,7 +171,7 @@ impl Marked { /// the session values), the session values are stored as `Arc`. /// Values could be stored as `T` instead, but that would mean that a service could potentially /// simply lose access to the value if it is removed from the storage during a call, severely -/// impacting ergnomics. +/// impacting ergonomics. /// The underlying storage is lock-free for any read operations. #[derive_where::derive_where(Debug; T: Debug + 'static)] pub struct SessionStorage { @@ -183,10 +201,14 @@ impl SessionStorage where T: Default + Send + Sync + 'static, { - pub(crate) async fn get_or_insert(self: Arc, session_id: SessionId) -> Session { + pub(crate) async fn get_or_insert( + self: Arc, + session_id: SessionId, + request_info: RequestInfo, + ) -> Session { self.marked.remove(session_id); - // shortcut, which is completely lock-free + // Shortcut, which is completely lock-free if let Some(value) = self .storage .peek_with(&session_id, |_, value| Arc::clone(value)) @@ -195,16 +217,21 @@ where storage: Arc::clone(&self), key: session_id, value, + request_info, }; } - let entry = self.storage.entry_async(session_id).await.or_default(); - let value = Arc::clone(entry.get()); + let value = { + let entry = self.storage.entry_async(session_id).await.or_default(); + + Arc::clone(entry.get()) + }; Session { storage: Arc::clone(&self), key: session_id, value, + request_info, } } } diff --git a/libs/@local/harpc/server/src/utils.rs b/libs/@local/harpc/server/src/utils.rs new file mode 100644 index 00000000000..e36a0d418c4 --- /dev/null +++ b/libs/@local/harpc/server/src/utils.rs @@ -0,0 +1,95 @@ +//! Common utilities used to implement various traits as well as used in the proc-macro. + +use core::{array, pin::pin}; + +use error_stack::{Report, ResultExt as _}; +use futures::{StreamExt as _, stream}; +use harpc_codec::{decode::ReportDecoder, encode::Encoder}; +use harpc_service::{Service, procedure::ProcedureIdentifier}; +use harpc_tower::{ + body::{Body, BodyExt as _, Frame, controlled::Controlled, stream::StreamBody}, + request::Request, + response::{self, Response}, +}; +use harpc_types::{ + procedure::ProcedureDescriptor, response_kind::ResponseKind, service::ServiceDescriptor, +}; + +use crate::error::{DelegationError, ProcedureNotFound, RequestExpectedItemCountMismatch}; + +/// Parses the procedure identifier from the given request. +/// +/// This function extracts the procedure identifier from the request and converts it +/// into the specified `ProcedureIdentifier` type. +/// +/// # Errors +/// +/// Returns a `DelegationError` if the procedure identifier cannot be parsed into the specified type +/// and is therefore not found. +pub fn parse_procedure_id(request: &Request) -> Result> +where + P: ProcedureIdentifier, +{ + let ProcedureDescriptor { id } = request.procedure(); + + P::from_id(id) + .ok_or(ProcedureNotFound { + service: ServiceDescriptor { + id: ::ID, + version: ::VERSION, + }, + procedure: id, + }) + .change_context(DelegationError) +} + +/// Delegates a call to a closure with a single input and output. +pub async fn delegate_call_discrete( + request: Request, + codec: C, + closure: impl FnOnce(I) -> Fut + Send, +) -> Result< + Response< + // Precise capturing of types isn't implemented yet, so we're going to painful + // route, as we don't want to capture any unnecessary types. + Controlled< + ResponseKind, + StreamBody< + stream::MapOk< + ::Output>>, + fn(::Buf) -> Frame<::Buf, !>, + >, + >, + >, + >, + Report, +> +where + B: Body + Send + Sync, + I: serde::de::DeserializeOwned, + O: serde::Serialize + Send, + C: Encoder + ReportDecoder + Clone + Send, + Fut: Future + Send, +{ + let session_id = request.session(); + + let body = request.into_body(); + let data = body.into_stream().into_data_stream(); + + let stream = codec.clone().decode(data); + let mut stream = pin!(stream); + + let payload = stream + .next() + .await + .ok_or_else(|| RequestExpectedItemCountMismatch::exactly(1)) + .change_context(DelegationError)? + .change_context(DelegationError)?; + + let response = closure(payload).await; + + let data = codec.encode(stream::iter([response])); + + // In theory we could also box this, or use `Either` if we have multiple responses + Ok(Response::from_ok(response::Parts::new(session_id), data)) +} diff --git a/libs/@local/harpc/service/src/lib.rs b/libs/@local/harpc/service/src/lib.rs index e454ae0e748..81b7007b533 100644 --- a/libs/@local/harpc/service/src/lib.rs +++ b/libs/@local/harpc/service/src/lib.rs @@ -1,6 +1,9 @@ #![feature(never_type, marker_trait_attr)] -use harpc_types::{service::ServiceId, version::Version}; +use harpc_types::{ + service::{ServiceDescriptor, ServiceId}, + version::Version, +}; use self::{metadata::Metadata, procedure::ProcedureIdentifier}; @@ -10,13 +13,21 @@ pub mod procedure; pub mod role; pub trait Service { - type ProcedureId: ProcedureIdentifier; - /// Heteregenous list of procedures that are part of this service, used for type-level + type ProcedureId: ProcedureIdentifier; + /// Heterogeneous list of procedures that are part of this service, used for type-level /// validation. type Procedures; const ID: ServiceId; const VERSION: Version; + #[must_use] + fn descriptor() -> ServiceDescriptor { + ServiceDescriptor { + id: Self::ID, + version: Self::VERSION, + } + } + fn metadata() -> Metadata; } diff --git a/libs/@local/harpc/service/src/procedure.rs b/libs/@local/harpc/service/src/procedure.rs index 70c4bd3a726..be6ff06b963 100644 --- a/libs/@local/harpc/service/src/procedure.rs +++ b/libs/@local/harpc/service/src/procedure.rs @@ -1,5 +1,5 @@ use frunk::HCons; -use harpc_types::procedure::ProcedureId; +use harpc_types::procedure::{ProcedureDescriptor, ProcedureId}; use crate::{Service, metadata::Metadata}; @@ -14,6 +14,8 @@ impl IncludesProcedure for HCons where Head: Proce impl IncludesProcedure

for HCons where Tail: IncludesProcedure

{} pub trait ProcedureIdentifier: Sized { + type Service: Service; + fn from_id(id: ProcedureId) -> Option; fn into_id(self) -> ProcedureId; } @@ -23,5 +25,12 @@ pub trait Procedure: Sized { const ID: ::ProcedureId; + #[must_use] + fn descriptor() -> ProcedureDescriptor { + ProcedureDescriptor { + id: Self::ID.into_id(), + } + } + fn metadata() -> Metadata; } diff --git a/libs/@local/harpc/types/src/error_code.rs b/libs/@local/harpc/types/src/error_code.rs index 8bb0cf18740..bc9ab6adbf2 100644 --- a/libs/@local/harpc/types/src/error_code.rs +++ b/libs/@local/harpc/types/src/error_code.rs @@ -85,7 +85,21 @@ define_error_code_consts! { /// The combination of service and version requirement could not be found on the server. /// /// The HTTP equivalent is 404 Not Found. - NOT_FOUND + SERVICE_NOT_FOUND, + /// The service was found, but the procedure was not. + /// + /// The HTTP equivalent is 404 Not Found. + PROCEDURE_NOT_FOUND, + /// The resource was not found. + /// + /// The HTTP equivalent is 404 Not Found. + RESOURCE_NOT_FOUND, + /// The client is not allowed to call the procedure. + /// + /// The HTTP equivalent is 403 Forbidden. + FORBIDDEN, + /// The amount of items in the request stream does not match the expected amount. + REQUEST_EXPECTED_ITEM_COUNT_MISMATCH ], // 0xFF_xx = server errors /// Errors that occur in a session and are issued by the server. diff --git a/libs/@local/harpc/types/src/procedure.rs b/libs/@local/harpc/types/src/procedure.rs index 940d542072f..df10c879a17 100644 --- a/libs/@local/harpc/types/src/procedure.rs +++ b/libs/@local/harpc/types/src/procedure.rs @@ -1,3 +1,5 @@ +use core::fmt::Display; + #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "proptest", derive(test_strategy::Arbitrary))] @@ -21,9 +23,23 @@ impl ProcedureId { } } +impl Display for ProcedureId { + fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let Self(value) = self; + + write!(fmt, "{value:#06x}") + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "proptest", derive(test_strategy::Arbitrary))] pub struct ProcedureDescriptor { pub id: ProcedureId, } + +impl Display for ProcedureDescriptor { + fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + Display::fmt(&self.id, fmt) + } +} diff --git a/libs/@local/harpc/types/src/service.rs b/libs/@local/harpc/types/src/service.rs index 4700482bc14..c1fb5cb8ede 100644 --- a/libs/@local/harpc/types/src/service.rs +++ b/libs/@local/harpc/types/src/service.rs @@ -1,3 +1,5 @@ +use core::fmt::Display; + use crate::version::Version; #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -23,6 +25,14 @@ impl ServiceId { } } +impl Display for ServiceId { + fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let Self(value) = self; + + write!(fmt, "{value:#06X}") + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "proptest", derive(test_strategy::Arbitrary))] @@ -30,3 +40,11 @@ pub struct ServiceDescriptor { pub id: ServiceId, pub version: Version, } + +impl Display for ServiceDescriptor { + fn fmt(&self, fmt: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let &Self { id, version } = self; + + write!(fmt, "{id}@{version}",) + } +}