diff --git a/src/analytics/relay_request.rs b/src/analytics/relay_request.rs index 7abf8403..2c6b933b 100644 --- a/src/analytics/relay_request.rs +++ b/src/analytics/relay_request.rs @@ -9,6 +9,7 @@ use { pub struct RelayResponseParams { pub request: Arc, + pub request_sdk: Option>, pub response_message_id: Arc, pub response_topic: Topic, @@ -31,6 +32,8 @@ pub struct RelayRequest { pub request_tag: u32, /// Time at which the request was received pub request_received_at: NaiveDateTime, + /// The SDK information of the request + pub request_sdk: Option>, /// Relay message ID of response pub response_message_id: Arc, @@ -55,6 +58,7 @@ impl From for RelayRequest { request_topic: params.request.topic.value().clone(), request_tag: params.request.tag, request_received_at: params.request.received_at.naive_utc(), + request_sdk: params.request_sdk, response_message_id: params.response_message_id.clone(), response_topic: params.response_topic.value().clone(), diff --git a/src/auth.rs b/src/auth.rs index 6b7a60fa..bb5d28f2 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -89,7 +89,7 @@ pub trait GetSharedClaims { fn get_shared_claims(&self) -> &SharedClaims; } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct WatchSubscriptionsRequestAuth { #[serde(flatten)] pub shared_claims: SharedClaims, @@ -100,6 +100,16 @@ pub struct WatchSubscriptionsRequestAuth { /// did:web of app domain to watch, or `null` for all domains #[serde(default)] pub app: Option, + /// Arbitrary-format platform and version of the SDK being used + #[validate(length(min = 1, max = 16))] + pub sdk: Option, +} + +impl WatchSubscriptionsRequestAuth { + pub fn validate(&self) -> Result<(), NotifyServerError> { + Validate::validate(&self) + .map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string())) + } } impl GetSharedClaims for WatchSubscriptionsRequestAuth { @@ -176,7 +186,7 @@ pub struct WatchSubscriptionsChangedResponseAuth { // Note: MessageAuth is different since it doesn't have `aud` // pub struct MessageAuth { -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct MessageResponseAuth { #[serde(flatten)] pub shared_claims: SharedClaims, @@ -186,6 +196,16 @@ pub struct MessageResponseAuth { pub sub: String, /// did:web of app domain pub app: DidWeb, + /// Arbitrary-format platform and version of the SDK being used + #[validate(length(min = 1, max = 16))] + pub sdk: Option, +} + +impl MessageResponseAuth { + pub fn validate(&self) -> Result<(), NotifyServerError> { + Validate::validate(&self) + .map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string())) + } } impl GetSharedClaims for MessageResponseAuth { @@ -194,7 +214,7 @@ impl GetSharedClaims for MessageResponseAuth { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct SubscriptionRequestAuth { #[serde(flatten)] pub shared_claims: SharedClaims, @@ -206,6 +226,16 @@ pub struct SubscriptionRequestAuth { pub app: DidWeb, /// space-delimited scope of notification types authorized by the user pub scp: String, + /// Arbitrary-format platform and version of the SDK being used + #[validate(length(min = 1, max = 16))] + pub sdk: Option, +} + +impl SubscriptionRequestAuth { + pub fn validate(&self) -> Result<(), NotifyServerError> { + Validate::validate(&self) + .map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string())) + } } impl GetSharedClaims for SubscriptionRequestAuth { @@ -233,7 +263,7 @@ impl GetSharedClaims for SubscriptionResponseAuth { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct SubscriptionUpdateRequestAuth { #[serde(flatten)] pub shared_claims: SharedClaims, @@ -245,6 +275,16 @@ pub struct SubscriptionUpdateRequestAuth { pub app: DidWeb, /// space-delimited scope of notification types authorized by the user pub scp: String, + /// Arbitrary-format platform and version of the SDK being used + #[validate(length(min = 1, max = 16))] + pub sdk: Option, +} + +impl SubscriptionUpdateRequestAuth { + pub fn validate(&self) -> Result<(), NotifyServerError> { + Validate::validate(&self) + .map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string())) + } } impl GetSharedClaims for SubscriptionUpdateRequestAuth { @@ -271,7 +311,7 @@ impl GetSharedClaims for SubscriptionUpdateResponseAuth { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct SubscriptionDeleteRequestAuth { #[serde(flatten)] pub shared_claims: SharedClaims, @@ -281,6 +321,16 @@ pub struct SubscriptionDeleteRequestAuth { pub sub: String, /// did:web of app domain pub app: DidWeb, + /// Arbitrary-format platform and version of the SDK being used + #[validate(length(min = 1, max = 16))] + pub sdk: Option, +} + +impl SubscriptionDeleteRequestAuth { + pub fn validate(&self) -> Result<(), NotifyServerError> { + Validate::validate(&self) + .map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string())) + } } impl GetSharedClaims for SubscriptionDeleteRequestAuth { @@ -320,6 +370,9 @@ pub struct SubscriptionGetNotificationsRequestAuth { #[serde(flatten)] #[validate(nested)] pub params: GetNotificationsParams, + /// Arbitrary-format platform and version of the SDK being used + #[validate(length(min = 1, max = 16))] + pub sdk: Option, } impl SubscriptionGetNotificationsRequestAuth { @@ -353,7 +406,7 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct SubscriptionMarkNotificationsAsReadRequestAuth { #[serde(flatten)] pub shared_claims: SharedClaims, @@ -365,10 +418,15 @@ pub struct SubscriptionMarkNotificationsAsReadRequestAuth { pub app: DidWeb, #[serde(flatten)] pub params: MarkNotificationsAsReadParams, + /// Arbitrary-format platform and version of the SDK being used + #[validate(length(min = 1, max = 16))] + pub sdk: Option, } impl SubscriptionMarkNotificationsAsReadRequestAuth { pub fn validate(&self) -> Result<(), NotifyServerError> { + Validate::validate(self) + .map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))?; self.params .validate_with_args(&MarkNotificationsAsReadParamsValidatorContext { all: self.params.all, diff --git a/src/publish_relay_message.rs b/src/publish_relay_message.rs index 2992a20f..668f8e8a 100644 --- a/src/publish_relay_message.rs +++ b/src/publish_relay_message.rs @@ -31,6 +31,7 @@ pub async fn publish_relay_message( relay_client: &Client, publish: &Publish, relay_request: Option>, + sdk: Option>, metrics: Option<&Metrics>, analytics: &NotifyAnalytics, ) -> Result<(), Error> { @@ -92,6 +93,7 @@ pub async fn publish_relay_message( let finished = Utc::now(); analytics.relay_request(RelayResponseParams { request: relay_request, + request_sdk: sdk, response_message_id: get_message_id(&publish.message).into(), response_topic: publish.topic.clone(), response_tag: publish.tag, @@ -122,6 +124,7 @@ pub async fn publish_relay_message( let finished = Utc::now(); analytics.relay_request(RelayResponseParams { request: relay_request, + request_sdk: sdk, response_message_id: get_message_id(&publish.message).into(), response_topic: publish.topic.clone(), response_tag: publish.tag, diff --git a/src/services/public_http_server/handlers/relay_webhook/error.rs b/src/services/public_http_server/handlers/relay_webhook/error.rs index 7d91845d..2f474b57 100644 --- a/src/services/public_http_server/handlers/relay_webhook/error.rs +++ b/src/services/public_http_server/handlers/relay_webhook/error.rs @@ -118,6 +118,9 @@ pub enum RelayMessageServerError { #[error("Subscription watcher send: {0}")] SubscriptionWatcherSend(SubscriptionWatcherSendError), + + #[error("Error sending sdk info via oneshot channel")] + SdkOneshotSend, } #[derive(Debug, thiserror::Error)] diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_delete.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_delete.rs index ad585509..a1d1e560 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_delete.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_delete.rs @@ -46,6 +46,7 @@ use { rpc::Publish, }, std::{collections::HashSet, sync::Arc}, + tokio::sync::oneshot, tracing::{info, warn}, }; @@ -87,10 +88,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R let req = decrypt_message::(envelope, &sym_key)?; + let (sdk_tx, mut sdk_rx) = oneshot::channel(); async fn handle( state: &AppState, msg: &RelayIncomingMessage, req: &JsonRpcRequest, + sdk_tx: oneshot::Sender>>, subscriber: &SubscriberWithScope, project: &Project, project_client_id: DecodedClientId, @@ -111,6 +114,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R "request_auth.shared_claims.iss: {:?}", request_auth.shared_claims.iss ); + + request_auth + .validate() + .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? + + sdk_tx + .send(request_auth.sdk.map(Into::into)) + .map_err(|_| RelayMessageServerError::SdkOneshotSend)?; let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) .map_err(AuthError::JwtIssNotDidKey) @@ -231,7 +242,16 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R Ok((ResponseAuth { response_auth }, watchers_with_subscriptions)) } - let result = handle(state, &msg, &req, &subscriber, &project, project_client_id).await; + let result = handle( + state, + &msg, + &req, + sdk_tx, + &subscriber, + &project, + project_client_id, + ) + .await; let (response, watchers_with_subscriptions, result) = match result { Ok((result, watchers_with_subscriptions)) => ( @@ -248,10 +268,13 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R ), }; + let sdk = sdk_rx.try_recv().unwrap_or(None); + let msg = Arc::from(msg); let response_fut = { let msg = msg.clone(); + let sdk = sdk.clone(); async { let envelope = Envelope::::new(&sym_key, response) .map_err(RelayMessageServerError::EnvelopeEncryption)?; @@ -268,6 +291,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R prompt: false, }, Some(msg), + sdk, state.metrics.as_ref(), &state.analytics, ) @@ -285,6 +309,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R &state.notify_keys.authentication_client_id, &state.relay_client, msg, + sdk, state.metrics.as_ref(), &state.analytics, ) diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_get_notifications.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_get_notifications.rs index 8b560126..d6db8c8e 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_get_notifications.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_get_notifications.rs @@ -41,6 +41,7 @@ use { rpc::{msg_id::get_message_id, Publish}, }, std::sync::Arc, + tokio::sync::oneshot, tracing::info, }; @@ -79,10 +80,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R let req = decrypt_message::(envelope, &sym_key)?; + let (sdk_tx, mut sdk_rx) = oneshot::channel(); async fn handle( state: &AppState, msg: &RelayIncomingMessage, req: &JsonRpcRequest, + sdk_tx: oneshot::Sender>>, subscriber: &SubscriberWithScope, project: &Project, ) -> Result { @@ -96,6 +99,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R "request_auth.shared_claims.iss: {:?}", request_auth.shared_claims.iss ); + + request_auth + .validate() + .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? + + sdk_tx + .send(request_auth.sdk.map(Into::into)) + .map_err(|_| RelayMessageServerError::SdkOneshotSend)?; let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) .map_err(AuthError::JwtIssNotDidKey) @@ -141,10 +152,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R (account, Arc::::from(domain)) }; - request_auth - .validate() - .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? - let data = get_notifications_for_subscriber( subscriber.id, request_auth.params, @@ -218,7 +225,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R Ok(AuthMessage { auth }) } - let result = handle(state, &msg, &req, &subscriber, &project).await; + let result = handle(state, &msg, &req, sdk_tx, &subscriber, &project).await; let response = match &result { Ok(result) => serde_json::to_vec(&JsonRpcResponse::new(req.id, result)) @@ -227,6 +234,8 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R .map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?, }; + let sdk = sdk_rx.try_recv().unwrap_or(None); + let envelope = Envelope::::new(&sym_key, response) .map_err(RelayMessageServerError::EnvelopeEncryption)?; @@ -242,6 +251,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R prompt: false, }, Some(Arc::new(msg)), + sdk, state.metrics.as_ref(), &state.analytics, ) diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs index 8762e6c9..34e99be8 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_mark_notifications_as_read.rs @@ -39,6 +39,7 @@ use { rpc::{msg_id::get_message_id, Publish}, }, std::sync::Arc, + tokio::sync::oneshot, tracing::info, }; @@ -78,10 +79,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R let req = decrypt_message::(envelope, &sym_key)?; + let (sdk_tx, mut sdk_rx) = oneshot::channel(); async fn handle( state: &AppState, msg: &RelayIncomingMessage, req: &JsonRpcRequest, + sdk_tx: oneshot::Sender>>, subscriber: &SubscriberWithScope, project: &Project, ) -> Result { @@ -96,6 +99,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R "request_auth.shared_claims.iss: {:?}", request_auth.shared_claims.iss ); + + request_auth + .validate() + .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? + + sdk_tx + .send(request_auth.sdk.map(Into::into)) + .map_err(|_| RelayMessageServerError::SdkOneshotSend)?; let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) .map_err(AuthError::JwtIssNotDidKey) @@ -141,10 +152,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R (account, Arc::::from(domain)) }; - request_auth - .validate() - .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? - let data = mark_notifications_as_read( subscriber.id, request_auth.params.ids, @@ -205,7 +212,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R Ok(AuthMessage { auth }) } - let result = handle(state, &msg, &req, &subscriber, &project).await; + let result = handle(state, &msg, &req, sdk_tx, &subscriber, &project).await; let response = match &result { Ok(result) => serde_json::to_vec(&JsonRpcResponse::new(req.id, result)) @@ -214,6 +221,8 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R .map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?, }; + let sdk = sdk_rx.try_recv().unwrap_or(None); + let envelope = Envelope::::new(&sym_key, response) .map_err(RelayMessageServerError::EnvelopeEncryption)?; @@ -229,6 +238,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R prompt: false, }, Some(Arc::new(msg)), + sdk, state.metrics.as_ref(), &state.analytics, ) diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_subscribe.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_subscribe.rs index f1e5e06d..8e4fe81f 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_subscribe.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_subscribe.rs @@ -49,6 +49,7 @@ use { rpc::Publish, }, std::{collections::HashSet, sync::Arc}, + tokio::sync::oneshot, tracing::{info, instrument}, uuid::Uuid, x25519_dalek::{PublicKey, StaticSecret}, @@ -105,10 +106,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R let req = decrypt_message::(envelope, &sym_key)?; + let (sdk_tx, mut sdk_rx) = oneshot::channel(); async fn handle( state: &AppState, msg: &RelayIncomingMessage, req: &JsonRpcRequest, + sdk_tx: oneshot::Sender>>, project: &Project, project_client_id: DecodedClientId, client_public_key: &PublicKey, @@ -129,6 +132,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R "request_auth.shared_claims.iss: {:?}", request_auth.shared_claims.iss ); + + request_auth + .validate() + .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? + + sdk_tx + .send(request_auth.sdk.map(Into::into)) + .map_err(|_| RelayMessageServerError::SdkOneshotSend)?; let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) .map_err(AuthError::JwtIssNotDidKey) @@ -324,6 +335,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R state, &msg, &req, + sdk_tx, &project, project_client_id, &client_public_key, @@ -345,10 +357,13 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R ), }; + let sdk = sdk_rx.try_recv().unwrap_or(None); + let msg = Arc::new(msg); let response_fut = { let msg = msg.clone(); + let sdk = sdk.clone(); async { let envelope = Envelope::::new(&sym_key, response) .map_err(RelayMessageServerError::EnvelopeEncryption)?; @@ -366,6 +381,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R prompt: false, }, Some(msg), + sdk, state.metrics.as_ref(), &state.analytics, ) @@ -385,6 +401,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R &state.notify_keys.authentication_client_id, &state.relay_client, msg, + sdk, state.metrics.as_ref(), &state.analytics, ) diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs index 720898e3..ab8fdec2 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_update.rs @@ -44,6 +44,7 @@ use { rpc::Publish, }, std::{collections::HashSet, sync::Arc}, + tokio::sync::oneshot, tracing::info, }; @@ -85,10 +86,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R let req = decrypt_message::(envelope, &sym_key)?; + let (sdk_tx, mut sdk_rx) = oneshot::channel(); async fn handle( state: &AppState, msg: &RelayIncomingMessage, req: &JsonRpcRequest, + sdk_tx: oneshot::Sender>>, subscriber: &SubscriberWithScope, project: &Project, project_client_id: DecodedClientId, @@ -109,6 +112,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R "request_auth.shared_claims.iss: {:?}", request_auth.shared_claims.iss ); + + request_auth + .validate() + .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? + + sdk_tx + .send(request_auth.sdk.map(Into::into)) + .map_err(|_| RelayMessageServerError::SdkOneshotSend)?; let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) .map_err(AuthError::JwtIssNotDidKey) @@ -226,7 +237,16 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R Ok((ResponseAuth { response_auth }, watchers_with_subscriptions)) } - let result = handle(state, &msg, &req, &subscriber, &project, project_client_id).await; + let result = handle( + state, + &msg, + &req, + sdk_tx, + &subscriber, + &project, + project_client_id, + ) + .await; let (response, watchers_with_subscriptions, result) = match result { Ok((result, watchers_with_subscriptions)) => ( @@ -243,10 +263,13 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R ), }; + let sdk = sdk_rx.try_recv().unwrap_or(None); + let msg = Arc::new(msg); let response_fut = { let msg = msg.clone(); + let sdk = sdk.clone(); async { let envelope = Envelope::::new(&sym_key, response) .map_err(RelayMessageServerError::EnvelopeEncryption)?; @@ -263,6 +286,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R prompt: false, }, Some(msg), + sdk, state.metrics.as_ref(), &state.analytics, ) @@ -280,6 +304,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R &state.notify_keys.authentication_client_id, &state.relay_client, msg, + sdk, state.metrics.as_ref(), &state.analytics, ) diff --git a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_watch_subscriptions.rs b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_watch_subscriptions.rs index 3ea9ca1d..951ceb13 100644 --- a/src/services/public_http_server/handlers/relay_webhook/handlers/notify_watch_subscriptions.rs +++ b/src/services/public_http_server/handlers/relay_webhook/handlers/notify_watch_subscriptions.rs @@ -47,6 +47,7 @@ use { sqlx::PgPool, std::sync::Arc, thiserror::Error, + tokio::sync::oneshot, tracing::{info, instrument}, x25519_dalek::PublicKey, }; @@ -80,9 +81,11 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R let req = decrypt_message::(envelope, &response_sym_key)?; + let (sdk_tx, mut sdk_rx) = oneshot::channel(); async fn handle( state: &AppState, req: &JsonRpcRequest, + sdk_tx: oneshot::Sender>>, response_sym_key: &[u8; 32], ) -> Result { info!("req.id: {}", req.id); @@ -96,6 +99,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R "request_auth.shared_claims.iss: {:?}", request_auth.shared_claims.iss ); + + request_auth + .validate() + .map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error? + + sdk_tx + .send(request_auth.sdk.map(Into::into)) + .map_err(|_| RelayMessageServerError::SdkOneshotSend)?; let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) .map_err(AuthError::JwtIssNotDidKey) @@ -205,7 +216,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R Ok(ResponseAuth { response_auth }) } - let result = handle(state, &req, &response_sym_key).await; + let result = handle(state, &req, sdk_tx, &response_sym_key).await; let response = match &result { Ok(result) => serde_json::to_vec(&JsonRpcResponse::new(req.id, result)) @@ -214,6 +225,8 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R .map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?, }; + let sdk = sdk_rx.try_recv().unwrap_or(None); + let envelope = Envelope::::new(&response_sym_key, response) .map_err(RelayMessageServerError::EnvelopeEncryption)?; let base64_notification = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); @@ -229,6 +242,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R prompt: false, }, Some(Arc::new(msg)), + sdk, state.metrics.as_ref(), &state.analytics, ) @@ -418,6 +432,7 @@ pub async fn prepare_subscription_watchers( Ok((source_subscriptions, watchers_with_subscriptions)) } +#[allow(clippy::too_many_arguments)] #[instrument(skip_all)] pub async fn send_to_subscription_watchers( watchers_with_subscriptions: Vec<(SubscriptionWatcherQuery, Vec)>, @@ -425,12 +440,14 @@ pub async fn send_to_subscription_watchers( authentication_client_id: &DecodedClientId, http_client: &relay_client::http::Client, relay_request: Arc, + sdk: Option>, metrics: Option<&Metrics>, analytics: &NotifyAnalytics, ) -> Result<(), SubscriptionWatcherSendError> { let results = futures_util::stream::iter(watchers_with_subscriptions) .map(|(watcher, subscriptions)| { let relay_request = relay_request.clone(); + let sdk = sdk.clone(); async move { info!( "Timing: Sending watchSubscriptionsChanged to watcher.did_key: {}", @@ -445,6 +462,7 @@ pub async fn send_to_subscription_watchers( authentication_client_id, http_client, relay_request, + sdk, metrics, analytics, ) @@ -494,6 +512,7 @@ async fn send( authentication_client_id: &DecodedClientId, http_client: &relay_client::http::Client, relay_request: Arc, + sdk: Option>, metrics: Option<&Metrics>, analytics: &NotifyAnalytics, ) -> Result<(), SubscriptionWatcherSendError> { @@ -539,6 +558,7 @@ async fn send( prompt: false, }, Some(relay_request), + sdk, metrics, analytics, ) diff --git a/src/services/publisher_service/mod.rs b/src/services/publisher_service/mod.rs index 88889316..73cde5f2 100644 --- a/src/services/publisher_service/mod.rs +++ b/src/services/publisher_service/mod.rs @@ -354,7 +354,7 @@ async fn process_notification( prompt: true, }; let message_id = publish.msg_id(); - publish_relay_message(relay_client, &publish, None, metrics, analytics) + publish_relay_message(relay_client, &publish, None, None, metrics, analytics) .await .map_err(ProcessNotificationError::RelayPublish)?; diff --git a/terraform/README.md b/terraform/README.md index c773f9f5..4613e79d 100644 --- a/terraform/README.md +++ b/terraform/README.md @@ -19,7 +19,7 @@ To authenticate, run `terraform login` and follow the instructions. | Name | Version | |------|---------| -| [aws](#provider\_aws) | 5.47.0 | +| [aws](#provider\_aws) | 5.48.0 | | [random](#provider\_random) | 3.6.1 | | [terraform](#provider\_terraform) | n/a | ## Modules diff --git a/tests/integration.rs b/tests/integration.rs index ef95ba7b..567945c8 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -2845,6 +2845,7 @@ async fn publish_notify_message_response( ksu: identity_key_details.keys_server_url.to_string(), sub: account.to_did_pkh(), app: did_web, + sdk: None, }, &identity_key_details.signing_key, ), @@ -2922,6 +2923,7 @@ async fn publish_update_request( sub: account.to_did_pkh(), app, scp: encode_scope(notification_types), + sdk: None, }, &identity_key_details.signing_key, ), @@ -3048,6 +3050,7 @@ async fn publish_delete_request( ksu: identity_key_details.keys_server_url.to_string(), sub: account.to_did_pkh(), app, + sdk: None, }, &identity_key_details.signing_key, ), @@ -3169,6 +3172,7 @@ async fn publish_get_notifications_request( sub: account.to_did_pkh(), app, params, + sdk: None, }, &identity_key_details.signing_key, ), @@ -3263,6 +3267,7 @@ async fn publish_mark_notifications_as_read_request( sub: account.to_did_pkh(), app, params, + sdk: None, }, &identity_key_details.signing_key, ), diff --git a/tests/utils/notify_relay_api.rs b/tests/utils/notify_relay_api.rs index e2812345..527e2815 100644 --- a/tests/utils/notify_relay_api.rs +++ b/tests/utils/notify_relay_api.rs @@ -73,6 +73,7 @@ async fn publish_watch_subscriptions_request( ksu: identity_key_details.keys_server_url.to_string(), sub: account.to_did_pkh(), app, + sdk: None, }, &identity_key_details.signing_key, ), @@ -252,6 +253,7 @@ async fn publish_subscribe_request( .collect::>() .join(" "), app, + sdk: None, }, &identity_key_details.signing_key, ),