Skip to content

Commit

Permalink
feat: sbs response
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed Jan 10, 2024
1 parent 56e187a commit 0b86012
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 229 deletions.
6 changes: 2 additions & 4 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,16 +611,14 @@ pub const KEYS_SERVER_IDENTITY_ENDPOINT: &str = "/identity";
pub const KEYS_SERVER_IDENTITY_ENDPOINT_PUBLIC_KEY_QUERY: &str = "publicKey";

pub async fn verify_identity(
iss: &str,
iss_client_id: &DecodedClientId,
ksu: &str,
sub: &str,
redis: Option<&Arc<Redis>>,
metrics: Option<&Metrics>,
) -> Result<Authorization> {
let mut url = Url::parse(ksu)?.join(KEYS_SERVER_IDENTITY_ENDPOINT)?;
let pubkey = DecodedClientId::try_from_did_key(iss)
.map_err(AuthError::JwtIssNotDidKey)?
.to_string();
let pubkey = iss_client_id.to_string();
url.query_pairs_mut()
.append_pair(KEYS_SERVER_IDENTITY_ENDPOINT_PUBLIC_KEY_QUERY, &pubkey);

Expand Down
11 changes: 10 additions & 1 deletion src/model/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::{services::websocket_server::decode_key, utils::get_client_id},
chrono::{DateTime, Utc},
relay_rpc::domain::{ProjectId, Topic},
relay_rpc::domain::{DecodedClientId, ProjectId, Topic},
sqlx::FromRow,
uuid::Uuid,
};
Expand All @@ -24,6 +25,14 @@ pub struct Project {
pub subscribe_private_key: String,
}

impl Project {
pub fn get_authentication_client_id(&self) -> crate::error::Result<DecodedClientId> {
Ok(get_client_id(&ed25519_dalek::VerifyingKey::from_bytes(
&decode_key(&self.authentication_public_key)?,
)?))
}
}

#[derive(Debug, FromRow)]
pub struct Subscriber {
pub id: Uuid,
Expand Down
122 changes: 69 additions & 53 deletions src/services/websocket_server/handlers/notify_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ use {
registry::storage::redis::Redis,
services::websocket_server::{
decode_key,
handlers::{decrypt_message, notify_watch_subscriptions::update_subscription_watchers},
handlers::{
decrypt_message,
notify_watch_subscriptions::{
prepare_subscription_watchers, send_to_subscription_watchers,
},
},
NotifyDelete, NotifyRequest, NotifyResponse, ResponseAuth,
},
spec::{
Expand Down Expand Up @@ -55,6 +60,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) ->
let project =
get_project_by_id(subscriber.project, &state.postgres, state.metrics.as_ref()).await?;
info!("project.id: {}", project.id);
let project_client_id = project.get_authentication_client_id()?;

let envelope = Envelope::<EnvelopeType0>::from_bytes(
base64::engine::general_purpose::STANDARD.decode(msg.message.to_string())?,
Expand All @@ -64,17 +70,20 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) ->

let msg: NotifyRequest<NotifyDelete> = decrypt_message(envelope, &sym_key)?;

let sub_auth = from_jwt::<SubscriptionDeleteRequestAuth>(&msg.params.delete_auth)?;
let request_auth = from_jwt::<SubscriptionDeleteRequestAuth>(&msg.params.delete_auth)?;
info!(
"sub_auth.shared_claims.iss: {:?}",
sub_auth.shared_claims.iss
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);
if sub_auth.app.domain() != project.app_domain {
let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)?;

if request_auth.app.domain() != project.app_domain {
Err(Error::AppDoesNotMatch)?;
}

let (account, siwe_domain) = {
if sub_auth.shared_claims.act != NOTIFY_DELETE_ACT {
if request_auth.shared_claims.act != NOTIFY_DELETE_ACT {
return Err(AuthError::InvalidAct)?;
}

Expand All @@ -83,9 +92,9 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) ->
app,
domain,
} = verify_identity(
&sub_auth.shared_claims.iss,
&sub_auth.ksu,
&sub_auth.sub,
&request_iss_client_id,
&request_auth.ksu,
&request_auth.sub,
state.redis.as_ref(),
state.metrics.as_ref(),
)
Expand Down Expand Up @@ -122,7 +131,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) ->
project_id: project.project_id,
pk: subscriber.id,
account: account.clone(),
updated_by_iss: sub_auth.shared_claims.iss.clone().into(),
updated_by_iss: request_auth.shared_claims.iss.clone().into(),
updated_by_domain: siwe_domain,
method: NotifyClientMethod::Unsubscribe,
old_scope: subscriber.scope.into_iter().map(Into::into).collect(),
Expand All @@ -131,56 +140,63 @@ pub async fn handle(msg: PublishedMessage, state: &AppState, client: &Client) ->
topic,
});

let identity = DecodedClientId(decode_key(&project.authentication_public_key)?);

let now = Utc::now();
let response_message = SubscriptionDeleteResponseAuth {
shared_claims: SharedClaims {
iat: now.timestamp() as u64,
exp: add_ttl(now, NOTIFY_DELETE_RESPONSE_TTL).timestamp() as u64,
iss: identity.to_did_key(),
aud: sub_auth.shared_claims.iss,
act: NOTIFY_DELETE_RESPONSE_ACT.to_owned(),
mjv: "1".to_owned(),
},
sub: account.to_did_pkh(),
app: DidWeb::from_domain(project.app_domain.clone()),
sbs: vec![],
};
let response_auth = sign_jwt(
response_message,
&ed25519_dalek::SigningKey::from_bytes(&decode_key(&project.authentication_private_key)?),
)?;

let response = NotifyResponse::new(msg.id, ResponseAuth { response_auth });

let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)?;

let base64_notification = base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes());

let response_topic = topic_from_key(&sym_key);

publish_relay_message(
&state.relay_http_client,
&Publish {
topic: response_topic,
message: base64_notification.into(),
tag: NOTIFY_DELETE_RESPONSE_TAG,
ttl_secs: NOTIFY_DELETE_RESPONSE_TTL.as_secs() as u32,
prompt: false,
},
let (sbs, watchers_with_subscriptions) = prepare_subscription_watchers(
&request_iss_client_id,
&request_auth.shared_claims.mjv,
&account,
&project.app_domain,
&state.postgres,
state.metrics.as_ref(),
)
.await?;

update_subscription_watchers(
{
let now = Utc::now();
let response_message = SubscriptionDeleteResponseAuth {
shared_claims: SharedClaims {
iat: now.timestamp() as u64,
exp: add_ttl(now, NOTIFY_DELETE_RESPONSE_TTL).timestamp() as u64,
iss: project_client_id.to_did_key(),
aud: request_auth.shared_claims.iss,
act: NOTIFY_DELETE_RESPONSE_ACT.to_owned(),
mjv: "1".to_owned(),
},
sub: account.to_did_pkh(),
app: DidWeb::from_domain(project.app_domain.clone()),
sbs,
};
let response_auth = sign_jwt(
response_message,
&ed25519_dalek::SigningKey::from_bytes(&decode_key(
&project.authentication_private_key,
)?),
)?;
let response = NotifyResponse::new(msg.id, ResponseAuth { response_auth });
let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)?;
let base64_notification =
base64::engine::general_purpose::STANDARD.encode(envelope.to_bytes());

publish_relay_message(
&state.relay_http_client,
&Publish {
topic: topic_from_key(&sym_key),
message: base64_notification.into(),
tag: NOTIFY_DELETE_RESPONSE_TAG,
ttl_secs: NOTIFY_DELETE_RESPONSE_TTL.as_secs() as u32,
prompt: false,
},
state.metrics.as_ref(),
)
.await?;
}

send_to_subscription_watchers(
watchers_with_subscriptions,
&account,
&project.app_domain,
&state.postgres,
&state.relay_http_client.clone(),
state.metrics.as_ref(),
&state.notify_keys.authentication_secret,
&state.notify_keys.authentication_client_id,
&state.relay_http_client.clone(),
state.metrics.as_ref(),
)
.await?;

Expand Down
25 changes: 14 additions & 11 deletions src/services/websocket_server/handlers/notify_get_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,20 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {

let msg: NotifyRequest<AuthMessage> = decrypt_message(envelope, &sym_key)?;

let request = from_jwt::<SubscriptionGetNotificationsRequestAuth>(&msg.params.auth)?;
let request_auth = from_jwt::<SubscriptionGetNotificationsRequestAuth>(&msg.params.auth)?;
info!(
"sub_auth.shared_claims.iss: {:?}",
request.shared_claims.iss
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);
if request.app.domain() != project.app_domain {
let request_iss_client_id = DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)?;

if request_auth.app.domain() != project.app_domain {
Err(Error::AppDoesNotMatch)?;
}

let account = {
if request.shared_claims.act != NOTIFY_GET_NOTIFICATIONS_ACT {
if request_auth.shared_claims.act != NOTIFY_GET_NOTIFICATIONS_ACT {
return Err(AuthError::InvalidAct)?;
}

Expand All @@ -85,9 +88,9 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {
app,
domain: _,
} = verify_identity(
&request.shared_claims.iss,
&request.ksu,
&request.sub,
&request_iss_client_id,
&request_auth.ksu,
&request_auth.sub,
state.redis.as_ref(),
state.metrics.as_ref(),
)
Expand All @@ -104,11 +107,11 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {
account
};

request.validate()?;
request_auth.validate()?;

let data = get_notifications_for_subscriber(
subscriber.id,
request.params,
request_auth.params,
&state.postgres,
state.metrics.as_ref(),
)
Expand All @@ -122,7 +125,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<()> {
iat: now.timestamp() as u64,
exp: add_ttl(now, NOTIFY_GET_NOTIFICATIONS_RESPONSE_TTL).timestamp() as u64,
iss: identity.to_did_key(),
aud: request.shared_claims.iss,
aud: request_iss_client_id.to_did_key(),
act: NOTIFY_GET_NOTIFICATIONS_RESPONSE_ACT.to_owned(),
mjv: "1".to_owned(),
},
Expand Down
Loading

0 comments on commit 0b86012

Please sign in to comment.