Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SDK analytics #545

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/analytics/relay_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {

pub struct RelayResponseParams {
pub request: Arc<RelayIncomingMessage>,
pub request_sdk: Option<Arc<str>>,

pub response_message_id: Arc<str>,
pub response_topic: Topic,
Expand All @@ -31,6 +32,8 @@ pub struct RelayRequest {
pub request_tag: u32,
/// Time at which the request was received
pub request_received_at: NaiveDateTime,
/// The SDK information of the request
pub request_sdk: Option<Arc<str>>,

/// Relay message ID of response
pub response_message_id: Arc<str>,
Expand All @@ -55,6 +58,7 @@ impl From<RelayResponseParams> for RelayRequest {
request_topic: params.request.topic.value().clone(),
request_tag: params.request.tag,
request_received_at: params.request.received_at.naive_utc(),
request_sdk: params.request_sdk,

response_message_id: params.response_message_id.clone(),
response_topic: params.response_topic.value().clone(),
Expand Down
70 changes: 64 additions & 6 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub trait GetSharedClaims {
fn get_shared_claims(&self) -> &SharedClaims;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct WatchSubscriptionsRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -100,6 +100,16 @@ pub struct WatchSubscriptionsRequestAuth {
/// did:web of app domain to watch, or `null` for all domains
#[serde(default)]
pub app: Option<DidWeb>,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl WatchSubscriptionsRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for WatchSubscriptionsRequestAuth {
Expand Down Expand Up @@ -176,7 +186,7 @@ pub struct WatchSubscriptionsChangedResponseAuth {
// Note: MessageAuth is different since it doesn't have `aud`
// pub struct MessageAuth {

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct MessageResponseAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -186,6 +196,16 @@ pub struct MessageResponseAuth {
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl MessageResponseAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for MessageResponseAuth {
Expand All @@ -194,7 +214,7 @@ impl GetSharedClaims for MessageResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -206,6 +226,16 @@ pub struct SubscriptionRequestAuth {
pub app: DidWeb,
/// space-delimited scope of notification types authorized by the user
pub scp: String,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionRequestAuth {
Expand Down Expand Up @@ -233,7 +263,7 @@ impl GetSharedClaims for SubscriptionResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionUpdateRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -245,6 +275,16 @@ pub struct SubscriptionUpdateRequestAuth {
pub app: DidWeb,
/// space-delimited scope of notification types authorized by the user
pub scp: String,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionUpdateRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionUpdateRequestAuth {
Expand All @@ -271,7 +311,7 @@ impl GetSharedClaims for SubscriptionUpdateResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionDeleteRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -281,6 +321,16 @@ pub struct SubscriptionDeleteRequestAuth {
pub sub: String,
/// did:web of app domain
pub app: DidWeb,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionDeleteRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(&self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))
}
}

impl GetSharedClaims for SubscriptionDeleteRequestAuth {
Expand Down Expand Up @@ -320,6 +370,9 @@ pub struct SubscriptionGetNotificationsRequestAuth {
#[serde(flatten)]
#[validate(nested)]
pub params: GetNotificationsParams,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionGetNotificationsRequestAuth {
Expand Down Expand Up @@ -353,7 +406,7 @@ impl GetSharedClaims for SubscriptionGetNotificationsResponseAuth {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
#[serde(flatten)]
pub shared_claims: SharedClaims,
Expand All @@ -365,10 +418,15 @@ pub struct SubscriptionMarkNotificationsAsReadRequestAuth {
pub app: DidWeb,
#[serde(flatten)]
pub params: MarkNotificationsAsReadParams,
/// Arbitrary-format platform and version of the SDK being used
#[validate(length(min = 1, max = 16))]
pub sdk: Option<String>,
}

impl SubscriptionMarkNotificationsAsReadRequestAuth {
pub fn validate(&self) -> Result<(), NotifyServerError> {
Validate::validate(self)
.map_err(|error| NotifyServerError::UnprocessableEntity(error.to_string()))?;
self.params
.validate_with_args(&MarkNotificationsAsReadParamsValidatorContext {
all: self.params.all,
Expand Down
3 changes: 3 additions & 0 deletions src/publish_relay_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub async fn publish_relay_message(
relay_client: &Client,
publish: &Publish,
relay_request: Option<Arc<RelayIncomingMessage>>,
sdk: Option<Arc<str>>,
metrics: Option<&Metrics>,
analytics: &NotifyAnalytics,
) -> Result<(), Error<PublishError>> {
Expand Down Expand Up @@ -92,6 +93,7 @@ pub async fn publish_relay_message(
let finished = Utc::now();
analytics.relay_request(RelayResponseParams {
request: relay_request,
request_sdk: sdk,
response_message_id: get_message_id(&publish.message).into(),
response_topic: publish.topic.clone(),
response_tag: publish.tag,
Expand Down Expand Up @@ -122,6 +124,7 @@ pub async fn publish_relay_message(
let finished = Utc::now();
analytics.relay_request(RelayResponseParams {
request: relay_request,
request_sdk: sdk,
response_message_id: get_message_id(&publish.message).into(),
response_topic: publish.topic.clone(),
response_tag: publish.tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ pub enum RelayMessageServerError {

#[error("Subscription watcher send: {0}")]
SubscriptionWatcherSend(SubscriptionWatcherSendError),

#[error("Error sending sdk info via oneshot channel")]
SdkOneshotSend,
}

#[derive(Debug, thiserror::Error)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use {
rpc::Publish,
},
std::{collections::HashSet, sync::Arc},
tokio::sync::oneshot,
tracing::{info, warn},
};

Expand Down Expand Up @@ -87,10 +88,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R

let req = decrypt_message::<NotifyDelete, _>(envelope, &sym_key)?;

let (sdk_tx, mut sdk_rx) = oneshot::channel();
async fn handle(
state: &AppState,
msg: &RelayIncomingMessage,
req: &JsonRpcRequest<NotifyDelete>,
sdk_tx: oneshot::Sender<Option<Arc<str>>>,
subscriber: &SubscriberWithScope,
project: &Project,
project_client_id: DecodedClientId,
Expand All @@ -111,6 +114,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

sdk_tx
.send(request_auth.sdk.map(Into::into))
.map_err(|_| RelayMessageServerError::SdkOneshotSend)?;
let request_iss_client_id =
DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)
Expand Down Expand Up @@ -231,7 +242,16 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Ok((ResponseAuth { response_auth }, watchers_with_subscriptions))
}

let result = handle(state, &msg, &req, &subscriber, &project, project_client_id).await;
let result = handle(
state,
&msg,
&req,
sdk_tx,
&subscriber,
&project,
project_client_id,
)
.await;

let (response, watchers_with_subscriptions, result) = match result {
Ok((result, watchers_with_subscriptions)) => (
Expand All @@ -248,10 +268,13 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
),
};

let sdk = sdk_rx.try_recv().unwrap_or(None);

let msg = Arc::from(msg);

let response_fut = {
let msg = msg.clone();
let sdk = sdk.clone();
async {
let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;
Expand All @@ -268,6 +291,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
prompt: false,
},
Some(msg),
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand All @@ -285,6 +309,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
&state.notify_keys.authentication_client_id,
&state.relay_client,
msg,
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use {
rpc::{msg_id::get_message_id, Publish},
},
std::sync::Arc,
tokio::sync::oneshot,
tracing::info,
};

Expand Down Expand Up @@ -79,10 +80,12 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R

let req = decrypt_message::<AuthMessage, _>(envelope, &sym_key)?;

let (sdk_tx, mut sdk_rx) = oneshot::channel();
async fn handle(
state: &AppState,
msg: &RelayIncomingMessage,
req: &JsonRpcRequest<AuthMessage>,
sdk_tx: oneshot::Sender<Option<Arc<str>>>,
subscriber: &SubscriberWithScope,
project: &Project,
) -> Result<AuthMessage, RelayMessageError> {
Expand All @@ -96,6 +99,14 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
"request_auth.shared_claims.iss: {:?}",
request_auth.shared_claims.iss
);

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

sdk_tx
.send(request_auth.sdk.map(Into::into))
.map_err(|_| RelayMessageServerError::SdkOneshotSend)?;
let request_iss_client_id =
DecodedClientId::try_from_did_key(&request_auth.shared_claims.iss)
.map_err(AuthError::JwtIssNotDidKey)
Expand Down Expand Up @@ -141,10 +152,6 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
(account, Arc::<str>::from(domain))
};

request_auth
.validate()
.map_err(RelayMessageServerError::NotifyServer)?; // TODO change to client error?

let data = get_notifications_for_subscriber(
subscriber.id,
request_auth.params,
Expand Down Expand Up @@ -218,7 +225,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
Ok(AuthMessage { auth })
}

let result = handle(state, &msg, &req, &subscriber, &project).await;
let result = handle(state, &msg, &req, sdk_tx, &subscriber, &project).await;

let response = match &result {
Ok(result) => serde_json::to_vec(&JsonRpcResponse::new(req.id, result))
Expand All @@ -227,6 +234,8 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
.map_err(RelayMessageServerError::JsonRpcResponseErrorSerialization)?,
};

let sdk = sdk_rx.try_recv().unwrap_or(None);

let envelope = Envelope::<EnvelopeType0>::new(&sym_key, response)
.map_err(RelayMessageServerError::EnvelopeEncryption)?;

Expand All @@ -242,6 +251,7 @@ pub async fn handle(msg: RelayIncomingMessage, state: &AppState) -> Result<(), R
prompt: false,
},
Some(Arc::new(msg)),
sdk,
state.metrics.as_ref(),
&state.analytics,
)
Expand Down
Loading
Loading