Skip to content

Commit

Permalink
Merge pull request #1 from numaproj/minor
Browse files Browse the repository at this point in the history
chore: minor changes
  • Loading branch information
shubhamdixit863 authored May 30, 2024
2 parents 719aa4a + c6ffade commit 3611e51
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 72 deletions.
5 changes: 2 additions & 3 deletions examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ impl sourcetransform::SourceTransformer for NowCat {
&self,
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
let message = sourcetransform::Message::new(input.value)
let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys)
.tags(vec![])
.event_time(chrono::offset::Utc::now());
.tags(vec![]);
vec![message]
}
}
17 changes: 7 additions & 10 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,20 @@ impl Message {
tags:None
}
}
/// Marks the message to be dropped by adding a special "DROP" tag.
///
/// This function guarantees that the tags vector is initialized if it was previously `None`.
/// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let mut message = Message::new(vec![1, 2, 3]);
/// let dropped_message = Message::message_to_drop(message);
/// let dropped_message = Message::message_to_drop();
/// ```
pub fn message_to_drop(mut message: Message) -> Message {
if message.tags.is_none() {
message.tags = Some(Vec::new());
pub fn message_to_drop() -> Message {
Message {
keys: None,
value: vec![],
tags: Some(vec![DROP.to_string()]),
}
message.tags.as_mut().unwrap().push(DROP.parse().unwrap());
message
}


Expand Down
17 changes: 7 additions & 10 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,23 +211,20 @@ impl Message {

}
}
/// Marks the message to be dropped by adding a special "DROP" tag.
///
/// This function guarantees that the tags vector is initialized if it was previously `None`.
/// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag.
///
/// # Examples
///
/// ```
/// use numaflow::reduce::Message;
/// let mut message = Message::new(vec![1, 2, 3]);
/// let dropped_message = Message::message_to_drop(message);
/// let dropped_message = Message::message_to_drop();
/// ```
pub fn message_to_drop(mut message:Message) -> Message {
if message.tags.is_none() {
message.tags = Some(Vec::new());
pub fn message_to_drop() -> crate::map::Message {
crate::map::Message {
keys: None,
value: vec![],
tags: Some(vec![DROP.to_string()]),
}
message.tags.as_mut().unwrap().push(DROP.parse().unwrap());
message
}

/// Sets or replaces the keys associated with this message.
Expand Down
91 changes: 42 additions & 49 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub trait SourceTransformer {
/// input: sourcetransform::SourceTransformRequest,
/// ) -> Vec<sourcetransform::Message> {
/// use numaflow::sourcetransform::Message;
/// let message=Message::new(input.value).keys(input.keys).tags(vec![]).event_time(chrono::offset::Utc::now());
/// let message=Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys).tags(vec![]);
/// vec![message]
/// }
/// }
Expand All @@ -69,52 +69,60 @@ pub struct Message {
pub value: Vec<u8>,
/// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming
/// event_time from the [`Datum`].
pub event_time: Option<DateTime<Utc>>,
pub event_time: DateTime<Utc>,
/// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/).
pub tags: Option<Vec<String>>,
}

/// Represents a message that can be modified and forwarded.
impl Message {
/// Creates a new message with the specified value.
/// Creates a new message with the specified value and event time.
///
/// This constructor initializes the message with no keys, tags, or specific event time.
/// This constructor initializes the message with no keys, tags.
///
/// # Arguments
///
/// * `value` - A vector of bytes representing the message's payload.
/// * `event_time` - The `DateTime<Utc>` that specifies when the event occurred.
///
/// # Examples
///
/// ```
/// use numaflow::sourcetransform::Message;
/// let message = Message::new(vec![1, 2, 3, 4]);
/// use chrono::Utc;
/// let now = Utc::now();
/// let message = Message::new(vec![1, 2, 3, 4], now);
/// ```
pub fn new(value:Vec<u8>) -> Self {
Self{
value,
event_time:None,
keys:None,
tags:None
}
pub fn new(value: Vec<u8>, event_time: DateTime<Utc>) -> Self {
Self {
value,
event_time,
keys: None,
tags: None,
}
}
/// Marks the message to be dropped by adding a special "DROP" tag.
/// Marks the message to be dropped by creating a new `Message` with an empty value, a special "DROP" tag, and the specified event time.
///
/// This function guarantees that the tags vector is initialized if it was previously `None`.
/// # Arguments
///
/// * `event_time` - The `DateTime<Utc>` that specifies when the event occurred. Event time is required because, even though a message is dropped,
/// it is still considered as being processed, hence the watermark should be updated accordingly using the provided event time.
///
/// # Examples
///
/// ```
/// use numaflow::sourcetransform::Message;
/// let mut message = Message::new(vec![1, 2, 3]);
/// let dropped_message = Message::message_to_drop(message);
/// use chrono::Utc;
/// let now = Utc::now();
/// let dropped_message = Message::message_to_drop(now);
/// ```
pub fn message_to_drop(mut message:Message) -> Message {
if message.tags.is_none() {
message.tags = Some(Vec::new());
pub fn message_to_drop(event_time: DateTime<Utc>) -> Message {
Message {
keys: None,
value: vec![],
event_time,
tags: Some(vec![DROP.to_string()]),
}
message.tags.as_mut().unwrap().push(DROP.parse().unwrap());
message
}


Expand All @@ -127,8 +135,10 @@ impl Message {
/// # Examples
///
/// ```
/// use numaflow::sourcetransform::Message;
/// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]);
/// use numaflow::sourcetransform::Message;
/// use chrono::Utc;
/// let now = Utc::now();
/// let message = Message::new(vec![1, 2, 3], now).keys(vec!["key1".to_string(), "key2".to_string()]);
/// ```
pub fn keys(mut self, keys: Vec<String>) -> Self {
self.keys = Some(keys);
Expand All @@ -143,8 +153,10 @@ impl Message {
/// # Examples
///
/// ```
/// use numaflow::sourcetransform::Message;
/// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// use numaflow::sourcetransform::Message;
/// use chrono::Utc;
/// let now = Utc::now();
/// let message = Message::new(vec![1, 2, 3], now).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
Expand All @@ -162,33 +174,14 @@ impl Message {
///
/// ```
/// use numaflow::sourcetransform::Message;
/// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]);
/// ```
pub fn value(mut self, value: Vec<u8>) -> Self {
self.value = value;
self
}

/// Sets the event time for the message.
///
/// # Arguments
///
/// * `event_time` - The `DateTime<Utc>` that specifies when the event occurred.
///
/// # Examples
///
/// ```
/// use numaflow::sourcetransform::Message;
/// use chrono::Utc;
/// let now = Utc::now();
/// let message = Message::new(vec![1, 2, 3]).event_time(now);
/// let message = Message::new(vec![1, 2, 3], now).value(vec![4, 5, 6]);
/// ```
pub fn event_time(mut self, event_time: DateTime<Utc>) -> Self {
self.event_time = Some(event_time);
pub fn value(mut self, value: Vec<u8>) -> Self {
self.value = value;
self
}

}

/// Incoming request to the Source Transformer.
Expand All @@ -211,7 +204,7 @@ impl From<Message> for proto::source_transform_response::Result {
proto::source_transform_response::Result {
keys: value.keys.unwrap_or_default(),
value: value.value,
event_time: prost_timestamp_from_utc(value.event_time.unwrap_or_default()),
event_time: prost_timestamp_from_utc(value.event_time),
tags: value.tags.unwrap_or_default(),
}
}
Expand Down Expand Up @@ -364,7 +357,7 @@ mod tests {
keys: Some(input.keys),
value: input.value,
tags: Some(vec![]),
event_time: Some(chrono::offset::Utc::now()),
event_time: chrono::offset::Utc::now(),
}]
}
}
Expand Down

0 comments on commit 3611e51

Please sign in to comment.