From 293ccf6e1c22cf665a1321f6be1e2064c181172e Mon Sep 17 00:00:00 2001 From: Shubham Dixit Date: Thu, 30 May 2024 13:24:42 +0530 Subject: [PATCH] feat: Message To Drop Semantics (#47) Signed-off-by: shubham Signed-off-by: Yashash H L Co-authored-by: Yashash H L --- examples/map-cat/src/main.rs | 9 +- examples/map-tickgen-serde/src/main.rs | 12 +- examples/reduce-counter/src/main.rs | 9 +- examples/source-transformer-now/src/main.rs | 10 +- src/map.rs | 125 ++++++++++++++--- src/reduce.rs | 140 +++++++++++++++++--- src/sourcetransform.rs | 133 +++++++++++++++++-- 7 files changed, 367 insertions(+), 71 deletions(-) diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index 971d479..4eefaf4 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -11,10 +11,9 @@ struct Cat; #[tonic::async_trait] impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { - vec![map::Message { - keys: input.keys, - value: input.value, - tags: vec![], - }] + let message = map::Message::new(input.value) + .keys(input.keys) + .tags(vec![]); + vec![message] } } diff --git a/examples/map-tickgen-serde/src/main.rs b/examples/map-tickgen-serde/src/main.rs index e7b7f20..645d215 100644 --- a/examples/map-tickgen-serde/src/main.rs +++ b/examples/map-tickgen-serde/src/main.rs @@ -1,4 +1,5 @@ use numaflow::map; +use numaflow::map::Message; #[tokio::main] async fn main() -> Result<(), Box> { @@ -38,14 +39,15 @@ impl map::Mapper for TickGen { let ts = Utc .timestamp_nanos(payload.created_ts) .to_rfc3339_opts(SecondsFormat::Nanos, true); - vec![map::Message { - keys: input.keys, - value: serde_json::to_vec(&ResultPayload { + let message = map::Message::new( + serde_json::to_vec(&ResultPayload { value: payload.data.value, time: ts, }) .unwrap_or_default(), - tags: vec![], - }] + ) + .keys(input.keys) + .tags(vec![]); + vec![message] } } diff --git a/examples/reduce-counter/src/main.rs b/examples/reduce-counter/src/main.rs index 3bacd1a..7e79882 100644 --- a/examples/reduce-counter/src/main.rs +++ b/examples/reduce-counter/src/main.rs @@ -48,11 +48,10 @@ mod counter { md.start_time(), md.end_time() ); - vec![Message { - keys: keys.clone(), - value: counter.to_string().into_bytes(), - tags: vec![], - }] + let message = reduce::Message::new(counter.to_string().into_bytes()) + .keys(keys.clone()) + .tags(vec![]); + vec![message] } } } diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index be4bae2..5c4b7ea 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -16,11 +16,9 @@ impl sourcetransform::SourceTransformer for NowCat { &self, input: sourcetransform::SourceTransformRequest, ) -> Vec { - vec![sourcetransform::Message { - keys: input.keys, - value: input.value, - event_time: chrono::offset::Utc::now(), - tags: vec![], - }] + let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now()) + .keys(input.keys) + .tags(vec![]); + vec![message] } } diff --git a/src/map.rs b/src/map.rs index df562f2..20770c0 100644 --- a/src/map.rs +++ b/src/map.rs @@ -2,15 +2,15 @@ use std::future::Future; use std::path::PathBuf; use chrono::{DateTime, Utc}; +use serde_json::Value; use tonic::{async_trait, Request, Response, Status}; use crate::shared; - const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; - +const DROP: &str = "U+005C__DROP__"; /// Numaflow Map Proto definitions. pub mod proto { tonic::include_proto!("map.v1"); @@ -45,11 +45,9 @@ pub trait Mapper { /// #[tonic::async_trait] /// impl map::Mapper for Cat { /// async fn map(&self, input: map::MapRequest) -> Vec { - /// vec![map::Message { - /// keys: input.keys, - /// value: input.value, - /// tags: vec![], - /// }] + /// use numaflow::map::Message; + /// let message=Message::new(input.value).keys(input.keys).tags(vec![]); + /// vec![message] /// } /// } /// ``` @@ -79,22 +77,115 @@ where } /// Message is the response struct from the [`Mapper::map`] . +#[derive(Debug, PartialEq)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - pub keys: Vec, + pub keys: Option>, /// Value is the value passed to the next vertex. pub value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - pub tags: Vec, + pub tags: Option>, +} +/// Represents a message that can be modified and forwarded. +impl Message { + /// Creates a new message with the specified value. + /// + /// This constructor initializes the message with no keys, tags, or specific event time. + /// + /// # Arguments + /// + /// * `value` - A vector of bytes representing the message's payload. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3, 4]); + /// ``` + pub fn new(value:Vec) -> Self { + Self{ + value, + keys:None, + tags:None + } + } + /// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let dropped_message = Message::message_to_drop(); + /// ``` + pub fn message_to_drop() -> Message { + Message { + keys: None, + value: vec![], + tags: Some(vec![DROP.to_string()]), + } + } + + + /// Sets or replaces the keys associated with this message. + /// + /// # Arguments + /// + /// * `keys` - A vector of strings representing the keys. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// ``` + pub fn keys(mut self, keys: Vec) -> Self { + self.keys = Some(keys); + self + } + + /// Sets or replaces the tags associated with this message. + /// + /// # Arguments + /// + /// * `tags` - A vector of strings representing the tags. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// ``` + pub fn tags(mut self, tags: Vec) -> Self { + self.tags = Some(tags); + self + } + + /// Replaces the value of the message. + /// + /// # Arguments + /// + /// * `value` - A new vector of bytes that replaces the current message value. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// ``` + pub fn value(mut self, value: Vec) -> Self { + self.value = value; + self + } + } impl From for proto::map_response::Result { fn from(value: Message) -> Self { proto::map_response::Result { - keys: value.keys, + keys: value.keys.unwrap_or_default(), value: value.value, - tags: value.tags, + tags: value.tags.unwrap_or_default(), } } } @@ -209,14 +300,14 @@ impl Server { #[cfg(test)] mod tests { - use std::{error::Error, time::Duration}; - use tower::service_fn; - use crate::map; use crate::map::proto::map_client::MapClient; + use crate::map::{Message}; + use std::{error::Error, time::Duration}; use tempfile::TempDir; use tokio::sync::oneshot; use tonic::transport::Uri; + use tower::service_fn; #[tokio::test] async fn map_server() -> Result<(), Box> { @@ -225,9 +316,9 @@ mod tests { impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { vec![map::Message { - keys: input.keys, + keys: Some(input.keys), value: input.value, - tags: vec![], + tags: Some(vec![]), }] } } @@ -284,4 +375,6 @@ mod tests { assert!(task.is_finished(), "gRPC server is still running"); Ok(()) } + + } diff --git a/src/reduce.rs b/src/reduce.rs index fc7bf56..1f2985a 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -8,8 +8,8 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio::task::JoinSet; use tokio_stream::wrappers::ReceiverStream; -use tonic::{async_trait, Request, Response, Status}; use tonic::metadata::MetadataMap; +use tonic::{async_trait, Request, Response, Status}; use crate::shared; @@ -20,6 +20,7 @@ const WIN_END_TIME: &str = "x-numaflow-win-end-time"; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info"; +const DROP: &str = "U+005C__DROP__"; /// Numaflow Reduce Proto definitions. pub mod proto { @@ -108,7 +109,7 @@ pub trait Reducer { /// impl numaflow::reduce::ReducerCreator for CounterCreator { /// type R = Counter; /// - /// fn create(&self) -> Counter { + /// fn create(&self) -> Self::R { /// Counter::new() /// } /// } @@ -131,11 +132,8 @@ pub trait Reducer { /// while (input.recv().await).is_some() { /// counter += 1; /// } - /// vec![Message { - /// keys: keys.clone(), - /// value: counter.to_string().into_bytes(), - /// tags: vec![], - /// }] + /// let message=Message::new(counter.to_string().into_bytes()).tags(vec![]).keys(keys.clone()); + /// vec![message] /// } /// } /// } @@ -159,7 +157,10 @@ pub struct IntervalWindow { impl IntervalWindow { fn new(start_time: DateTime, end_time: DateTime) -> Self { - Self { start_time, end_time } + Self { + start_time, + end_time, + } } } @@ -175,14 +176,109 @@ pub struct Metadata { } /// Message is the response from the user's [`Reducer::reduce`]. +#[derive(Debug, PartialEq)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. It is mainly used in creating a partition in [`Reducer::reduce`]. - pub keys: Vec, + pub keys: Option>, /// Value is the value passed to the next vertex. pub value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - pub tags: Vec, + pub tags:Option>, +} + +/// Represents a message that can be modified and forwarded. +impl Message { + /// Creates a new message with the specified value. + /// + /// This constructor initializes the message with no keys, tags, or specific event time. + /// + /// # Arguments + /// + /// * `value` - A vector of bytes representing the message's payload. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3, 4]); + /// ``` + pub fn new(value :Vec) -> Self { + Self{ + value, + keys:None, + tags:None + + } + } + /// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let dropped_message = Message::message_to_drop(); + /// ``` + pub fn message_to_drop() -> crate::map::Message { + crate::map::Message { + keys: None, + value: vec![], + tags: Some(vec![DROP.to_string()]), + } + } + + /// Sets or replaces the keys associated with this message. + /// + /// # Arguments + /// + /// * `keys` - A vector of strings representing the keys. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// ``` + pub fn keys(mut self, keys: Vec) -> Self { + self.keys = Some(keys); + self + } + + /// Sets or replaces the tags associated with this message. + /// + /// # Arguments + /// + /// * `tags` - A vector of strings representing the tags. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// ``` + + + pub fn tags(mut self, tags: Vec) -> Self { + self.tags = Some(tags); + self + } + + /// Replaces the value of the message. + /// + /// # Arguments + /// + /// * `value` - A new vector of bytes that replaces the current message value. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// ``` + pub fn value(mut self, value: Vec) -> Self { + self.value = value; + self + } } /// Incoming request into the reducer handler of [`Reducer`]. @@ -237,8 +333,8 @@ fn get_window_details(request: &MetadataMap) -> (DateTime, DateTime) { #[async_trait] impl proto::reduce_server::Reduce for ReduceService - where - C: ReducerCreator + Send + Sync + 'static, +where + C: ReducerCreator + Send + Sync + 'static, { type ReduceFnStream = ReceiverStream>; async fn reduce_fn( @@ -296,17 +392,17 @@ impl proto::reduce_server::Reduce for ReduceService let mut datum_responses = vec![]; for message in messages { datum_responses.push(proto::reduce_response::Result { - keys: message.keys, + keys: message.keys.unwrap_or_default(), value: message.value, - tags: message.tags, + tags: message.tags.unwrap_or_default(), }); } // stream it out to the client tx.send(Ok(proto::ReduceResponse { results: datum_responses, })) - .await - .unwrap(); + .await + .unwrap(); } }); @@ -378,9 +474,9 @@ impl Server { &mut self, shutdown: F, ) -> Result<(), Box> - where - F: Future, - C: ReducerCreator + Send + Sync + 'static, + where + F: Future, + C: ReducerCreator + Send + Sync + 'static, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let creator = self.creator.take().unwrap(); @@ -397,9 +493,9 @@ impl Server { } /// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the signal arrives. - pub async fn start(&mut self) -> Result<(), Box> - where - C: ReducerCreator + Send + Sync + 'static, + pub async fn start(&mut self) -> Result<(), Box> + where + C: ReducerCreator + Send + Sync + 'static, { self.start_with_shutdown(shared::shutdown_signal()).await } diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 4828af6..87fa9c4 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -10,6 +10,7 @@ const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sourcetransform.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcetransformer-server-info"; +const DROP: &str = "U+005C__DROP__"; /// Numaflow SourceTransformer Proto definitions. pub mod proto { tonic::include_proto!("sourcetransformer.v1"); @@ -49,12 +50,9 @@ pub trait SourceTransformer { /// &self, /// input: sourcetransform::SourceTransformRequest, /// ) -> Vec { - /// vec![sourcetransform::Message { - /// keys: input.keys, - /// value: input.value, - /// event_time: chrono::offset::Utc::now(), - /// tags: vec![], - /// }] + /// use numaflow::sourcetransform::Message; + /// let message=Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys).tags(vec![]); + /// vec![message] /// } /// } /// ``` @@ -62,17 +60,128 @@ pub trait SourceTransformer { } /// Message is the response struct from the [`SourceTransformer::transform`] . +#[derive(Debug, PartialEq)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - pub keys: Vec, + pub keys: Option>, /// Value is the value passed to the next vertex. pub value: Vec, /// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming /// event_time from the [`Datum`]. pub event_time: DateTime, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - pub tags: Vec, + pub tags: Option>, +} + +/// Represents a message that can be modified and forwarded. +impl Message { + /// Creates a new message with the specified value and event time. + /// + /// This constructor initializes the message with no keys, tags. + /// + /// # Arguments + /// + /// * `value` - A vector of bytes representing the message's payload. + /// * `event_time` - The `DateTime` that specifies when the event occurred. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3, 4], now); + /// ``` + pub fn new(value: Vec, event_time: DateTime) -> Self { + Self { + value, + event_time, + keys: None, + tags: None, + } + } + /// Marks the message to be dropped by creating a new `Message` with an empty value, a special "DROP" tag, and the specified event time. + /// + /// # Arguments + /// + /// * `event_time` - The `DateTime` that specifies when the event occurred. Event time is required because, even though a message is dropped, + /// it is still considered as being processed, hence the watermark should be updated accordingly using the provided event time. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let dropped_message = Message::message_to_drop(now); + /// ``` + pub fn message_to_drop(event_time: DateTime) -> Message { + Message { + keys: None, + value: vec![], + event_time, + tags: Some(vec![DROP.to_string()]), + } + } + + + /// Sets or replaces the keys associated with this message. + /// + /// # Arguments + /// + /// * `keys` - A vector of strings representing the keys. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3], now).keys(vec!["key1".to_string(), "key2".to_string()]); + /// ``` + pub fn keys(mut self, keys: Vec) -> Self { + self.keys = Some(keys); + self + } + /// Sets or replaces the tags associated with this message. + /// + /// # Arguments + /// + /// * `tags` - A vector of strings representing the tags. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3], now).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// ``` + + pub fn tags(mut self, tags: Vec) -> Self { + self.tags = Some(tags); + self + } + + /// Replaces the value of the message. + /// + /// # Arguments + /// + /// * `value` - A new vector of bytes that replaces the current message value. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3], now).value(vec![4, 5, 6]); + /// ``` + pub fn value(mut self, value: Vec) -> Self { + self.value = value; + self + } } /// Incoming request to the Source Transformer. @@ -93,10 +202,10 @@ pub struct SourceTransformRequest { impl From for proto::source_transform_response::Result { fn from(value: Message) -> Self { proto::source_transform_response::Result { - keys: value.keys, + keys: value.keys.unwrap_or_default(), value: value.value, event_time: prost_timestamp_from_utc(value.event_time), - tags: value.tags, + tags: value.tags.unwrap_or_default(), } } } @@ -245,9 +354,9 @@ mod tests { input: sourcetransform::SourceTransformRequest, ) -> Vec { vec![sourcetransform::Message { - keys: input.keys, + keys: Some(input.keys), value: input.value, - tags: vec![], + tags: Some(vec![]), event_time: chrono::offset::Utc::now(), }] }