From cd19b148ce642b6c69370ea7bfeb8ff28ff88fc5 Mon Sep 17 00:00:00 2001 From: shubham Date: Tue, 21 May 2024 16:02:44 +0530 Subject: [PATCH 01/13] drop message Signed-off-by: shubham --- src/map.rs | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/map.rs b/src/map.rs index df562f2..2209392 100644 --- a/src/map.rs +++ b/src/map.rs @@ -10,7 +10,7 @@ use crate::shared; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; - +const DROP: &str ="U+005C__DROP__"; /// Numaflow Map Proto definitions. pub mod proto { tonic::include_proto!("map.v1"); @@ -79,6 +79,7 @@ where } /// Message is the response struct from the [`Mapper::map`] . +#[derive(Default)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. @@ -89,6 +90,64 @@ pub struct Message { pub tags: Vec, } +impl Message{ + pub fn new_message(value: Vec)->Self{ + Message{ + value, + ..Default::default() // Use default values for keys and tags + } + } + + pub fn message_to_drop()->Self{ + Message{ + tags:vec![DROP.parse().unwrap()], + ..Default::default() // Use default values for keys and tags + } + } + + pub fn with_keys(mut self,keys:Vec)-> Self{ + self.keys=keys; + self + } + + pub fn with_tags(mut self,tags:Vec)-> Self{ + self.tags=tags; + self + } + + pub fn keys(mut self) ->Vec{ + self.keys + } + pub fn value(mut self) ->Vec{ + self.value + } + + pub fn tags(mut self) ->Vec{ + self.tags + } + +} + +pub struct Messages{ + messages:Vec +} + +impl Messages{ + fn message_builder() -> Self { + Messages { + messages: Vec::new(), + } + } + fn append(mut self, msg: Message) -> Self { + self.messages.push(msg); + self + } + + fn items( &self) ->&Vec{ + &self.messages + } +} + impl From for proto::map_response::Result { fn from(value: Message) -> Self { proto::map_response::Result { From e2bddab1297604962e6b1f12c3a0c86a1014d7f0 Mon Sep 17 00:00:00 2001 From: shubham Date: Wed, 22 May 2024 13:50:47 +0530 Subject: [PATCH 02/13] Added messages drop to reducer and source transform Signed-off-by: shubham --- src/reduce.rs | 61 ++++++++++++++++++++++++++++++++++++++++++ src/sourcetransform.rs | 60 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/src/reduce.rs b/src/reduce.rs index fc7bf56..4dd09be 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -20,6 +20,7 @@ const WIN_END_TIME: &str = "x-numaflow-win-end-time"; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info"; +const DROP: &str ="U+005C__DROP__"; /// Numaflow Reduce Proto definitions. pub mod proto { @@ -175,6 +176,7 @@ pub struct Metadata { } /// Message is the response from the user's [`Reducer::reduce`]. +#[derive(Default)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. It is mainly used in creating a partition in [`Reducer::reduce`]. @@ -185,6 +187,65 @@ pub struct Message { pub tags: Vec, } + +impl Message { + pub fn new_message(value: Vec)->Self{ + Message { + value, + ..Default::default() // Use default values for keys and tags + } + } + + pub fn message_to_drop()->Self{ + Message { + tags:vec![DROP.parse().unwrap()], + ..Default::default() // Use default values for keys and tags + } + } + + pub fn with_keys(mut self,keys:Vec)-> Self{ + self.keys=keys; + self + } + + pub fn with_tags(mut self,tags:Vec)-> Self{ + self.tags=tags; + self + } + + pub fn keys(mut self) ->Vec{ + self.keys + } + pub fn value(mut self) ->Vec{ + self.value + } + + pub fn tags(mut self) ->Vec{ + self.tags + } + +} + +pub struct Messages{ + messages:Vec +} + +impl Messages { + fn message_builder() -> Self { + Messages { + messages: Vec::new(), + } + } + fn append(mut self, msg: Message) -> Self { + self.messages.push(msg); + self + } + + fn items( &self) ->&Vec{ + &self.messages + } +} + /// Incoming request into the reducer handler of [`Reducer`]. pub struct ReduceRequest { /// Set of keys in the (key, value) terminology of map/reduce paradigm. diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 4828af6..129aa16 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -10,6 +10,7 @@ const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sourcetransform.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcetransformer-server-info"; +const DROP: &str ="U+005C__DROP__"; /// Numaflow SourceTransformer Proto definitions. pub mod proto { tonic::include_proto!("sourcetransformer.v1"); @@ -62,6 +63,7 @@ pub trait SourceTransformer { } /// Message is the response struct from the [`SourceTransformer::transform`] . +#[derive(Default)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. @@ -75,6 +77,64 @@ pub struct Message { pub tags: Vec, } +impl Message { + pub fn new_message(value: Vec)->Self{ + Message { + value, + ..Default::default() // Use default values for keys and tags + } + } + + pub fn message_to_drop()->Self{ + Message { + tags:vec![DROP.parse().unwrap()], + ..Default::default() // Use default values for keys and tags + } + } + + pub fn with_keys(mut self,keys:Vec)-> Self{ + self.keys=keys; + self + } + + pub fn with_tags(mut self,tags:Vec)-> Self{ + self.tags=tags; + self + } + + pub fn keys(mut self) ->Vec{ + self.keys + } + pub fn value(mut self) ->Vec{ + self.value + } + + pub fn tags(mut self) ->Vec{ + self.tags + } + +} + +pub struct Messages{ + messages:Vec +} + +impl Messages { + fn message_builder() -> Self { + Messages { + messages: Vec::new(), + } + } + fn append(mut self, msg: crate::reduce::Message) -> Self { + self.messages.push(msg); + self + } + + fn items( &self) ->&Vec{ + &self.messages + } +} + /// Incoming request to the Source Transformer. pub struct SourceTransformRequest { /// keys are the keys in the (key, value) terminology of map/reduce paradigm. From d15690f82c94d014163dac2d9ab9ddc7415f71a9 Mon Sep 17 00:00:00 2001 From: shubham Date: Wed, 22 May 2024 18:37:28 +0530 Subject: [PATCH 03/13] fixes for the reduce Signed-off-by: shubham --- src/reduce.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reduce.rs b/src/reduce.rs index 4dd09be..43893ff 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -109,7 +109,7 @@ pub trait Reducer { /// impl numaflow::reduce::ReducerCreator for CounterCreator { /// type R = Counter; /// - /// fn create(&self) -> Counter { + /// fn create(&self) -> Self::R { /// Counter::new() /// } /// } From 16706041fd3bb913b3a17fa04504ee37c5e1a2fb Mon Sep 17 00:00:00 2001 From: shubham Date: Thu, 23 May 2024 15:02:53 +0530 Subject: [PATCH 04/13] Added builder pattern to message struct Signed-off-by: shubham --- src/map.rs | 82 +++++++++++++++------------------------ src/reduce.rs | 83 ++++++++++++++------------------------- src/sourcetransform.rs | 88 +++++++++++++++++------------------------- 3 files changed, 98 insertions(+), 155 deletions(-) diff --git a/src/map.rs b/src/map.rs index 2209392..aa66606 100644 --- a/src/map.rs +++ b/src/map.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::path::PathBuf; use chrono::{DateTime, Utc}; +use serde_json::Value; use tonic::{async_trait, Request, Response, Status}; use crate::shared; @@ -45,11 +46,9 @@ pub trait Mapper { /// #[tonic::async_trait] /// impl map::Mapper for Cat { /// async fn map(&self, input: map::MapRequest) -> Vec { - /// vec![map::Message { - /// keys: input.keys, - /// value: input.value, - /// tags: vec![], - /// }] + /// use numaflow::map::MessageBuilder; + /// let message=MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).build(); + /// vec![message] /// } /// } /// ``` @@ -79,74 +78,57 @@ where } /// Message is the response struct from the [`Mapper::map`] . -#[derive(Default)] + pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - pub keys: Vec, + keys: Vec, /// Value is the value passed to the next vertex. - pub value: Vec, + value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - pub tags: Vec, + tags: Vec, } -impl Message{ - pub fn new_message(value: Vec)->Self{ - Message{ - value, - ..Default::default() // Use default values for keys and tags - } +#[derive(Default)] +pub struct MessageBuilder{ + keys: Vec, + value: Vec, + tags: Vec, +} +impl MessageBuilder{ + pub fn new()->Self{ + Default::default() } - - pub fn message_to_drop()->Self{ - Message{ - tags:vec![DROP.parse().unwrap()], - ..Default::default() // Use default values for keys and tags - } + pub fn message_to_drop(mut self) -> Self { + self.tags.push(DROP.parse().unwrap()); + self } - - pub fn with_keys(mut self,keys:Vec)-> Self{ + pub fn keys(mut self,keys:Vec)-> Self{ self.keys=keys; self } - pub fn with_tags(mut self,tags:Vec)-> Self{ + pub fn tags(mut self,tags:Vec)-> Self{ self.tags=tags; self } - pub fn keys(mut self) ->Vec{ - self.keys - } - pub fn value(mut self) ->Vec{ - self.value + pub fn values( mut self,value: Vec)->Self{ + self.value=value; + self } - - pub fn tags(mut self) ->Vec{ - self.tags + pub fn build(self)->Message{ + Message{ + keys: self.keys, + value:self.value, + tags: self.tags, + } } - } -pub struct Messages{ - messages:Vec -} -impl Messages{ - fn message_builder() -> Self { - Messages { - messages: Vec::new(), - } - } - fn append(mut self, msg: Message) -> Self { - self.messages.push(msg); - self - } - fn items( &self) ->&Vec{ - &self.messages - } -} + impl From for proto::map_response::Result { fn from(value: Message) -> Self { diff --git a/src/reduce.rs b/src/reduce.rs index 43893ff..24af611 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -97,7 +97,7 @@ pub trait Reducer { /// Ok(()) /// } /// mod counter { - /// use numaflow::reduce::{Message, ReduceRequest}; + /// use numaflow::reduce::{Message, MessageBuilder, ReduceRequest}; /// use numaflow::reduce::{Reducer, Metadata}; /// use tokio::sync::mpsc::Receiver; /// use tonic::async_trait; @@ -132,11 +132,8 @@ pub trait Reducer { /// while (input.recv().await).is_some() { /// counter += 1; /// } - /// vec![Message { - /// keys: keys.clone(), - /// value: counter.to_string().into_bytes(), - /// tags: vec![], - /// }] + /// let message=MessageBuilder::new().tags(vec![]).keys(keys.clone()).values(counter.to_string().into_bytes()).build(); + /// vec![message] /// } /// } /// } @@ -176,76 +173,56 @@ pub struct Metadata { } /// Message is the response from the user's [`Reducer::reduce`]. -#[derive(Default)] + pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. It is mainly used in creating a partition in [`Reducer::reduce`]. - pub keys: Vec, + keys: Vec, /// Value is the value passed to the next vertex. - pub value: Vec, + value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - pub tags: Vec, + tags: Vec, } -impl Message { - pub fn new_message(value: Vec)->Self{ - Message { - value, - ..Default::default() // Use default values for keys and tags - } +#[derive(Default)] +pub struct MessageBuilder{ + keys: Vec, + value: Vec, + tags: Vec, +} +impl MessageBuilder { + pub fn new()->Self{ + Default::default() } - - pub fn message_to_drop()->Self{ - Message { - tags:vec![DROP.parse().unwrap()], - ..Default::default() // Use default values for keys and tags - } + pub fn message_to_drop(mut self) -> Self { + self.tags.push(DROP.parse().unwrap()); + self } - - pub fn with_keys(mut self,keys:Vec)-> Self{ + pub fn keys(mut self,keys:Vec)-> Self{ self.keys=keys; self } - pub fn with_tags(mut self,tags:Vec)-> Self{ + pub fn tags(mut self,tags:Vec)-> Self{ self.tags=tags; self } - pub fn keys(mut self) ->Vec{ - self.keys - } - pub fn value(mut self) ->Vec{ - self.value - } - - pub fn tags(mut self) ->Vec{ - self.tags - } - -} - -pub struct Messages{ - messages:Vec -} - -impl Messages { - fn message_builder() -> Self { - Messages { - messages: Vec::new(), - } - } - fn append(mut self, msg: Message) -> Self { - self.messages.push(msg); + pub fn values( mut self,value: Vec)->Self{ + self.value=value; self } - - fn items( &self) ->&Vec{ - &self.messages + pub fn build(self)-> Message { + Message { + keys: self.keys, + value:self.value, + tags: self.tags, + } } } + /// Incoming request into the reducer handler of [`Reducer`]. pub struct ReduceRequest { /// Set of keys in the (key, value) terminology of map/reduce paradigm. diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 129aa16..dcc2ddc 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -50,12 +50,9 @@ pub trait SourceTransformer { /// &self, /// input: sourcetransform::SourceTransformRequest, /// ) -> Vec { - /// vec![sourcetransform::Message { - /// keys: input.keys, - /// value: input.value, - /// event_time: chrono::offset::Utc::now(), - /// tags: vec![], - /// }] + /// use numaflow::sourcetransform::MessageBuilder; + /// let message=MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).event_time(chrono::offset::Utc::now()).build(); + /// vec![message] /// } /// } /// ``` @@ -63,78 +60,65 @@ pub trait SourceTransformer { } /// Message is the response struct from the [`SourceTransformer::transform`] . -#[derive(Default)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - pub keys: Vec, + keys: Vec, /// Value is the value passed to the next vertex. - pub value: Vec, + value: Vec, /// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming /// event_time from the [`Datum`]. - pub event_time: DateTime, + event_time: DateTime, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - pub tags: Vec, + tags: Vec, } -impl Message { - pub fn new_message(value: Vec)->Self{ - Message { - value, - ..Default::default() // Use default values for keys and tags - } +#[derive(Default)] +pub struct MessageBuilder{ + keys: Vec, + value: Vec, + tags: Vec, + event_time:DateTime +} +impl MessageBuilder { + pub fn new()->Self{ + Default::default() } - - pub fn message_to_drop()->Self{ - Message { - tags:vec![DROP.parse().unwrap()], - ..Default::default() // Use default values for keys and tags - } + pub fn message_to_drop(mut self) -> Self { + self.tags.push(DROP.parse().unwrap()); + self } - - pub fn with_keys(mut self,keys:Vec)-> Self{ + pub fn keys(mut self,keys:Vec)-> Self{ self.keys=keys; self } - pub fn with_tags(mut self,tags:Vec)-> Self{ + pub fn tags(mut self,tags:Vec)-> Self{ self.tags=tags; self } - pub fn keys(mut self) ->Vec{ - self.keys - } - pub fn value(mut self) ->Vec{ - self.value - } - - pub fn tags(mut self) ->Vec{ - self.tags + pub fn values( mut self,value: Vec)->Self{ + self.value=value; + self } -} - -pub struct Messages{ - messages:Vec -} - -impl Messages { - fn message_builder() -> Self { - Messages { - messages: Vec::new(), - } - } - fn append(mut self, msg: crate::reduce::Message) -> Self { - self.messages.push(msg); + pub fn event_time(mut self,event_time:DateTime)->Self{ + self.event_time=event_time; self - } - fn items( &self) ->&Vec{ - &self.messages + } + pub fn build(self)-> Message { + Message { + keys: self.keys, + value:self.value, + tags: self.tags, + event_time:self.event_time + } } } + /// Incoming request to the Source Transformer. pub struct SourceTransformRequest { /// keys are the keys in the (key, value) terminology of map/reduce paradigm. From a8180759b76a6cf124c1e657bc84d52194daf000 Mon Sep 17 00:00:00 2001 From: shubham Date: Thu, 23 May 2024 15:16:28 +0530 Subject: [PATCH 05/13] Added test cases Signed-off-by: shubham --- examples/map-cat/src/main.rs | 7 ++----- src/map.rs | 24 ++++++++++++++++++++++-- src/reduce.rs | 2 +- src/sourcetransform.rs | 1 + 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index 971d479..d6e9550 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -11,10 +11,7 @@ struct Cat; #[tonic::async_trait] impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { - vec![map::Message { - keys: input.keys, - value: input.value, - tags: vec![], - }] + let message=MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).build(); + vec![message] } } diff --git a/src/map.rs b/src/map.rs index aa66606..7cdca2d 100644 --- a/src/map.rs +++ b/src/map.rs @@ -78,7 +78,7 @@ where } /// Message is the response struct from the [`Mapper::map`] . - +#[derive(Debug, PartialEq)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. @@ -252,7 +252,7 @@ impl Server { mod tests { use std::{error::Error, time::Duration}; use tower::service_fn; - + use crate::map::{Message, MessageBuilder}; use crate::map; use crate::map::proto::map_client::MapClient; use tempfile::TempDir; @@ -325,4 +325,24 @@ mod tests { assert!(task.is_finished(), "gRPC server is still running"); Ok(()) } + + + // builder test + #[test] + fn builder_test(){ + let message=Message{ + tags:vec![], + keys:vec![], + value:vec![] + }; + + let message_builder=MessageBuilder::new() + .keys(vec![]) + .values(vec![]) + .tags(vec![]) + .build(); + + assert_eq!(message,message_builder) + + } } diff --git a/src/reduce.rs b/src/reduce.rs index 24af611..28d5474 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -173,7 +173,7 @@ pub struct Metadata { } /// Message is the response from the user's [`Reducer::reduce`]. - +#[derive(Debug, PartialEq)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. It is mainly used in creating a partition in [`Reducer::reduce`]. diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index dcc2ddc..e56fe95 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -60,6 +60,7 @@ pub trait SourceTransformer { } /// Message is the response struct from the [`SourceTransformer::transform`] . +#[derive(Debug, PartialEq)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. From 61d61cc44846c8732d4b12c7be7403c7a6f6f761 Mon Sep 17 00:00:00 2001 From: shubham Date: Thu, 23 May 2024 15:34:29 +0530 Subject: [PATCH 06/13] Updated examples Signed-off-by: shubham --- examples/map-cat/src/main.rs | 3 ++- examples/map-tickgen-serde/src/main.rs | 15 ++++++--------- examples/reduce-counter/src/main.rs | 7 ++----- examples/source-transformer-now/src/main.rs | 14 ++++++++------ 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index d6e9550..a3dc0aa 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -1,6 +1,7 @@ use numaflow::map; use std::error::Error; + #[tokio::main] async fn main() -> Result<(), Box> { map::Server::new(Cat).start().await @@ -11,7 +12,7 @@ struct Cat; #[tonic::async_trait] impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { - let message=MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).build(); + let message=map::MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).build(); vec![message] } } diff --git a/examples/map-tickgen-serde/src/main.rs b/examples/map-tickgen-serde/src/main.rs index e7b7f20..0253f6d 100644 --- a/examples/map-tickgen-serde/src/main.rs +++ b/examples/map-tickgen-serde/src/main.rs @@ -1,4 +1,5 @@ use numaflow::map; +use numaflow::map::{MessageBuilder}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -38,14 +39,10 @@ impl map::Mapper for TickGen { let ts = Utc .timestamp_nanos(payload.created_ts) .to_rfc3339_opts(SecondsFormat::Nanos, true); - vec![map::Message { - keys: input.keys, - value: serde_json::to_vec(&ResultPayload { - value: payload.data.value, - time: ts, - }) - .unwrap_or_default(), - tags: vec![], - }] + let message=map::MessageBuilder::new().keys(input.keys).values(serde_json::to_vec(&ResultPayload { + value: payload.data.value, + time: ts, + }).unwrap_or_default()).tags(vec![]).build(); + vec![message] } } diff --git a/examples/reduce-counter/src/main.rs b/examples/reduce-counter/src/main.rs index 3bacd1a..ffbc33e 100644 --- a/examples/reduce-counter/src/main.rs +++ b/examples/reduce-counter/src/main.rs @@ -48,11 +48,8 @@ mod counter { md.start_time(), md.end_time() ); - vec![Message { - keys: keys.clone(), - value: counter.to_string().into_bytes(), - tags: vec![], - }] + let message=reduce::MessageBuilder::new().keys(keys.clone()).values(counter.to_string().into_bytes()).tags(vec![]).build(); + vec![message] } } } diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index be4bae2..5e50f2b 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -16,11 +16,13 @@ impl sourcetransform::SourceTransformer for NowCat { &self, input: sourcetransform::SourceTransformRequest, ) -> Vec { - vec![sourcetransform::Message { - keys: input.keys, - value: input.value, - event_time: chrono::offset::Utc::now(), - tags: vec![], - }] + let message=sourcetransform::MessageBuilder::new() + .keys(input.keys) + .values(input.value) + .tags(vec![]) + .event_time(chrono::offset::Utc::now()) + .build(); + + vec![message] } } From dae8e8232ee196ccfb11e8ef8a934108a84740a8 Mon Sep 17 00:00:00 2001 From: shubham Date: Thu, 23 May 2024 20:43:10 +0530 Subject: [PATCH 07/13] formatted files Signed-off-by: shubham --- examples/map-cat/src/main.rs | 7 ++- examples/map-tickgen-serde/src/main.rs | 17 ++++-- examples/reduce-counter/src/main.rs | 6 ++- examples/source-transformer-now/src/main.rs | 2 +- src/map.rs | 57 +++++++++------------ src/reduce.rs | 57 +++++++++++---------- src/sourcetransform.rs | 36 ++++++------- 7 files changed, 94 insertions(+), 88 deletions(-) diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index a3dc0aa..b84c9b3 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -1,7 +1,6 @@ use numaflow::map; use std::error::Error; - #[tokio::main] async fn main() -> Result<(), Box> { map::Server::new(Cat).start().await @@ -12,7 +11,11 @@ struct Cat; #[tonic::async_trait] impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { - let message=map::MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).build(); + let message = map::MessageBuilder::new() + .keys(input.keys) + .values(input.value) + .tags(vec![]) + .build(); vec![message] } } diff --git a/examples/map-tickgen-serde/src/main.rs b/examples/map-tickgen-serde/src/main.rs index 0253f6d..7bae200 100644 --- a/examples/map-tickgen-serde/src/main.rs +++ b/examples/map-tickgen-serde/src/main.rs @@ -1,5 +1,5 @@ use numaflow::map; -use numaflow::map::{MessageBuilder}; +use numaflow::map::MessageBuilder; #[tokio::main] async fn main() -> Result<(), Box> { @@ -39,10 +39,17 @@ impl map::Mapper for TickGen { let ts = Utc .timestamp_nanos(payload.created_ts) .to_rfc3339_opts(SecondsFormat::Nanos, true); - let message=map::MessageBuilder::new().keys(input.keys).values(serde_json::to_vec(&ResultPayload { - value: payload.data.value, - time: ts, - }).unwrap_or_default()).tags(vec![]).build(); + let message = map::MessageBuilder::new() + .keys(input.keys) + .values( + serde_json::to_vec(&ResultPayload { + value: payload.data.value, + time: ts, + }) + .unwrap_or_default(), + ) + .tags(vec![]) + .build(); vec![message] } } diff --git a/examples/reduce-counter/src/main.rs b/examples/reduce-counter/src/main.rs index ffbc33e..ac1ae93 100644 --- a/examples/reduce-counter/src/main.rs +++ b/examples/reduce-counter/src/main.rs @@ -48,7 +48,11 @@ mod counter { md.start_time(), md.end_time() ); - let message=reduce::MessageBuilder::new().keys(keys.clone()).values(counter.to_string().into_bytes()).tags(vec![]).build(); + let message = reduce::MessageBuilder::new() + .keys(keys.clone()) + .values(counter.to_string().into_bytes()) + .tags(vec![]) + .build(); vec![message] } } diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index 5e50f2b..3322236 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -16,7 +16,7 @@ impl sourcetransform::SourceTransformer for NowCat { &self, input: sourcetransform::SourceTransformRequest, ) -> Vec { - let message=sourcetransform::MessageBuilder::new() + let message = sourcetransform::MessageBuilder::new() .keys(input.keys) .values(input.value) .tags(vec![]) diff --git a/src/map.rs b/src/map.rs index 7cdca2d..c4a1226 100644 --- a/src/map.rs +++ b/src/map.rs @@ -7,11 +7,10 @@ use tonic::{async_trait, Request, Response, Status}; use crate::shared; - const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; -const DROP: &str ="U+005C__DROP__"; +const DROP: &str = "U+005C__DROP__"; /// Numaflow Map Proto definitions. pub mod proto { tonic::include_proto!("map.v1"); @@ -82,54 +81,50 @@ where pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - keys: Vec, + keys: Vec, /// Value is the value passed to the next vertex. - value: Vec, + value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). tags: Vec, } #[derive(Default)] -pub struct MessageBuilder{ +pub struct MessageBuilder { keys: Vec, value: Vec, tags: Vec, } -impl MessageBuilder{ - pub fn new()->Self{ +impl MessageBuilder { + pub fn new() -> Self { Default::default() } pub fn message_to_drop(mut self) -> Self { self.tags.push(DROP.parse().unwrap()); self } - pub fn keys(mut self,keys:Vec)-> Self{ - self.keys=keys; + pub fn keys(mut self, keys: Vec) -> Self { + self.keys = keys; self } - pub fn tags(mut self,tags:Vec)-> Self{ - self.tags=tags; + pub fn tags(mut self, tags: Vec) -> Self { + self.tags = tags; self } - pub fn values( mut self,value: Vec)->Self{ - self.value=value; + pub fn values(mut self, value: Vec) -> Self { + self.value = value; self } - pub fn build(self)->Message{ - Message{ + pub fn build(self) -> Message { + Message { keys: self.keys, - value:self.value, + value: self.value, tags: self.tags, } } } - - - - impl From for proto::map_response::Result { fn from(value: Message) -> Self { proto::map_response::Result { @@ -250,14 +245,14 @@ impl Server { #[cfg(test)] mod tests { - use std::{error::Error, time::Duration}; - use tower::service_fn; - use crate::map::{Message, MessageBuilder}; use crate::map; use crate::map::proto::map_client::MapClient; + use crate::map::{Message, MessageBuilder}; + use std::{error::Error, time::Duration}; use tempfile::TempDir; use tokio::sync::oneshot; use tonic::transport::Uri; + use tower::service_fn; #[tokio::test] async fn map_server() -> Result<(), Box> { @@ -326,23 +321,21 @@ mod tests { Ok(()) } - // builder test #[test] - fn builder_test(){ - let message=Message{ - tags:vec![], - keys:vec![], - value:vec![] + fn builder_test() { + let message = Message { + tags: vec![], + keys: vec![], + value: vec![], }; - let message_builder=MessageBuilder::new() + let message_builder = MessageBuilder::new() .keys(vec![]) .values(vec![]) .tags(vec![]) .build(); - assert_eq!(message,message_builder) - + assert_eq!(message, message_builder) } } diff --git a/src/reduce.rs b/src/reduce.rs index 28d5474..e5589ed 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -8,8 +8,8 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio::task::JoinSet; use tokio_stream::wrappers::ReceiverStream; -use tonic::{async_trait, Request, Response, Status}; use tonic::metadata::MetadataMap; +use tonic::{async_trait, Request, Response, Status}; use crate::shared; @@ -20,7 +20,7 @@ const WIN_END_TIME: &str = "x-numaflow-win-end-time"; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info"; -const DROP: &str ="U+005C__DROP__"; +const DROP: &str = "U+005C__DROP__"; /// Numaflow Reduce Proto definitions. pub mod proto { @@ -157,7 +157,10 @@ pub struct IntervalWindow { impl IntervalWindow { fn new(start_time: DateTime, end_time: DateTime) -> Self { - Self { start_time, end_time } + Self { + start_time, + end_time, + } } } @@ -177,52 +180,50 @@ pub struct Metadata { pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. It is mainly used in creating a partition in [`Reducer::reduce`]. - keys: Vec, + keys: Vec, /// Value is the value passed to the next vertex. - value: Vec, + value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - tags: Vec, + tags: Vec, } - #[derive(Default)] -pub struct MessageBuilder{ +pub struct MessageBuilder { keys: Vec, value: Vec, tags: Vec, } impl MessageBuilder { - pub fn new()->Self{ + pub fn new() -> Self { Default::default() } pub fn message_to_drop(mut self) -> Self { self.tags.push(DROP.parse().unwrap()); self } - pub fn keys(mut self,keys:Vec)-> Self{ - self.keys=keys; + pub fn keys(mut self, keys: Vec) -> Self { + self.keys = keys; self } - pub fn tags(mut self,tags:Vec)-> Self{ - self.tags=tags; + pub fn tags(mut self, tags: Vec) -> Self { + self.tags = tags; self } - pub fn values( mut self,value: Vec)->Self{ - self.value=value; + pub fn values(mut self, value: Vec) -> Self { + self.value = value; self } - pub fn build(self)-> Message { + pub fn build(self) -> Message { Message { keys: self.keys, - value:self.value, + value: self.value, tags: self.tags, } } } - /// Incoming request into the reducer handler of [`Reducer`]. pub struct ReduceRequest { /// Set of keys in the (key, value) terminology of map/reduce paradigm. @@ -275,8 +276,8 @@ fn get_window_details(request: &MetadataMap) -> (DateTime, DateTime) { #[async_trait] impl proto::reduce_server::Reduce for ReduceService - where - C: ReducerCreator + Send + Sync + 'static, +where + C: ReducerCreator + Send + Sync + 'static, { type ReduceFnStream = ReceiverStream>; async fn reduce_fn( @@ -343,8 +344,8 @@ impl proto::reduce_server::Reduce for ReduceService tx.send(Ok(proto::ReduceResponse { results: datum_responses, })) - .await - .unwrap(); + .await + .unwrap(); } }); @@ -416,9 +417,9 @@ impl Server { &mut self, shutdown: F, ) -> Result<(), Box> - where - F: Future, - C: ReducerCreator + Send + Sync + 'static, + where + F: Future, + C: ReducerCreator + Send + Sync + 'static, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let creator = self.creator.take().unwrap(); @@ -435,9 +436,9 @@ impl Server { } /// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the signal arrives. - pub async fn start(&mut self) -> Result<(), Box> - where - C: ReducerCreator + Send + Sync + 'static, + pub async fn start(&mut self) -> Result<(), Box> + where + C: ReducerCreator + Send + Sync + 'static, { self.start_with_shutdown(shared::shutdown_signal()).await } diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index e56fe95..4b7c6f7 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -10,7 +10,7 @@ const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sourcetransform.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcetransformer-server-info"; -const DROP: &str ="U+005C__DROP__"; +const DROP: &str = "U+005C__DROP__"; /// Numaflow SourceTransformer Proto definitions. pub mod proto { tonic::include_proto!("sourcetransformer.v1"); @@ -64,9 +64,9 @@ pub trait SourceTransformer { pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - keys: Vec, + keys: Vec, /// Value is the value passed to the next vertex. - value: Vec, + value: Vec, /// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming /// event_time from the [`Datum`]. event_time: DateTime, @@ -75,51 +75,49 @@ pub struct Message { } #[derive(Default)] -pub struct MessageBuilder{ +pub struct MessageBuilder { keys: Vec, value: Vec, tags: Vec, - event_time:DateTime + event_time: DateTime, } impl MessageBuilder { - pub fn new()->Self{ + pub fn new() -> Self { Default::default() } pub fn message_to_drop(mut self) -> Self { self.tags.push(DROP.parse().unwrap()); self } - pub fn keys(mut self,keys:Vec)-> Self{ - self.keys=keys; + pub fn keys(mut self, keys: Vec) -> Self { + self.keys = keys; self } - pub fn tags(mut self,tags:Vec)-> Self{ - self.tags=tags; + pub fn tags(mut self, tags: Vec) -> Self { + self.tags = tags; self } - pub fn values( mut self,value: Vec)->Self{ - self.value=value; + pub fn values(mut self, value: Vec) -> Self { + self.value = value; self } - pub fn event_time(mut self,event_time:DateTime)->Self{ - self.event_time=event_time; + pub fn event_time(mut self, event_time: DateTime) -> Self { + self.event_time = event_time; self - } - pub fn build(self)-> Message { + pub fn build(self) -> Message { Message { keys: self.keys, - value:self.value, + value: self.value, tags: self.tags, - event_time:self.event_time + event_time: self.event_time, } } } - /// Incoming request to the Source Transformer. pub struct SourceTransformRequest { /// keys are the keys in the (key, value) terminology of map/reduce paradigm. From bfc8ad42f4bd7903341d012fc20c6f9f74339e12 Mon Sep 17 00:00:00 2001 From: shubham Date: Wed, 29 May 2024 14:14:40 +0530 Subject: [PATCH 08/13] Modified Message builder pattern Signed-off-by: shubham --- examples/map-cat/src/main.rs | 6 +- examples/map-tickgen-serde/src/main.rs | 22 +++---- examples/reduce-counter/src/main.rs | 6 +- examples/source-transformer-now/src/main.rs | 7 +- src/map.rs | 71 ++++++++------------- src/reduce.rs | 48 +++++++------- src/sourcetransform.rs | 56 +++++++--------- 7 files changed, 87 insertions(+), 129 deletions(-) diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index b84c9b3..4eefaf4 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -11,11 +11,9 @@ struct Cat; #[tonic::async_trait] impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { - let message = map::MessageBuilder::new() + let message = map::Message::new(input.value) .keys(input.keys) - .values(input.value) - .tags(vec![]) - .build(); + .tags(vec![]); vec![message] } } diff --git a/examples/map-tickgen-serde/src/main.rs b/examples/map-tickgen-serde/src/main.rs index 7bae200..645d215 100644 --- a/examples/map-tickgen-serde/src/main.rs +++ b/examples/map-tickgen-serde/src/main.rs @@ -1,5 +1,5 @@ use numaflow::map; -use numaflow::map::MessageBuilder; +use numaflow::map::Message; #[tokio::main] async fn main() -> Result<(), Box> { @@ -39,17 +39,15 @@ impl map::Mapper for TickGen { let ts = Utc .timestamp_nanos(payload.created_ts) .to_rfc3339_opts(SecondsFormat::Nanos, true); - let message = map::MessageBuilder::new() - .keys(input.keys) - .values( - serde_json::to_vec(&ResultPayload { - value: payload.data.value, - time: ts, - }) - .unwrap_or_default(), - ) - .tags(vec![]) - .build(); + let message = map::Message::new( + serde_json::to_vec(&ResultPayload { + value: payload.data.value, + time: ts, + }) + .unwrap_or_default(), + ) + .keys(input.keys) + .tags(vec![]); vec![message] } } diff --git a/examples/reduce-counter/src/main.rs b/examples/reduce-counter/src/main.rs index ac1ae93..7e79882 100644 --- a/examples/reduce-counter/src/main.rs +++ b/examples/reduce-counter/src/main.rs @@ -48,11 +48,9 @@ mod counter { md.start_time(), md.end_time() ); - let message = reduce::MessageBuilder::new() + let message = reduce::Message::new(counter.to_string().into_bytes()) .keys(keys.clone()) - .values(counter.to_string().into_bytes()) - .tags(vec![]) - .build(); + .tags(vec![]); vec![message] } } diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index 3322236..fdce790 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -16,13 +16,10 @@ impl sourcetransform::SourceTransformer for NowCat { &self, input: sourcetransform::SourceTransformRequest, ) -> Vec { - let message = sourcetransform::MessageBuilder::new() + let message = sourcetransform::Message::new(input.value) .keys(input.keys) - .values(input.value) .tags(vec![]) - .event_time(chrono::offset::Utc::now()) - .build(); - + .event_time(chrono::offset::Utc::now()); vec![message] } } diff --git a/src/map.rs b/src/map.rs index c4a1226..dd32f5f 100644 --- a/src/map.rs +++ b/src/map.rs @@ -45,8 +45,8 @@ pub trait Mapper { /// #[tonic::async_trait] /// impl map::Mapper for Cat { /// async fn map(&self, input: map::MapRequest) -> Vec { - /// use numaflow::map::MessageBuilder; - /// let message=MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).build(); + /// use numaflow::map::Message; + /// let message=Message::new(input.value).keys(input.keys).tags(vec![]); /// vec![message] /// } /// } @@ -81,56 +81,51 @@ where pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - keys: Vec, + pub keys: Option>, /// Value is the value passed to the next vertex. - value: Vec, + pub value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - tags: Vec, + pub tags: Option>, } -#[derive(Default)] -pub struct MessageBuilder { - keys: Vec, - value: Vec, - tags: Vec, -} -impl MessageBuilder { - pub fn new() -> Self { - Default::default() +impl Message { + pub fn new(value:Vec) -> Self { + Self{ + value, + keys:None, + tags:None + } } pub fn message_to_drop(mut self) -> Self { - self.tags.push(DROP.parse().unwrap()); + if self.tags.is_none(){ + self.tags=Some(Vec::new()); + } + self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); self } pub fn keys(mut self, keys: Vec) -> Self { - self.keys = keys; + self.keys = Some(keys); self } pub fn tags(mut self, tags: Vec) -> Self { - self.tags = tags; + self.tags = Some(tags); self } - pub fn values(mut self, value: Vec) -> Self { + pub fn value(mut self, value: Vec) -> Self { self.value = value; self } - pub fn build(self) -> Message { - Message { - keys: self.keys, - value: self.value, - tags: self.tags, - } - } + } impl From for proto::map_response::Result { fn from(value: Message) -> Self { proto::map_response::Result { - keys: value.keys, + keys: value.keys.unwrap_or_default(), value: value.value, - tags: value.tags, + tags: value.tags.unwrap_or_default(), } } } @@ -247,7 +242,7 @@ impl Server { mod tests { use crate::map; use crate::map::proto::map_client::MapClient; - use crate::map::{Message, MessageBuilder}; + use crate::map::{Message}; use std::{error::Error, time::Duration}; use tempfile::TempDir; use tokio::sync::oneshot; @@ -261,9 +256,9 @@ mod tests { impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { vec![map::Message { - keys: input.keys, + keys: Some(input.keys), value: input.value, - tags: vec![], + tags: Some(vec![]), }] } } @@ -321,21 +316,5 @@ mod tests { Ok(()) } - // builder test - #[test] - fn builder_test() { - let message = Message { - tags: vec![], - keys: vec![], - value: vec![], - }; - let message_builder = MessageBuilder::new() - .keys(vec![]) - .values(vec![]) - .tags(vec![]) - .build(); - - assert_eq!(message, message_builder) - } } diff --git a/src/reduce.rs b/src/reduce.rs index e5589ed..ad790ee 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -97,7 +97,7 @@ pub trait Reducer { /// Ok(()) /// } /// mod counter { - /// use numaflow::reduce::{Message, MessageBuilder, ReduceRequest}; + /// use numaflow::reduce::{Message, ReduceRequest}; /// use numaflow::reduce::{Reducer, Metadata}; /// use tokio::sync::mpsc::Receiver; /// use tonic::async_trait; @@ -132,7 +132,7 @@ pub trait Reducer { /// while (input.recv().await).is_some() { /// counter += 1; /// } - /// let message=MessageBuilder::new().tags(vec![]).keys(keys.clone()).values(counter.to_string().into_bytes()).build(); + /// let message=Message::new(counter.to_string().into_bytes()).tags(vec![]).keys(keys.clone()).build(); /// vec![message] /// } /// } @@ -180,34 +180,37 @@ pub struct Metadata { pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. It is mainly used in creating a partition in [`Reducer::reduce`]. - keys: Vec, + pub keys: Option>, /// Value is the value passed to the next vertex. - value: Vec, + pub value: Vec, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - tags: Vec, + pub tags:Option>, } -#[derive(Default)] -pub struct MessageBuilder { - keys: Vec, - value: Vec, - tags: Vec, -} -impl MessageBuilder { - pub fn new() -> Self { - Default::default() + +impl Message { + pub fn new(value :Vec) -> Self { + Self{ + value, + keys:None, + tags:None + + } } pub fn message_to_drop(mut self) -> Self { - self.tags.push(DROP.parse().unwrap()); + if self.tags.is_none(){ + self.tags=Some(Vec::new()) + } + self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); self } pub fn keys(mut self, keys: Vec) -> Self { - self.keys = keys; + self.keys = Some(keys); self } pub fn tags(mut self, tags: Vec) -> Self { - self.tags = tags; + self.tags = Some(tags); self } @@ -215,13 +218,6 @@ impl MessageBuilder { self.value = value; self } - pub fn build(self) -> Message { - Message { - keys: self.keys, - value: self.value, - tags: self.tags, - } - } } /// Incoming request into the reducer handler of [`Reducer`]. @@ -335,9 +331,9 @@ where let mut datum_responses = vec![]; for message in messages { datum_responses.push(proto::reduce_response::Result { - keys: message.keys, + keys: message.keys.unwrap_or_default(), value: message.value, - tags: message.tags, + tags: message.tags.unwrap_or_default(), }); } // stream it out to the client diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 4b7c6f7..9bd2813 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -50,8 +50,8 @@ pub trait SourceTransformer { /// &self, /// input: sourcetransform::SourceTransformRequest, /// ) -> Vec { - /// use numaflow::sourcetransform::MessageBuilder; - /// let message=MessageBuilder::new().keys(input.keys).values(input.value).tags(vec![]).event_time(chrono::offset::Utc::now()).build(); + /// use numaflow::sourcetransform::Message; + /// let message=Message::new(input.value).keys(input.keys).tags(vec![]).event_time(chrono::offset::Utc::now()).build(); /// vec![message] /// } /// } @@ -64,38 +64,37 @@ pub trait SourceTransformer { pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - keys: Vec, + keys: Option>, /// Value is the value passed to the next vertex. value: Vec, /// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming /// event_time from the [`Datum`]. - event_time: DateTime, + event_time: Option>, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - tags: Vec, + tags: Option>, } -#[derive(Default)] -pub struct MessageBuilder { - keys: Vec, - value: Vec, - tags: Vec, - event_time: DateTime, -} -impl MessageBuilder { - pub fn new() -> Self { - Default::default() + +impl Message { + pub fn new(value:Vec) -> Self { + Self{ + value, + event_time:None, + keys:None, + tags:None + } } pub fn message_to_drop(mut self) -> Self { self.tags.push(DROP.parse().unwrap()); self } pub fn keys(mut self, keys: Vec) -> Self { - self.keys = keys; + self.keys = Some(keys); self } pub fn tags(mut self, tags: Vec) -> Self { - self.tags = tags; + self.tags = Some(tags); self } @@ -105,17 +104,10 @@ impl MessageBuilder { } pub fn event_time(mut self, event_time: DateTime) -> Self { - self.event_time = event_time; + self.event_time = Some(event_time); self } - pub fn build(self) -> Message { - Message { - keys: self.keys, - value: self.value, - tags: self.tags, - event_time: self.event_time, - } - } + } /// Incoming request to the Source Transformer. @@ -136,10 +128,10 @@ pub struct SourceTransformRequest { impl From for proto::source_transform_response::Result { fn from(value: Message) -> Self { proto::source_transform_response::Result { - keys: value.keys, + keys: value.keys.unwrap_or_default(), value: value.value, - event_time: prost_timestamp_from_utc(value.event_time), - tags: value.tags, + event_time: prost_timestamp_from_utc(value.event_time.unwrap_or_default()), + tags: value.tags.unwrap_or_default(), } } } @@ -288,10 +280,10 @@ mod tests { input: sourcetransform::SourceTransformRequest, ) -> Vec { vec![sourcetransform::Message { - keys: input.keys, + keys: Some(input.keys), value: input.value, - tags: vec![], - event_time: chrono::offset::Utc::now(), + tags: Some(vec![]), + event_time: Some(chrono::offset::Utc::now()), }] } } From 444104f5a3b49f0597a86394c1bda32f4392fc2f Mon Sep 17 00:00:00 2001 From: shubham Date: Wed, 29 May 2024 14:17:35 +0530 Subject: [PATCH 09/13] fixed tests Signed-off-by: shubham --- src/reduce.rs | 2 +- src/sourcetransform.rs | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/reduce.rs b/src/reduce.rs index ad790ee..dbd8c11 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -132,7 +132,7 @@ pub trait Reducer { /// while (input.recv().await).is_some() { /// counter += 1; /// } - /// let message=Message::new(counter.to_string().into_bytes()).tags(vec![]).keys(keys.clone()).build(); + /// let message=Message::new(counter.to_string().into_bytes()).tags(vec![]).keys(keys.clone()); /// vec![message] /// } /// } diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 9bd2813..a19fc9b 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -51,7 +51,7 @@ pub trait SourceTransformer { /// input: sourcetransform::SourceTransformRequest, /// ) -> Vec { /// use numaflow::sourcetransform::Message; - /// let message=Message::new(input.value).keys(input.keys).tags(vec![]).event_time(chrono::offset::Utc::now()).build(); + /// let message=Message::new(input.value).keys(input.keys).tags(vec![]).event_time(chrono::offset::Utc::now()); /// vec![message] /// } /// } @@ -85,7 +85,10 @@ impl Message { } } pub fn message_to_drop(mut self) -> Self { - self.tags.push(DROP.parse().unwrap()); + if self.tags.is_none(){ + self.tags=Some(Vec::new()) + } + self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); self } pub fn keys(mut self, keys: Vec) -> Self { From 90aea4d2409b93ee2a7245103813246c642ac82d Mon Sep 17 00:00:00 2001 From: shubham Date: Wed, 29 May 2024 15:57:41 +0530 Subject: [PATCH 10/13] Added docs for the message Signed-off-by: shubham --- src/map.rs | 63 ++++++++++++++++++++++++++++++++- src/reduce.rs | 67 +++++++++++++++++++++++++++++++++-- src/sourcetransform.rs | 80 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 205 insertions(+), 5 deletions(-) diff --git a/src/map.rs b/src/map.rs index dd32f5f..68ec358 100644 --- a/src/map.rs +++ b/src/map.rs @@ -87,8 +87,22 @@ pub struct Message { /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). pub tags: Option>, } - +/// Represents a message that can be modified and forwarded. impl Message { + /// Creates a new message with the specified value. + /// + /// This constructor initializes the message with no keys, tags, or specific event time. + /// + /// # Arguments + /// + /// * `value` - A vector of bytes representing the message's payload. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3, 4]); + /// ``` pub fn new(value:Vec) -> Self { Self{ value, @@ -96,6 +110,16 @@ impl Message { tags:None } } + /// Marks the message to be dropped by adding a special "DROP" tag. + /// + /// This method guarantees that the tags vector is initialized if it was previously `None`. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let dropped_message = Message::new(vec![1, 2, 3]).message_to_drop(); + /// ``` pub fn message_to_drop(mut self) -> Self { if self.tags.is_none(){ self.tags=Some(Vec::new()); @@ -103,16 +127,53 @@ impl Message { self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); self } + + /// Sets or replaces the keys associated with this message. + /// + /// # Arguments + /// + /// * `keys` - A vector of strings representing the keys. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// ``` pub fn keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } + /// Sets or replaces the tags associated with this message. + /// + /// # Arguments + /// + /// * `tags` - A vector of strings representing the tags. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// ``` pub fn tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } + /// Replaces the value of the message. + /// + /// # Arguments + /// + /// * `value` - A new vector of bytes that replaces the current message value. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// ``` pub fn value(mut self, value: Vec) -> Self { self.value = value; self diff --git a/src/reduce.rs b/src/reduce.rs index dbd8c11..73cd106 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -187,8 +187,22 @@ pub struct Message { pub tags:Option>, } - +/// Represents a message that can be modified and forwarded. impl Message { + /// Creates a new message with the specified value. + /// + /// This constructor initializes the message with no keys, tags, or specific event time. + /// + /// # Arguments + /// + /// * `value` - A vector of bytes representing the message's payload. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3, 4]); + /// ``` pub fn new(value :Vec) -> Self { Self{ value, @@ -197,6 +211,16 @@ impl Message { } } + /// Marks the message to be dropped by adding a special "DROP" tag. + /// + /// This method guarantees that the tags vector is initialized if it was previously `None`. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let dropped_message = Message::new(vec![1, 2, 3]).message_to_drop(); + /// ``` pub fn message_to_drop(mut self) -> Self { if self.tags.is_none(){ self.tags=Some(Vec::new()) @@ -204,17 +228,56 @@ impl Message { self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); self } + + /// Sets or replaces the keys associated with this message. + /// + /// # Arguments + /// + /// * `keys` - A vector of strings representing the keys. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// ``` pub fn keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } + /// Sets or replaces the tags associated with this message. + /// + /// # Arguments + /// + /// * `tags` - A vector of strings representing the tags. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// ``` + + pub fn tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } - pub fn values(mut self, value: Vec) -> Self { + /// Replaces the value of the message. + /// + /// # Arguments + /// + /// * `value` - A new vector of bytes that replaces the current message value. + /// + /// # Examples + /// + /// ``` + /// use numaflow::reduce::Message; + /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// ``` + pub fn value(mut self, value: Vec) -> Self { self.value = value; self } diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index a19fc9b..523f3ad 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -74,8 +74,22 @@ pub struct Message { tags: Option>, } - +/// Represents a message that can be modified and forwarded. impl Message { + /// Creates a new message with the specified value. + /// + /// This constructor initializes the message with no keys, tags, or specific event time. + /// + /// # Arguments + /// + /// * `value` - A vector of bytes representing the message's payload. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// let message = Message::new(vec![1, 2, 3, 4]); + /// ``` pub fn new(value:Vec) -> Self { Self{ value, @@ -84,6 +98,16 @@ impl Message { tags:None } } + /// Marks the message to be dropped by adding a special "DROP" tag. + /// + /// This method guarantees that the tags vector is initialized if it was previously `None`. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// let dropped_message = Message::new(vec![1, 2, 3]).message_to_drop(); + /// ``` pub fn message_to_drop(mut self) -> Self { if self.tags.is_none(){ self.tags=Some(Vec::new()) @@ -91,21 +115,73 @@ impl Message { self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); self } + + /// Sets or replaces the keys associated with this message. + /// + /// # Arguments + /// + /// * `keys` - A vector of strings representing the keys. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// ``` pub fn keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } + /// Sets or replaces the tags associated with this message. + /// + /// # Arguments + /// + /// * `tags` - A vector of strings representing the tags. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// ``` pub fn tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } - pub fn values(mut self, value: Vec) -> Self { + /// Replaces the value of the message. + /// + /// # Arguments + /// + /// * `value` - A new vector of bytes that replaces the current message value. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// ``` + pub fn value(mut self, value: Vec) -> Self { self.value = value; self } + /// Sets the event time for the message. + /// + /// # Arguments + /// + /// * `event_time` - The `DateTime` that specifies when the event occurred. + /// + /// # Examples + /// + /// ``` + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3]).event_time(now); + /// ``` + pub fn event_time(mut self, event_time: DateTime) -> Self { self.event_time = Some(event_time); self From c56d9abe42c02a8c3ae70a54bcc0902b1245d223 Mon Sep 17 00:00:00 2001 From: shubham Date: Wed, 29 May 2024 16:00:22 +0530 Subject: [PATCH 11/13] made struct values public Signed-off-by: shubham --- src/sourcetransform.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 523f3ad..6842989 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -64,14 +64,14 @@ pub trait SourceTransformer { pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can /// be an empty collection. - keys: Option>, + pub keys: Option>, /// Value is the value passed to the next vertex. - value: Vec, + pub value: Vec, /// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming /// event_time from the [`Datum`]. - event_time: Option>, + pub event_time: Option>, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). - tags: Option>, + pub tags: Option>, } /// Represents a message that can be modified and forwarded. From 719aa4ab5c0c8c28546ac0349c5a2d64c50c3680 Mon Sep 17 00:00:00 2001 From: shubham Date: Thu, 30 May 2024 12:53:36 +0530 Subject: [PATCH 12/13] made message_to drop standalone Signed-off-by: shubham --- src/map.rs | 20 +++++++++++--------- src/reduce.rs | 15 ++++++++------- src/sourcetransform.rs | 16 +++++++++------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/map.rs b/src/map.rs index 68ec358..dd085e2 100644 --- a/src/map.rs +++ b/src/map.rs @@ -45,7 +45,7 @@ pub trait Mapper { /// #[tonic::async_trait] /// impl map::Mapper for Cat { /// async fn map(&self, input: map::MapRequest) -> Vec { - /// use numaflow::map::Message; + /// use numaflow::map::Message; /// let message=Message::new(input.value).keys(input.keys).tags(vec![]); /// vec![message] /// } @@ -112,22 +112,24 @@ impl Message { } /// Marks the message to be dropped by adding a special "DROP" tag. /// - /// This method guarantees that the tags vector is initialized if it was previously `None`. + /// This function guarantees that the tags vector is initialized if it was previously `None`. /// /// # Examples /// /// ``` /// use numaflow::map::Message; - /// let dropped_message = Message::new(vec![1, 2, 3]).message_to_drop(); + /// let mut message = Message::new(vec![1, 2, 3]); + /// let dropped_message = Message::message_to_drop(message); /// ``` - pub fn message_to_drop(mut self) -> Self { - if self.tags.is_none(){ - self.tags=Some(Vec::new()); + pub fn message_to_drop(mut message: Message) -> Message { + if message.tags.is_none() { + message.tags = Some(Vec::new()); } - self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); - self + message.tags.as_mut().unwrap().push(DROP.parse().unwrap()); + message } + /// Sets or replaces the keys associated with this message. /// /// # Arguments @@ -154,7 +156,7 @@ impl Message { /// # Examples /// /// ``` - /// use numaflow::map::Message; + /// use numaflow::map::Message; /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` pub fn tags(mut self, tags: Vec) -> Self { diff --git a/src/reduce.rs b/src/reduce.rs index 73cd106..6f50a3c 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -213,20 +213,21 @@ impl Message { } /// Marks the message to be dropped by adding a special "DROP" tag. /// - /// This method guarantees that the tags vector is initialized if it was previously `None`. + /// This function guarantees that the tags vector is initialized if it was previously `None`. /// /// # Examples /// /// ``` /// use numaflow::reduce::Message; - /// let dropped_message = Message::new(vec![1, 2, 3]).message_to_drop(); + /// let mut message = Message::new(vec![1, 2, 3]); + /// let dropped_message = Message::message_to_drop(message); /// ``` - pub fn message_to_drop(mut self) -> Self { - if self.tags.is_none(){ - self.tags=Some(Vec::new()) + pub fn message_to_drop(mut message:Message) -> Message { + if message.tags.is_none() { + message.tags = Some(Vec::new()); } - self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); - self + message.tags.as_mut().unwrap().push(DROP.parse().unwrap()); + message } /// Sets or replaces the keys associated with this message. diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 6842989..f742c01 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -100,22 +100,24 @@ impl Message { } /// Marks the message to be dropped by adding a special "DROP" tag. /// - /// This method guarantees that the tags vector is initialized if it was previously `None`. + /// This function guarantees that the tags vector is initialized if it was previously `None`. /// /// # Examples /// /// ``` /// use numaflow::sourcetransform::Message; - /// let dropped_message = Message::new(vec![1, 2, 3]).message_to_drop(); + /// let mut message = Message::new(vec![1, 2, 3]); + /// let dropped_message = Message::message_to_drop(message); /// ``` - pub fn message_to_drop(mut self) -> Self { - if self.tags.is_none(){ - self.tags=Some(Vec::new()) + pub fn message_to_drop(mut message:Message) -> Message { + if message.tags.is_none() { + message.tags = Some(Vec::new()); } - self.tags.as_mut().unwrap().push(DROP.parse().unwrap()); - self + message.tags.as_mut().unwrap().push(DROP.parse().unwrap()); + message } + /// Sets or replaces the keys associated with this message. /// /// # Arguments From c6ffade8dfd15fe539478fe997016ff71d59c546 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 30 May 2024 13:18:08 +0530 Subject: [PATCH 13/13] minor changes Signed-off-by: Yashash H L --- examples/source-transformer-now/src/main.rs | 5 +- src/map.rs | 17 ++-- src/reduce.rs | 17 ++-- src/sourcetransform.rs | 91 ++++++++++----------- 4 files changed, 58 insertions(+), 72 deletions(-) diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index fdce790..5c4b7ea 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -16,10 +16,9 @@ impl sourcetransform::SourceTransformer for NowCat { &self, input: sourcetransform::SourceTransformRequest, ) -> Vec { - let message = sourcetransform::Message::new(input.value) + let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now()) .keys(input.keys) - .tags(vec![]) - .event_time(chrono::offset::Utc::now()); + .tags(vec![]); vec![message] } } diff --git a/src/map.rs b/src/map.rs index dd085e2..20770c0 100644 --- a/src/map.rs +++ b/src/map.rs @@ -110,23 +110,20 @@ impl Message { tags:None } } - /// Marks the message to be dropped by adding a special "DROP" tag. - /// - /// This function guarantees that the tags vector is initialized if it was previously `None`. + /// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag. /// /// # Examples /// /// ``` /// use numaflow::map::Message; - /// let mut message = Message::new(vec![1, 2, 3]); - /// let dropped_message = Message::message_to_drop(message); + /// let dropped_message = Message::message_to_drop(); /// ``` - pub fn message_to_drop(mut message: Message) -> Message { - if message.tags.is_none() { - message.tags = Some(Vec::new()); + pub fn message_to_drop() -> Message { + Message { + keys: None, + value: vec![], + tags: Some(vec![DROP.to_string()]), } - message.tags.as_mut().unwrap().push(DROP.parse().unwrap()); - message } diff --git a/src/reduce.rs b/src/reduce.rs index 6f50a3c..1f2985a 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -211,23 +211,20 @@ impl Message { } } - /// Marks the message to be dropped by adding a special "DROP" tag. - /// - /// This function guarantees that the tags vector is initialized if it was previously `None`. + /// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag. /// /// # Examples /// /// ``` /// use numaflow::reduce::Message; - /// let mut message = Message::new(vec![1, 2, 3]); - /// let dropped_message = Message::message_to_drop(message); + /// let dropped_message = Message::message_to_drop(); /// ``` - pub fn message_to_drop(mut message:Message) -> Message { - if message.tags.is_none() { - message.tags = Some(Vec::new()); + pub fn message_to_drop() -> crate::map::Message { + crate::map::Message { + keys: None, + value: vec![], + tags: Some(vec![DROP.to_string()]), } - message.tags.as_mut().unwrap().push(DROP.parse().unwrap()); - message } /// Sets or replaces the keys associated with this message. diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index f742c01..87fa9c4 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -51,7 +51,7 @@ pub trait SourceTransformer { /// input: sourcetransform::SourceTransformRequest, /// ) -> Vec { /// use numaflow::sourcetransform::Message; - /// let message=Message::new(input.value).keys(input.keys).tags(vec![]).event_time(chrono::offset::Utc::now()); + /// let message=Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys).tags(vec![]); /// vec![message] /// } /// } @@ -69,52 +69,60 @@ pub struct Message { pub value: Vec, /// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming /// event_time from the [`Datum`]. - pub event_time: Option>, + pub event_time: DateTime, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). pub tags: Option>, } /// Represents a message that can be modified and forwarded. impl Message { - /// Creates a new message with the specified value. + /// Creates a new message with the specified value and event time. /// - /// This constructor initializes the message with no keys, tags, or specific event time. + /// This constructor initializes the message with no keys, tags. /// /// # Arguments /// /// * `value` - A vector of bytes representing the message's payload. + /// * `event_time` - The `DateTime` that specifies when the event occurred. /// /// # Examples /// /// ``` /// use numaflow::sourcetransform::Message; - /// let message = Message::new(vec![1, 2, 3, 4]); + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3, 4], now); /// ``` - pub fn new(value:Vec) -> Self { - Self{ - value, - event_time:None, - keys:None, - tags:None - } + pub fn new(value: Vec, event_time: DateTime) -> Self { + Self { + value, + event_time, + keys: None, + tags: None, + } } - /// Marks the message to be dropped by adding a special "DROP" tag. + /// Marks the message to be dropped by creating a new `Message` with an empty value, a special "DROP" tag, and the specified event time. /// - /// This function guarantees that the tags vector is initialized if it was previously `None`. + /// # Arguments + /// + /// * `event_time` - The `DateTime` that specifies when the event occurred. Event time is required because, even though a message is dropped, + /// it is still considered as being processed, hence the watermark should be updated accordingly using the provided event time. /// /// # Examples /// /// ``` /// use numaflow::sourcetransform::Message; - /// let mut message = Message::new(vec![1, 2, 3]); - /// let dropped_message = Message::message_to_drop(message); + /// use chrono::Utc; + /// let now = Utc::now(); + /// let dropped_message = Message::message_to_drop(now); /// ``` - pub fn message_to_drop(mut message:Message) -> Message { - if message.tags.is_none() { - message.tags = Some(Vec::new()); + pub fn message_to_drop(event_time: DateTime) -> Message { + Message { + keys: None, + value: vec![], + event_time, + tags: Some(vec![DROP.to_string()]), } - message.tags.as_mut().unwrap().push(DROP.parse().unwrap()); - message } @@ -127,8 +135,10 @@ impl Message { /// # Examples /// /// ``` - /// use numaflow::sourcetransform::Message; - /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3], now).keys(vec!["key1".to_string(), "key2".to_string()]); /// ``` pub fn keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); @@ -143,8 +153,10 @@ impl Message { /// # Examples /// /// ``` - /// use numaflow::sourcetransform::Message; - /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// use numaflow::sourcetransform::Message; + /// use chrono::Utc; + /// let now = Utc::now(); + /// let message = Message::new(vec![1, 2, 3], now).tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` pub fn tags(mut self, tags: Vec) -> Self { @@ -162,33 +174,14 @@ impl Message { /// /// ``` /// use numaflow::sourcetransform::Message; - /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); - /// ``` - pub fn value(mut self, value: Vec) -> Self { - self.value = value; - self - } - - /// Sets the event time for the message. - /// - /// # Arguments - /// - /// * `event_time` - The `DateTime` that specifies when the event occurred. - /// - /// # Examples - /// - /// ``` - /// use numaflow::sourcetransform::Message; /// use chrono::Utc; /// let now = Utc::now(); - /// let message = Message::new(vec![1, 2, 3]).event_time(now); + /// let message = Message::new(vec![1, 2, 3], now).value(vec![4, 5, 6]); /// ``` - - pub fn event_time(mut self, event_time: DateTime) -> Self { - self.event_time = Some(event_time); + pub fn value(mut self, value: Vec) -> Self { + self.value = value; self } - } /// Incoming request to the Source Transformer. @@ -211,7 +204,7 @@ impl From for proto::source_transform_response::Result { proto::source_transform_response::Result { keys: value.keys.unwrap_or_default(), value: value.value, - event_time: prost_timestamp_from_utc(value.event_time.unwrap_or_default()), + event_time: prost_timestamp_from_utc(value.event_time), tags: value.tags.unwrap_or_default(), } } @@ -364,7 +357,7 @@ mod tests { keys: Some(input.keys), value: input.value, tags: Some(vec![]), - event_time: Some(chrono::offset::Utc::now()), + event_time: chrono::offset::Utc::now(), }] } }