Skip to content

Commit

Permalink
feat: Message To Drop Semantics (#47)
Browse files Browse the repository at this point in the history
Signed-off-by: shubham <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
  • Loading branch information
shubhamdixit863 and yhl25 authored May 30, 2024
1 parent e993a18 commit 293ccf6
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 71 deletions.
9 changes: 4 additions & 5 deletions examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ struct Cat;
#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
vec![map::Message {
keys: input.keys,
value: input.value,
tags: vec![],
}]
let message = map::Message::new(input.value)
.keys(input.keys)
.tags(vec![]);
vec![message]
}
}
12 changes: 7 additions & 5 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use numaflow::map;
use numaflow::map::Message;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Expand Down Expand Up @@ -38,14 +39,15 @@ impl map::Mapper for TickGen {
let ts = Utc
.timestamp_nanos(payload.created_ts)
.to_rfc3339_opts(SecondsFormat::Nanos, true);
vec![map::Message {
keys: input.keys,
value: serde_json::to_vec(&ResultPayload {
let message = map::Message::new(
serde_json::to_vec(&ResultPayload {
value: payload.data.value,
time: ts,
})
.unwrap_or_default(),
tags: vec![],
}]
)
.keys(input.keys)
.tags(vec![]);
vec![message]
}
}
9 changes: 4 additions & 5 deletions examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ mod counter {
md.start_time(),
md.end_time()
);
vec![Message {
keys: keys.clone(),
value: counter.to_string().into_bytes(),
tags: vec![],
}]
let message = reduce::Message::new(counter.to_string().into_bytes())
.keys(keys.clone())
.tags(vec![]);
vec![message]
}
}
}
10 changes: 4 additions & 6 deletions examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ impl sourcetransform::SourceTransformer for NowCat {
&self,
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
vec![sourcetransform::Message {
keys: input.keys,
value: input.value,
event_time: chrono::offset::Utc::now(),
tags: vec![],
}]
let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys)
.tags(vec![]);
vec![message]
}
}
125 changes: 109 additions & 16 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::future::Future;
use std::path::PathBuf;

use chrono::{DateTime, Utc};
use serde_json::Value;
use tonic::{async_trait, Request, Response, Status};

use crate::shared;


const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";

const DROP: &str = "U+005C__DROP__";
/// Numaflow Map Proto definitions.
pub mod proto {
tonic::include_proto!("map.v1");
Expand Down Expand Up @@ -45,11 +45,9 @@ pub trait Mapper {
/// #[tonic::async_trait]
/// impl map::Mapper for Cat {
/// async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
/// vec![map::Message {
/// keys: input.keys,
/// value: input.value,
/// tags: vec![],
/// }]
/// use numaflow::map::Message;
/// let message=Message::new(input.value).keys(input.keys).tags(vec![]);
/// vec![message]
/// }
/// }
/// ```
Expand Down Expand Up @@ -79,22 +77,115 @@ where
}

/// Message is the response struct from the [`Mapper::map`] .
#[derive(Debug, PartialEq)]
pub struct Message {
/// Keys are a collection of strings which will be passed on to the next vertex as is. It can
/// be an empty collection.
pub keys: Vec<String>,
pub keys: Option<Vec<String>>,
/// Value is the value passed to the next vertex.
pub value: Vec<u8>,
/// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/).
pub tags: Vec<String>,
pub tags: Option<Vec<String>>,
}
/// Represents a message that can be modified and forwarded.
impl Message {
/// Creates a new message with the specified value.
///
/// This constructor initializes the message with no keys, tags, or specific event time.
///
/// # Arguments
///
/// * `value` - A vector of bytes representing the message's payload.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3, 4]);
/// ```
pub fn new(value:Vec<u8>) -> Self {
Self{
value,
keys:None,
tags:None
}
}
/// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let dropped_message = Message::message_to_drop();
/// ```
pub fn message_to_drop() -> Message {
Message {
keys: None,
value: vec![],
tags: Some(vec![DROP.to_string()]),
}
}


/// Sets or replaces the keys associated with this message.
///
/// # Arguments
///
/// * `keys` - A vector of strings representing the keys.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]);
/// ```
pub fn keys(mut self, keys: Vec<String>) -> Self {
self.keys = Some(keys);
self
}

/// Sets or replaces the tags associated with this message.
///
/// # Arguments
///
/// * `tags` - A vector of strings representing the tags.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]);
/// ```
pub fn tags(mut self, tags: Vec<String>) -> Self {
self.tags = Some(tags);
self
}

/// Replaces the value of the message.
///
/// # Arguments
///
/// * `value` - A new vector of bytes that replaces the current message value.
///
/// # Examples
///
/// ```
/// use numaflow::map::Message;
/// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]);
/// ```
pub fn value(mut self, value: Vec<u8>) -> Self {
self.value = value;
self
}

}

impl From<Message> for proto::map_response::Result {
fn from(value: Message) -> Self {
proto::map_response::Result {
keys: value.keys,
keys: value.keys.unwrap_or_default(),
value: value.value,
tags: value.tags,
tags: value.tags.unwrap_or_default(),
}
}
}
Expand Down Expand Up @@ -209,14 +300,14 @@ impl<T> Server<T> {

#[cfg(test)]
mod tests {
use std::{error::Error, time::Duration};
use tower::service_fn;

use crate::map;
use crate::map::proto::map_client::MapClient;
use crate::map::{Message};
use std::{error::Error, time::Duration};
use tempfile::TempDir;
use tokio::sync::oneshot;
use tonic::transport::Uri;
use tower::service_fn;

#[tokio::test]
async fn map_server() -> Result<(), Box<dyn Error>> {
Expand All @@ -225,9 +316,9 @@ mod tests {
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
vec![map::Message {
keys: input.keys,
keys: Some(input.keys),
value: input.value,
tags: vec![],
tags: Some(vec![]),
}]
}
}
Expand Down Expand Up @@ -284,4 +375,6 @@ mod tests {
assert!(task.is_finished(), "gRPC server is still running");
Ok(())
}


}
Loading

0 comments on commit 293ccf6

Please sign in to comment.