diff --git a/src/auth.rs b/src/auth.rs index c7e263b5..090cf716 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -611,16 +611,14 @@ pub const KEYS_SERVER_IDENTITY_ENDPOINT: &str = "/identity"; pub const KEYS_SERVER_IDENTITY_ENDPOINT_PUBLIC_KEY_QUERY: &str = "publicKey"; pub async fn verify_identity( - iss: &str, + iss_client_id: &DecodedClientId, ksu: &str, sub: &str, redis: Option<&Arc>, metrics: Option<&Metrics>, ) -> Result { let mut url = Url::parse(ksu)?.join(KEYS_SERVER_IDENTITY_ENDPOINT)?; - let pubkey = DecodedClientId::try_from_did_key(iss) - .map_err(AuthError::JwtIssNotDidKey)? - .to_string(); + let pubkey = iss_client_id.to_string(); url.query_pairs_mut() .append_pair(KEYS_SERVER_IDENTITY_ENDPOINT_PUBLIC_KEY_QUERY, &pubkey); diff --git a/src/model/types/mod.rs b/src/model/types/mod.rs index 78c42e9b..221b402d 100644 --- a/src/model/types/mod.rs +++ b/src/model/types/mod.rs @@ -1,6 +1,7 @@ use { + crate::{services::websocket_server::decode_key, utils::get_client_id}, chrono::{DateTime, Utc}, - relay_rpc::domain::{ProjectId, Topic}, + relay_rpc::domain::{DecodedClientId, ProjectId, Topic}, sqlx::FromRow, uuid::Uuid, }; @@ -24,6 +25,14 @@ pub struct Project { pub subscribe_private_key: String, } +impl Project { + pub fn get_authentication_client_id(&self) -> crate::error::Result { + Ok(get_client_id(&ed25519_dalek::VerifyingKey::from_bytes( + &decode_key(&self.authentication_public_key)?, + )?)) + } +} + #[derive(Debug, FromRow)] pub struct Subscriber { pub id: Uuid, diff --git a/src/services/websocket_server/handlers/notify_delete.rs b/src/services/websocket_server/handlers/notify_delete.rs index f55cd994..eada4312 100644 --- a/src/services/websocket_server/handlers/notify_delete.rs +++ b/src/services/websocket_server/handlers/notify_delete.rs @@ -12,7 +12,12 @@ use { registry::storage::redis::Redis, services::websocket_server::{ decode_key, - handlers::{decrypt_message, notify_watch_subscriptions::update_subscription_watchers}, + handlers::{ + decrypt_message, + notify_watch_subscriptions::{ + prepare_subscription_watchers, send_to_subscription_watchers, + }, + }, NotifyDelete, NotifyRequest, NotifyResponse, ResponseAuth, }, spec::{ @@ -55,6 +60,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) -> let project = get_project_by_id(subscriber.project, &state.postgres, state.metrics.as_ref()).await?; info!("project.id: {}", project.id); + let project_client_id = project.get_authentication_client_id()?; let envelope = Envelope::::from_bytes( base64::engine::general_purpose::STANDARD.decode(msg.message.to_string())?, @@ -64,17 +70,20 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) -> let msg: NotifyRequest = decrypt_message(envelope, &sym_key)?; - let sub_auth = from_jwt::(&msg.params.delete_auth)?; + let request_auth = from_jwt::(&msg.params.delete_auth)?; info!( - "sub_auth.shared_claims.iss: {:?}", - sub_auth.shared_claims.iss + "request_auth.shared_claims.iss: {:?}", + request_auth.shared_claims.iss ); - if sub_auth.app.domain() != project.app_domain { + let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) + .map_err(AuthError::JwtIssNotDidKey)?; + + if request_auth.app.domain() != project.app_domain { Err(Error::AppDoesNotMatch)?; } let (account, siwe_domain) = { - if sub_auth.shared_claims.act != NOTIFY_DELETE_ACT { + if request_auth.shared_claims.act != NOTIFY_DELETE_ACT { return Err(AuthError::InvalidAct)?; } @@ -83,9 +92,9 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) -> app, domain, } = verify_identity( - &sub_auth.shared_claims.iss, - &sub_auth.ksu, - &sub_auth.sub, + &request_iss_client_id, + &request_auth.ksu, + &request_auth.sub, state.redis.as_ref(), state.metrics.as_ref(), ) @@ -122,7 +131,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) -> project_id: project.project_id, pk: subscriber.id, account: account.clone(), - updated_by_iss: sub_auth.shared_claims.iss.clone().into(), + updated_by_iss: request_auth.shared_claims.iss.clone().into(), updated_by_domain: siwe_domain, method: NotifyClientMethod::Unsubscribe, old_scope: subscriber.scope.into_iter().map(Into::into).collect(), @@ -131,56 +140,63 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) -> topic, }); - let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); - - let now = Utc::now(); - let response_message = SubscriptionDeleteResponseAuth { - shared_claims: SharedClaims { - iat: now.timestamp() as u64, - exp: add_ttl(now, NOTIFY_DELETE_RESPONSE_TTL).timestamp() as u64, - iss: identity.to_did_key(), - aud: sub_auth.shared_claims.iss, - act: NOTIFY_DELETE_RESPONSE_ACT.to_owned(), - mjv: "1".to_owned(), - }, - sub: account.to_did_pkh(), - app: DidWeb::from_domain(project.app_domain.clone()), - sbs: vec![], - }; - let response_auth = sign_jwt( - response_message, - &ed25519_dalek::SigningKey::from_bytes(&decode_key(&project.authentication_private_key)?), - )?; - - let response = NotifyResponse::new(msg.id, ResponseAuth { response_auth }); - - let envelope = Envelope::::new(&sym_key, response)?; - - let base64_notification = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); - - let response_topic = topic_from_key(&sym_key); - - publish_relay_message( - &state.relay_http_client, - &Publish { - topic: response_topic, - message: base64_notification.into(), - tag: NOTIFY_DELETE_RESPONSE_TAG, - ttl_secs: NOTIFY_DELETE_RESPONSE_TTL.as_secs() as u32, - prompt: false, - }, + let (sbs, watchers_with_subscriptions) = prepare_subscription_watchers( + &request_iss_client_id, + &request_auth.shared_claims.mjv, + &account, + &project.app_domain, + &state.postgres, state.metrics.as_ref(), ) .await?; - update_subscription_watchers( + { + let now = Utc::now(); + let response_message = SubscriptionDeleteResponseAuth { + shared_claims: SharedClaims { + iat: now.timestamp() as u64, + exp: add_ttl(now, NOTIFY_DELETE_RESPONSE_TTL).timestamp() as u64, + iss: project_client_id.to_did_key(), + aud: request_auth.shared_claims.iss, + act: NOTIFY_DELETE_RESPONSE_ACT.to_owned(), + mjv: "1".to_owned(), + }, + sub: account.to_did_pkh(), + app: DidWeb::from_domain(project.app_domain.clone()), + sbs, + }; + let response_auth = sign_jwt( + response_message, + &ed25519_dalek::SigningKey::from_bytes(&decode_key( + &project.authentication_private_key, + )?), + )?; + let response = NotifyResponse::new(msg.id, ResponseAuth { response_auth }); + let envelope = Envelope::::new(&sym_key, response)?; + let base64_notification = + base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); + + publish_relay_message( + &state.relay_http_client, + &Publish { + topic: topic_from_key(&sym_key), + message: base64_notification.into(), + tag: NOTIFY_DELETE_RESPONSE_TAG, + ttl_secs: NOTIFY_DELETE_RESPONSE_TTL.as_secs() as u32, + prompt: false, + }, + state.metrics.as_ref(), + ) + .await?; + } + + send_to_subscription_watchers( + watchers_with_subscriptions, &account, - &project.app_domain, - &state.postgres, - &state.relay_http_client.clone(), - state.metrics.as_ref(), &state.notify_keys.authentication_secret, &state.notify_keys.authentication_client_id, + &state.relay_http_client.clone(), + state.metrics.as_ref(), ) .await?; diff --git a/src/services/websocket_server/handlers/notify_get_notifications.rs b/src/services/websocket_server/handlers/notify_get_notifications.rs index b6d17b33..797dc987 100644 --- a/src/services/websocket_server/handlers/notify_get_notifications.rs +++ b/src/services/websocket_server/handlers/notify_get_notifications.rs @@ -66,17 +66,20 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { let msg: NotifyRequest = decrypt_message(envelope, &sym_key)?; - let request = from_jwt::(&msg.params.auth)?; + let request_auth = from_jwt::(&msg.params.auth)?; info!( - "sub_auth.shared_claims.iss: {:?}", - request.shared_claims.iss + "request_auth.shared_claims.iss: {:?}", + request_auth.shared_claims.iss ); - if request.app.domain() != project.app_domain { + let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) + .map_err(AuthError::JwtIssNotDidKey)?; + + if request_auth.app.domain() != project.app_domain { Err(Error::AppDoesNotMatch)?; } let account = { - if request.shared_claims.act != NOTIFY_GET_NOTIFICATIONS_ACT { + if request_auth.shared_claims.act != NOTIFY_GET_NOTIFICATIONS_ACT { return Err(AuthError::InvalidAct)?; } @@ -85,9 +88,9 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { app, domain: _, } = verify_identity( - &request.shared_claims.iss, - &request.ksu, - &request.sub, + &request_iss_client_id, + &request_auth.ksu, + &request_auth.sub, state.redis.as_ref(), state.metrics.as_ref(), ) @@ -104,11 +107,11 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { account }; - request.validate()?; + request_auth.validate()?; let data = get_notifications_for_subscriber( subscriber.id, - request.params, + request_auth.params, &state.postgres, state.metrics.as_ref(), ) @@ -122,7 +125,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { iat: now.timestamp() as u64, exp: add_ttl(now, NOTIFY_GET_NOTIFICATIONS_RESPONSE_TTL).timestamp() as u64, iss: identity.to_did_key(), - aud: request.shared_claims.iss, + aud: request_iss_client_id.to_did_key(), act: NOTIFY_GET_NOTIFICATIONS_RESPONSE_ACT.to_owned(), mjv: "1".to_owned(), }, diff --git a/src/services/websocket_server/handlers/notify_subscribe.rs b/src/services/websocket_server/handlers/notify_subscribe.rs index ab747212..c40bc0e1 100644 --- a/src/services/websocket_server/handlers/notify_subscribe.rs +++ b/src/services/websocket_server/handlers/notify_subscribe.rs @@ -15,7 +15,10 @@ use { websocket_server::{ decode_key, derive_key, handlers::{ - decrypt_message, notify_watch_subscriptions::update_subscription_watchers, + decrypt_message, + notify_watch_subscriptions::{ + prepare_subscription_watchers, send_to_subscription_watchers, + }, }, NotifyRequest, NotifyResponse, NotifySubscribe, ResponseAuth, }, @@ -64,6 +67,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { e => e.into(), })?; info!("project.id: {}", project.id); + let project_client_id = project.get_authentication_client_id()?; let envelope = Envelope::::from_bytes( base64::engine::general_purpose::STANDARD.decode(msg.message.to_string())?, @@ -85,17 +89,20 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { let msg: NotifyRequest = decrypt_message(envelope, &sym_key)?; let id = msg.id; - let sub_auth = from_jwt::(&msg.params.subscription_auth)?; + let request_auth = from_jwt::(&msg.params.subscription_auth)?; info!( - "sub_auth.shared_claims.iss: {:?}", - sub_auth.shared_claims.iss + "request_auth.shared_claims.iss: {:?}", + request_auth.shared_claims.iss ); - if sub_auth.app.domain() != project.app_domain { + let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) + .map_err(AuthError::JwtIssNotDidKey)?; + + if request_auth.app.domain() != project.app_domain { Err(Error::AppDoesNotMatch)?; } let (account, siwe_domain) = { - if sub_auth.shared_claims.act != NOTIFY_SUBSCRIBE_ACT { + if request_auth.shared_claims.act != NOTIFY_SUBSCRIBE_ACT { return Err(AuthError::InvalidAct)?; } @@ -104,9 +111,9 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { app, domain, } = verify_identity( - &sub_auth.shared_claims.iss, - &sub_auth.ksu, - &sub_auth.sub, + &request_iss_client_id, + &request_auth.ksu, + &request_auth.sub, state.redis.as_ref(), state.metrics.as_ref(), ) @@ -128,39 +135,12 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { let secret = StaticSecret::random_from_rng(chacha20poly1305::aead::OsRng); - let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); - - let now = Utc::now(); - let response_message = SubscriptionResponseAuth { - shared_claims: SharedClaims { - iat: now.timestamp() as u64, - exp: add_ttl(now, NOTIFY_SUBSCRIBE_RESPONSE_TTL).timestamp() as u64, - iss: identity.to_did_key(), - aud: sub_auth.shared_claims.iss.clone(), - act: NOTIFY_SUBSCRIBE_RESPONSE_ACT.to_owned(), - mjv: "1".to_owned(), - }, - sub: account.to_did_pkh(), - app: DidWeb::from_domain(project.app_domain.clone()), - sbs: vec![], - }; - let response_auth = sign_jwt( - response_message, - &ed25519_dalek::SigningKey::from_bytes(&decode_key(&project.authentication_private_key)?), - )?; - - let response = NotifyResponse::new(msg.id, ResponseAuth { response_auth }); - // Technically we don't need to derive based on client_public_key anymore; we just need a symkey. But this is technical // debt from when clients derived the same symkey on their end via Diffie-Hellman. But now they use the value from // watch subscriptions. let notify_key = derive_key(&client_public_key, &secret)?; - let envelope = Envelope::::new(&sym_key, response)?; - - let base64_notification = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); - - let scope = parse_scope(&sub_auth.scp)?; + let scope = parse_scope(&request_auth.scp)?; let notify_topic = topic_from_key(¬ify_key); @@ -205,7 +185,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { project_id, pk: subscriber_id, account: account.clone(), - updated_by_iss: sub_auth.shared_claims.iss.into(), + updated_by_iss: request_iss_client_id.to_did_key().into(), updated_by_domain: siwe_domain, method: NotifyClientMethod::Subscribe, old_scope: HashSet::new(), @@ -215,6 +195,53 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { }); info!("Timing: Finished recording SubscriberUpdateParams"); + let (sbs, watchers_with_subscriptions) = prepare_subscription_watchers( + &request_iss_client_id, + &request_auth.shared_claims.mjv, + &account, + &project.app_domain, + &state.postgres, + state.metrics.as_ref(), + ) + .await?; + + let now = Utc::now(); + let response_message = SubscriptionResponseAuth { + shared_claims: SharedClaims { + iat: now.timestamp() as u64, + exp: add_ttl(now, NOTIFY_SUBSCRIBE_RESPONSE_TTL).timestamp() as u64, + iss: project_client_id.to_did_key(), + aud: request_iss_client_id.to_did_key(), + act: NOTIFY_SUBSCRIBE_RESPONSE_ACT.to_owned(), + mjv: "1".to_owned(), + }, + sub: account.to_did_pkh(), + app: DidWeb::from_domain(project.app_domain.clone()), + sbs, + }; + let response_auth = sign_jwt( + response_message, + &ed25519_dalek::SigningKey::from_bytes(&decode_key(&project.authentication_private_key)?), + )?; + let response = NotifyResponse::new(msg.id, ResponseAuth { response_auth }); + let envelope = Envelope::::new(&sym_key, response)?; + let base64_notification = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); + + info!("Publishing subscribe response to topic: {response_topic}"); + publish_relay_message( + &state.relay_http_client, + &Publish { + topic: response_topic, + message: base64_notification.into(), + tag: NOTIFY_SUBSCRIBE_RESPONSE_TAG, + ttl_secs: NOTIFY_SUBSCRIBE_RESPONSE_TTL.as_secs() as u32, + prompt: false, + }, + state.metrics.as_ref(), + ) + .await?; + info!("Finished publishing subscribe response"); + // Send noop to extend ttl of relay's mapping info!("Timing: Publishing noop to notify_topic"); publish_relay_message( @@ -235,21 +262,6 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { .await?; info!("Timing: Finished publishing noop to notify_topic"); - info!("Publishing subscribe response to topic: {response_topic}"); - publish_relay_message( - &state.relay_http_client, - &Publish { - topic: response_topic, - message: base64_notification.into(), - tag: NOTIFY_SUBSCRIBE_RESPONSE_TAG, - ttl_secs: NOTIFY_SUBSCRIBE_RESPONSE_TTL.as_secs() as u32, - prompt: false, - }, - state.metrics.as_ref(), - ) - .await?; - info!("Finished publishing subscribe response"); - let welcome_notification = get_welcome_notification(project.id, &state.postgres, state.metrics.as_ref()).await?; if let Some(welcome_notification) = welcome_notification { @@ -285,19 +297,13 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { info!("Welcome notification not enabled"); } - // TODO use the `iss` to find the "watcher" associated with this client. - // 1 function should return the watcher info and the sbs for it (all or just 1) - // Then we should extract out the one relevant for this client and send it to that one first in the response - // Then we send to all the other watchers/clients - // TODO do it based on the `mjv` value - update_subscription_watchers( + send_to_subscription_watchers( + watchers_with_subscriptions, &account, - &project.app_domain, - &state.postgres, - &state.relay_http_client.clone(), - state.metrics.as_ref(), &state.notify_keys.authentication_secret, &state.notify_keys.authentication_client_id, + &state.relay_http_client.clone(), + state.metrics.as_ref(), ) .await?; diff --git a/src/services/websocket_server/handlers/notify_update.rs b/src/services/websocket_server/handlers/notify_update.rs index f8e489e7..8018b2b7 100644 --- a/src/services/websocket_server/handlers/notify_update.rs +++ b/src/services/websocket_server/handlers/notify_update.rs @@ -1,5 +1,5 @@ use { - super::notify_watch_subscriptions::update_subscription_watchers, + super::notify_watch_subscriptions::prepare_subscription_watchers, crate::{ analytics::subscriber_update::{NotifyClientMethod, SubscriberUpdateParams}, auth::{ @@ -12,8 +12,11 @@ use { rate_limit::{self, Clock}, registry::storage::redis::Redis, services::websocket_server::{ - decode_key, handlers::decrypt_message, NotifyRequest, NotifyResponse, NotifyUpdate, - ResponseAuth, + decode_key, + handlers::{ + decrypt_message, notify_watch_subscriptions::send_to_subscription_watchers, + }, + NotifyRequest, NotifyResponse, NotifyUpdate, ResponseAuth, }, spec::{ NOTIFY_UPDATE_ACT, NOTIFY_UPDATE_RESPONSE_ACT, NOTIFY_UPDATE_RESPONSE_TAG, @@ -54,6 +57,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { let project = get_project_by_id(subscriber.project, &state.postgres, state.metrics.as_ref()).await?; info!("project.id: {}", project.id); + let project_client_id = project.get_authentication_client_id()?; let envelope = Envelope::::from_bytes( base64::engine::general_purpose::STANDARD.decode(msg.message.to_string())?, @@ -63,17 +67,19 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { let msg: NotifyRequest = decrypt_message(envelope, &sym_key)?; - let sub_auth = from_jwt::(&msg.params.update_auth)?; + let request_auth = from_jwt::(&msg.params.update_auth)?; info!( - "sub_auth.shared_claims.iss: {:?}", - sub_auth.shared_claims.iss + "request_auth.shared_claims.iss: {:?}", + request_auth.shared_claims.iss ); - if sub_auth.app.domain() != project.app_domain { + let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) + .map_err(AuthError::JwtIssNotDidKey)?; + if request_auth.app.domain() != project.app_domain { Err(Error::AppDoesNotMatch)?; } let (account, siwe_domain) = { - if sub_auth.shared_claims.act != NOTIFY_UPDATE_ACT { + if request_auth.shared_claims.act != NOTIFY_UPDATE_ACT { return Err(AuthError::InvalidAct)?; } @@ -82,9 +88,9 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { app, domain, } = verify_identity( - &sub_auth.shared_claims.iss, - &sub_auth.ksu, - &sub_auth.sub, + &request_iss_client_id, + &request_auth.ksu, + &request_auth.sub, state.redis.as_ref(), state.metrics.as_ref(), ) @@ -102,7 +108,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { }; let old_scope = subscriber.scope.into_iter().collect::>(); - let new_scope = parse_scope(&sub_auth.scp)?; + let new_scope = parse_scope(&request_auth.scp)?; let subscriber = update_subscriber( project.id, @@ -127,7 +133,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { project_id: project.project_id, pk: subscriber.id, account: account.clone(), - updated_by_iss: sub_auth.shared_claims.iss.clone().into(), + updated_by_iss: request_auth.shared_claims.iss.clone().into(), updated_by_domain: siwe_domain, method: NotifyClientMethod::Update, old_scope, @@ -136,21 +142,29 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { topic, }); - let identity = DecodedClientId(decode_key(&project.authentication_public_key)?); + let (sbs, watchers_with_subscriptions) = prepare_subscription_watchers( + &request_iss_client_id, + &request_auth.shared_claims.mjv, + &account, + &project.app_domain, + &state.postgres, + state.metrics.as_ref(), + ) + .await?; let now = Utc::now(); let response_message = SubscriptionUpdateResponseAuth { shared_claims: SharedClaims { iat: now.timestamp() as u64, exp: add_ttl(now, NOTIFY_UPDATE_RESPONSE_TTL).timestamp() as u64, - iss: identity.to_did_key(), - aud: sub_auth.shared_claims.iss, + iss: project_client_id.to_did_key(), + aud: request_iss_client_id.to_did_key(), act: NOTIFY_UPDATE_RESPONSE_ACT.to_owned(), mjv: "1".to_owned(), }, sub: account.to_did_pkh(), app: DidWeb::from_domain(project.app_domain.clone()), - sbs: vec![], + sbs, }; let response_auth = sign_jwt( response_message, @@ -158,17 +172,13 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { )?; let response = NotifyResponse::new(msg.id, ResponseAuth { response_auth }); - let envelope = Envelope::::new(&sym_key, response)?; - let base64_notification = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); - let response_topic = topic_from_key(&sym_key); - publish_relay_message( &state.relay_http_client, &Publish { - topic: response_topic, + topic: topic_from_key(&sym_key), message: base64_notification.into(), tag: NOTIFY_UPDATE_RESPONSE_TAG, ttl_secs: NOTIFY_UPDATE_RESPONSE_TTL.as_secs() as u32, @@ -178,14 +188,13 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { ) .await?; - update_subscription_watchers( + send_to_subscription_watchers( + watchers_with_subscriptions, &account, - &project.app_domain, - &state.postgres, - &state.relay_http_client.clone(), - state.metrics.as_ref(), &state.notify_keys.authentication_secret, &state.notify_keys.authentication_client_id, + &state.relay_http_client.clone(), + state.metrics.as_ref(), ) .await?; diff --git a/src/services/websocket_server/handlers/notify_watch_subscriptions.rs b/src/services/websocket_server/handlers/notify_watch_subscriptions.rs index 15ab60b9..2d102285 100644 --- a/src/services/websocket_server/handlers/notify_watch_subscriptions.rs +++ b/src/services/websocket_server/handlers/notify_watch_subscriptions.rs @@ -11,7 +11,7 @@ use { helpers::{ get_project_by_app_domain, get_subscription_watchers_for_account_by_app_or_all_app, get_subscriptions_by_account, get_subscriptions_by_account_and_app, - upsert_subscription_watcher, SubscriberWithProject, + upsert_subscription_watcher, SubscriberWithProject, SubscriptionWatcherQuery, }, types::AccountId, }, @@ -78,6 +78,8 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { "request_auth.shared_claims.iss: {:?}", request_auth.shared_claims.iss ); + let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss) + .map_err(AuthError::JwtIssNotDidKey)?; // Verify request let authorization = { @@ -86,7 +88,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { } verify_identity( - &request_auth.shared_claims.iss, + &request_iss_client_id, &request_auth.ksu, &request_auth.sub, state.redis.as_ref(), @@ -142,16 +144,13 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> { // Respond { - let identity: DecodedClientId = - DecodedClientId(state.notify_keys.authentication_public.to_bytes()); - let now = Utc::now(); let response_message = WatchSubscriptionsResponseAuth { shared_claims: SharedClaims { iat: now.timestamp() as u64, exp: add_ttl(now, NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_TTL).timestamp() as u64, - iss: identity.to_did_key(), - aud: request_auth.shared_claims.iss, + iss: state.notify_keys.authentication_client_id.to_did_key(), + aud: request_iss_client_id.to_did_key(), act: NOTIFY_WATCH_SUBSCRIPTIONS_RESPONSE_ACT.to_owned(), mjv: "1".to_owned(), }, @@ -251,73 +250,20 @@ pub async fn collect_subscriptions( Ok(subscriptions) } -// TODO do async outside of websocket request handler +#[allow(clippy::type_complexity)] #[instrument(skip_all, fields(account = %account, app_domain = %app_domain))] -pub async fn update_subscription_watchers( +pub async fn prepare_subscription_watchers( + source_client_id: &DecodedClientId, + mjv: &str, account: &AccountId, app_domain: &str, postgres: &PgPool, - http_client: &relay_client::http::Client, metrics: Option<&Metrics>, - authentication_secret: &ed25519_dalek::SigningKey, - authentication_client_id: &DecodedClientId, -) -> Result<()> { - info!("Called update_subscription_watchers"); - - #[allow(clippy::too_many_arguments)] - #[instrument(skip_all, fields(account = %account, aud = %aud, subscriptions_count = %subscriptions.len()))] - async fn send( - subscriptions: Vec, - account: &AccountId, - aud: String, - sym_key: &str, - authentication_secret: &ed25519_dalek::SigningKey, - authentication_client_id: &DecodedClientId, - http_client: &relay_client::http::Client, - metrics: Option<&Metrics>, - ) -> Result<()> { - let now = Utc::now(); - let response_message = WatchSubscriptionsChangedRequestAuth { - shared_claims: SharedClaims { - iat: now.timestamp() as u64, - exp: add_ttl(now, NOTIFY_SUBSCRIPTIONS_CHANGED_TTL).timestamp() as u64, - iss: authentication_client_id.to_did_key(), - aud, - act: NOTIFY_SUBSCRIPTIONS_CHANGED_ACT.to_owned(), - mjv: "1".to_owned(), - }, - sub: account.to_did_pkh(), - sbs: subscriptions, - }; - let auth = sign_jwt(response_message, authentication_secret)?; - let request = NotifyRequest::new( - NOTIFY_SUBSCRIPTIONS_CHANGED_METHOD, - NotifySubscriptionsChanged { - subscriptions_changed_auth: auth, - }, - ); - - let sym_key = decode_key(sym_key)?; - let envelope = Envelope::::new(&sym_key, request)?; - let base64_notification = - base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); - - let topic = topic_from_key(&sym_key); - publish_relay_message( - http_client, - &Publish { - topic, - message: base64_notification.into(), - tag: NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, - ttl_secs: NOTIFY_SUBSCRIPTIONS_CHANGED_TTL.as_secs() as u32, - prompt: false, - }, - metrics, - ) - .await?; - - Ok(()) - } +) -> Result<( + Vec, + Vec<(SubscriptionWatcherQuery, Vec)>, +)> { + info!("Called prepare_subscription_watchers"); // TODO can we combine collect_subscriptions() and get_subscription_watchers_for_account_by_app_or_all_app() queries? @@ -338,6 +284,11 @@ pub async fn update_subscription_watchers( ) .await?; info!("Timing: Finished querying get_subscription_watchers_for_account_by_app_or_all_app"); + + let mut source_subscriptions = None; + let mut watchers_with_subscriptions = Vec::with_capacity(subscription_watchers.len()); + + let source_did_key = source_client_id.to_did_key(); for watcher in subscription_watchers { let subscriptions = if watcher.project.is_some() { app_subscriptions.clone() @@ -345,6 +296,36 @@ pub async fn update_subscription_watchers( all_account_subscriptions.clone() }; + let source_is_this_watcher = source_did_key == watcher.did_key; + if source_is_this_watcher { + assert!( + source_subscriptions.is_none(), + "Found multiple subscription watchers for same did_key: {}", + watcher.did_key + ); + source_subscriptions = Some(subscriptions.clone()); + } + + if !source_is_this_watcher || mjv == "0" { + watchers_with_subscriptions.push((watcher, subscriptions)); + } + } + + // In-case the source client never called watchSubscriptions, we can still give back a response + let source_subscriptions = source_subscriptions.unwrap_or(app_subscriptions); + + Ok((source_subscriptions, watchers_with_subscriptions)) +} + +pub async fn send_to_subscription_watchers( + watchers_with_subscriptions: Vec<(SubscriptionWatcherQuery, Vec)>, + account: &AccountId, + authentication_secret: &ed25519_dalek::SigningKey, + authentication_client_id: &DecodedClientId, + http_client: &relay_client::http::Client, + metrics: Option<&Metrics>, +) -> Result<()> { + for (watcher, subscriptions) in watchers_with_subscriptions { info!( "Timing: Sending watchSubscriptionsChanged to watcher.did_key: {}", watcher.did_key @@ -365,6 +346,59 @@ pub async fn update_subscription_watchers( watcher.did_key ); } + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all, fields(account = %account, aud = %aud, subscriptions_count = %subscriptions.len()))] +async fn send( + subscriptions: Vec, + account: &AccountId, + aud: String, + sym_key: &str, + authentication_secret: &ed25519_dalek::SigningKey, + authentication_client_id: &DecodedClientId, + http_client: &relay_client::http::Client, + metrics: Option<&Metrics>, +) -> Result<()> { + let now = Utc::now(); + let response_message = WatchSubscriptionsChangedRequestAuth { + shared_claims: SharedClaims { + iat: now.timestamp() as u64, + exp: add_ttl(now, NOTIFY_SUBSCRIPTIONS_CHANGED_TTL).timestamp() as u64, + iss: authentication_client_id.to_did_key(), + aud, + act: NOTIFY_SUBSCRIPTIONS_CHANGED_ACT.to_owned(), + mjv: "1".to_owned(), + }, + sub: account.to_did_pkh(), + sbs: subscriptions, + }; + let auth = sign_jwt(response_message, authentication_secret)?; + let request = NotifyRequest::new( + NOTIFY_SUBSCRIPTIONS_CHANGED_METHOD, + NotifySubscriptionsChanged { + subscriptions_changed_auth: auth, + }, + ); + + let sym_key = decode_key(sym_key)?; + let envelope = Envelope::::new(&sym_key, request)?; + let base64_notification = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes()); + + let topic = topic_from_key(&sym_key); + publish_relay_message( + http_client, + &Publish { + topic, + message: base64_notification.into(), + tag: NOTIFY_SUBSCRIPTIONS_CHANGED_TAG, + ttl_secs: NOTIFY_SUBSCRIPTIONS_CHANGED_TTL.as_secs() as u32, + prompt: false, + }, + metrics, + ) + .await?; Ok(()) } diff --git a/tests/integration.rs b/tests/integration.rs index 092ad971..d54da023 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -6629,7 +6629,6 @@ async fn watch_subscriptions_multiple_clients_mjv_v0(notify_server: &NotifyServe #[test_context(NotifyServerContext)] #[tokio::test] -#[ignore] async fn watch_subscriptions_multiple_clients_mjv_v1(notify_server: &NotifyServerContext) { let (account_signing_key, account) = generate_account();