From 83f90ad0506626ab061a1aff9259603e9afd21cc Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Mon, 13 Jan 2025 20:58:07 +0530 Subject: [PATCH 1/5] feat: support flatmap operation in serving Signed-off-by: Yashash H L --- rust/serving/src/app/callback.rs | 38 +- rust/serving/src/app/callback/state.rs | 87 +- rust/serving/src/app/callback/store.rs | 12 +- .../src/app/callback/store/memstore.rs | 22 +- .../src/app/callback/store/redisstore.rs | 26 +- rust/serving/src/app/jetstream_proxy.rs | 31 +- rust/serving/src/app/tracker.rs | 873 +++++++++++++----- 7 files changed, 794 insertions(+), 295 deletions(-) diff --git a/rust/serving/src/app/callback.rs b/rust/serving/src/app/callback.rs index b4d43868ee..6932aadb5f 100644 --- a/rust/serving/src/app/callback.rs +++ b/rust/serving/src/app/callback.rs @@ -13,11 +13,17 @@ use state::State as CallbackState; pub(crate) mod store; #[derive(Debug, Serialize, Deserialize, Clone)] -pub(crate) struct CallbackRequest { +pub(crate) struct Callback { pub(crate) id: String, pub(crate) vertex: String, pub(crate) cb_time: u64, pub(crate) from_vertex: String, + pub(crate) responses: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub(crate) struct Response { + pub(crate) index: u16, pub(crate) tags: Option>, } @@ -68,7 +74,7 @@ async fn callback_save( async fn callback( State(app_state): State>, - Json(payload): Json>, + Json(payload): Json>, ) -> Result<(), ApiError> { app_state .callback_state @@ -107,12 +113,15 @@ mod tests { let state = CallbackState::new(msg_graph, store).await.unwrap(); let app = callback_handler("ID".to_owned(), state); - let payload = vec![CallbackRequest { + let payload = vec![Callback { id: "test_id".to_string(), vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }]; let res = Request::builder() @@ -143,26 +152,35 @@ mod tests { let app = callback_handler("ID".to_owned(), state); let payload = vec![ - CallbackRequest { + Callback { id: "test_id".to_string(), vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: "test_id".to_string(), vertex: "cat".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: "test_id".to_string(), vertex: "out".to_string(), cb_time: 12345, from_vertex: "cat".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, ]; diff --git a/rust/serving/src/app/callback/state.rs b/rust/serving/src/app/callback/state.rs index 293478ead2..f86b4a45ef 100644 --- a/rust/serving/src/app/callback/state.rs +++ b/rust/serving/src/app/callback/state.rs @@ -6,7 +6,7 @@ use std::{ use tokio::sync::oneshot; use super::store::Store; -use crate::app::callback::{store::PayloadToSave, CallbackRequest}; +use crate::app::callback::{store::PayloadToSave, Callback}; use crate::app::tracker::MessageGraph; use crate::Error; @@ -14,7 +14,7 @@ struct RequestState { // Channel to notify when all callbacks for a message is received tx: oneshot::Sender>, // CallbackRequest is immutable, while vtx_visited can grow. - vtx_visited: Vec>, + vtx_visited: Vec>, } #[derive(Clone)] @@ -81,15 +81,14 @@ where /// insert_callback_requests is used to insert the callback requests. pub(crate) async fn insert_callback_requests( &mut self, - cb_requests: Vec, + cb_requests: Vec, ) -> Result<(), Error> { /* TODO: should we consider batching the requests and then processing them? that way algorithm can be invoked only once for a batch of requests instead of invoking it for each request. */ - let cb_requests: Vec> = - cb_requests.into_iter().map(Arc::new).collect(); + let cb_requests: Vec> = cb_requests.into_iter().map(Arc::new).collect(); let redis_payloads: Vec = cb_requests .iter() .cloned() @@ -153,23 +152,18 @@ where id: &str, ) -> Result { // If the id is not found in the in-memory store, fetch from Redis - let callbacks: Vec> = - match self.retrieve_callbacks_from_storage(id).await { - Ok(callbacks) => callbacks, - Err(e) => { - return Err(e); - } - }; + let callbacks: Vec> = match self.retrieve_callbacks_from_storage(id).await { + Ok(callbacks) => callbacks, + Err(e) => { + return Err(e); + } + }; // check if the sub graph can be generated self.get_subgraph(id.to_string(), callbacks) } // Generate subgraph from the given callbacks - fn get_subgraph( - &self, - id: String, - callbacks: Vec>, - ) -> Result { + fn get_subgraph(&self, id: String, callbacks: Vec>) -> Result { match self .msg_graph_generator .generate_subgraph_from_callbacks(id, callbacks) @@ -204,7 +198,7 @@ where // Get the Callback value for the given ID // TODO: Generate json serialized data here itself to avoid cloning. - fn get_callbacks_from_memory(&self, id: &str) -> Option>> { + fn get_callbacks_from_memory(&self, id: &str) -> Option>> { let guard = self.callbacks.lock().expect("Getting lock on State"); guard.get(id).map(|state| state.vtx_visited.clone()) } @@ -213,9 +207,9 @@ where async fn retrieve_callbacks_from_storage( &mut self, id: &str, - ) -> Result>, Error> { + ) -> Result>, Error> { // If the id is not found in the in-memory store, fetch from Redis - let callbacks: Vec> = match self.store.retrieve_callbacks(id).await { + let callbacks: Vec> = match self.store.retrieve_callbacks(id).await { Ok(response) => response.into_iter().collect(), Err(e) => { return Err(e); @@ -232,11 +226,11 @@ where #[cfg(test)] mod tests { - use axum::body::Bytes; - use super::*; use crate::app::callback::store::memstore::InMemoryStore; + use crate::app::callback::Response; use crate::pipeline::PipelineDCG; + use axum::body::Bytes; const PIPELINE_SPEC_ENCODED: &str = "eyJ2ZXJ0aWNlcyI6W3sibmFtZSI6ImluIiwic291cmNlIjp7InNlcnZpbmciOnsiYXV0aCI6bnVsbCwic2VydmljZSI6dHJ1ZSwibXNnSURIZWFkZXJLZXkiOiJYLU51bWFmbG93LUlkIiwic3RvcmUiOnsidXJsIjoicmVkaXM6Ly9yZWRpczo2Mzc5In19fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIiLCJlbnYiOlt7Im5hbWUiOiJSVVNUX0xPRyIsInZhbHVlIjoiZGVidWcifV19LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InBsYW5uZXIiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJwbGFubmVyIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InRpZ2VyIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsidGlnZXIiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZG9nIiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZG9nIl0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sImJ1aWx0aW4iOm51bGwsImdyb3VwQnkiOm51bGx9LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6ImVsZXBoYW50IiwidWRmIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImFzY2lpOjAuMSIsImFyZ3MiOlsiZWxlcGhhbnQiXSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwiYnVpbHRpbiI6bnVsbCwiZ3JvdXBCeSI6bnVsbH0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiYXNjaWlhcnQiLCJ1ZGYiOnsiY29udGFpbmVyIjp7ImltYWdlIjoiYXNjaWk6MC4xIiwiYXJncyI6WyJhc2NpaWFydCJdLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJidWlsdGluIjpudWxsLCJncm91cEJ5IjpudWxsfSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fSx7Im5hbWUiOiJzZXJ2ZS1zaW5rIiwic2luayI6eyJ1ZHNpbmsiOnsiY29udGFpbmVyIjp7ImltYWdlIjoic2VydmVzaW5rOjAuMSIsImVudiI6W3sibmFtZSI6Ik5VTUFGTE9XX0NBTExCQUNLX1VSTF9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctQ2FsbGJhY2stVXJsIn0seyJuYW1lIjoiTlVNQUZMT1dfTVNHX0lEX0hFQURFUl9LRVkiLCJ2YWx1ZSI6IlgtTnVtYWZsb3ctSWQifV0sInJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn19LCJyZXRyeVN0cmF0ZWd5Ijp7fX0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImltYWdlUHVsbFBvbGljeSI6Ik5ldmVyIn0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fX0seyJuYW1lIjoiZXJyb3Itc2luayIsInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InNlcnZlc2luazowLjEiLCJlbnYiOlt7Im5hbWUiOiJOVU1BRkxPV19DQUxMQkFDS19VUkxfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUNhbGxiYWNrLVVybCJ9LHsibmFtZSI6Ik5VTUFGTE9XX01TR19JRF9IRUFERVJfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUlkIn1dLCJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9fSwicmV0cnlTdHJhdGVneSI6e319LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19XSwiZWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoicGxhbm5lciIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImFzY2lpYXJ0IiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiYXNjaWlhcnQiXX19fSx7ImZyb20iOiJwbGFubmVyIiwidG8iOiJ0aWdlciIsImNvbmRpdGlvbnMiOnsidGFncyI6eyJvcGVyYXRvciI6Im9yIiwidmFsdWVzIjpbInRpZ2VyIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZG9nIiwiY29uZGl0aW9ucyI6eyJ0YWdzIjp7Im9wZXJhdG9yIjoib3IiLCJ2YWx1ZXMiOlsiZG9nIl19fX0seyJmcm9tIjoicGxhbm5lciIsInRvIjoiZWxlcGhhbnQiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlbGVwaGFudCJdfX19LHsiZnJvbSI6InRpZ2VyIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZG9nIiwidG8iOiJzZXJ2ZS1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH0seyJmcm9tIjoiZWxlcGhhbnQiLCJ0byI6InNlcnZlLXNpbmsiLCJjb25kaXRpb25zIjpudWxsfSx7ImZyb20iOiJhc2NpaWFydCIsInRvIjoic2VydmUtc2luayIsImNvbmRpdGlvbnMiOm51bGx9LHsiZnJvbSI6InBsYW5uZXIiLCJ0byI6ImVycm9yLXNpbmsiLCJjb25kaXRpb25zIjp7InRhZ3MiOnsib3BlcmF0b3IiOiJvciIsInZhbHVlcyI6WyJlcnJvciJdfX19XSwibGlmZWN5Y2xlIjp7fSwid2F0ZXJtYXJrIjp7fX0="; @@ -271,47 +265,65 @@ mod tests { // Test insert_callback_requests let cbs = vec![ - CallbackRequest { + Callback { id: id.clone(), vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: id.clone(), vertex: "planner".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: Some(vec!["tiger".to_owned(), "asciiart".to_owned()]), + responses: vec![Response { + index: 0, + tags: Some(vec!["tiger".to_owned(), "asciiart".to_owned()]), + }], }, - CallbackRequest { + Callback { id: id.clone(), vertex: "tiger".to_string(), cb_time: 12345, from_vertex: "planner".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: id.clone(), vertex: "asciiart".to_string(), cb_time: 12345, from_vertex: "planner".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: id.clone(), vertex: "serve-sink".to_string(), cb_time: 12345, from_vertex: "tiger".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: id.clone(), vertex: "serve-sink".to_string(), cb_time: 12345, from_vertex: "asciiart".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }, ]; state.insert_callback_requests(cbs).await.unwrap(); @@ -345,12 +357,15 @@ mod tests { let store = InMemoryStore::new(); let mut state = State::new(msg_graph, store).await.unwrap(); - let cbs = vec![CallbackRequest { + let cbs = vec![Callback { id: "nonexistent_id".to_string(), vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }]; // Try to insert callback requests for an ID that hasn't been registered diff --git a/rust/serving/src/app/callback/store.rs b/rust/serving/src/app/callback/store.rs index af5f3c4368..4165a93406 100644 --- a/rust/serving/src/app/callback/store.rs +++ b/rust/serving/src/app/callback/store.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use crate::app::callback::CallbackRequest; +use crate::app::callback::Callback; // in-memory store pub(crate) mod memstore; @@ -9,10 +9,7 @@ pub(crate) mod redisstore; pub(crate) enum PayloadToSave { /// Callback as sent by Numaflow to track the progression - Callback { - key: String, - value: Arc, - }, + Callback { key: String, value: Arc }, /// Data sent by the Numaflow pipeline which is to be delivered as the response DatumFromPipeline { key: String, @@ -26,10 +23,7 @@ pub(crate) enum PayloadToSave { pub(crate) trait LocalStore { async fn save(&mut self, messages: Vec) -> crate::Result<()>; /// retrieve the callback payloads - async fn retrieve_callbacks( - &mut self, - id: &str, - ) -> Result>, crate::Error>; + async fn retrieve_callbacks(&mut self, id: &str) -> Result>, crate::Error>; async fn retrieve_datum(&mut self, id: &str) -> Result>, crate::Error>; async fn ready(&mut self) -> bool; } diff --git a/rust/serving/src/app/callback/store/memstore.rs b/rust/serving/src/app/callback/store/memstore.rs index a9cbaea31d..d5260d4563 100644 --- a/rust/serving/src/app/callback/store/memstore.rs +++ b/rust/serving/src/app/callback/store/memstore.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use super::PayloadToSave; -use crate::app::callback::CallbackRequest; +use crate::app::callback::Callback; use crate::consts::SAVED; use crate::Error; @@ -55,14 +55,14 @@ impl super::Store for InMemoryStore { /// Retrieves callbacks for a given id from the `HashMap`. /// Each callback is deserialized from bytes into a `CallbackRequest`. - async fn retrieve_callbacks(&mut self, id: &str) -> Result>, Error> { + async fn retrieve_callbacks(&mut self, id: &str) -> Result>, Error> { let data = self.data.lock().unwrap(); match data.get(id) { Some(result) => { let messages: Result, _> = result .iter() .map(|msg| { - let cbr: CallbackRequest = serde_json::from_slice(msg).map_err(|_| { + let cbr: Callback = serde_json::from_slice(msg).map_err(|_| { Error::StoreRead( "Failed to parse CallbackRequest from bytes".to_string(), ) @@ -98,18 +98,21 @@ mod tests { use super::*; use crate::app::callback::store::{PayloadToSave, Store}; - use crate::app::callback::CallbackRequest; + use crate::app::callback::{Callback, Response}; #[tokio::test] async fn test_save_and_retrieve_callbacks() { let mut store = InMemoryStore::new(); let key = "test_key".to_string(); - let value = Arc::new(CallbackRequest { + let value = Arc::new(Callback { id: "test_id".to_string(), vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }); // Save a callback @@ -179,12 +182,15 @@ mod tests { #[tokio::test] async fn test_save_invalid_callback() { let mut store = InMemoryStore::new(); - let value = Arc::new(CallbackRequest { + let value = Arc::new(Callback { id: "test_id".to_string(), vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }); // Try to save a callback with an invalid key diff --git a/rust/serving/src/app/callback/store/redisstore.rs b/rust/serving/src/app/callback/store/redisstore.rs index 4439e7ce8e..16197b387e 100644 --- a/rust/serving/src/app/callback/store/redisstore.rs +++ b/rust/serving/src/app/callback/store/redisstore.rs @@ -7,7 +7,7 @@ use redis::RedisError; use tokio::sync::Semaphore; use super::PayloadToSave; -use crate::app::callback::CallbackRequest; +use crate::app::callback::Callback; use crate::config::RedisConfig; use crate::consts::SAVED; use crate::Error; @@ -130,7 +130,7 @@ impl super::Store for RedisConnection { Ok(()) } - async fn retrieve_callbacks(&mut self, id: &str) -> Result>, Error> { + async fn retrieve_callbacks(&mut self, id: &str) -> Result>, Error> { let result: Result>, RedisError> = redis::cmd(LRANGE) .arg(id) .arg(0) @@ -147,7 +147,7 @@ impl super::Store for RedisConnection { let messages: Result, _> = result .into_iter() .map(|msg| { - let cbr: CallbackRequest = serde_json::from_slice(&msg).map_err(|e| { + let cbr: Callback = serde_json::from_slice(&msg).map_err(|e| { Error::StoreRead(format!("Parsing payload from bytes - {}", e)) })?; Ok(Arc::new(cbr)) @@ -201,11 +201,11 @@ impl super::Store for RedisConnection { #[cfg(feature = "redis-tests")] #[cfg(test)] mod tests { - use axum::body::Bytes; - use redis::AsyncCommands; - use super::*; use crate::app::callback::store::LocalStore; + use crate::app::callback::Response; + use axum::body::Bytes; + use redis::AsyncCommands; #[tokio::test] async fn test_redis_store() { @@ -225,12 +225,15 @@ mod tests { let ps_cb = PayloadToSave::Callback { key: key.clone(), - value: Arc::new(CallbackRequest { + value: Arc::new(Callback { id: String::from("1234"), vertex: String::from("prev_vertex"), cb_time: 1234, from_vertex: String::from("next_vertex"), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }), }; @@ -290,12 +293,15 @@ mod tests { .expect("Failed to connect to Redis"); let key = uuid::Uuid::new_v4().to_string(); - let value = Arc::new(CallbackRequest { + let value = Arc::new(Callback { id: String::from("test-redis-ttl"), vertex: String::from("vertex"), cb_time: 1234, from_vertex: String::from("next_vertex"), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }); // Save with TTL of 1 second diff --git a/rust/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs index eb083d57e9..001cbc421e 100644 --- a/rust/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -260,10 +260,11 @@ mod tests { use tower::ServiceExt; use super::*; + use crate::app::callback; use crate::app::callback::state::State as CallbackState; use crate::app::callback::store::memstore::InMemoryStore; use crate::app::callback::store::PayloadToSave; - use crate::app::callback::CallbackRequest; + use crate::app::callback::Callback; use crate::app::tracker::MessageGraph; use crate::pipeline::PipelineDCG; use crate::{Error, Settings}; @@ -277,10 +278,7 @@ mod tests { async fn save(&mut self, _messages: Vec) -> crate::Result<()> { Ok(()) } - async fn retrieve_callbacks( - &mut self, - _id: &str, - ) -> Result>, Error> { + async fn retrieve_callbacks(&mut self, _id: &str) -> Result>, Error> { Ok(vec![]) } async fn retrieve_datum(&mut self, _id: &str) -> Result>, Error> { @@ -355,28 +353,37 @@ mod tests { resp } - fn create_default_callbacks(id: &str) -> Vec { + fn create_default_callbacks(id: &str) -> Vec { vec![ - CallbackRequest { + Callback { id: id.to_string(), vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![callback::Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: id.to_string(), vertex: "cat".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - tags: None, + responses: vec![callback::Response { + index: 0, + tags: None, + }], }, - CallbackRequest { + Callback { id: id.to_string(), vertex: "out".to_string(), cb_time: 12345, from_vertex: "cat".to_string(), - tags: None, + responses: vec![callback::Response { + index: 0, + tags: None, + }], }, ] } diff --git a/rust/serving/src/app/tracker.rs b/rust/serving/src/app/tracker.rs index 33137f45db..9cdaa71854 100644 --- a/rust/serving/src/app/tracker.rs +++ b/rust/serving/src/app/tracker.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use crate::app::callback::CallbackRequest; +use crate::app::callback::Callback; use crate::pipeline::{Edge, OperatorType, PipelineDCG}; use crate::Error; @@ -24,8 +24,6 @@ struct Subgraph { blocks: Vec, } -const DROP: &str = "U+005C__DROP__"; - /// MessageGraph is a struct that generates the graph from the source vertex to the downstream vertices /// for a message using the given callbacks. pub(crate) struct MessageGraph { @@ -44,7 +42,7 @@ pub(crate) struct Block { // whether it has been visited or not. It is used to keep track of the visited callbacks. #[derive(Debug)] struct CallbackRequestWrapper { - callback_request: Arc, + callback_request: Arc, visited: bool, } @@ -56,7 +54,7 @@ impl MessageGraph { pub(crate) fn generate_subgraph_from_callbacks( &self, id: String, - callbacks: Vec>, + callbacks: Vec>, ) -> Result, Error> { // Create a HashMap to map each vertex to its corresponding callbacks let mut callback_map: HashMap> = HashMap::new(); @@ -126,7 +124,7 @@ impl MessageGraph { callback_map: &mut HashMap>, subgraph: &mut Subgraph, ) -> bool { - let mut current_callback: Option> = None; + let mut current_callback: Option> = None; // we need to borrow the callback_map as mutable to update the visited flag of the callback // so that next time when we visit the same callback, we can skip it. Because there can be cases @@ -160,67 +158,73 @@ impl MessageGraph { cb_time: current_callback.cb_time, }); - // if the current vertex has a DROP tag, then we should not proceed further - // and return true - if current_callback - .tags - .as_ref() - .map_or(false, |tags| tags.contains(&DROP.to_string())) - { + // if there are no responses, means the message is dropped, we can return true + if current_callback.responses.is_empty() { return true; } - // recursively invoke the downstream vertices of the current vertex, if any - if let Some(edges) = self.dag.get(¤t) { - for edge in edges { - // check if the edge should proceed based on the conditions - // if there are no conditions, we should proceed with the edge - // if there are conditions, we should check the tags of the current callback - // with the tags of the edge and the operator of the tags to decide if we should - // proceed with the edge - let should_proceed = edge - .conditions - .as_ref() - // If the edge has conditions, get the tags - .and_then(|conditions| conditions.tags.as_ref()) - // If there are no conditions or tags, default to true (i.e., proceed with the edge) - // If there are tags, compare the tags with the current callback's tags and the operator - // to decide if we should proceed with the edge. - .map_or(true, |tags| { - current_callback - .tags - .as_ref() - // If the current callback has no tags we should not proceed with the edge for "and" and "or" operators - // because we expect the current callback to have tags specified in the edge. - // If the current callback has no tags we should proceed with the edge for "not" operator. - // because we don't expect the current callback to have tags specified in the edge. - .map_or( - tags.operator.as_ref() == Some(&OperatorType::Not), - |callback_tags| { - tags.operator.as_ref().map_or(false, |operator| { - // If there is no operator, default to false (i.e., do not proceed with the edge) - // If there is an operator, compare the current callback's tags with the edge's tags - compare_slice(operator, callback_tags, &tags.values) - }) - }, - ) - }); - - // if the conditions are not met, then proceed to the next edge - if !should_proceed { - continue; - } - - // proceed to the downstream vertex - // if any of the downstream vertex returns false, then we should return false. - if !self.generate_subgraph(edge.to.clone(), current.clone(), callback_map, subgraph) - { - return false; + // iterate over the responses of the current callback, for flatmap operation there can + // more than one response, so we need to make sure all the responses are processed. + // For example a -> b -> c, lets say vertex a has 2 responses. That means we need to + // wait for two callbacks from vertex b, each message from vertex b can have 2 responses + // we will have to wait for 4 callbacks from vertex c. + for response in current_callback.responses.clone() { + // recursively invoke the downstream vertices of the current vertex, if any + if let Some(edges) = self.dag.get(¤t) { + for edge in edges { + // check if the edge should proceed based on the conditions + // if there are no conditions, we should proceed with the edge + // if there are conditions, we should check the tags of the current callback + // with the tags of the edge and the operator of the tags to decide if we should + // proceed with the edge + let should_proceed = edge + .conditions + .as_ref() + // If the edge has conditions, get the tags + .and_then(|conditions| conditions.tags.as_ref()) + // If there are no conditions or tags, default to true (i.e., proceed with the edge) + // If there are tags, compare the tags with the current callback's tags and the operator + // to decide if we should proceed with the edge. + .map_or(true, |tags| { + response + .tags + .as_ref() + // If the current callback has no tags we should not proceed with the edge for "and" and "or" operators + // because we expect the current callback to have tags specified in the edge. + // If the current callback has no tags we should proceed with the edge for "not" operator. + // because we don't expect the current callback to have tags specified in the edge. + .map_or( + tags.operator.as_ref() == Some(&OperatorType::Not), + |callback_tags| { + tags.operator.as_ref().map_or(false, |operator| { + // If there is no operator, default to false (i.e., do not proceed with the edge) + // If there is an operator, compare the current callback's tags with the edge's tags + compare_slice(operator, callback_tags, &tags.values) + }) + }, + ) + }); + + // if the conditions are not met, then proceed to the next edge + if !should_proceed { + continue; + } + + // proceed to the downstream vertex + // if any of the downstream vertex returns false, then we should return false. + if !self.generate_subgraph( + edge.to.clone(), + current.clone(), + callback_map, + subgraph, + ) { + return false; + } } } + // if there are no downstream vertices, or all the downstream vertices returned true, + // we can return true } - // if there are no downstream vertices, or all the downstream vertices returned true, - // we can return true true } @@ -238,6 +242,7 @@ impl MessageGraph { #[cfg(test)] mod tests { use super::*; + use crate::app::callback::Response; use crate::pipeline::{Conditions, Tag, Vertex}; #[test] @@ -264,12 +269,15 @@ mod tests { callback_map.insert( "a".to_string(), vec![CallbackRequestWrapper { - callback_request: Arc::new(CallbackRequest { + callback_request: Arc::new(Callback { id: "uuid1".to_string(), vertex: "a".to_string(), cb_time: 1, from_vertex: "a".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }), visited: false, }], @@ -313,12 +321,15 @@ mod tests { callback_map.insert( "a".to_string(), vec![CallbackRequestWrapper { - callback_request: Arc::new(CallbackRequest { + callback_request: Arc::new(Callback { id: "uuid1".to_string(), vertex: "a".to_string(), cb_time: 1, from_vertex: "a".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }), visited: false, }], @@ -327,12 +338,15 @@ mod tests { callback_map.insert( "b".to_string(), vec![CallbackRequestWrapper { - callback_request: Arc::new(CallbackRequest { + callback_request: Arc::new(Callback { id: "uuid1".to_string(), vertex: "b".to_string(), cb_time: 1, from_vertex: "a".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }), visited: false, }], @@ -341,12 +355,15 @@ mod tests { callback_map.insert( "c".to_string(), vec![CallbackRequestWrapper { - callback_request: Arc::new(CallbackRequest { + callback_request: Arc::new(Callback { id: "uuid1".to_string(), vertex: "c".to_string(), cb_time: 1, from_vertex: "a".to_string(), - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], }), visited: false, }], @@ -461,77 +478,86 @@ mod tests { let source_vertex = "a".to_string(); let raw_callback = r#"[ - { - "id": "xxxx", - "vertex": "a", - "from_vertex": "a", - "cb_time": 123456789 - }, - { - "id": "xxxx", - "vertex": "b", - "from_vertex": "a", - "cb_time": 123456867 - }, - { - "id": "xxxx", - "vertex": "c", - "from_vertex": "a", - "cb_time": 123456819 - }, - { - "id": "xxxx", - "vertex": "d", - "from_vertex": "b", - "cb_time": 123456840 - }, - { - "id": "xxxx", - "vertex": "e", - "from_vertex": "c", - "cb_time": 123456843 - }, - { - "id": "xxxx", - "vertex": "f", - "from_vertex": "d", - "cb_time": 123456854 - }, - { - "id": "xxxx", - "vertex": "f", - "from_vertex": "e", - "cb_time": 123456886 - }, - { - "id": "xxxx", - "vertex": "g", - "from_vertex": "f", - "tags": ["even"], - "cb_time": 123456885 - }, - { - "id": "xxxx", - "vertex": "g", - "from_vertex": "f", - "tags": ["even"], - "cb_time": 123456888 - }, - { - "id": "xxxx", - "vertex": "h", - "from_vertex": "g", - "cb_time": 123456889 - }, - { - "id": "xxxx", - "vertex": "h", - "from_vertex": "g", - "cb_time": 123456890 - } - ]"#; + { + "id": "xxxx", + "vertex": "a", + "from_vertex": "a", + "cb_time": 123456789, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "b", + "from_vertex": "a", + "cb_time": 123456867, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "c", + "from_vertex": "a", + "cb_time": 123456819, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "d", + "from_vertex": "b", + "cb_time": 123456840, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "e", + "from_vertex": "c", + "cb_time": 123456843, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "d", + "cb_time": 123456854, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "e", + "cb_time": 123456886, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456885, + "responses": [{"index": 0, "tags": ["even"]}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456888, + "responses": [{"index": 0, "tags": ["even"]}] + }, + { + "id": "xxxx", + "vertex": "h", + "from_vertex": "g", + "cb_time": 123456889, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "h", + "from_vertex": "g", + "cb_time": 123456890, + "responses": [{"index": 0, "tags": null}] + } + ]"#; - let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); + let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); let mut callback_map: HashMap> = HashMap::new(); for callback in callbacks { @@ -590,23 +616,24 @@ mod tests { let source_vertex = "a".to_string(); let raw_callback = r#" - [ - { - "id": "xxxx", - "vertex": "a", - "from_vertex": "a", - "cb_time": 123456789 - }, - { - "id": "xxxx", - "vertex": "b", - "from_vertex": "a", - "cb_time": 123456867, - "tags": ["U+005C__DROP__"] - } - ]"#; + [ + { + "id": "xxxx", + "vertex": "a", + "from_vertex": "a", + "cb_time": 123456789, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "b", + "from_vertex": "a", + "cb_time": 123456867, + "responses": [] + } + ]"#; - let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); + let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); let mut callback_map: HashMap> = HashMap::new(); for callback in callbacks { @@ -728,54 +755,59 @@ mod tests { let source_vertex = "a".to_string(); let raw_callback = r#" - [ - { - "id": "xxxx", - "vertex": "a", - "from_vertex": "a", - "cb_time": 123456789 - }, - { - "id": "xxxx", - "vertex": "b", - "from_vertex": "a", - "cb_time": 123456867 - }, - { - "id": "xxxx", - "vertex": "c", - "from_vertex": "a", - "cb_time": 123456819, - "tags": ["U+005C__DROP__"] - }, - { - "id": "xxxx", - "vertex": "d", - "from_vertex": "b", - "cb_time": 123456840 - }, - { - "id": "xxxx", - "vertex": "f", - "from_vertex": "d", - "cb_time": 123456854 - }, - { - "id": "xxxx", - "vertex": "g", - "from_vertex": "f", - "tags": ["even"], - "cb_time": 123456885 - }, - { - "id": "xxxx", - "vertex": "h", - "from_vertex": "g", - "cb_time": 123456889 - } - ]"#; + [ + { + "id": "xxxx", + "vertex": "a", + "from_vertex": "a", + "cb_time": 123456789, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "b", + "from_vertex": "a", + "cb_time": 123456867, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "c", + "from_vertex": "a", + "cb_time": 123456819, + "responses": [] + }, + { + "id": "xxxx", + "vertex": "d", + "from_vertex": "b", + "cb_time": 123456840, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "d", + "cb_time": 123456854, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456885, + "responses": [{"index": 0, "tags": ["even"]}] + }, + { + "id": "xxxx", + "vertex": "h", + "from_vertex": "g", + "cb_time": 123456889, + "responses": [{"index": 0, "tags": null}] + } + ]"#; - let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); + let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); let mut callback_map: HashMap> = HashMap::new(); for callback in callbacks { @@ -844,36 +876,43 @@ mod tests { "id": "xxxx", "vertex": "a", "from_vertex": "a", - "cb_time": 123456789 + "cb_time": 123456789, + "responses": [{"index": 0, "tags": null}] }, { "id": "xxxx", "vertex": "b", "from_vertex": "a", "cb_time": 123456867, - "tags": ["failed"] + "responses": [{"index": 0, "tags": ["failed"]}] }, { "id": "xxxx", "vertex": "a", "from_vertex": "b", - "cb_time": 123456819 + "cb_time": 123456819, + "responses": [{"index": 0, "tags": null}] + }, { "id": "xxxx", "vertex": "b", "from_vertex": "a", - "cb_time": 123456819 + "cb_time": 123456819, + "responses": [{"index": 0, "tags": null}] + }, { "id": "xxxx", "vertex": "c", "from_vertex": "b", - "cb_time": 123456819 + "cb_time": 123456819, + "responses": [{"index": 0, "tags": null}] + } ]"#; - let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); + let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); let mut callback_map: HashMap> = HashMap::new(); for callback in callbacks { @@ -915,12 +954,15 @@ mod tests { let message_graph = MessageGraph { dag }; // Create a callback with an invalid vertex - let callbacks = vec![Arc::new(CallbackRequest { + let callbacks = vec![Arc::new(Callback { id: "test".to_string(), vertex: "invalid_vertex".to_string(), from_vertex: "invalid_vertex".to_string(), cb_time: 1, - tags: None, + responses: vec![Response { + index: 0, + tags: None, + }], })]; // Call the function with the invalid callback @@ -930,4 +972,415 @@ mod tests { assert!(result.is_err()); assert!(matches!(result, Err(Error::SubGraphInvalidInput(_)))); } + + #[test] + fn test_flatmap_operation_with_simple_dag() { + let pipeline = PipelineDCG { + vertices: vec![ + Vertex { + name: "a".to_string(), + }, + Vertex { + name: "b".to_string(), + }, + Vertex { + name: "c".to_string(), + }, + ], + edges: vec![ + Edge { + from: "a".to_string(), + to: "b".to_string(), + conditions: None, + }, + Edge { + from: "b".to_string(), + to: "c".to_string(), + conditions: None, + }, + ], + }; + + let message_graph = MessageGraph::from_pipeline(&pipeline).unwrap(); + let source_vertex = "a".to_string(); + + let raw_callback = r#"[ + { + "id": "xxxx", + "vertex": "a", + "from_vertex": "a", + "cb_time": 123456789, + "responses": [ + {"index": 0, "tags": null}, + {"index": 1, "tags": null} + ] + }, + { + "id": "xxxx", + "vertex": "b", + "from_vertex": "a", + "cb_time": 123456867, + "responses": [ + {"index": 0, "tags": null}, + {"index": 1, "tags": null} + ] + }, + { + "id": "xxxx", + "vertex": "b", + "from_vertex": "a", + "cb_time": 123456868, + "responses": [ + {"index": 0, "tags": null}, + {"index": 1, "tags": null} + ] + }, + { + "id": "xxxx", + "vertex": "c", + "from_vertex": "b", + "cb_time": 123456869, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "c", + "from_vertex": "b", + "cb_time": 123456870, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "c", + "from_vertex": "b", + "cb_time": 123456871, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "c", + "from_vertex": "b", + "cb_time": 123456872, + "responses": [{"index": 0, "tags": null}] + } + ]"#; + + let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); + let mut callback_map: HashMap> = HashMap::new(); + + for callback in callbacks { + callback_map + .entry(callback.vertex.clone()) + .or_default() + .push(CallbackRequestWrapper { + callback_request: Arc::new(callback), + visited: false, + }); + } + + let mut subgraph: Subgraph = Subgraph { + id: "xxxx".to_string(), + blocks: Vec::new(), + }; + let result = message_graph.generate_subgraph( + source_vertex.clone(), + source_vertex, + &mut callback_map, + &mut subgraph, + ); + + assert!(result); + } + + #[test] + fn test_flatmap_operation_with_complex_dag() { + let pipeline = PipelineDCG { + vertices: vec![ + Vertex { + name: "a".to_string(), + }, + Vertex { + name: "b".to_string(), + }, + Vertex { + name: "c".to_string(), + }, + Vertex { + name: "d".to_string(), + }, + Vertex { + name: "e".to_string(), + }, + Vertex { + name: "f".to_string(), + }, + Vertex { + name: "g".to_string(), + }, + Vertex { + name: "h".to_string(), + }, + Vertex { + name: "i".to_string(), + }, + ], + edges: vec![ + Edge { + from: "a".to_string(), + to: "b".to_string(), + conditions: None, + }, + Edge { + from: "a".to_string(), + to: "c".to_string(), + conditions: None, + }, + Edge { + from: "b".to_string(), + to: "d".to_string(), + conditions: None, + }, + Edge { + from: "c".to_string(), + to: "e".to_string(), + conditions: None, + }, + Edge { + from: "d".to_string(), + to: "f".to_string(), + conditions: None, + }, + Edge { + from: "e".to_string(), + to: "f".to_string(), + conditions: None, + }, + Edge { + from: "f".to_string(), + to: "g".to_string(), + conditions: None, + }, + Edge { + from: "g".to_string(), + to: "h".to_string(), + conditions: Some(Conditions { + tags: Some(Tag { + operator: Some(OperatorType::And), + values: vec!["even".to_string()], + }), + }), + }, + Edge { + from: "g".to_string(), + to: "i".to_string(), + conditions: Some(Conditions { + tags: Some(Tag { + operator: Some(OperatorType::Or), + values: vec!["odd".to_string()], + }), + }), + }, + ], + }; + + let message_graph = MessageGraph::from_pipeline(&pipeline).unwrap(); + let source_vertex = "a".to_string(); + + let raw_callback = r#"[ + { + "id": "xxxx", + "vertex": "a", + "from_vertex": "a", + "cb_time": 123456789, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "b", + "from_vertex": "a", + "cb_time": 123456867, + "responses": [ + {"index": 0, "tags": null}, + {"index": 1, "tags": null} + ] + }, + { + "id": "xxxx", + "vertex": "c", + "from_vertex": "a", + "cb_time": 123456819, + "responses": [ + {"index": 0, "tags": null}, + {"index": 1, "tags": null}, + {"index": 2, "tags": null} + ] + }, + { + "id": "xxxx", + "vertex": "d", + "from_vertex": "b", + "cb_time": 123456840, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "d", + "from_vertex": "b", + "cb_time": 123456841, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "e", + "from_vertex": "c", + "cb_time": 123456843, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "e", + "from_vertex": "c", + "cb_time": 123456844, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "e", + "from_vertex": "c", + "cb_time": 123456845, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "d", + "cb_time": 123456854, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "d", + "cb_time": 123456854, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "e", + "cb_time": 123456886, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "e", + "cb_time": 123456887, + "responses": [{"index": 1, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "f", + "from_vertex": "e", + "cb_time": 123456888, + "responses": [{"index": 2, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456885, + "responses": [{"index": 0, "tags": ["even"]}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456886, + "responses": [{"index": 1, "tags": ["even"]}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456887, + "responses": [{"index": 2, "tags": ["odd"]}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456888, + "responses": [{"index": 3, "tags": ["odd"]}] + }, + { + "id": "xxxx", + "vertex": "g", + "from_vertex": "f", + "cb_time": 123456889, + "responses": [{"index": 4, "tags": ["odd"]}] + }, + { + "id": "xxxx", + "vertex": "h", + "from_vertex": "g", + "cb_time": 123456890, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "h", + "from_vertex": "g", + "cb_time": 123456891, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "i", + "from_vertex": "g", + "cb_time": 123456892, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "i", + "from_vertex": "g", + "cb_time": 123456893, + "responses": [{"index": 0, "tags": null}] + }, + { + "id": "xxxx", + "vertex": "i", + "from_vertex": "g", + "cb_time": 123456894, + "responses": [{"index": 0, "tags": null}] + } + ]"#; + + let callbacks: Vec = serde_json::from_str(raw_callback).unwrap(); + let mut callback_map: HashMap> = HashMap::new(); + + for callback in callbacks { + callback_map + .entry(callback.vertex.clone()) + .or_default() + .push(CallbackRequestWrapper { + callback_request: Arc::new(callback), + visited: false, + }); + } + + let mut subgraph: Subgraph = Subgraph { + id: "xxxx".to_string(), + blocks: Vec::new(), + }; + let result = message_graph.generate_subgraph( + source_vertex.clone(), + source_vertex, + &mut callback_map, + &mut subgraph, + ); + + assert!(result); + } } From 4f9a4788d9c046091e99a8a6da12aa4cdfa9196f Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 15 Jan 2025 10:03:34 +0530 Subject: [PATCH 2/5] update comment Signed-off-by: Yashash H L --- rust/serving/src/app/tracker.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rust/serving/src/app/tracker.rs b/rust/serving/src/app/tracker.rs index 9cdaa71854..311a1edae7 100644 --- a/rust/serving/src/app/tracker.rs +++ b/rust/serving/src/app/tracker.rs @@ -165,9 +165,8 @@ impl MessageGraph { // iterate over the responses of the current callback, for flatmap operation there can // more than one response, so we need to make sure all the responses are processed. - // For example a -> b -> c, lets say vertex a has 2 responses. That means we need to - // wait for two callbacks from vertex b, each message from vertex b can have 2 responses - // we will have to wait for 4 callbacks from vertex c. + // For example a -> b -> c, lets say vertex a has 2 responses. We will have to recursively + // find the subgraph for both the responses. for response in current_callback.responses.clone() { // recursively invoke the downstream vertices of the current vertex, if any if let Some(edges) = self.dag.get(¤t) { From 8728f097b5262f0764e20cda50689837cb4aebae Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 15 Jan 2025 11:31:09 +0530 Subject: [PATCH 3/5] remove index field Signed-off-by: Yashash H L --- rust/serving/src/app/callback.rs | 21 +--- rust/serving/src/app/callback/state.rs | 31 +---- .../src/app/callback/store/memstore.rs | 10 +- .../src/app/callback/store/redisstore.rs | 10 +- rust/serving/src/app/jetstream_proxy.rs | 15 +-- rust/serving/src/app/tracker.rs | 119 ++++++++---------- 6 files changed, 69 insertions(+), 137 deletions(-) diff --git a/rust/serving/src/app/callback.rs b/rust/serving/src/app/callback.rs index 6932aadb5f..61295d52f7 100644 --- a/rust/serving/src/app/callback.rs +++ b/rust/serving/src/app/callback.rs @@ -23,7 +23,6 @@ pub(crate) struct Callback { #[derive(Debug, Serialize, Deserialize, Clone)] pub(crate) struct Response { - pub(crate) index: u16, pub(crate) tags: Option>, } @@ -118,10 +117,7 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }]; let res = Request::builder() @@ -157,30 +153,21 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, Callback { id: "test_id".to_string(), vertex: "cat".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, Callback { id: "test_id".to_string(), vertex: "out".to_string(), cb_time: 12345, from_vertex: "cat".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, ]; diff --git a/rust/serving/src/app/callback/state.rs b/rust/serving/src/app/callback/state.rs index f86b4a45ef..419a40f60f 100644 --- a/rust/serving/src/app/callback/state.rs +++ b/rust/serving/src/app/callback/state.rs @@ -270,10 +270,7 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, Callback { id: id.clone(), @@ -281,7 +278,6 @@ mod tests { cb_time: 12345, from_vertex: "in".to_string(), responses: vec![Response { - index: 0, tags: Some(vec!["tiger".to_owned(), "asciiart".to_owned()]), }], }, @@ -290,40 +286,28 @@ mod tests { vertex: "tiger".to_string(), cb_time: 12345, from_vertex: "planner".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, Callback { id: id.clone(), vertex: "asciiart".to_string(), cb_time: 12345, from_vertex: "planner".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, Callback { id: id.clone(), vertex: "serve-sink".to_string(), cb_time: 12345, from_vertex: "tiger".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, Callback { id: id.clone(), vertex: "serve-sink".to_string(), cb_time: 12345, from_vertex: "asciiart".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }, ]; state.insert_callback_requests(cbs).await.unwrap(); @@ -362,10 +346,7 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }]; // Try to insert callback requests for an ID that hasn't been registered diff --git a/rust/serving/src/app/callback/store/memstore.rs b/rust/serving/src/app/callback/store/memstore.rs index d5260d4563..4c5c36666d 100644 --- a/rust/serving/src/app/callback/store/memstore.rs +++ b/rust/serving/src/app/callback/store/memstore.rs @@ -109,10 +109,7 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }); // Save a callback @@ -187,10 +184,7 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }); // Try to save a callback with an invalid key diff --git a/rust/serving/src/app/callback/store/redisstore.rs b/rust/serving/src/app/callback/store/redisstore.rs index 16197b387e..d4fe501cdb 100644 --- a/rust/serving/src/app/callback/store/redisstore.rs +++ b/rust/serving/src/app/callback/store/redisstore.rs @@ -230,10 +230,7 @@ mod tests { vertex: String::from("prev_vertex"), cb_time: 1234, from_vertex: String::from("next_vertex"), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }), }; @@ -298,10 +295,7 @@ mod tests { vertex: String::from("vertex"), cb_time: 1234, from_vertex: String::from("next_vertex"), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }); // Save with TTL of 1 second diff --git a/rust/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs index 001cbc421e..4c864c64c4 100644 --- a/rust/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -360,30 +360,21 @@ mod tests { vertex: "in".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![callback::Response { - index: 0, - tags: None, - }], + responses: vec![callback::Response { tags: None }], }, Callback { id: id.to_string(), vertex: "cat".to_string(), cb_time: 12345, from_vertex: "in".to_string(), - responses: vec![callback::Response { - index: 0, - tags: None, - }], + responses: vec![callback::Response { tags: None }], }, Callback { id: id.to_string(), vertex: "out".to_string(), cb_time: 12345, from_vertex: "cat".to_string(), - responses: vec![callback::Response { - index: 0, - tags: None, - }], + responses: vec![callback::Response { tags: None }], }, ] } diff --git a/rust/serving/src/app/tracker.rs b/rust/serving/src/app/tracker.rs index 311a1edae7..e47d31216f 100644 --- a/rust/serving/src/app/tracker.rs +++ b/rust/serving/src/app/tracker.rs @@ -273,10 +273,7 @@ mod tests { vertex: "a".to_string(), cb_time: 1, from_vertex: "a".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }), visited: false, }], @@ -325,10 +322,7 @@ mod tests { vertex: "a".to_string(), cb_time: 1, from_vertex: "a".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }), visited: false, }], @@ -342,10 +336,7 @@ mod tests { vertex: "b".to_string(), cb_time: 1, from_vertex: "a".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }), visited: false, }], @@ -359,10 +350,7 @@ mod tests { vertex: "c".to_string(), cb_time: 1, from_vertex: "a".to_string(), - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], }), visited: false, }], @@ -482,77 +470,77 @@ mod tests { "vertex": "a", "from_vertex": "a", "cb_time": 123456789, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "b", "from_vertex": "a", "cb_time": 123456867, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "c", "from_vertex": "a", "cb_time": 123456819, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "d", "from_vertex": "b", "cb_time": 123456840, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "e", "from_vertex": "c", "cb_time": 123456843, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "f", "from_vertex": "d", "cb_time": 123456854, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "f", "from_vertex": "e", "cb_time": 123456886, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "g", "from_vertex": "f", "cb_time": 123456885, - "responses": [{"index": 0, "tags": ["even"]}] + "responses": [{"tags": ["even"]}] }, { "id": "xxxx", "vertex": "g", "from_vertex": "f", "cb_time": 123456888, - "responses": [{"index": 0, "tags": ["even"]}] + "responses": [{"tags": ["even"]}] }, { "id": "xxxx", "vertex": "h", "from_vertex": "g", "cb_time": 123456889, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "h", "from_vertex": "g", "cb_time": 123456890, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] } ]"#; @@ -621,7 +609,7 @@ mod tests { "vertex": "a", "from_vertex": "a", "cb_time": 123456789, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", @@ -760,14 +748,14 @@ mod tests { "vertex": "a", "from_vertex": "a", "cb_time": 123456789, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "b", "from_vertex": "a", "cb_time": 123456867, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", @@ -781,28 +769,28 @@ mod tests { "vertex": "d", "from_vertex": "b", "cb_time": 123456840, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "f", "from_vertex": "d", "cb_time": 123456854, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "g", "from_vertex": "f", "cb_time": 123456885, - "responses": [{"index": 0, "tags": ["even"]}] + "responses": [{"tags": ["even"]}] }, { "id": "xxxx", "vertex": "h", "from_vertex": "g", "cb_time": 123456889, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] } ]"#; @@ -876,21 +864,21 @@ mod tests { "vertex": "a", "from_vertex": "a", "cb_time": 123456789, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "b", "from_vertex": "a", "cb_time": 123456867, - "responses": [{"index": 0, "tags": ["failed"]}] + "responses": [{"tags": ["failed"]}] }, { "id": "xxxx", "vertex": "a", "from_vertex": "b", "cb_time": 123456819, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { @@ -898,7 +886,7 @@ mod tests { "vertex": "b", "from_vertex": "a", "cb_time": 123456819, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { @@ -906,7 +894,7 @@ mod tests { "vertex": "c", "from_vertex": "b", "cb_time": 123456819, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] } ]"#; @@ -958,10 +946,7 @@ mod tests { vertex: "invalid_vertex".to_string(), from_vertex: "invalid_vertex".to_string(), cb_time: 1, - responses: vec![Response { - index: 0, - tags: None, - }], + responses: vec![Response { tags: None }], })]; // Call the function with the invalid callback @@ -1010,7 +995,7 @@ mod tests { "from_vertex": "a", "cb_time": 123456789, "responses": [ - {"index": 0, "tags": null}, + {"tags": null}, {"index": 1, "tags": null} ] }, @@ -1020,7 +1005,7 @@ mod tests { "from_vertex": "a", "cb_time": 123456867, "responses": [ - {"index": 0, "tags": null}, + {"tags": null}, {"index": 1, "tags": null} ] }, @@ -1030,7 +1015,7 @@ mod tests { "from_vertex": "a", "cb_time": 123456868, "responses": [ - {"index": 0, "tags": null}, + {"tags": null}, {"index": 1, "tags": null} ] }, @@ -1039,28 +1024,28 @@ mod tests { "vertex": "c", "from_vertex": "b", "cb_time": 123456869, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "c", "from_vertex": "b", "cb_time": 123456870, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "c", "from_vertex": "b", "cb_time": 123456871, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "c", "from_vertex": "b", "cb_time": 123456872, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] } ]"#; @@ -1191,7 +1176,7 @@ mod tests { "vertex": "a", "from_vertex": "a", "cb_time": 123456789, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", @@ -1199,7 +1184,7 @@ mod tests { "from_vertex": "a", "cb_time": 123456867, "responses": [ - {"index": 0, "tags": null}, + {"tags": null}, {"index": 1, "tags": null} ] }, @@ -1209,7 +1194,7 @@ mod tests { "from_vertex": "a", "cb_time": 123456819, "responses": [ - {"index": 0, "tags": null}, + {"tags": null}, {"index": 1, "tags": null}, {"index": 2, "tags": null} ] @@ -1219,56 +1204,56 @@ mod tests { "vertex": "d", "from_vertex": "b", "cb_time": 123456840, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "d", "from_vertex": "b", "cb_time": 123456841, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "e", "from_vertex": "c", "cb_time": 123456843, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "e", "from_vertex": "c", "cb_time": 123456844, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "e", "from_vertex": "c", "cb_time": 123456845, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "f", "from_vertex": "d", "cb_time": 123456854, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "f", "from_vertex": "d", "cb_time": 123456854, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "f", "from_vertex": "e", "cb_time": 123456886, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", @@ -1289,7 +1274,7 @@ mod tests { "vertex": "g", "from_vertex": "f", "cb_time": 123456885, - "responses": [{"index": 0, "tags": ["even"]}] + "responses": [{"tags": ["even"]}] }, { "id": "xxxx", @@ -1324,35 +1309,35 @@ mod tests { "vertex": "h", "from_vertex": "g", "cb_time": 123456890, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "h", "from_vertex": "g", "cb_time": 123456891, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "i", "from_vertex": "g", "cb_time": 123456892, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "i", "from_vertex": "g", "cb_time": 123456893, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] }, { "id": "xxxx", "vertex": "i", "from_vertex": "g", "cb_time": 123456894, - "responses": [{"index": 0, "tags": null}] + "responses": [{"tags": null}] } ]"#; From 9431405d44e23c30a9895d75fc8cbe69e2296e96 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Tue, 14 Jan 2025 22:43:23 -0800 Subject: [PATCH 4/5] doc: comments Signed-off-by: Vigith Maurice --- rust/serving/src/app/callback.rs | 7 +++++++ rust/serving/src/app/tracker.rs | 33 ++++++++++++++++++++------------ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/rust/serving/src/app/callback.rs b/rust/serving/src/app/callback.rs index 61295d52f7..64e007d33a 100644 --- a/rust/serving/src/app/callback.rs +++ b/rust/serving/src/app/callback.rs @@ -12,17 +12,24 @@ use state::State as CallbackState; /// store for storing the state pub(crate) mod store; +/// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback +/// to inform that message has been processed by this component. #[derive(Debug, Serialize, Deserialize, Clone)] pub(crate) struct Callback { pub(crate) id: String, pub(crate) vertex: String, pub(crate) cb_time: u64, pub(crate) from_vertex: String, + /// Due to flat-map operation, we can have 0 or more responses. + // TODO: Arc<[T]> pub(crate) responses: Vec, } +/// It contains details about the `To` vertex via tags (conditional forwarding). #[derive(Debug, Serialize, Deserialize, Clone)] pub(crate) struct Response { + /// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that + /// the message has been dropped. pub(crate) tags: Option>, } diff --git a/rust/serving/src/app/tracker.rs b/rust/serving/src/app/tracker.rs index e47d31216f..5f3b24db7b 100644 --- a/rust/serving/src/app/tracker.rs +++ b/rust/serving/src/app/tracker.rs @@ -16,6 +16,8 @@ fn compare_slice(operator: &OperatorType, a: &[String], b: &[String]) -> bool { } } +/// hash map of vertex and its edges. The key of the HashMap and `Edge.from` are same, `Edge.to` will +/// help find the adjacent vertex. type Graph = HashMap>; #[derive(Serialize, Deserialize, Debug)] @@ -47,10 +49,12 @@ struct CallbackRequestWrapper { } impl MessageGraph { - /// This function generates a sub graph from a list of callbacks. + /// Generates a sub graph from a list of [Callback]. /// It first creates a HashMap to map each vertex to its corresponding callbacks. - /// Then it finds the source vertex by checking if the vertex and from_vertex fields are the same. - /// Finally, it calls the `generate_subgraph` function to generate the subgraph from the source vertex. + /// NOTE: it finds checks whether the callback is from the originating source vertex by checking + /// if the vertex and from_vertex fields are the same. + /// Finally, it calls the [Self::generate_subgraph] function to generate the subgraph from the source + /// vertex. pub(crate) fn generate_subgraph_from_callbacks( &self, id: String, @@ -113,10 +117,13 @@ impl MessageGraph { } } - // generate_subgraph function is a recursive function that generates the sub graph from the source vertex for - // the given list of callbacks. The function returns true if the subgraph is generated successfully(if we are - // able to find a subgraph for the message using the given callbacks), it - // updates the subgraph with the path from the source vertex to the downstream vertices. + /// generate_subgraph function is a recursive function that generates the sub graph from the source + /// vertex for the given list of callbacks. The function returns true if the subgraph is + /// generated successfully (if we are able to find a subgraph for the message using the given + /// callbacks), it updates the subgraph with the path from the source vertex to the downstream + /// vertices. + /// It uses the pipeline DAG [Graph] and the [Callback]'s HashMap to check whether sub-graph is + /// complete. fn generate_subgraph( &self, current: String, @@ -167,7 +174,7 @@ impl MessageGraph { // more than one response, so we need to make sure all the responses are processed. // For example a -> b -> c, lets say vertex a has 2 responses. We will have to recursively // find the subgraph for both the responses. - for response in current_callback.responses.clone() { + for response in current_callback.responses.iter() { // recursively invoke the downstream vertices of the current vertex, if any if let Some(edges) = self.dag.get(¤t) { for edge in edges { @@ -176,7 +183,7 @@ impl MessageGraph { // if there are conditions, we should check the tags of the current callback // with the tags of the edge and the operator of the tags to decide if we should // proceed with the edge - let should_proceed = edge + let will_continue_the_path = edge .conditions .as_ref() // If the edge has conditions, get the tags @@ -204,12 +211,14 @@ impl MessageGraph { ) }); - // if the conditions are not met, then proceed to the next edge - if !should_proceed { + // if the conditions are not met, then proceed to the next edge because conditions + // for forwarding the message did not match the conditional forwarding requirements. + if !will_continue_the_path { + // let's move on to the next edge continue; } - // proceed to the downstream vertex + // recursively proceed to the next downstream vertex // if any of the downstream vertex returns false, then we should return false. if !self.generate_subgraph( edge.to.clone(), From fd44a6bf4afc21bf23279ab771b068dd08d96940 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 15 Jan 2025 12:49:54 +0530 Subject: [PATCH 5/5] remove todo, avoid clone Signed-off-by: Yashash H L --- rust/numaflow-core/src/metrics.rs | 1 - rust/serving/src/app/callback.rs | 7 +++---- rust/serving/src/app/jetstream_proxy.rs | 8 ++++---- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index d63b82de7e..4549ade9fe 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -718,7 +718,6 @@ struct TimestampedPending { #[derive(Clone)] pub(crate) enum LagReader { Source(Source), - // TODO: Arc<[T]> ISB(Vec), // multiple partitions } diff --git a/rust/serving/src/app/callback.rs b/rust/serving/src/app/callback.rs index 64e007d33a..d5708a4e62 100644 --- a/rust/serving/src/app/callback.rs +++ b/rust/serving/src/app/callback.rs @@ -14,22 +14,21 @@ pub(crate) mod store; /// As message passes through each component (map, transformer, sink, etc.). it emits a beacon via callback /// to inform that message has been processed by this component. -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct Callback { pub(crate) id: String, pub(crate) vertex: String, pub(crate) cb_time: u64, pub(crate) from_vertex: String, /// Due to flat-map operation, we can have 0 or more responses. - // TODO: Arc<[T]> pub(crate) responses: Vec, } /// It contains details about the `To` vertex via tags (conditional forwarding). -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct Response { /// If tags is None, the message is forwarded to all vertices, if len(Vec) == 0, it means that - /// the message has been dropped. + /// the message has been dropped. pub(crate) tags: Option>, } diff --git a/rust/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs index 4c864c64c4..6d4d9bb737 100644 --- a/rust/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -415,10 +415,10 @@ mod tests { let app = jetstream_proxy(app_state).await.unwrap(); tokio::spawn(async move { - let cbs = create_default_callbacks(ID_VALUE); let mut retries = 0; loop { - match callback_state.insert_callback_requests(cbs.clone()).await { + let cbs = create_default_callbacks(ID_VALUE); + match callback_state.insert_callback_requests(cbs).await { Ok(_) => break, Err(e) => { retries += 1; @@ -487,14 +487,14 @@ mod tests { let app = jetstream_proxy(app_state).await.unwrap(); // pipeline is in -> cat -> out, so we will have 3 callback requests - let cbs = create_default_callbacks(ID_VALUE); // spawn a tokio task which will insert the callback requests to the callback state // if it fails, sleep for 10ms and retry tokio::spawn(async move { let mut retries = 0; loop { - match callback_state.insert_callback_requests(cbs.clone()).await { + let cbs = create_default_callbacks(ID_VALUE); + match callback_state.insert_callback_requests(cbs).await { Ok(_) => { // save a test message, we should get this message when serve is invoked // with foobar id