Skip to content

Commit

Permalink
Expose RPC server in binary (#5610)
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp authored Nov 9, 2024
1 parent 5a95185 commit c4b5ed9
Show file tree
Hide file tree
Showing 19 changed files with 347 additions and 51 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions apps/hash-graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ hash-graph-test-server = { workspace = true, optional = true }
hash-temporal-client = { workspace = true }
hash-tracing = { workspace = true, features = ["clap"] }
type-system = { workspace = true }
harpc-codec = { workspace = true, features = ["json"] }
harpc-server = { workspace = true }

# Third party dependencies
axum = { workspace = true }
Expand All @@ -48,6 +50,7 @@ tokio = { workspace = true }
tokio-postgres = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
tracing = { workspace = true }
multiaddr = { workspace = true }

[features]
test-server = ["dep:hash-graph-test-server"]
Expand Down
2 changes: 2 additions & 0 deletions apps/hash-graph/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"@blockprotocol/type-system-rs": "0.0.0-private",
"@rust/error-stack": "0.5.0",
"@rust/graph": "0.0.0-private",
"@rust/harpc-codec": "0.0.0-private",
"@rust/harpc-server": "0.0.0-private",
"@rust/hash-codec": "0.0.0-private",
"@rust/hash-graph-api": "0.0.0-private",
"@rust/hash-graph-authorization": "0.0.0-private",
Expand Down
1 change: 1 addition & 0 deletions apps/hash-graph/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![forbid(unsafe_code)]
#![feature(async_closure)]
#![expect(
unreachable_pub,
reason = "This is a binary but as we want to document this crate as well this should be a \
Expand Down
152 changes: 132 additions & 20 deletions apps/hash-graph/src/subcommand/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ use error_stack::{Report, ResultExt as _};
use graph::{
ontology::domain_validator::DomainValidator,
store::{
DatabaseConnectionInfo, DatabasePoolConfig, FetchingPool, PostgresStorePool, StorePool as _,
DatabaseConnectionInfo, DatabasePoolConfig, FetchingPool, PostgresStorePool, StorePool,
},
};
use hash_graph_api::rest::{RestRouterDependencies, rest_api_router};
use harpc_codec::json::JsonCodec;
use harpc_server::Server;
use hash_graph_api::{
rest::{RestRouterDependencies, rest_api_router},
rpc::Dependencies,
};
use hash_graph_authorization::{
AuthorizationApi as _, NoAuthorization,
AuthorizationApi as _, AuthorizationApiPool, NoAuthorization,
backend::{SpiceDbOpenApi, ZanzibarBackend as _},
zanzibar::ZanzibarClient,
};
use hash_temporal_client::TemporalClientConfig;
use multiaddr::{Multiaddr, Protocol};
use regex::Regex;
use reqwest::{Client, Url};
use tokio::{net::TcpListener, time::timeout};
Expand Down Expand Up @@ -59,6 +65,38 @@ impl TryFrom<ApiAddress> for SocketAddr {
}
}

#[derive(Debug, Clone, Parser)]
pub struct RpcAddress {
/// The host the RPC client is listening at.
#[clap(long, default_value = "127.0.0.1", env = "HASH_GRAPH_RPC_HOST")]
pub rpc_host: String,

/// The port the RPC client is listening at.
#[clap(long, default_value_t = 4002, env = "HASH_GRAPH_RPC_PORT")]
pub rpc_port: u16,
}

impl fmt::Display for RpcAddress {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}:{}", self.rpc_host, self.rpc_port)
}
}

impl TryFrom<RpcAddress> for SocketAddr {
type Error = Report<AddrParseError>;

fn try_from(address: RpcAddress) -> Result<Self, Report<AddrParseError>> {
address
.to_string()
.parse::<Self>()
.attach_printable(address)
}
}

#[expect(
clippy::struct_excessive_bools,
reason = "CLI arguments are boolean flags."
)]
#[derive(Debug, Parser)]
pub struct ServerArgs {
#[clap(flatten)]
Expand All @@ -67,10 +105,18 @@ pub struct ServerArgs {
#[clap(flatten)]
pub pool_config: DatabasePoolConfig,

/// The address the REST client is listening at.
/// The address the REST server is listening at.
#[clap(flatten)]
pub api_address: ApiAddress,

/// Enable the experimental RPC server.
#[clap(long, default_value_t = false, env = "HASH_GRAPH_RPC_ENABLED")]
pub rpc_enabled: bool,

/// The address the RPC server is listening at.
#[clap(flatten)]
pub rpc_address: RpcAddress,

/// The address for the type fetcher RPC server is listening at.
#[clap(flatten)]
pub type_fetcher_address: TypeFetcherAddress,
Expand Down Expand Up @@ -134,6 +180,54 @@ pub struct ServerArgs {
pub temporal_port: u16,
}

fn server_rpc<S, A>(
address: RpcAddress,
dependencies: Dependencies<S, A, ()>,
) -> Result<(), Report<GraphError>>
where
S: StorePool + Send + Sync + 'static,
A: AuthorizationApiPool + Send + Sync + 'static,
{
let server = Server::new(harpc_server::ServerConfig::default()).change_context(GraphError)?;

let (router, task) = hash_graph_api::rpc::rpc_router(
Dependencies {
store: dependencies.store,
authorization_api: dependencies.authorization_api,
temporal_client: dependencies.temporal_client,
codec: JsonCodec,
},
server.events(),
);

tokio::spawn(task.into_future());

let socket_address: SocketAddr = SocketAddr::try_from(address).change_context(GraphError)?;
let mut address = Multiaddr::empty();
match socket_address {
SocketAddr::V4(v4) => {
address.push(Protocol::Ip4(*v4.ip()));
address.push(Protocol::Tcp(v4.port()));
}
SocketAddr::V6(v6) => {
address.push(Protocol::Ip6(*v6.ip()));
address.push(Protocol::Tcp(v6.port()));
}
}

#[expect(clippy::significant_drop_tightening, reason = "false positive")]
tokio::spawn(async move {
let stream = server
.listen(address)
.await
.expect("server should be able to listen on address");

harpc_server::serve::serve(stream, router).await;
});

Ok(())
}

pub async fn server(args: ServerArgs) -> Result<(), Report<GraphError>> {
if args.healthcheck {
return wait_healthcheck(
Expand Down Expand Up @@ -186,24 +280,42 @@ pub async fn server(args: ServerArgs) -> Result<(), Report<GraphError>> {
let mut zanzibar_client = ZanzibarClient::new(spicedb_client);
zanzibar_client.seed().await.change_context(GraphError)?;

let router = rest_api_router(RestRouterDependencies {
store: Arc::new(pool),
authorization_api: Arc::new(zanzibar_client),
domain_regex: DomainValidator::new(args.allowed_url_domain),
temporal_client: if let Some(host) = args.temporal_host {
Some(
TemporalClientConfig::new(
Url::from_str(&format!("{}:{}", host, args.temporal_port))
.change_context(GraphError)?,
)
.change_context(GraphError)?
.await
.change_context(GraphError)?,
let temporal_client_fn = async |host: Option<String>, port: u16| {
if let Some(host) = host {
TemporalClientConfig::new(
Url::from_str(&format!("{host}:{port}")).change_context(GraphError)?,
)
.change_context(GraphError)?
.await
.map(Some)
.change_context(GraphError)
} else {
None
},
});
Ok(None)
}
};

let router = {
let dependencies = RestRouterDependencies {
store: Arc::new(pool),
authorization_api: Arc::new(zanzibar_client),
domain_regex: DomainValidator::new(args.allowed_url_domain),
temporal_client: temporal_client_fn(args.temporal_host.clone(), args.temporal_port)
.await?,
};

if args.rpc_enabled {
tracing::info!("Starting RPC server...");

server_rpc(args.rpc_address, Dependencies {
store: Arc::clone(&dependencies.store),
authorization_api: Arc::clone(&dependencies.authorization_api),
temporal_client: temporal_client_fn(args.temporal_host, args.temporal_port).await?,
codec: (),
})?;
}

rest_api_router(dependencies)
};

tracing::info!("Listening on {}", args.api_address);
axum::serve(
Expand Down
10 changes: 5 additions & 5 deletions libs/@local/graph/api/src/rpc/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ pub mod meta {
#[derive(Debug)]
#[derive_where::derive_where(Clone)]
pub struct AccountServer<S, A> {
authorization_api_pool: Arc<A>,
temporal_client: Option<Arc<TemporalClient>>,
store_pool: Arc<S>,
pub authorization_api_pool: Arc<A>,
pub temporal_client: Option<Arc<TemporalClient>>,
pub store_pool: Arc<S>,
}

impl<S, A> AccountServer<S, A>
Expand Down Expand Up @@ -495,7 +495,7 @@ where
type Body<Source>
= impl Body<Control: AsRef<ResponseKind>, Error = <C as Encoder>::Error>
where
Source: Body<Control = !, Error: Send + Sync> + Send + Sync;
Source: Body<Control = !, Error: Send + Sync> + Send;

async fn call<B>(
self,
Expand All @@ -504,7 +504,7 @@ where
codec: C,
) -> Result<Response<Self::Body<B>>, Self::Error>
where
B: Body<Control = !, Error: Send + Sync> + Send + Sync,
B: Body<Control = !, Error: Send + Sync> + Send,
{
let id = parse_procedure_id(&request)?;

Expand Down
4 changes: 2 additions & 2 deletions libs/@local/graph/api/src/rpc/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ where
type Body<Source>
= impl Body<Control: AsRef<ResponseKind>, Error = <C as Encoder>::Error>
where
Source: Body<Control = !, Error: Send + Sync> + Send + Sync;
Source: Body<Control = !, Error: Send + Sync> + Send;

async fn call<B>(
self,
Expand All @@ -144,7 +144,7 @@ where
codec: C,
) -> Result<Response<Self::Body<B>>, Self::Error>
where
B: Body<Control = !, Error: Send + Sync> + Send + Sync,
B: Body<Control = !, Error: Send + Sync> + Send,
{
let id = parse_procedure_id(&request)?;

Expand Down
4 changes: 2 additions & 2 deletions libs/@local/graph/api/src/rpc/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ where
type Body<Source>
= impl Body<Control: AsRef<ResponseKind>, Error = <C as Encoder>::Error>
where
Source: Body<Control = !, Error: Send + Sync> + Send + Sync;
Source: Body<Control = !, Error: Send + Sync> + Send;

async fn call<B>(
self,
Expand All @@ -136,7 +136,7 @@ where
codec: C,
) -> Result<Response<Self::Body<B>>, Self::Error>
where
B: Body<Control = !, Error: Send + Sync> + Send + Sync,
B: Body<Control = !, Error: Send + Sync> + Send,
{
let id = parse_procedure_id(&request)?;

Expand Down
66 changes: 66 additions & 0 deletions libs/@local/graph/api/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,30 @@ pub mod auth;
pub mod echo;
mod session;

use alloc::sync::Arc;

use graph::store::StorePool;
use harpc_codec::{decode::ReportDecoder, encode::ReportEncoder};
use harpc_server::{
route::Route,
router::{Router, RouterBuilder},
session::Task,
};
use harpc_system::SubsystemIdentifier;
use harpc_tower::{
body::server::request::RequestBody,
layer::{body_report::HandleBodyReportLayer, report::HandleReportLayer},
};
use harpc_types::subsystem::SubsystemId;
use hash_graph_authorization::AuthorizationApiPool;
use hash_temporal_client::TemporalClient;

use self::{
account::{AccountDelegate, AccountServer},
auth::{AuthenticationDelegate, AuthenticationServer},
echo::{EchoDelegate, EchoServer},
session::Account,
};

#[derive(Debug, Copy, Clone)]
pub enum GraphSubsystemId {
Expand Down Expand Up @@ -34,3 +56,47 @@ impl SubsystemIdentifier for GraphSubsystemId {
}
}
}

pub struct Dependencies<S, A, C> {
pub store: Arc<S>,
pub authorization_api: Arc<A>,
pub temporal_client: Option<TemporalClient>,
pub codec: C,
}

#[expect(
clippy::significant_drop_tightening,
reason = "false-positive in `AccountServer`"
)]
pub fn rpc_router<S, A, C, N>(
dependencies: Dependencies<S, A, C>,
notifications: N,
) -> (
Router<impl Route<RequestBody, ResponseBody: Send, Future: Send> + Send>,
Task<Account, N>,
)
where
S: StorePool + Send + Sync + 'static,
A: AuthorizationApiPool + Send + Sync + 'static,
C: ReportEncoder + ReportDecoder + Clone + Send + Sync + 'static,
{
let builder = RouterBuilder::new(dependencies.codec)
.with_builder(|builder| {
builder
.layer(HandleReportLayer::new())
.layer(HandleBodyReportLayer::new())
})
.register(AuthenticationDelegate::new(AuthenticationServer))
.register(AccountDelegate::new(AccountServer {
store_pool: dependencies.store,
authorization_api_pool: dependencies.authorization_api,
temporal_client: dependencies.temporal_client.map(Arc::new),
}))
.register(EchoDelegate::new(EchoServer));

let task = builder.background_task(notifications);

let router = builder.build();

(router, task)
}
Loading

0 comments on commit c4b5ed9

Please sign in to comment.