From b5c89d5dcf5b25fe75cb4e169c52463b2f1a43d3 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 19 Dec 2024 18:27:09 +0530 Subject: [PATCH 01/10] feat: Map Streamer Signed-off-by: Yashash H L --- numaflow/src/lib.rs | 3 + numaflow/src/map.rs | 4 + numaflow/src/mapstream.rs | 859 ++++++++++++++++++++++++++++++++++++++ numaflow/src/shared.rs | 1 + 4 files changed, 867 insertions(+) create mode 100644 numaflow/src/mapstream.rs diff --git a/numaflow/src/lib.rs b/numaflow/src/lib.rs index 9948098..f0220dc 100644 --- a/numaflow/src/lib.rs +++ b/numaflow/src/lib.rs @@ -37,6 +37,9 @@ pub mod sideinput; /// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers. pub mod batchmap; +/// mapstream is for writing the [mapstream](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/mapstream/) handlers. +pub mod mapstream; + mod servers; // Error handling on Numaflow SDKs! diff --git a/numaflow/src/map.rs b/numaflow/src/map.rs index c9dd0c0..8a28b3e 100644 --- a/numaflow/src/map.rs +++ b/numaflow/src/map.rs @@ -538,6 +538,7 @@ impl Drop for Server { // same address. UnixListener doesn't implement Drop trait, so we have to manually remove the socket file. fn drop(&mut self) { let _ = fs::remove_file(&self.sock_addr); + let _ = fs::remove_file(&self.server_info_file); } } @@ -830,6 +831,9 @@ mod tests { }; tx.send(request).await.unwrap(); + let resp = stream.message().await; + assert!(resp.is_err()); + // server should shut down gracefully because there was a panic in the handler. for _ in 0..10 { tokio::time::sleep(Duration::from_millis(10)).await; diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs new file mode 100644 index 0000000..45e1f2e --- /dev/null +++ b/numaflow/src/mapstream.rs @@ -0,0 +1,859 @@ +use crate::servers::map as proto; +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::fs; +use std::path::PathBuf; +use tokio::sync::mpsc::Sender; + +use crate::error::{Error, ErrorKind}; +use crate::shared::{self, shutdown_signal, ContainerType}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::CancellationToken; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{error, info}; + +const DEFAULT_CHANNEL_SIZE: usize = 1000; +const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; +const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/mapstream.sock"; +const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; + +const DROP: &str = "U+005C__DROP__"; + +/// MapStreamer trait for implementing MapStream handler. +#[async_trait] +pub trait MapStreamer { + /// The `map_stream` function processes each incoming message and streams the result back using a channel. + /// + /// # Arguments + /// + /// * `input` - The input request containing keys, value, event time, watermark, and headers. + /// * `tx` - The channel to send the resulting messages. + /// + /// # Example + /// + /// ```no_run + /// use tokio::sync::mpsc; + /// use tonic::async_trait; + /// use std::collections::HashMap; + /// use chrono::{DateTime, Utc}; + /// use tokio::sync::mpsc::Sender; + /// use numaflow::mapstream::{MapStreamRequest, MapStreamer, Message}; + /// + /// struct ExampleMapStreamer; + /// + /// #[async_trait] + /// impl MapStreamer for ExampleMapStreamer { + /// async fn map_stream(&self, input: MapStreamRequest, tx: Sender) { + /// let payload_str = String::from_utf8(input.value).unwrap_or_default(); + /// let splits: Vec<&str> = payload_str.split(',').collect(); + /// + /// for split in splits { + /// let message = Message::new(split.as_bytes().to_vec()) + /// .keys(input.keys.clone()) + /// .tags(vec![]); + /// if tx.send(message).await.is_err() { + /// break; + /// } + /// } + /// } + /// } + /// ``` + async fn map_stream(&self, input: MapStreamRequest, tx: Sender); +} + +/// Message is the response struct from the [`MapStreamer::map_stream`] . +#[derive(Debug, PartialEq)] +pub struct Message { + /// Keys are a collection of strings which will be passed on to the next vertex as is. It can + /// be an empty collection. + pub keys: Option>, + /// Value is the value passed to the next vertex. + pub value: Vec, + /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). + pub tags: Option>, +} + +/// Represents a message that can be modified and forwarded. +impl Message { + /// Creates a new message with the specified value. + /// + /// This constructor initializes the message with no keys, tags, or specific event time. + /// + /// # Arguments + /// + /// * `value` - A vector of bytes representing the message's payload. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3, 4]); + /// ``` + pub fn new(value: Vec) -> Self { + Self { + value, + keys: None, + tags: None, + } + } + /// Marks the message to be dropped by creating a new `Message` with an empty value and a special "DROP" tag. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let dropped_message = Message::message_to_drop(); + /// ``` + pub fn message_to_drop() -> Message { + Message { + keys: None, + value: vec![], + tags: Some(vec![DROP.to_string()]), + } + } + + /// Sets or replaces the keys associated with this message. + /// + /// # Arguments + /// + /// * `keys` - A vector of strings representing the keys. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// ``` + pub fn keys(mut self, keys: Vec) -> Self { + self.keys = Some(keys); + self + } + + /// Sets or replaces the tags associated with this message. + /// + /// # Arguments + /// + /// * `tags` - A vector of strings representing the tags. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// ``` + pub fn tags(mut self, tags: Vec) -> Self { + self.tags = Some(tags); + self + } + + /// Replaces the value of the message. + /// + /// # Arguments + /// + /// * `value` - A new vector of bytes that replaces the current message value. + /// + /// # Examples + /// + /// ``` + /// use numaflow::map::Message; + /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// ``` + pub fn value(mut self, value: Vec) -> Self { + self.value = value; + self + } +} + +impl From for proto::map_response::Result { + fn from(value: Message) -> Self { + proto::map_response::Result { + keys: value.keys.unwrap_or_default(), + value: value.value, + tags: value.tags.unwrap_or_default(), + } + } +} + +/// Incoming request into the map handler of [`MapStreamer`]. +pub struct MapStreamRequest { + /// Set of keys in the (key, value) terminology of map/reduce paradigm. + pub keys: Vec, + /// The value in the (key, value) terminology of map/reduce paradigm. + pub value: Vec, + /// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time. + pub watermark: DateTime, + /// Time of the element as seen at source or aligned after a reduce operation. + pub eventtime: DateTime, + /// Headers for the message. + pub headers: HashMap, +} + +impl From for MapStreamRequest { + fn from(value: proto::map_request::Request) -> Self { + Self { + keys: value.keys, + value: value.value, + watermark: shared::utc_from_timestamp(value.watermark), + eventtime: shared::utc_from_timestamp(value.event_time), + headers: value.headers, + } + } +} + +struct MapStreamService { + handler: Arc, + shutdown_tx: mpsc::Sender<()>, + cancellation_token: CancellationToken, +} + +#[async_trait] +impl proto::map_server::Map for MapStreamService +where + T: MapStreamer + Send + Sync + 'static, +{ + type MapFnStream = ReceiverStream>; + + async fn map_fn( + &self, + request: Request>, + ) -> Result, Status> { + let mut stream = request.into_inner(); + let handler = Arc::clone(&self.handler); + + let (stream_response_tx, stream_response_rx) = + mpsc::channel::>(DEFAULT_CHANNEL_SIZE); + + // Perform handshake + perform_handshake(&mut stream, &stream_response_tx).await?; + + let (error_tx, error_rx) = mpsc::channel::(1); + + // Spawn a task to handle incoming stream requests + let handle: JoinHandle<()> = tokio::spawn(handle_stream_requests( + handler.clone(), + stream, + stream_response_tx.clone(), + error_tx.clone(), + self.cancellation_token.child_token(), + )); + + tokio::spawn(manage_grpc_stream( + handle, + self.cancellation_token.clone(), + stream_response_tx, + error_rx, + self.shutdown_tx.clone(), + )); + + Ok(Response::new(ReceiverStream::new(stream_response_rx))) + } + + async fn is_ready(&self, _: Request<()>) -> Result, Status> { + Ok(Response::new(proto::ReadyResponse { ready: true })) + } +} + +/// Handles incoming stream requests and processes them and +/// sends the response back to the client. +async fn handle_stream_requests( + handler: Arc, + mut stream: Streaming, + stream_response_tx: Sender>, + error_tx: Sender, + token: CancellationToken, +) where + T: MapStreamer + Send + Sync + 'static, +{ + let mut stream_open = true; + while stream_open { + stream_open = tokio::select! { + map_request = stream.message() => handle_request( + handler.clone(), + map_request, + stream_response_tx.clone(), + error_tx.clone(), + token.clone(), + ).await, + _ = token.cancelled() => { + info!("Cancellation token is cancelled, shutting down"); + break; + } + } + } +} + +/// Handles a single request from the client. If the request is invalid or the stream is closed, +/// it returns false. Otherwise, it spawns a task to handle the request, and +/// streams the response back to the client. +async fn handle_request( + handler: Arc, + map_request: Result, Status>, + stream_response_tx: mpsc::Sender>, + error_tx: mpsc::Sender, + token: CancellationToken, +) -> bool +where + T: MapStreamer + Send + Sync + 'static, +{ + let map_request = match map_request { + Ok(None) => return false, + Ok(Some(val)) => val, + Err(val) => { + error!("Received gRPC error from sender: {val:?}"); + return false; + } + }; + tokio::spawn(run_map_stream( + handler, + map_request, + stream_response_tx, + error_tx, + token, + )); + true +} + +/// Runs the map_stream function of the handler and streams the response back to the client. +async fn run_map_stream( + handler: Arc, + map_request: proto::MapRequest, + stream_response_tx: Sender>, + error_tx: Sender, + token: CancellationToken, +) where + T: MapStreamer + Send + Sync + 'static, +{ + let Some(request) = map_request.request else { + error_tx + .send(Error::MapError(ErrorKind::InternalError( + "Request not present".to_string(), + ))) + .await + .expect("Sending error on channel"); + return; + }; + + let message_id = map_request.id.clone(); + + let (tx, mut rx) = mpsc::channel::(DEFAULT_CHANNEL_SIZE); + + // Spawn a task to run the map_stream function + let token = token.child_token(); + let map_stream_task = tokio::spawn({ + let handler = handler.clone(); + let token = token.clone(); + async move { + tokio::select! { + _ = token.cancelled() => { + info!("Task was cancelled"); + } + _ = handler.map_stream(request.into(), tx) => {} + } + } + }); + + // spawn a task to catch the panics from the map_stream task + tokio::spawn({ + let error_tx = error_tx.clone(); + async move { + if let Err(e) = map_stream_task.await { + error_tx + .send(Error::MapError(ErrorKind::InternalError(format!( + "Task panicked: {e:?}" + )))) + .await + .expect("Sending error on channel"); + } + } + }); + + while let Some(message) = rx.recv().await { + let send_response_result = stream_response_tx + .send(Ok(proto::MapResponse { + results: vec![message.into()], + id: message_id.clone(), + handshake: None, + status: None, + })) + .await; + + if let Err(e) = send_response_result { + error_tx + .send(Error::MapError(ErrorKind::InternalError(format!( + "Failed to send response: {e:?}" + )))) + .await + .expect("Sending error on channel"); + return; + } + } +} + +/// Manages the gRPC stream. If the request handler is finished, it cancels the token. +async fn manage_grpc_stream( + request_handler: JoinHandle<()>, + token: CancellationToken, + stream_response_tx: Sender>, + mut error_rx: mpsc::Receiver, + server_shutdown_tx: Sender<()>, +) { + let err = tokio::select! { + _ = request_handler => { + token.cancel(); + return; + }, + err = error_rx.recv() => err, + }; + + token.cancel(); + let Some(err) = err else { + return; + }; + error!("Shutting down gRPC channel: {err:?}"); + stream_response_tx + .send(Err(Status::internal(err.to_string()))) + .await + .expect("Sending error message to gRPC response channel"); + server_shutdown_tx + .send(()) + .await + .expect("Writing to shutdown channel"); +} + +/// Performs the handshake with the client. +async fn perform_handshake( + stream: &mut Streaming, + stream_response_tx: &mpsc::Sender>, +) -> Result<(), Status> { + let handshake_request = stream + .message() + .await + .map_err(|e| Status::internal(format!("Handshake failed: {}", e)))? + .ok_or_else(|| Status::internal("Stream closed before handshake"))?; + + if let Some(handshake) = handshake_request.handshake { + stream_response_tx + .send(Ok(proto::MapResponse { + results: vec![], + id: "".to_string(), + handshake: Some(handshake), + status: None, + })) + .await + .map_err(|e| Status::internal(format!("Failed to send handshake response: {}", e)))?; + Ok(()) + } else { + Err(Status::invalid_argument("Handshake not present")) + } +} + +/// gRPC server to start a map stream service +#[derive(Debug)] +pub struct Server { + sock_addr: PathBuf, + max_message_size: usize, + server_info_file: PathBuf, + svc: Option, +} + +impl Server { + pub fn new(map_stream_svc: T) -> Self { + Server { + sock_addr: DEFAULT_SOCK_ADDR.into(), + max_message_size: DEFAULT_MAX_MESSAGE_SIZE, + server_info_file: DEFAULT_SERVER_INFO_FILE.into(), + svc: Some(map_stream_svc), + } + } + + /// Set the unix domain socket file path used by the gRPC server to listen for incoming connections. + /// Default value is `/var/run/numaflow/map.sock` + pub fn with_socket_file(mut self, file: impl Into) -> Self { + self.sock_addr = file.into(); + self + } + + /// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/map.sock` + pub fn socket_file(&self) -> &std::path::Path { + self.sock_addr.as_path() + } + + /// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 64MB. + pub fn with_max_message_size(mut self, message_size: usize) -> Self { + self.max_message_size = message_size; + self + } + + /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB. + pub fn max_message_size(&self) -> usize { + self.max_message_size + } + + /// Change the file in which numaflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/mapper-server-info` + pub fn with_server_info_file(mut self, file: impl Into) -> Self { + self.server_info_file = file.into(); + self + } + + /// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/mapper-server-info` + pub fn server_info_file(&self) -> &std::path::Path { + self.server_info_file.as_path() + } + + /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. + pub async fn start_with_shutdown( + &mut self, + shutdown_rx: oneshot::Receiver<()>, + ) -> Result<(), Box> + where + T: MapStreamer + Send + Sync + 'static, + { + let info = shared::ServerInfo::new(ContainerType::MapStream); + let listener = + shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?; + let handler = self.svc.take().unwrap(); + let cln_token = CancellationToken::new(); + + let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); + let map_stream_svc = MapStreamService { + handler: Arc::new(handler), + shutdown_tx: internal_shutdown_tx, + cancellation_token: cln_token.clone(), + }; + + let map_stream_svc = proto::map_server::MapServer::new(map_stream_svc) + .max_encoding_message_size(self.max_message_size) + .max_decoding_message_size(self.max_message_size); + + let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); + + let _drop_guard = cln_token.drop_guard(); + + tonic::transport::Server::builder() + .add_service(map_stream_svc) + .serve_with_incoming_shutdown(listener, shutdown) + .await?; + + Ok(()) + } + + pub async fn start(&mut self) -> Result<(), Box> + where + T: MapStreamer + Send + Sync + 'static, + { + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + self.start_with_shutdown(shutdown_rx).await + } +} + +/// Remove the socket file and server info file +impl Drop for Server { + fn drop(&mut self) { + let _ = fs::remove_file(&self.sock_addr); + let _ = fs::remove_file(&self.server_info_file); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::servers::map::map_client::MapClient; + use std::{error::Error, time::Duration}; + use tempfile::TempDir; + use tokio::net::UnixStream; + use tokio::sync::{mpsc, oneshot}; + use tokio_stream::wrappers::ReceiverStream; + use tonic::transport::Uri; + use tower::service_fn; + + #[tokio::test] + async fn map_stream_single_response() -> Result<(), Box> { + struct Cat; + #[async_trait] + impl MapStreamer for Cat { + async fn map_stream(&self, input: MapStreamRequest, tx: Sender) { + let message = Message::new(input.value).keys(input.keys).tags(vec![]); + tx.send(message).await.unwrap(); + } + } + + let tmp_dir = TempDir::new()?; + let sock_file = tmp_dir.path().join("mapstream.sock"); + let server_info_file = tmp_dir.path().join("mapstream-server-info"); + + let mut server = Server::new(Cat) + .with_server_info_file(&server_info_file) + .with_socket_file(&sock_file) + .with_max_message_size(10240); + + assert_eq!(server.max_message_size(), 10240); + assert_eq!(server.server_info_file(), server_info_file); + assert_eq!(server.socket_file(), sock_file); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(move |_: Uri| { + let sock_file = sock_file.clone(); + async move { + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new( + UnixStream::connect(sock_file).await?, + )) + } + })) + .await?; + + let mut client = MapClient::new(channel); + let request = proto::MapRequest { + request: Some(proto::map_request::Request { + keys: vec!["map".into(), "stream".into()], + value: "hello".into(), + watermark: Some(prost_types::Timestamp::default()), + event_time: Some(prost_types::Timestamp::default()), + headers: Default::default(), + }), + id: "".to_string(), + handshake: None, + status: None, + }; + let (tx, rx) = mpsc::channel(2); + let handshake_request = proto::MapRequest { + request: None, + id: "".to_string(), + handshake: Some(proto::Handshake { sot: true }), + status: None, + }; + + tx.send(handshake_request).await?; + tx.send(request).await?; + + let resp = client.map_fn(ReceiverStream::new(rx)).await?; + let mut resp = resp.into_inner(); + + let handshake_response = resp.message().await?; + assert!(handshake_response.is_some()); + + let handshake_response = handshake_response.unwrap(); + assert!(handshake_response.handshake.is_some()); + + let actual_response = resp.message().await?; + assert!(actual_response.is_some()); + + let actual_response = actual_response.unwrap(); + assert_eq!( + actual_response.results.len(), + 1, + "Expected single message from server" + ); + let msg = &actual_response.results[0]; + assert_eq!(msg.keys.first(), Some(&"map".to_owned())); + assert_eq!(msg.value, "hello".as_bytes()); + + drop(tx); + shutdown_tx + .send(()) + .expect("Sending shutdown signal to gRPC server"); + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(task.is_finished(), "gRPC server is still running"); + Ok(()) + } + + #[tokio::test] + async fn map_stream_multi_response() -> Result<(), Box> { + struct StreamCat; + + #[async_trait] + impl MapStreamer for StreamCat { + async fn map_stream(&self, request: MapStreamRequest, sender: Sender) { + let value = String::from_utf8(request.value).unwrap(); + for part in value.split(',') { + let message = Message::new(part.as_bytes().to_vec()); + sender.send(message).await.unwrap(); + } + } + } + + let tmp_dir = TempDir::new()?; + let sock_file = tmp_dir.path().join("map_stream.sock"); + let server_info_file = tmp_dir.path().join("mapper-stream-server-info"); + + let mut server = Server::new(StreamCat) + .with_server_info_file(&server_info_file) + .with_socket_file(&sock_file) + .with_max_message_size(10240); + + assert_eq!(server.max_message_size(), 10240); + assert_eq!(server.server_info_file(), server_info_file); + assert_eq!(server.socket_file(), sock_file); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(move |_: Uri| { + let sock_file = sock_file.clone(); + async move { + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new( + UnixStream::connect(sock_file).await?, + )) + } + })) + .await?; + + let mut client = MapClient::new(channel); + let request = proto::MapRequest { + request: Some(proto::map_request::Request { + keys: vec!["first".into()], + value: "test,map,stream".into(), + watermark: Some(prost_types::Timestamp::default()), + event_time: Some(prost_types::Timestamp::default()), + headers: Default::default(), + }), + id: "".to_string(), + handshake: None, + status: None, + }; + + let (tx, rx) = mpsc::channel(2); + let handshake_request = proto::MapRequest { + request: None, + id: "".to_string(), + handshake: Some(proto::Handshake { sot: true }), + status: None, + }; + + tx.send(handshake_request).await?; + tx.send(request).await?; + + let resp = client.map_fn(ReceiverStream::new(rx)).await?; + let mut resp = resp.into_inner(); + + let handshake_response = resp.message().await?; + assert!(handshake_response.is_some()); + + let handshake_response = handshake_response.unwrap(); + assert!(handshake_response.handshake.is_some()); + + for expected_value in &["test", "map", "stream"] { + let actual_response = resp.message().await?; + assert!(actual_response.is_some()); + + let actual_response = actual_response.unwrap(); + assert_eq!( + actual_response.results.len(), + 1, + "Expected single message from server" + ); + let msg = &actual_response.results[0]; + assert_eq!(msg.value, expected_value.as_bytes()); + } + + drop(tx); + shutdown_tx + .send(()) + .expect("Sending shutdown signal to gRPC server"); + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(task.is_finished(), "gRPC server is still running"); + Ok(()) + } + + #[tokio::test] + async fn map_stream_server_panic() -> Result<(), Box> { + struct PanicStreamer; + #[async_trait] + impl MapStreamer for PanicStreamer { + async fn map_stream(&self, _: MapStreamRequest, _tx: Sender) { + panic!("Panic in Map Stream"); + } + } + + let tmp_dir = TempDir::new()?; + let sock_file = tmp_dir.path().join("mapstream.sock"); + let server_info_file = tmp_dir.path().join("mapstream-server-info"); + + let mut server = Server::new(PanicStreamer) + .with_server_info_file(&server_info_file) + .with_socket_file(&sock_file) + .with_max_message_size(10240); + + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); + + tokio::time::sleep(Duration::from_millis(50)).await; + + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(move |_: Uri| { + let sock_file = sock_file.clone(); + async move { + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new( + UnixStream::connect(sock_file).await?, + )) + } + })) + .await?; + + let mut client = MapClient::new(channel); + + let (tx, rx) = mpsc::channel(2); + let handshake_request = proto::MapRequest { + request: None, + id: "".to_string(), + handshake: Some(proto::Handshake { sot: true }), + status: None, + }; + tx.send(handshake_request).await.unwrap(); + + let mut stream = tokio::time::timeout( + Duration::from_secs(2), + client.map_fn(ReceiverStream::new(rx)), + ) + .await + .map_err(|_| "timeout while getting stream for map_fn")?? + .into_inner(); + + let handshake_resp = stream.message().await?.unwrap(); + assert!( + handshake_resp.handshake.is_some(), + "Not a valid response for handshake request" + ); + + let request = proto::MapRequest { + request: Some(proto::map_request::Request { + keys: vec!["panic".into(), "stream".into()], + value: "hello".into(), + watermark: Some(prost_types::Timestamp::default()), + event_time: Some(prost_types::Timestamp::default()), + headers: Default::default(), + }), + id: "".to_string(), + handshake: None, + status: None, + }; + + tx.send(request).await.unwrap(); + + if let Err(e) = stream.message().await { + assert_eq!(e.code(), tonic::Code::Internal); + assert!(e.message().contains("panicked")); + } else { + return Err("Expected error from server".into()); + } + + for _ in 0..10 { + tokio::time::sleep(Duration::from_millis(10)).await; + if task.is_finished() { + break; + } + } + assert!(task.is_finished(), "gRPC server is still running"); + Ok(()) + } +} diff --git a/numaflow/src/shared.rs b/numaflow/src/shared.rs index 158d945..ee20f3b 100644 --- a/numaflow/src/shared.rs +++ b/numaflow/src/shared.rs @@ -20,6 +20,7 @@ pub(crate) const BATCH_MAP: &str = "batch-map"; pub(crate) enum ContainerType { Map, BatchMap, + MapStream, Reduce, Sink, Source, From 8814d8b9e1d5d07758e3b9829813798672fb4e26 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 19 Dec 2024 18:32:36 +0530 Subject: [PATCH 02/10] chore: add example Signed-off-by: Yashash H L --- examples/flatmap-stream/Cargo.toml | 9 ++++++ examples/flatmap-stream/Dockerfile | 20 +++++++++++++ examples/flatmap-stream/Makefile | 20 +++++++++++++ .../manifests/simple-flatmap-stream.yaml | 29 +++++++++++++++++++ examples/flatmap-stream/src/main.rs | 27 +++++++++++++++++ 5 files changed, 105 insertions(+) create mode 100644 examples/flatmap-stream/Cargo.toml create mode 100644 examples/flatmap-stream/Dockerfile create mode 100644 examples/flatmap-stream/Makefile create mode 100644 examples/flatmap-stream/manifests/simple-flatmap-stream.yaml create mode 100644 examples/flatmap-stream/src/main.rs diff --git a/examples/flatmap-stream/Cargo.toml b/examples/flatmap-stream/Cargo.toml new file mode 100644 index 0000000..279eb29 --- /dev/null +++ b/examples/flatmap-stream/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "flatmap-stream" +version = "0.1.0" +edition = "2021" + +[dependencies] +tonic = "0.12.0" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +numaflow = { path = "../../numaflow" } diff --git a/examples/flatmap-stream/Dockerfile b/examples/flatmap-stream/Dockerfile new file mode 100644 index 0000000..50ea3a1 --- /dev/null +++ b/examples/flatmap-stream/Dockerfile @@ -0,0 +1,20 @@ +FROM rust:1.82-bullseye AS build + +RUN apt-get update +RUN apt-get install protobuf-compiler -y + +WORKDIR /numaflow-rs +COPY ./ ./ +WORKDIR /numaflow-rs/examples/flatmap-stream + +# build for release +RUN cargo build --release + +# our final base +FROM debian:bullseye AS flatmap-stream + +# copy the build artifact from the build stage +COPY --from=build /numaflow-rs/target/release/flatmap-stream . + +# set the startup command to run your binary +CMD ["./flatmap-stream"] diff --git a/examples/flatmap-stream/Makefile b/examples/flatmap-stream/Makefile new file mode 100644 index 0000000..ab916dd --- /dev/null +++ b/examples/flatmap-stream/Makefile @@ -0,0 +1,20 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-rs/flatmap-stream:${TAG} +DOCKER_FILE_PATH = examples/flatmap-stream/Dockerfile + +.PHONY: update +update: + cargo check + cargo update + +.PHONY: image +image: update + cd ../../ && docker build \ + -f ${DOCKER_FILE_PATH} \ + -t ${IMAGE_REGISTRY} . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi + +.PHONY: clean +clean: + -rm -rf target diff --git a/examples/flatmap-stream/manifests/simple-flatmap-stream.yaml b/examples/flatmap-stream/manifests/simple-flatmap-stream.yaml new file mode 100644 index 0000000..c2a02af --- /dev/null +++ b/examples/flatmap-stream/manifests/simple-flatmap-stream.yaml @@ -0,0 +1,29 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-flatmap-stream +spec: + vertices: + - name: in + source: + # A self data generating source + generator: + rpu: 300 + duration: 1s + keyCount: 5 + value: 5 + - name: cat + scale: + min: 1 + udf: + container: + image: quay.io/numaio/numaflow-rs/flatmap-stream:stable + - name: out + sink: + # A simple log printing sink + log: { } + edges: + - from: in + to: cat + - from: cat + to: out \ No newline at end of file diff --git a/examples/flatmap-stream/src/main.rs b/examples/flatmap-stream/src/main.rs new file mode 100644 index 0000000..24da52f --- /dev/null +++ b/examples/flatmap-stream/src/main.rs @@ -0,0 +1,27 @@ +use numaflow::mapstream; +use numaflow::mapstream::Message; +use tokio::sync::mpsc::Sender; + +#[tokio::main] +async fn main() -> Result<(), Box> { + mapstream::Server::new(Cat).start().await +} + +struct Cat; + +#[tonic::async_trait] +impl mapstream::MapStreamer for Cat { + async fn map_stream(&self, input: mapstream::MapStreamRequest, tx: Sender) { + let payload_str = String::from_utf8(input.value).unwrap_or_default(); + let splits: Vec<&str> = payload_str.split(',').collect(); + + for split in splits { + let message = Message::new(split.as_bytes().to_vec()) + .keys(input.keys.clone()) + .tags(vec![]); + if tx.send(message).await.is_err() { + break; + } + } + } +} From 037a6676ee1fc8082147760de2971957c85c8fac Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 19 Dec 2024 18:35:47 +0530 Subject: [PATCH 03/10] fix docs Signed-off-by: Yashash H L --- numaflow/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/numaflow/src/lib.rs b/numaflow/src/lib.rs index f0220dc..aadec84 100644 --- a/numaflow/src/lib.rs +++ b/numaflow/src/lib.rs @@ -34,10 +34,10 @@ pub mod sink; /// building [side input](https://numaflow.numaproj.io/user-guide/reference/side-inputs/) pub mod sideinput; -/// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers. +/// batchmap is for writing the map in [batch mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#batch-map-mode) handlers. pub mod batchmap; -/// mapstream is for writing the [mapstream](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/mapstream/) handlers. +/// mapstream is for writing the map in [stream mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/#streaming-mode) handlers. pub mod mapstream; mod servers; From 00db37e86c2c17e45e0386361df141a01ae55cdc Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 19 Dec 2024 18:39:11 +0530 Subject: [PATCH 04/10] lint Signed-off-by: Yashash H L --- .rustfmt.toml | 2 +- examples/mapt-event-time-filter/src/main.rs | 6 ++++-- examples/sideinput/src/main.rs | 2 +- examples/simple-source/src/main.rs | 4 ++-- numaflow/src/batchmap.rs | 13 +++++++------ numaflow/src/mapstream.rs | 19 +++++++++++-------- numaflow/src/reduce.rs | 1 - numaflow/src/sideinput.rs | 1 - numaflow/src/sink.rs | 11 ++++++----- numaflow/src/sourcetransform.rs | 1 - 10 files changed, 32 insertions(+), 28 deletions(-) diff --git a/.rustfmt.toml b/.rustfmt.toml index 3a26366..36c419b 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1 +1 @@ -edition = "2021" +edition = "2021" \ No newline at end of file diff --git a/examples/mapt-event-time-filter/src/main.rs b/examples/mapt-event-time-filter/src/main.rs index 1b76145..2033719 100644 --- a/examples/mapt-event-time-filter/src/main.rs +++ b/examples/mapt-event-time-filter/src/main.rs @@ -1,7 +1,8 @@ +use std::error::Error; + use filter_impl::filter_event_time; use numaflow::sourcetransform; use numaflow::sourcetransform::{Message, SourceTransformRequest}; -use std::error::Error; #[tokio::main] async fn main() -> Result<(), Box> { @@ -41,9 +42,10 @@ mod filter_impl { #[cfg(test)] mod tests { - use crate::filter_impl::filter_event_time; use chrono::{TimeZone, Utc}; use numaflow::sourcetransform::SourceTransformRequest; + + use crate::filter_impl::filter_event_time; /// Tests that events from 2022 are tagged as within the year 2022. #[test] fn test_filter_event_time_should_return_within_year_2022() { diff --git a/examples/sideinput/src/main.rs b/examples/sideinput/src/main.rs index 39af6dd..65b9ec2 100644 --- a/examples/sideinput/src/main.rs +++ b/examples/sideinput/src/main.rs @@ -1,7 +1,7 @@ -use numaflow::sideinput::{self, SideInputer}; use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; +use numaflow::sideinput::{self, SideInputer}; use tonic::async_trait; struct SideInputHandler { diff --git a/examples/simple-source/src/main.rs b/examples/simple-source/src/main.rs index dd842c8..bfb1d49 100644 --- a/examples/simple-source/src/main.rs +++ b/examples/simple-source/src/main.rs @@ -7,12 +7,12 @@ async fn main() -> Result<(), Box> { } pub(crate) mod simple_source { - use chrono::Utc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{collections::HashSet, sync::RwLock}; - use tokio::sync::mpsc::Sender; + use chrono::Utc; use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer}; + use tokio::sync::mpsc::Sender; /// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks /// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK diff --git a/numaflow/src/batchmap.rs b/numaflow/src/batchmap.rs index f602037..7642df4 100644 --- a/numaflow/src/batchmap.rs +++ b/numaflow/src/batchmap.rs @@ -3,12 +3,6 @@ use std::fs; use std::path::PathBuf; use std::sync::Arc; -use crate::error::Error; -use crate::error::ErrorKind::{InternalError, UserDefinedError}; -use crate::servers::map as proto; -use crate::servers::map::map_server::Map; -use crate::servers::map::{MapRequest, MapResponse, ReadyResponse}; -use crate::shared::{self, shutdown_signal, ContainerType}; use chrono::{DateTime, Utc}; use tokio::sync::mpsc::channel; use tokio::sync::{mpsc, oneshot}; @@ -18,6 +12,13 @@ use tokio_util::sync::CancellationToken; use tonic::{Request, Response, Status, Streaming}; use tracing::{debug, info}; +use crate::error::Error; +use crate::error::ErrorKind::{InternalError, UserDefinedError}; +use crate::servers::map as proto; +use crate::servers::map::map_server::Map; +use crate::servers::map::{MapRequest, MapResponse, ReadyResponse}; +use crate::shared::{self, shutdown_signal, ContainerType}; + const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index 45e1f2e..5d11eca 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -1,13 +1,10 @@ -use crate::servers::map as proto; -use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::fs; use std::path::PathBuf; -use tokio::sync::mpsc::Sender; - -use crate::error::{Error, ErrorKind}; -use crate::shared::{self, shutdown_signal, ContainerType}; use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; @@ -15,6 +12,10 @@ use tokio_util::sync::CancellationToken; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{error, info}; +use crate::error::{Error, ErrorKind}; +use crate::servers::map as proto; +use crate::shared::{self, shutdown_signal, ContainerType}; + const DEFAULT_CHANNEL_SIZE: usize = 1000; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/mapstream.sock"; @@ -559,9 +560,8 @@ impl Drop for Server { #[cfg(test)] mod tests { - use super::*; - use crate::servers::map::map_client::MapClient; use std::{error::Error, time::Duration}; + use tempfile::TempDir; use tokio::net::UnixStream; use tokio::sync::{mpsc, oneshot}; @@ -569,6 +569,9 @@ mod tests { use tonic::transport::Uri; use tower::service_fn; + use super::*; + use crate::servers::map::map_client::MapClient; + #[tokio::test] async fn map_stream_single_response() -> Result<(), Box> { struct Cat; diff --git a/numaflow/src/reduce.rs b/numaflow/src/reduce.rs index 6f58972..9c1b82d 100644 --- a/numaflow/src/reduce.rs +++ b/numaflow/src/reduce.rs @@ -255,7 +255,6 @@ impl Message { /// use numaflow::reduce::Message; /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self diff --git a/numaflow/src/sideinput.rs b/numaflow/src/sideinput.rs index 9980653..fd5cc24 100644 --- a/numaflow/src/sideinput.rs +++ b/numaflow/src/sideinput.rs @@ -81,7 +81,6 @@ pub trait SideInputer { /// ``` /// /// The `retrieve_sideinput` method is implemented to return an `Option>`. In this example, the method returns a message containing the current time if the counter is odd, and `None` if the counter is even. - async fn retrieve_sideinput(&self) -> Option>; } diff --git a/numaflow/src/sink.rs b/numaflow/src/sink.rs index 1171256..0034e24 100644 --- a/numaflow/src/sink.rs +++ b/numaflow/src/sink.rs @@ -524,17 +524,18 @@ impl Drop for Server { mod tests { use std::{error::Error, time::Duration}; - use crate::servers::sink::TransmissionStatus; - use crate::sink; - use crate::sink::sink_pb::sink_client::SinkClient; - use crate::sink::sink_pb::sink_request::Request; - use crate::sink::sink_pb::Handshake; use tempfile::TempDir; use tokio::net::UnixStream; use tokio::sync::oneshot; use tonic::transport::Uri; use tower::service_fn; + use crate::servers::sink::TransmissionStatus; + use crate::sink; + use crate::sink::sink_pb::sink_client::SinkClient; + use crate::sink::sink_pb::sink_request::Request; + use crate::sink::sink_pb::Handshake; + #[tokio::test] async fn sink_server() -> Result<(), Box> { struct Logger; diff --git a/numaflow/src/sourcetransform.rs b/numaflow/src/sourcetransform.rs index 1895312..2e6c6a4 100644 --- a/numaflow/src/sourcetransform.rs +++ b/numaflow/src/sourcetransform.rs @@ -167,7 +167,6 @@ impl Message { /// let now = Utc::now(); /// let message = Message::new(vec![1, 2, 3], now).tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self From 9ca9362ad511084501520e5a37d40cdcd0cdc9d9 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 19 Dec 2024 20:23:44 +0530 Subject: [PATCH 05/10] send eof response Signed-off-by: Yashash H L --- numaflow/src/mapstream.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index 5d11eca..6619f83 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -14,6 +14,7 @@ use tracing::{error, info}; use crate::error::{Error, ErrorKind}; use crate::servers::map as proto; +use crate::servers::map::TransmissionStatus; use crate::shared::{self, shutdown_signal, ContainerType}; const DEFAULT_CHANNEL_SIZE: usize = 1000; @@ -356,8 +357,8 @@ async fn run_map_stream( } }); - // spawn a task to catch the panics from the map_stream task - tokio::spawn({ + // Wait for the map_stream_task to complete and handle any errors + let panic_listener = tokio::spawn({ let error_tx = error_tx.clone(); async move { if let Err(e) = map_stream_task.await { @@ -367,9 +368,12 @@ async fn run_map_stream( )))) .await .expect("Sending error on channel"); + return Err(e); } + Ok(()) } - }); + }) + .await; while let Some(message) = rx.recv().await { let send_response_result = stream_response_tx @@ -391,6 +395,22 @@ async fn run_map_stream( return; } } + + // we should not end eof message if the map stream panicked + if panic_listener.is_err() { + return; + } + + // send eof message to indicate end of stream + stream_response_tx + .send(Ok(proto::MapResponse { + results: vec![], + id: message_id, + handshake: None, + status: Some(TransmissionStatus { eot: true }), + })) + .await + .expect("Sending eof message to gRPC response channel"); } /// Manages the gRPC stream. If the request handler is finished, it cancels the token. @@ -759,6 +779,11 @@ mod tests { assert_eq!(msg.value, expected_value.as_bytes()); } + // read eof response + let eof_response = resp.message().await?; + assert!(eof_response.is_some()); + assert!(eof_response.unwrap().status.is_some()); + drop(tx); shutdown_tx .send(()) From f45f4588a01369855a7492cd8fb4e8a933260ab3 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 20 Dec 2024 08:55:25 +0530 Subject: [PATCH 06/10] fix error kind Signed-off-by: Yashash H L --- numaflow/src/map.rs | 2 +- numaflow/src/mapstream.rs | 4 ++-- numaflow/src/shared.rs | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/numaflow/src/map.rs b/numaflow/src/map.rs index 8a28b3e..0b1e165 100644 --- a/numaflow/src/map.rs +++ b/numaflow/src/map.rs @@ -370,7 +370,7 @@ async fn run_map( Err(e) => { error!("Failed to run map function: {e:?}"); error_tx - .send(Error::MapError(ErrorKind::InternalError(format!( + .send(Error::MapError(ErrorKind::UserDefinedError(format!( "panicked: {e:?}" )))) .await diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index 6619f83..f4a0558 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -363,8 +363,8 @@ async fn run_map_stream( async move { if let Err(e) = map_stream_task.await { error_tx - .send(Error::MapError(ErrorKind::InternalError(format!( - "Task panicked: {e:?}" + .send(Error::MapError(ErrorKind::UserDefinedError(format!( + "panicked: {e:?}" )))) .await .expect("Sending error on channel"); diff --git a/numaflow/src/shared.rs b/numaflow/src/shared.rs index ee20f3b..7d9ecab 100644 --- a/numaflow/src/shared.rs +++ b/numaflow/src/shared.rs @@ -15,6 +15,7 @@ use tracing::info; pub(crate) const MAP_MODE_KEY: &str = "MAP_MODE"; pub(crate) const UNARY_MAP: &str = "unary-map"; pub(crate) const BATCH_MAP: &str = "batch-map"; +pub(crate) const STREAM_MAP: &str = "stream-map"; #[derive(Eq, PartialEq, Hash)] pub(crate) enum ContainerType { @@ -83,6 +84,7 @@ impl ServerInfo { match container_type { ContainerType::Map => UNARY_MAP.to_string(), ContainerType::BatchMap => BATCH_MAP.to_string(), + ContainerType::MapStream => STREAM_MAP.to_string(), _ => "".to_string(), }, ); From c2ce6b44ed7eac1e1d7a162441925d452afbf999 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 20 Dec 2024 08:58:48 +0530 Subject: [PATCH 07/10] Apply suggestions from code review Signed-off-by: Yashash H L Co-authored-by: Vigith Maurice --- numaflow/src/mapstream.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index f4a0558..648545c 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -398,6 +398,7 @@ async fn run_map_stream( // we should not end eof message if the map stream panicked if panic_listener.is_err() { + error!(err=?panic_listener, "there was a panic invoked"); return; } @@ -491,13 +492,13 @@ impl Server { } /// Set the unix domain socket file path used by the gRPC server to listen for incoming connections. - /// Default value is `/var/run/numaflow/map.sock` + /// Default value is `/var/run/numaflow/mapstream.sock` pub fn with_socket_file(mut self, file: impl Into) -> Self { self.sock_addr = file.into(); self } - /// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/map.sock` + /// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/mapstream.sock` pub fn socket_file(&self) -> &std::path::Path { self.sock_addr.as_path() } From 8ca43a7b679d9c136488d6cf7478b06995282215 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 20 Dec 2024 09:06:47 +0530 Subject: [PATCH 08/10] address review comments Signed-off-by: Yashash H L --- examples/batchmap-cat/src/main.rs | 2 +- examples/batchmap-flatmap/src/main.rs | 2 +- examples/flatmap-stream/src/main.rs | 4 +-- examples/map-cat/src/main.rs | 2 +- examples/map-tickgen-serde/src/main.rs | 2 +- examples/mapt-event-time-filter/src/main.rs | 4 +-- examples/reduce-counter/src/main.rs | 2 +- examples/source-transformer-now/src/main.rs | 2 +- numaflow/src/batchmap.rs | 12 ++++----- numaflow/src/map.rs | 14 +++++------ numaflow/src/mapstream.rs | 28 +++++++++++---------- numaflow/src/reduce.rs | 14 +++++------ numaflow/src/sourcetransform.rs | 14 +++++------ 13 files changed, 52 insertions(+), 50 deletions(-) diff --git a/examples/batchmap-cat/src/main.rs b/examples/batchmap-cat/src/main.rs index b54bfa0..a9ae80e 100644 --- a/examples/batchmap-cat/src/main.rs +++ b/examples/batchmap-cat/src/main.rs @@ -14,7 +14,7 @@ impl batchmap::BatchMapper for Cat { let mut responses: Vec = Vec::new(); while let Some(datum) = input.recv().await { let mut response = BatchResponse::from_id(datum.id); - response.append(Message::new(datum.value).keys(datum.keys.clone())); + response.append(Message::new(datum.value).with_keys(datum.keys.clone())); responses.push(response); } responses diff --git a/examples/batchmap-flatmap/src/main.rs b/examples/batchmap-flatmap/src/main.rs index 75ac063..c8430c6 100644 --- a/examples/batchmap-flatmap/src/main.rs +++ b/examples/batchmap-flatmap/src/main.rs @@ -23,7 +23,7 @@ impl batchmap::BatchMapper for Flatmap { // return the resulting parts for part in parts { - response.append(Message::new(Vec::from(part)).keys(datum.keys.clone())); + response.append(Message::new(Vec::from(part)).with_keys(datum.keys.clone())); } responses.push(response); } diff --git a/examples/flatmap-stream/src/main.rs b/examples/flatmap-stream/src/main.rs index 24da52f..41d7cc8 100644 --- a/examples/flatmap-stream/src/main.rs +++ b/examples/flatmap-stream/src/main.rs @@ -17,8 +17,8 @@ impl mapstream::MapStreamer for Cat { for split in splits { let message = Message::new(split.as_bytes().to_vec()) - .keys(input.keys.clone()) - .tags(vec![]); + .with_keys(input.keys.clone()) + .with_tags(vec![]); if tx.send(message).await.is_err() { break; } diff --git a/examples/map-cat/src/main.rs b/examples/map-cat/src/main.rs index 389de43..1d124f2 100644 --- a/examples/map-cat/src/main.rs +++ b/examples/map-cat/src/main.rs @@ -10,6 +10,6 @@ struct Cat; #[tonic::async_trait] impl map::Mapper for Cat { async fn map(&self, input: map::MapRequest) -> Vec { - vec![map::Message::new(input.value).keys(input.keys.clone())] + vec![map::Message::new(input.value).with_keys(input.keys.clone())] } } diff --git a/examples/map-tickgen-serde/src/main.rs b/examples/map-tickgen-serde/src/main.rs index ccc9f2a..d3855a5 100644 --- a/examples/map-tickgen-serde/src/main.rs +++ b/examples/map-tickgen-serde/src/main.rs @@ -45,7 +45,7 @@ impl map::Mapper for TickGen { }) .unwrap_or_default(), ) - .keys(input.keys.clone()); + .with_keys(input.keys.clone()); vec![message] } } diff --git a/examples/mapt-event-time-filter/src/main.rs b/examples/mapt-event-time-filter/src/main.rs index 2033719..72c708d 100644 --- a/examples/mapt-event-time-filter/src/main.rs +++ b/examples/mapt-event-time-filter/src/main.rs @@ -32,10 +32,10 @@ mod filter_impl { vec![Message::message_to_drop(input.eventtime)] } else if input.eventtime < jan_first_2023 { vec![Message::new(input.value, jan_first_2022) - .tags(vec![String::from("within_year_2022")])] + .with_tags(vec![String::from("within_year_2022")])] } else { vec![Message::new(input.value, jan_first_2023) - .tags(vec![String::from("after_year_2022")])] + .with_tags(vec![String::from("after_year_2022")])] } } } diff --git a/examples/reduce-counter/src/main.rs b/examples/reduce-counter/src/main.rs index 3f8e988..59b16f8 100644 --- a/examples/reduce-counter/src/main.rs +++ b/examples/reduce-counter/src/main.rs @@ -44,7 +44,7 @@ mod counter { while input.recv().await.is_some() { counter += 1; } - vec![Message::new(counter.to_string().into_bytes()).keys(keys.clone())] + vec![Message::new(counter.to_string().into_bytes()).with_keys(keys.clone())] } } } diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index 106a9f9..bbd36cd 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -17,7 +17,7 @@ impl sourcetransform::SourceTransformer for NowCat { ) -> Vec { vec![ sourcetransform::Message::new(input.value, chrono::offset::Utc::now()) - .keys(input.keys.clone()), + .with_keys(input.keys.clone()), ] } } diff --git a/numaflow/src/batchmap.rs b/numaflow/src/batchmap.rs index 7642df4..1237aa7 100644 --- a/numaflow/src/batchmap.rs +++ b/numaflow/src/batchmap.rs @@ -173,9 +173,9 @@ impl Message { /// /// ``` /// use numaflow::batchmap::Message; - /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]); /// ``` - pub fn keys(mut self, keys: Vec) -> Self { + pub fn with_keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } @@ -190,9 +190,9 @@ impl Message { /// /// ``` /// use numaflow::batchmap::Message; - /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { + pub fn with_tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } @@ -207,9 +207,9 @@ impl Message { /// /// ``` /// use numaflow::batchmap::Message; - /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); /// ``` - pub fn value(mut self, value: Vec) -> Self { + pub fn with_value(mut self, value: Vec) -> Self { self.value = value; self } diff --git a/numaflow/src/map.rs b/numaflow/src/map.rs index 0b1e165..9395311 100644 --- a/numaflow/src/map.rs +++ b/numaflow/src/map.rs @@ -53,7 +53,7 @@ pub trait Mapper { /// impl map::Mapper for Cat { /// async fn map(&self, input: map::MapRequest) -> Vec { /// use numaflow::map::Message; - /// let message=Message::new(input.value).keys(input.keys).tags(vec![]); + /// let message=Message::new(input.value).with_keys(input.keys).with_tags(vec![]); /// vec![message] /// } /// } @@ -122,9 +122,9 @@ impl Message { /// /// ``` /// use numaflow::map::Message; - /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]); /// ``` - pub fn keys(mut self, keys: Vec) -> Self { + pub fn with_keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } @@ -139,9 +139,9 @@ impl Message { /// /// ``` /// use numaflow::map::Message; - /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { + pub fn with_tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } @@ -156,9 +156,9 @@ impl Message { /// /// ``` /// use numaflow::map::Message; - /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); /// ``` - pub fn value(mut self, value: Vec) -> Self { + pub fn with_value(mut self, value: Vec) -> Self { self.value = value; self } diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index f4a0558..ad86765 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -54,8 +54,8 @@ pub trait MapStreamer { /// /// for split in splits { /// let message = Message::new(split.as_bytes().to_vec()) - /// .keys(input.keys.clone()) - /// .tags(vec![]); + /// .with_keys(input.keys.clone()) + /// .with_tags(vec![]); /// if tx.send(message).await.is_err() { /// break; /// } @@ -106,7 +106,7 @@ impl Message { /// # Examples /// /// ``` - /// use numaflow::map::Message; + /// use numaflow::mapstream::Message; /// let dropped_message = Message::message_to_drop(); /// ``` pub fn message_to_drop() -> Message { @@ -126,10 +126,10 @@ impl Message { /// # Examples /// /// ``` - /// use numaflow::map::Message; - /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// use numaflow::mapstream::Message; + /// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]); /// ``` - pub fn keys(mut self, keys: Vec) -> Self { + pub fn with_keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } @@ -143,10 +143,10 @@ impl Message { /// # Examples /// /// ``` - /// use numaflow::map::Message; - /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// use numaflow::mapstream::Message; + /// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { + pub fn with_tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } @@ -160,10 +160,10 @@ impl Message { /// # Examples /// /// ``` - /// use numaflow::map::Message; - /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// use numaflow::mapstream::Message; + /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); /// ``` - pub fn value(mut self, value: Vec) -> Self { + pub fn with_value(mut self, value: Vec) -> Self { self.value = value; self } @@ -598,7 +598,9 @@ mod tests { #[async_trait] impl MapStreamer for Cat { async fn map_stream(&self, input: MapStreamRequest, tx: Sender) { - let message = Message::new(input.value).keys(input.keys).tags(vec![]); + let message = Message::new(input.value) + .with_keys(input.keys) + .with_value(vec![]); tx.send(message).await.unwrap(); } } diff --git a/numaflow/src/reduce.rs b/numaflow/src/reduce.rs index 9c1b82d..2de6d5e 100644 --- a/numaflow/src/reduce.rs +++ b/numaflow/src/reduce.rs @@ -130,7 +130,7 @@ pub trait Reducer { /// while input.recv().await.is_some() { /// counter += 1; /// } - /// let message=Message::new(counter.to_string().into_bytes()).tags(vec![]).keys(keys.clone()); + /// let message=Message::new(counter.to_string().into_bytes()).with_tags(vec![]).with_keys(keys.clone()); /// vec![message] /// } /// } @@ -236,9 +236,9 @@ impl Message { /// /// ``` /// use numaflow::reduce::Message; - /// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); + /// let message = Message::new(vec![1, 2, 3]).with_keys(vec!["key1".to_string(), "key2".to_string()]); /// ``` - pub fn keys(mut self, keys: Vec) -> Self { + pub fn with_keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } @@ -253,9 +253,9 @@ impl Message { /// /// ``` /// use numaflow::reduce::Message; - /// let message = Message::new(vec![1, 2, 3]).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// let message = Message::new(vec![1, 2, 3]).with_tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { + pub fn with_tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } @@ -270,9 +270,9 @@ impl Message { /// /// ``` /// use numaflow::reduce::Message; - /// let message = Message::new(vec![1, 2, 3]).value(vec![4, 5, 6]); + /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); /// ``` - pub fn value(mut self, value: Vec) -> Self { + pub fn with_value(mut self, value: Vec) -> Self { self.value = value; self } diff --git a/numaflow/src/sourcetransform.rs b/numaflow/src/sourcetransform.rs index 2e6c6a4..acf93e7 100644 --- a/numaflow/src/sourcetransform.rs +++ b/numaflow/src/sourcetransform.rs @@ -61,7 +61,7 @@ pub trait SourceTransformer { /// input: sourcetransform::SourceTransformRequest, /// ) -> Vec { /// use numaflow::sourcetransform::Message; - /// let message=Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys).tags(vec![]); + /// let message=Message::new(input.value, chrono::offset::Utc::now()).with_keys(input.keys).with_tags(vec![]); /// vec![message] /// } /// } @@ -147,9 +147,9 @@ impl Message { /// use numaflow::sourcetransform::Message; /// use chrono::Utc; /// let now = Utc::now(); - /// let message = Message::new(vec![1, 2, 3], now).keys(vec!["key1".to_string(), "key2".to_string()]); + /// let message = Message::new(vec![1, 2, 3], now).with_keys(vec!["key1".to_string(), "key2".to_string()]); /// ``` - pub fn keys(mut self, keys: Vec) -> Self { + pub fn with_keys(mut self, keys: Vec) -> Self { self.keys = Some(keys); self } @@ -165,9 +165,9 @@ impl Message { /// use numaflow::sourcetransform::Message; /// use chrono::Utc; /// let now = Utc::now(); - /// let message = Message::new(vec![1, 2, 3], now).tags(vec!["tag1".to_string(), "tag2".to_string()]); + /// let message = Message::new(vec![1, 2, 3], now).with_tags(vec!["tag1".to_string(), "tag2".to_string()]); /// ``` - pub fn tags(mut self, tags: Vec) -> Self { + pub fn with_tags(mut self, tags: Vec) -> Self { self.tags = Some(tags); self } @@ -184,9 +184,9 @@ impl Message { /// use numaflow::sourcetransform::Message; /// use chrono::Utc; /// let now = Utc::now(); - /// let message = Message::new(vec![1, 2, 3], now).value(vec![4, 5, 6]); + /// let message = Message::new(vec![1, 2, 3], now).with_value(vec![4, 5, 6]); /// ``` - pub fn value(mut self, value: Vec) -> Self { + pub fn with_value(mut self, value: Vec) -> Self { self.value = value; self } From f9b1b86554d1f46cbfff130357a9f073aca993c6 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 20 Dec 2024 09:19:12 +0530 Subject: [PATCH 09/10] remove with_value Signed-off-by: Yashash H L --- numaflow/src/batchmap.rs | 17 ----------------- numaflow/src/map.rs | 17 ----------------- numaflow/src/mapstream.rs | 19 +------------------ numaflow/src/reduce.rs | 17 ----------------- numaflow/src/sourcetransform.rs | 19 ------------------- 5 files changed, 1 insertion(+), 88 deletions(-) diff --git a/numaflow/src/batchmap.rs b/numaflow/src/batchmap.rs index 1237aa7..e5f43c7 100644 --- a/numaflow/src/batchmap.rs +++ b/numaflow/src/batchmap.rs @@ -196,23 +196,6 @@ impl Message { self.tags = Some(tags); self } - - /// Replaces the value of the message. - /// - /// # Arguments - /// - /// * `value` - A new vector of bytes that replaces the current message value. - /// - /// # Examples - /// - /// ``` - /// use numaflow::batchmap::Message; - /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); - /// ``` - pub fn with_value(mut self, value: Vec) -> Self { - self.value = value; - self - } } /// The result of the call to [`BatchMapper::batchmap`] method. pub struct BatchResponse { diff --git a/numaflow/src/map.rs b/numaflow/src/map.rs index 9395311..89c70de 100644 --- a/numaflow/src/map.rs +++ b/numaflow/src/map.rs @@ -145,23 +145,6 @@ impl Message { self.tags = Some(tags); self } - - /// Replaces the value of the message. - /// - /// # Arguments - /// - /// * `value` - A new vector of bytes that replaces the current message value. - /// - /// # Examples - /// - /// ``` - /// use numaflow::map::Message; - /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); - /// ``` - pub fn with_value(mut self, value: Vec) -> Self { - self.value = value; - self - } } impl From for proto::map_response::Result { diff --git a/numaflow/src/mapstream.rs b/numaflow/src/mapstream.rs index 094f2b5..8c4ade0 100644 --- a/numaflow/src/mapstream.rs +++ b/numaflow/src/mapstream.rs @@ -150,23 +150,6 @@ impl Message { self.tags = Some(tags); self } - - /// Replaces the value of the message. - /// - /// # Arguments - /// - /// * `value` - A new vector of bytes that replaces the current message value. - /// - /// # Examples - /// - /// ``` - /// use numaflow::mapstream::Message; - /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); - /// ``` - pub fn with_value(mut self, value: Vec) -> Self { - self.value = value; - self - } } impl From for proto::map_response::Result { @@ -601,7 +584,7 @@ mod tests { async fn map_stream(&self, input: MapStreamRequest, tx: Sender) { let message = Message::new(input.value) .with_keys(input.keys) - .with_value(vec![]); + .with_tags(vec![]); tx.send(message).await.unwrap(); } } diff --git a/numaflow/src/reduce.rs b/numaflow/src/reduce.rs index 2de6d5e..1f27822 100644 --- a/numaflow/src/reduce.rs +++ b/numaflow/src/reduce.rs @@ -259,23 +259,6 @@ impl Message { self.tags = Some(tags); self } - - /// Replaces the value of the message. - /// - /// # Arguments - /// - /// * `value` - A new vector of bytes that replaces the current message value. - /// - /// # Examples - /// - /// ``` - /// use numaflow::reduce::Message; - /// let message = Message::new(vec![1, 2, 3]).with_value(vec![4, 5, 6]); - /// ``` - pub fn with_value(mut self, value: Vec) -> Self { - self.value = value; - self - } } /// Incoming request into the reducer handler of [`Reducer`]. diff --git a/numaflow/src/sourcetransform.rs b/numaflow/src/sourcetransform.rs index acf93e7..43475d6 100644 --- a/numaflow/src/sourcetransform.rs +++ b/numaflow/src/sourcetransform.rs @@ -171,25 +171,6 @@ impl Message { self.tags = Some(tags); self } - - /// Replaces the value of the message. - /// - /// # Arguments - /// - /// * `value` - A new vector of bytes that replaces the current message value. - /// - /// # Examples - /// - /// ``` - /// use numaflow::sourcetransform::Message; - /// use chrono::Utc; - /// let now = Utc::now(); - /// let message = Message::new(vec![1, 2, 3], now).with_value(vec![4, 5, 6]); - /// ``` - pub fn with_value(mut self, value: Vec) -> Self { - self.value = value; - self - } } /// Incoming request to the Source Transformer. From 8caf385b9b61d412ff7dc5d232be4114e227811a Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 20 Dec 2024 15:45:19 +0530 Subject: [PATCH 10/10] fix server info check Signed-off-by: Yashash H L --- numaflow/src/shared.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/numaflow/src/shared.rs b/numaflow/src/shared.rs index 7d9ecab..a940f8a 100644 --- a/numaflow/src/shared.rs +++ b/numaflow/src/shared.rs @@ -78,7 +78,10 @@ pub(crate) struct ServerInfo { impl ServerInfo { pub fn new(container_type: ContainerType) -> Self { let mut metadata: HashMap = HashMap::new(); - if container_type == ContainerType::Map || container_type == ContainerType::BatchMap { + if container_type == ContainerType::Map + || container_type == ContainerType::BatchMap + || container_type == ContainerType::MapStream + { metadata.insert( MAP_MODE_KEY.to_string(), match container_type {