From 848d30dcb178d76a634912c12caac0685571115e Mon Sep 17 00:00:00 2001 From: Samir Talwar Date: Thu, 22 Aug 2024 11:00:23 +0200 Subject: [PATCH] DOES NOT COMPILE --- crates/sdk/src/connector.rs | 31 +++++++++++---- crates/sdk/src/connector/example.rs | 15 ++++++- crates/sdk/src/default_main.rs | 47 +++++++++++++++------- crates/sdk/src/state.rs | 62 +++++++++++++++++++++++++---- 4 files changed, 123 insertions(+), 32 deletions(-) diff --git a/crates/sdk/src/connector.rs b/crates/sdk/src/connector.rs index f8cbe08..5ae8890 100644 --- a/crates/sdk/src/connector.rs +++ b/crates/sdk/src/connector.rs @@ -40,9 +40,9 @@ pub use error::*; #[async_trait] pub trait Connector: Send { /// The type of validated configuration - type Configuration: Sync + Send; + type Configuration: Send + Sync; /// The type of unserializable state - type State: Sync + Send; + type State: Send + Sync; /// Update any metrics from the state /// @@ -140,8 +140,19 @@ pub trait Connector: Send { /// /// See [`Connector`] for further details. #[async_trait] -pub trait ConnectorSetup { - type Connector: Connector; +pub trait ConnectorSetup: + ParseConfiguration::Configuration> + + InitState< + Configuration = ::Configuration, + State = ::State, + > + 'static +{ + type Connector: Connector + 'static; +} + +#[async_trait] +pub trait ParseConfiguration { + type Configuration; /// Validate the configuration provided by the user, returning a configuration error or a /// validated [`Connector::Configuration`]. @@ -151,7 +162,13 @@ pub trait ConnectorSetup { async fn parse_configuration( &self, configuration_dir: impl AsRef + Send, - ) -> Result<::Configuration>; + ) -> Result; +} + +#[async_trait] +pub trait InitState: Send + Sync { + type Configuration; + type State; /// Initialize the connector's in-memory state. /// @@ -162,7 +179,7 @@ pub trait ConnectorSetup { /// registry. async fn try_init_state( &self, - configuration: &::Configuration, + configuration: &Self::Configuration, metrics: &mut prometheus::Registry, - ) -> Result<::State>; + ) -> Result; } diff --git a/crates/sdk/src/connector/example.rs b/crates/sdk/src/connector/example.rs index 0bfe0bc..c2c2e03 100644 --- a/crates/sdk/src/connector/example.rs +++ b/crates/sdk/src/connector/example.rs @@ -10,8 +10,8 @@ use super::*; pub struct Example {} #[async_trait] -impl ConnectorSetup for Example { - type Connector = Self; +impl ParseConfiguration for Example { + type Configuration = (); async fn parse_configuration( &self, @@ -19,6 +19,12 @@ impl ConnectorSetup for Example { ) -> Result<::Configuration> { Ok(()) } +} + +#[async_trait] +impl InitState for Example { + type Configuration = (); + type State = (); async fn try_init_state( &self, @@ -29,6 +35,11 @@ impl ConnectorSetup for Example { } } +#[async_trait] +impl ConnectorSetup for Example { + type Connector = Self; +} + #[async_trait] impl Connector for Example { type Configuration = (); diff --git a/crates/sdk/src/default_main.rs b/crates/sdk/src/default_main.rs index 5bceede..7cd7e66 100644 --- a/crates/sdk/src/default_main.rs +++ b/crates/sdk/src/default_main.rs @@ -262,10 +262,9 @@ pub async fn init_server_state( setup: Setup, config_directory: impl AsRef + Send, ) -> Result> { - let mut metrics = Registry::new(); + let metrics = Registry::new(); let configuration = setup.parse_configuration(config_directory).await?; - let state = setup.try_init_state(&configuration, &mut metrics).await?; - Ok(ServerState::new(configuration, state, metrics)) + Ok(ServerState::new(configuration, setup, metrics)) } pub fn create_router( @@ -354,8 +353,11 @@ fn auth_handler( } } -async fn get_metrics(State(state): State>) -> Result { - fetch_metrics::(state.configuration(), state.state(), state.metrics()) +async fn get_metrics(State(state): State>) -> Result +where + C::State: Clone, +{ + fetch_metrics::(state.configuration(), state.state().await?, state.metrics()) } async fn get_capabilities() -> JsonResponse { @@ -367,8 +369,11 @@ async fn get_capabilities() -> JsonResponse .into() } -async fn get_health_readiness(State(state): State>) -> Result<()> { - C::get_health_readiness(state.configuration(), state.state()).await +async fn get_health_readiness(State(state): State>) -> Result<()> +where + C::State: Clone, +{ + C::get_health_readiness(state.configuration(), state.state().await?).await } async fn get_schema( @@ -380,29 +385,41 @@ async fn get_schema( async fn post_query_explain( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result> { - C::query_explain(state.configuration(), state.state(), request).await +) -> Result> +where + C::State: Clone, +{ + C::query_explain(state.configuration(), state.state().await?, request).await } async fn post_mutation_explain( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result> { - C::mutation_explain(state.configuration(), state.state(), request).await +) -> Result> +where + C::State: Clone, +{ + C::mutation_explain(state.configuration(), state.state().await?, request).await } async fn post_mutation( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result> { - C::mutation(state.configuration(), state.state(), request).await +) -> Result> +where + C::State: Clone, +{ + C::mutation(state.configuration(), state.state().await?, request).await } async fn post_query( State(state): State>, WithRejection(Json(request), _): WithRejection, JsonRejection>, -) -> Result> { - C::query(state.configuration(), state.state(), request).await +) -> Result> +where + C::State: Clone, +{ + C::query(state.configuration(), state.state().await?, request).await } #[cfg(feature = "ndc-test")] diff --git a/crates/sdk/src/state.rs b/crates/sdk/src/state.rs index 9d01417..00d01cc 100644 --- a/crates/sdk/src/state.rs +++ b/crates/sdk/src/state.rs @@ -1,16 +1,23 @@ -use crate::connector::Connector; +use std::sync::{Arc, Mutex, OnceLock}; -#[derive(Debug)] +use crate::connector::error::*; +use crate::connector::{Connector, InitState}; + +/// Everything we need to keep in memory. pub struct ServerState { configuration: C::Configuration, - state: C::State, + state: Arc<( + OnceLock, + Mutex>>, + )>, metrics: prometheus::Registry, } +// Server state must be cloneable even if the underlying connector is not. +// We only require `Connector::Configuration` to be cloneable. impl Clone for ServerState where C::Configuration: Clone, - C::State: Clone, { fn clone(&self) -> Self { Self { @@ -22,27 +29,66 @@ where } impl ServerState { + /// Construct a new server state. pub fn new( configuration: C::Configuration, - state: C::State, + init_state: impl InitState + 'static, metrics: prometheus::Registry, ) -> Self { Self { configuration, - state, + state: Arc::new((OnceLock::new(), Mutex::new(Box::new(init_state)))), metrics, } } + /// The server configuration. pub fn configuration(&self) -> &C::Configuration { &self.configuration } - pub fn state(&self) -> &C::State { - &self.state + /// The server state. + /// + /// If the state has not yet been initialized, this initializes it. + /// + /// On initialization failure, this function will also fail, and subsequent calls will retry. + pub async fn state(&self) -> Result<&C::State> { + // If the state is already created, return it. + if let Some(state) = self.state.0.get() { + return Ok(state); + } + + { + let init_state = self.state.1.lock().map_err(|_| poisoned())?; + match self.state.0.get() { + // If the state was created before we acquired the lock, return it. + Some(state) => Ok(state), + // If not, let's call `setup.try_init_state` to create it. + // Failures are propagated outwards. + None => { + let new_state = init_state + .try_init_state(&self.configuration, &mut self.metrics.clone()) + .await?; + self.state.0.set(new_state); + self.state + .0 + .get() + .ok_or_else(|| unreachable!("uninitialized state")) + } + } + } } + /// The server metrics. pub fn metrics(&self) -> &prometheus::Registry { &self.metrics } } + +fn poisoned() -> ErrorResponse { + ErrorResponse::new( + http::StatusCode::INTERNAL_SERVER_ERROR, + "The state has become corrupted.".to_string(), + serde_json::Value::Null, + ) +}