Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enhance Serving To Support Flatmap Operations #2324

Merged
merged 9 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ struct TimestampedPending {
#[derive(Clone)]
pub(crate) enum LagReader {
Source(Source),
// TODO: Arc<[T]>
ISB(Vec<JetstreamReader>), // multiple partitions
}

Expand Down
33 changes: 22 additions & 11 deletions rust/serving/src/app/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,23 @@ use state::State as CallbackState;
/// store for storing the state
pub(crate) mod store;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) struct CallbackRequest {
/// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback
/// to inform that message has been processed by this component.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct Callback {
pub(crate) id: String,
pub(crate) vertex: String,
pub(crate) cb_time: u64,
pub(crate) from_vertex: String,
/// Due to flat-map operation, we can have 0 or more responses.
pub(crate) responses: Vec<Response>,
}

/// It contains details about the `To` vertex via tags (conditional forwarding).
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct Response {
/// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that
/// the message has been dropped.
pub(crate) tags: Option<Vec<String>>,
}

Expand Down Expand Up @@ -68,7 +79,7 @@ async fn callback_save<T: Send + Sync + Clone + Store>(

async fn callback<T: Send + Sync + Clone + Store>(
State(app_state): State<CallbackAppState<T>>,
Json(payload): Json<Vec<CallbackRequest>>,
Json(payload): Json<Vec<Callback>>,
) -> Result<(), ApiError> {
app_state
.callback_state
Expand Down Expand Up @@ -107,12 +118,12 @@ mod tests {
let state = CallbackState::new(msg_graph, store).await.unwrap();
let app = callback_handler("ID".to_owned(), state);

let payload = vec![CallbackRequest {
let payload = vec![Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
}];

let res = Request::builder()
Expand Down Expand Up @@ -143,26 +154,26 @@ mod tests {
let app = callback_handler("ID".to_owned(), state);

let payload = vec![
CallbackRequest {
Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: "test_id".to_string(),
vertex: "cat".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: "test_id".to_string(),
vertex: "out".to_string(),
cb_time: 12345,
from_vertex: "cat".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
];

Expand Down
68 changes: 32 additions & 36 deletions rust/serving/src/app/callback/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::{
use tokio::sync::oneshot;

use super::store::Store;
use crate::app::callback::{store::PayloadToSave, CallbackRequest};
use crate::app::callback::{store::PayloadToSave, Callback};
use crate::app::tracker::MessageGraph;
use crate::Error;

struct RequestState {
// Channel to notify when all callbacks for a message is received
tx: oneshot::Sender<Result<String, Error>>,
// CallbackRequest is immutable, while vtx_visited can grow.
vtx_visited: Vec<Arc<CallbackRequest>>,
vtx_visited: Vec<Arc<Callback>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -81,15 +81,14 @@ where
/// insert_callback_requests is used to insert the callback requests.
pub(crate) async fn insert_callback_requests(
&mut self,
cb_requests: Vec<CallbackRequest>,
cb_requests: Vec<Callback>,
) -> Result<(), Error> {
/*
TODO: should we consider batching the requests and then processing them?
that way algorithm can be invoked only once for a batch of requests
instead of invoking it for each request.
*/
let cb_requests: Vec<Arc<CallbackRequest>> =
cb_requests.into_iter().map(Arc::new).collect();
let cb_requests: Vec<Arc<Callback>> = cb_requests.into_iter().map(Arc::new).collect();
let redis_payloads: Vec<PayloadToSave> = cb_requests
.iter()
.cloned()
Expand Down Expand Up @@ -153,23 +152,18 @@ where
id: &str,
) -> Result<String, Error> {
// If the id is not found in the in-memory store, fetch from Redis
let callbacks: Vec<Arc<CallbackRequest>> =
match self.retrieve_callbacks_from_storage(id).await {
Ok(callbacks) => callbacks,
Err(e) => {
return Err(e);
}
};
let callbacks: Vec<Arc<Callback>> = match self.retrieve_callbacks_from_storage(id).await {
Ok(callbacks) => callbacks,
Err(e) => {
return Err(e);
}
};
// check if the sub graph can be generated
self.get_subgraph(id.to_string(), callbacks)
}

// Generate subgraph from the given callbacks
fn get_subgraph(
&self,
id: String,
callbacks: Vec<Arc<CallbackRequest>>,
) -> Result<String, Error> {
fn get_subgraph(&self, id: String, callbacks: Vec<Arc<Callback>>) -> Result<String, Error> {
match self
.msg_graph_generator
.generate_subgraph_from_callbacks(id, callbacks)
Expand Down Expand Up @@ -204,7 +198,7 @@ where

// Get the Callback value for the given ID
// TODO: Generate json serialized data here itself to avoid cloning.
fn get_callbacks_from_memory(&self, id: &str) -> Option<Vec<Arc<CallbackRequest>>> {
fn get_callbacks_from_memory(&self, id: &str) -> Option<Vec<Arc<Callback>>> {
let guard = self.callbacks.lock().expect("Getting lock on State");
guard.get(id).map(|state| state.vtx_visited.clone())
}
Expand All @@ -213,9 +207,9 @@ where
async fn retrieve_callbacks_from_storage(
&mut self,
id: &str,
) -> Result<Vec<Arc<CallbackRequest>>, Error> {
) -> Result<Vec<Arc<Callback>>, Error> {
// If the id is not found in the in-memory store, fetch from Redis
let callbacks: Vec<Arc<CallbackRequest>> = match self.store.retrieve_callbacks(id).await {
let callbacks: Vec<Arc<Callback>> = match self.store.retrieve_callbacks(id).await {
Ok(response) => response.into_iter().collect(),
Err(e) => {
return Err(e);
Expand All @@ -232,11 +226,11 @@ where

#[cfg(test)]
mod tests {
use axum::body::Bytes;

use super::*;
use crate::app::callback::store::memstore::InMemoryStore;
use crate::app::callback::Response;
use crate::pipeline::PipelineDCG;
use axum::body::Bytes;

const PIPELINE_SPEC_ENCODED: &str = "eyJ2ZXJ0aWNlcyI6W3sibmFtZSI6ImluIiwic291cmNlIjp7InNlcnZpbmciOnsiYXV0aCI6bnVsbCwic2VydmljZSI6dHJ1ZSwibXNnSURIZWFkZXJLZXkiOiJYLU51bWFmbG93LUlkIiwic3RvcmUiOnsidXJsIjoicmVkaXM6Ly9yZWRpczo2Mzc5In19fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIiLCJlbnYiOlt7Im5hbWUiOiJSVVNUX0xPRyIsInZhbHVlIjoiZGVidWcifV19LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InBsYW5uZXIiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJwbGFubmVyIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InRpZ2VyIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsidGlnZXIiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZG9nIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZG9nIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6ImVsZXBoYW50IiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZWxlcGhhbnQiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiYXNjaWlhcnQiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJhc2NpaWFydCJdLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJidWlsdGluIjpudWxsLCJncm91cEJ5IjpudWxsfSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fSx7Im5hbWUiOiJzZXJ2ZS1zaW5rIiwic2luayI6eyJ1ZHNpbmsiOnsiY29udGFpbmVyIjp7ImltYWdlIjoic2VydmVzaW5rOjAuMSIsImVudiI6W3sibmFtZSI6Ik5VTUFGTE9XX0NBTExCQUNLX1VSTF9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctQ2FsbGJhY2stVXJsIn0seyJuYW1lIjoiTlVNQUZMT1dfTVNHX0lEX0hFQURFUl9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctSWQifV0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn19LCJyZXRyeVN0cmF0ZWd5Ijp7fX0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZXJyb3Itc2luayIsInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InNlcnZlc2luazowLjEiLCJlbnYiOlt7Im5hbWUiOiJOVU1BRkxPV19DQUxMQkFDS19VUkxfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUNhbGxiYWNrLVVybCJ9LHsibmFtZSI6Ik5VTUFGTE9XX01TR19JRF9IRUFERVJfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUlkIn1dLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9fSwicmV0cnlTdHJhdGVneSI6e319LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19XSwiZWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoicGxhbm5lciIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImFzY2lpYXJ0IiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiYXNjaWlhcnQiXX19fSx7ImZyb20iOiJwbGFubmVyIiwidG8iOiJ0aWdlciIsImNvbmRpdGlvbnMiOnsidGFncyI6eyJvcGVyYXRvciI6Im9yIiwidmFsdWVzIjpbInRpZ2VyIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZG9nIiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiZG9nIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZWxlcGhhbnQiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlbGVwaGFudCJdfX19LHsiZnJvbSI6InRpZ2VyIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZG9nIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZWxlcGhhbnQiLCJ0byI6InNlcnZlLXNpbmsiLCJjb25kaXRpb25zIjpudWxsfSx7ImZyb20iOiJhc2NpaWFydCIsInRvIjoic2VydmUtc2luayIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImVycm9yLXNpbmsiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlcnJvciJdfX19XSwibGlmZWN5Y2xlIjp7fSwid2F0ZXJtYXJrIjp7fX0=";

Expand Down Expand Up @@ -271,47 +265,49 @@ mod tests {

// Test insert_callback_requests
let cbs = vec![
CallbackRequest {
Callback {
id: id.clone(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "planner".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: Some(vec!["tiger".to_owned(), "asciiart".to_owned()]),
responses: vec![Response {
tags: Some(vec!["tiger".to_owned(), "asciiart".to_owned()]),
}],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "tiger".to_string(),
cb_time: 12345,
from_vertex: "planner".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "asciiart".to_string(),
cb_time: 12345,
from_vertex: "planner".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "serve-sink".to_string(),
cb_time: 12345,
from_vertex: "tiger".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
CallbackRequest {
Callback {
id: id.clone(),
vertex: "serve-sink".to_string(),
cb_time: 12345,
from_vertex: "asciiart".to_string(),
tags: None,
responses: vec![Response { tags: None }],
},
];
state.insert_callback_requests(cbs).await.unwrap();
Expand Down Expand Up @@ -345,12 +341,12 @@ mod tests {
let store = InMemoryStore::new();
let mut state = State::new(msg_graph, store).await.unwrap();

let cbs = vec![CallbackRequest {
let cbs = vec![Callback {
id: "nonexistent_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
}];

// Try to insert callback requests for an ID that hasn't been registered
Expand Down
12 changes: 3 additions & 9 deletions rust/serving/src/app/callback/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use crate::app::callback::CallbackRequest;
use crate::app::callback::Callback;

// in-memory store
pub(crate) mod memstore;
Expand All @@ -9,10 +9,7 @@ pub(crate) mod redisstore;

pub(crate) enum PayloadToSave {
/// Callback as sent by Numaflow to track the progression
Callback {
key: String,
value: Arc<CallbackRequest>,
},
Callback { key: String, value: Arc<Callback> },
/// Data sent by the Numaflow pipeline which is to be delivered as the response
DatumFromPipeline {
key: String,
Expand All @@ -26,10 +23,7 @@ pub(crate) enum PayloadToSave {
pub(crate) trait LocalStore {
async fn save(&mut self, messages: Vec<PayloadToSave>) -> crate::Result<()>;
/// retrieve the callback payloads
async fn retrieve_callbacks(
&mut self,
id: &str,
) -> Result<Vec<Arc<CallbackRequest>>, crate::Error>;
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>, crate::Error>;
async fn retrieve_datum(&mut self, id: &str) -> Result<Vec<Vec<u8>>, crate::Error>;
async fn ready(&mut self) -> bool;
}
16 changes: 8 additions & 8 deletions rust/serving/src/app/callback/store/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use super::PayloadToSave;
use crate::app::callback::CallbackRequest;
use crate::app::callback::Callback;
use crate::consts::SAVED;
use crate::Error;

Expand Down Expand Up @@ -55,14 +55,14 @@ impl super::Store for InMemoryStore {

/// Retrieves callbacks for a given id from the `HashMap`.
/// Each callback is deserialized from bytes into a `CallbackRequest`.
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<CallbackRequest>>, Error> {
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>, Error> {
let data = self.data.lock().unwrap();
match data.get(id) {
Some(result) => {
let messages: Result<Vec<_>, _> = result
.iter()
.map(|msg| {
let cbr: CallbackRequest = serde_json::from_slice(msg).map_err(|_| {
let cbr: Callback = serde_json::from_slice(msg).map_err(|_| {
Error::StoreRead(
"Failed to parse CallbackRequest from bytes".to_string(),
)
Expand Down Expand Up @@ -98,18 +98,18 @@ mod tests {

use super::*;
use crate::app::callback::store::{PayloadToSave, Store};
use crate::app::callback::CallbackRequest;
use crate::app::callback::{Callback, Response};

#[tokio::test]
async fn test_save_and_retrieve_callbacks() {
let mut store = InMemoryStore::new();
let key = "test_key".to_string();
let value = Arc::new(CallbackRequest {
let value = Arc::new(Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
});

// Save a callback
Expand Down Expand Up @@ -179,12 +179,12 @@ mod tests {
#[tokio::test]
async fn test_save_invalid_callback() {
let mut store = InMemoryStore::new();
let value = Arc::new(CallbackRequest {
let value = Arc::new(Callback {
id: "test_id".to_string(),
vertex: "in".to_string(),
cb_time: 12345,
from_vertex: "in".to_string(),
tags: None,
responses: vec![Response { tags: None }],
});

// Try to save a callback with an invalid key
Expand Down
Loading
Loading