From 416c4afa16491d96c3f10dcd5ff9aa0a71c3e273 Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Tue, 13 Feb 2024 19:05:05 -0800
Subject: [PATCH 1/8] feat: improve the sink SDK experience

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 examples/sink-log/Cargo.toml  |   3 +-
 examples/sink-log/src/main.rs |  87 +++++++++++---------------
 src/map.rs                    |  23 +------
 src/shared.rs                 |  18 ++++++
 src/sink.rs                   | 114 ++++++++++++----------------------
 5 files changed, 99 insertions(+), 146 deletions(-)

diff --git a/examples/sink-log/Cargo.toml b/examples/sink-log/Cargo.toml
index 034cb88..d91aba7 100644
--- a/examples/sink-log/Cargo.toml
+++ b/examples/sink-log/Cargo.toml
@@ -10,4 +10,5 @@ path = "src/main.rs"
 [dependencies]
 tonic = "0.9"
 tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
-numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
+#numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
+numaflow={path="/Users/vigith/code/git/numaproj/numaflow-rs"}
diff --git a/examples/sink-log/src/main.rs b/examples/sink-log/src/main.rs
index d92a526..71dac8d 100644
--- a/examples/sink-log/src/main.rs
+++ b/examples/sink-log/src/main.rs
@@ -1,61 +1,46 @@
-use numaflow::sink::start_uds_server;
+use std::error::Error;
 
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let sink_handler = log_sink::Logger::new();
+use numaflow::sink::{self, start_uds_server, Response, SinkRequest};
 
-    start_uds_server(sink_handler).await?;
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
+    start_uds_server(Logger {}).await?;
 
     Ok(())
 }
 
-mod log_sink {
-    use numaflow::sink;
-    use numaflow::sink::{Datum, Response};
-    use tonic::async_trait;
-
-    pub(crate) struct Logger {}
-
-    impl Logger {
-        pub(crate) fn new() -> Self {
-            Self {}
-        }
-    }
-
-    #[async_trait]
-    impl sink::Sinker for Logger {
-        async fn sink<T: Datum + Send + Sync + 'static>(
-            &self,
-            mut input: tokio::sync::mpsc::Receiver<T>,
-        ) -> Vec<Response> {
-            let mut responses: Vec<Response> = Vec::new();
-
-            while let Some(datum) = input.recv().await {
-                // do something better, but for now let's just log it.
-                // please note that `from_utf8` is working because the input in this
-                // example uses utf-8 data.
-                let response = match std::str::from_utf8(datum.value()) {
-                    Ok(v) => {
-                        println!("{}", v);
-                        // record the response
-                        Response {
-                            id: datum.id().to_string(),
-                            success: true,
-                            err: "".to_string(),
-                        }
+struct Logger {}
+
+#[tonic::async_trait]
+impl sink::Sinker for Logger {
+    async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
+        let mut responses: Vec<Response> = Vec::new();
+
+        while let Some(datum) = input.recv().await {
+            // do something better, but for now let's just log it.
+            // please note that `from_utf8` is working because the input in this
+            // example uses utf-8 data.
+            let response = match std::str::from_utf8(&datum.value) {
+                Ok(v) => {
+                    println!("{}", v);
+                    // record the response
+                    Response {
+                        id: datum.id,
+                        success: true,
+                        err: "".to_string(),
                     }
-                    Err(e) => Response {
-                        id: datum.id().to_string(),
-                        success: true, // there is no point setting success to false as retrying is not going to help
-                        err: format!("Invalid UTF-8 sequence: {}", e),
-                    },
-                };
-
-                // return the responses
-                responses.push(response);
-            }
-
-            responses
+                }
+                Err(e) => Response {
+                    id: datum.id,
+                    success: true, // there is no point setting success to false as retrying is not going to help
+                    err: format!("Invalid UTF-8 sequence: {}", e),
+                },
+            };
+
+            // return the responses
+            responses.push(response);
         }
+
+        responses
     }
 }
diff --git a/src/map.rs b/src/map.rs
index 397e156..dc34367 100644
--- a/src/map.rs
+++ b/src/map.rs
@@ -95,7 +95,7 @@ impl From<Message> for map_response::Result {
     }
 }
 
-/// Incoming request into the map handles of [`Mapper`].
+/// Incoming request into the map handler of [`Mapper`].
 pub struct MapRequest {
     /// Set of keys in the (key, value) terminology of map/reduce paradigm.
     pub keys: Vec<String>,
@@ -127,7 +127,7 @@ pub struct Server<T> {
     map_svc: Option<T>,
 }
 
-impl<T> Server<T> {
+impl<T: Mapper> Server<T> {
     pub fn new(map_svc: T) -> Self {
         let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
             "/var/run/numaflow/server-info"
@@ -208,28 +208,11 @@ impl<T> Server<T> {
         T: Mapper + Send + Sync + 'static,
     {
         let (tx, rx) = oneshot::channel::<()>();
-        tokio::spawn(wait_for_signal(tx));
+        tokio::spawn(shared::wait_for_signal(tx));
         self.start_with_shutdown(rx).await
     }
 }
 
-async fn wait_for_signal(tx: oneshot::Sender<()>) {
-    use tokio::signal::unix::{signal, SignalKind};
-    let mut interrupt =
-        signal(SignalKind::interrupt()).expect("Failed to register SIGINT interrupt handler");
-    let mut termination =
-        signal(SignalKind::terminate()).expect("Failed to register SIGTERM interrupt handler");
-    tokio::select! {
-        _ = interrupt.recv() =>  {
-            tracing::info!("Received SIGINT. Stopping gRPC server")
-        }
-        _ = termination.recv() => {
-            tracing::info!("Received SIGTERM. Stopping gRPC server")
-        }
-    }
-    tx.send(()).expect("Sending shutdown signal to gRPC server");
-}
-
 #[cfg(test)]
 mod tests {
     use std::{error::Error, time::Duration};
diff --git a/src/shared.rs b/src/shared.rs
index a4a407c..c86fe6a 100644
--- a/src/shared.rs
+++ b/src/shared.rs
@@ -4,6 +4,7 @@ use std::{collections::HashMap, io};
 
 use chrono::{DateTime, TimeZone, Timelike, Utc};
 use prost_types::Timestamp;
+use tokio::sync::oneshot;
 use tokio_stream::wrappers::UnixListenerStream;
 use tracing::info;
 
@@ -55,3 +56,20 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime<Utc>) -> Option<Timestamp> {
         nanos: t.nanosecond() as i32,
     })
 }
+
+pub(crate) async fn wait_for_signal(tx: oneshot::Sender<()>) {
+    use tokio::signal::unix::{signal, SignalKind};
+    let mut interrupt =
+        signal(SignalKind::interrupt()).expect("Failed to register SIGINT interrupt handler");
+    let mut termination =
+        signal(SignalKind::terminate()).expect("Failed to register SIGTERM interrupt handler");
+    tokio::select! {
+        _ = interrupt.recv() =>  {
+            tracing::info!("Received SIGINT. Stopping gRPC server")
+        }
+        _ = termination.recv() => {
+            tracing::info!("Received SIGTERM. Stopping gRPC server")
+        }
+    }
+    tx.send(()).expect("Sending shutdown signal to gRPC server");
+}
diff --git a/src/sink.rs b/src/sink.rs
index 5b58554..fe0e6af 100644
--- a/src/sink.rs
+++ b/src/sink.rs
@@ -3,11 +3,13 @@ use tokio::sync::mpsc;
 use tonic::transport::Server;
 use tonic::{Request, Status, Streaming};
 
-use sinker_grpc::sink_server::SinkServer;
-use sinker_grpc::{ReadyResponse, SinkRequest, SinkResponse};
+use crate::sink::sinker_grpc::{
+    sink_response,
+    sink_server::{Sink, SinkServer},
+    ReadyResponse, SinkRequest as RPCSinkRequest, SinkResponse,
+};
 
 use crate::shared;
-use crate::sink::sinker_grpc::sink_server::Sink;
 
 mod sinker_grpc {
     tonic::include_proto!("sink.v1");
@@ -90,49 +92,25 @@ pub trait Sinker {
     ///     Ok(())
     /// }
     /// ```
-    async fn sink<T: Datum + Send + Sync + 'static>(
-        &self,
-        input: mpsc::Receiver<T>,
-    ) -> Vec<Response>;
+    async fn sink(&self, mut input: mpsc::Receiver<SinkRequest>) -> Vec<Response>;
 }
 
-/// Response is the result returned from the [`Sinker::sink`].
-pub struct Response {
-    /// id is the unique ID of the message.
+/// Incoming request into the  handler of [`Sinker`].
+pub struct SinkRequest {
+    /// Set of keys in the (key, value) terminology of map/reduce paradigm.
+    pub keys: Vec<String>,
+    /// The value in the (key, value) terminology of map/reduce paradigm.
+    pub value: Vec<u8>,
+    /// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
+    pub watermark: DateTime<Utc>,
+    /// Time of the element as seen at source or aligned after a reduce operation.
+    pub eventtime: DateTime<Utc>,
+    /// ID is the unique id of the message to be send to the Sink.
     pub id: String,
-    /// success indicates whether the write to the sink was successful. If set to `false`, it will be
-    /// retried, hence it is better to try till it is successful.
-    pub success: bool,
-    /// err string is used to describe the error if [`Response::success`]  was `false`.
-    pub err: String,
 }
 
-/// Datum trait represents an incoming element into the [`Sinker::sink`].
-pub trait Datum {
-    /// keys are the keys in the (key, value) terminology of map/reduce paradigm.
-    fn keys(&self) -> &Vec<String>;
-    /// value is the value in (key, value) terminology of map/reduce paradigm.
-    fn value(&self) -> &Vec<u8>;
-    /// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this
-    /// time.
-    fn watermark(&self) -> DateTime<Utc>;
-    /// event_time is the time of the element as seen at source or aligned after a reduce operation.
-    fn event_time(&self) -> DateTime<Utc>;
-    /// ID corresponds the unique ID in the message.
-    fn id(&self) -> &str;
-}
-
-/// Owned copy of SinkRequest from tonic.
-struct OwnedSinkRequest {
-    keys: Vec<String>,
-    value: Vec<u8>,
-    watermark: DateTime<Utc>,
-    eventtime: DateTime<Utc>,
-    id: String,
-}
-
-impl OwnedSinkRequest {
-    fn new(sr: SinkRequest) -> Self {
+impl From<RPCSinkRequest> for SinkRequest {
+    fn from(sr: RPCSinkRequest) -> Self {
         Self {
             keys: sr.keys,
             value: sr.value,
@@ -143,25 +121,24 @@ impl OwnedSinkRequest {
     }
 }
 
-impl Datum for OwnedSinkRequest {
-    fn keys(&self) -> &Vec<String> {
-        &self.keys
-    }
-
-    fn value(&self) -> &Vec<u8> {
-        &self.value
-    }
-
-    fn watermark(&self) -> DateTime<Utc> {
-        self.watermark
-    }
-
-    fn event_time(&self) -> DateTime<Utc> {
-        self.eventtime
-    }
+/// Response is the result returned from the [`Sinker::sink`].
+pub struct Response {
+    /// id is the unique ID of the message.
+    pub id: String,
+    /// success indicates whether the write to the sink was successful. If set to `false`, it will be
+    /// retried, hence it is better to try till it is successful.
+    pub success: bool,
+    /// err string is used to describe the error if [`Response::success`]  was `false`.
+    pub err: String,
+}
 
-    fn id(&self) -> &str {
-        &self.id
+impl From<Response> for sink_response::Result {
+    fn from(r: Response) -> Self {
+        Self {
+            id: r.id,
+            success: r.success,
+            err_msg: r.err.to_string(),
+        }
     }
 }
 
@@ -172,12 +149,12 @@ where
 {
     async fn sink_fn(
         &self,
-        request: Request<Streaming<SinkRequest>>,
+        request: Request<Streaming<RPCSinkRequest>>,
     ) -> Result<tonic::Response<SinkResponse>, Status> {
         let mut stream = request.into_inner();
 
         // TODO: what should be the idle buffer size?
-        let (tx, rx) = mpsc::channel::<OwnedSinkRequest>(1);
+        let (tx, rx) = mpsc::channel::<SinkRequest>(1);
 
         // call the user's sink handle
         let sink_handle = self.handler.sink(rx);
@@ -189,9 +166,8 @@ where
                 .await
                 .expect("expected next message from stream")
             {
-                let owned_next_message = OwnedSinkRequest::new(next_message);
                 // panic is good i think!
-                tx.send(owned_next_message)
+                tx.send(next_message.into())
                     .await
                     .expect("send be successfully received!");
             }
@@ -200,18 +176,8 @@ where
         // wait for the sink handle to respond
         let responses = sink_handle.await;
 
-        // build the result
-        let mut sink_responses: Vec<sinker_grpc::sink_response::Result> = Vec::new();
-        for response in responses {
-            sink_responses.push(sinker_grpc::sink_response::Result {
-                id: response.id,
-                success: response.success,
-                err_msg: response.err.to_string(),
-            })
-        }
-
         Ok(tonic::Response::new(SinkResponse {
-            results: sink_responses,
+            results: responses.into_iter().map(|r| r.into()).collect(),
         }))
     }
 

From 5b0df1c8697146c57ef6c65e4f3c4fee67c1558e Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Wed, 14 Feb 2024 20:21:23 -0800
Subject: [PATCH 2/8] chore: add tests and update example

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 examples/sink-log/src/main.rs |   7 +-
 src/sink.rs                   | 266 +++++++++++++++++++++++++++-------
 2 files changed, 215 insertions(+), 58 deletions(-)

diff --git a/examples/sink-log/src/main.rs b/examples/sink-log/src/main.rs
index 71dac8d..0950d10 100644
--- a/examples/sink-log/src/main.rs
+++ b/examples/sink-log/src/main.rs
@@ -1,12 +1,9 @@
+use numaflow::sink::{self, Response, SinkRequest};
 use std::error::Error;
 
-use numaflow::sink::{self, start_uds_server, Response, SinkRequest};
-
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
-    start_uds_server(Logger {}).await?;
-
-    Ok(())
+    sink::Server::new(Logger {}).start().await
 }
 
 struct Logger {}
diff --git a/src/sink.rs b/src/sink.rs
index fe0e6af..1194c5c 100644
--- a/src/sink.rs
+++ b/src/sink.rs
@@ -1,6 +1,7 @@
+use std::path::PathBuf;
+
 use chrono::{DateTime, Utc};
-use tokio::sync::mpsc;
-use tonic::transport::Server;
+use tokio::sync::{mpsc, oneshot};
 use tonic::{Request, Status, Streaming};
 
 use crate::sink::sinker_grpc::{
@@ -31,43 +32,37 @@ pub trait Sinker {
     /// A simple log sink.
     ///
     /// ```rust,ignore
-    /// use numaflow::sink;
-    /// use numaflow::sink::{Datum, Response};
-    /// use tonic::async_trait;
-    ///
-    /// pub(crate) struct Logger {}
+    /// use numaflow::sink::{self, Response, SinkRequest};
+    /// use std::error::Error;
     ///
-    ///
-    /// impl Logger {
-    ///     pub(crate) fn new() -> Self {
-    ///         Self {}
-    ///     }
+    /// #[tokio::main]
+    /// async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
+    ///     sink::Server::new(Logger {}).start().await
     /// }
     ///
-    /// #[async_trait]
+    /// struct Logger {}
+    ///
+    /// #[tonic::async_trait]
     /// impl sink::Sinker for Logger {
-    ///     async fn sink<T: Datum + Send + Sync + 'static>(
-    ///         &self,
-    ///         mut input: tokio::sync::mpsc::Receiver<T>,
-    ///     ) -> Vec<Response> {
+    ///     async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
     ///         let mut responses: Vec<Response> = Vec::new();
     ///
     ///         while let Some(datum) = input.recv().await {
     ///             // do something better, but for now let's just log it.
     ///             // please note that `from_utf8` is working because the input in this
     ///             // example uses utf-8 data.
-    ///             let response = match std::str::from_utf8(datum.value()) {
+    ///             let response = match std::str::from_utf8(&datum.value) {
     ///                 Ok(v) => {
     ///                     println!("{}", v);
     ///                     // record the response
     ///                     Response {
-    ///                         id: datum.id().to_string(),
+    ///                         id: datum.id,
     ///                         success: true,
     ///                         err: "".to_string(),
     ///                     }
     ///                 }
     ///                 Err(e) => Response {
-    ///                     id: datum.id().to_string(),
+    ///                     id: datum.id,
     ///                     success: true, // there is no point setting success to false as retrying is not going to help
     ///                     err: format!("Invalid UTF-8 sequence: {}", e),
     ///                 },
@@ -80,17 +75,6 @@ pub trait Sinker {
     ///         responses
     ///     }
     /// }
-    /// #[tokio::main]
-    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    ///     use numaflow::sink::start_uds_server;
-    ///
-    ///     // sink handler
-    ///     let sink_handler = Logger::new();
-    ///
-    ///     start_uds_server(sink_handler).await?;
-    ///
-    ///     Ok(())
-    /// }
     /// ```
     async fn sink(&self, mut input: mpsc::Receiver<SinkRequest>) -> Vec<Response>;
 }
@@ -104,7 +88,7 @@ pub struct SinkRequest {
     /// [watermark](https://numaflow.numaproj.io/core-concepts/watermarks/) represented by time is a guarantee that we will not see an element older than this time.
     pub watermark: DateTime<Utc>,
     /// Time of the element as seen at source or aligned after a reduce operation.
-    pub eventtime: DateTime<Utc>,
+    pub event_time: DateTime<Utc>,
     /// ID is the unique id of the message to be send to the Sink.
     pub id: String,
 }
@@ -115,7 +99,7 @@ impl From<RPCSinkRequest> for SinkRequest {
             keys: sr.keys,
             value: sr.value,
             watermark: shared::utc_from_timestamp(sr.watermark),
-            eventtime: shared::utc_from_timestamp(sr.event_time),
+            event_time: shared::utc_from_timestamp(sr.event_time),
             id: sr.id,
         }
     }
@@ -186,24 +170,200 @@ where
     }
 }
 
-/// start_uds_server starts a gRPC server over an UDS (unix-domain-socket) endpoint.
-pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
-where
-    T: Sinker + Send + Sync + 'static,
-{
-    let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
-        "/var/run/numaflow/server-info"
-    } else {
-        "/tmp/numaflow.server-info"
-    };
-    let socket_file = "/var/run/numaflow/sink.sock";
-    let listener = shared::create_listener_stream(socket_file, server_info_file)?;
-    let sink_service = SinkService { handler: m };
-
-    Server::builder()
-        .add_service(SinkServer::new(sink_service))
-        .serve_with_incoming(listener)
-        .await?;
-
-    Ok(())
+/// gRPC server to start a sink service
+#[derive(Debug)]
+pub struct Server<T> {
+    sock_addr: PathBuf,
+    max_message_size: usize,
+    server_info_file: PathBuf,
+    svc: Option<T>,
+}
+
+impl<T: Sinker> Server<T> {
+    pub fn new(svc: T) -> Self {
+        let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
+            "/var/run/numaflow/server-info"
+        } else {
+            "/tmp/numaflow.server-info"
+        };
+        Self {
+            sock_addr: "/var/run/numaflow/sink.sock".into(),
+            max_message_size: 64 * 1024 * 1024,
+            server_info_file: server_info_file.into(),
+            svc: Some(svc),
+        }
+    }
+
+    /// Set the unix domain socket file path used by the gRPC server to listen for incoming connections. Defaults value is `/var/run/numaflow/map.sock`
+    pub fn with_socket_file(mut self, file: impl Into<PathBuf>) -> Self {
+        self.sock_addr = file.into();
+        self
+    }
+
+    /// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/map.sock`
+    pub fn socket_file(&self) -> &std::path::Path {
+        self.sock_addr.as_path()
+    }
+
+    /// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 4MB.
+    pub fn with_max_message_size(mut self, message_size: usize) -> Self {
+        self.max_message_size = message_size;
+        self
+    }
+
+    /// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB.
+    pub fn max_message_size(&self) -> usize {
+        self.max_message_size
+    }
+
+    /// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info`
+    pub fn with_server_info_file(mut self, file: impl Into<PathBuf>) -> Self {
+        self.server_info_file = file.into();
+        self
+    }
+
+    /// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info`
+    pub fn server_info_file(&self) -> &std::path::Path {
+        self.server_info_file.as_path()
+    }
+
+    /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated.
+    pub async fn start_with_shutdown(
+        &mut self,
+        shutdown: oneshot::Receiver<()>,
+    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
+    where
+        T: Sinker + Send + Sync + 'static,
+    {
+        let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
+        let handler = self.svc.take().unwrap();
+        let svc = SinkService { handler };
+        let svc = SinkServer::new(svc)
+            .max_encoding_message_size(self.max_message_size)
+            .max_decoding_message_size(self.max_message_size);
+
+        let shutdown = async {
+            shutdown
+                .await
+                .expect("Receiving message from shutdown channel");
+        };
+        tonic::transport::Server::builder()
+            .add_service(svc)
+            .serve_with_incoming_shutdown(listener, shutdown)
+            .await
+            .map_err(Into::into)
+    }
+
+    /// Starts the gRPC server. Automatically registers singal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the singal arrives.
+    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
+    where
+        T: Sinker + Send + Sync + 'static,
+    {
+        let (tx, rx) = oneshot::channel::<()>();
+        tokio::spawn(shared::wait_for_signal(tx));
+        self.start_with_shutdown(rx).await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{error::Error, time::Duration};
+    use tower::service_fn;
+
+    use crate::sink;
+    use crate::sink::sinker_grpc::sink_client::SinkClient;
+    use tempfile::TempDir;
+    use tokio::sync::oneshot;
+    use tonic::transport::Uri;
+
+    #[tokio::test]
+    async fn sink_server() -> Result<(), Box<dyn Error>> {
+        struct Logger;
+        #[tonic::async_trait]
+        impl sink::Sinker for Logger {
+            async fn sink(
+                &self,
+                mut input: tokio::sync::mpsc::Receiver<sink::SinkRequest>,
+            ) -> Vec<sink::Response> {
+                let mut responses: Vec<sink::Response> = Vec::new();
+
+                while let Some(datum) = input.recv().await {
+                    // do something better, but for now let's just log it.
+                    // please note that `from_utf8` is working because the input in this
+                    // example uses utf-8 data.
+                    let response = match std::str::from_utf8(&datum.value) {
+                        Ok(v) => {
+                            println!("{}", v);
+                            // record the response
+                            sink::Response {
+                                id: datum.id,
+                                success: true,
+                                err: "".to_string(),
+                            }
+                        }
+                        Err(e) => sink::Response {
+                            id: datum.id,
+                            success: true, // there is no point setting success to false as retrying is not going to help
+                            err: format!("Invalid UTF-8 sequence: {}", e),
+                        },
+                    };
+
+                    // return the responses
+                    responses.push(response);
+                }
+
+                responses
+            }
+        }
+
+        let tmp_dir = TempDir::new()?;
+        let sock_file = tmp_dir.path().join("map.sock");
+        let server_info_file = tmp_dir.path().join("server_info");
+
+        let mut server = sink::Server::new(Logger)
+            .with_server_info_file(&server_info_file)
+            .with_socket_file(&sock_file)
+            .with_max_message_size(10240);
+
+        assert_eq!(server.max_message_size(), 10240);
+        assert_eq!(server.server_info_file(), server_info_file);
+        assert_eq!(server.socket_file(), sock_file);
+
+        let (shutdown_tx, shutdown_rx) = oneshot::channel();
+        let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });
+
+        tokio::time::sleep(Duration::from_millis(50)).await;
+
+        // https://github.com/hyperium/tonic/blob/master/examples/src/uds/client.rs
+        let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")?
+            .connect_with_connector(service_fn(move |_: Uri| {
+                // Connect to a Uds socket
+                let sock_file = sock_file.clone();
+                tokio::net::UnixStream::connect(sock_file)
+            }))
+            .await?;
+
+        let mut client = SinkClient::new(channel);
+        let request = sink::sinker_grpc::SinkRequest {
+            keys: vec!["first".into(), "second".into()],
+            value: "hello".into(),
+            watermark: Some(prost_types::Timestamp::default()),
+            event_time: Some(prost_types::Timestamp::default()),
+            id: "1".to_string(),
+        };
+
+        let resp = client.sink_fn(tokio_stream::iter(vec![request])).await?;
+        let resp = resp.into_inner();
+        assert_eq!(resp.results.len(), 1, "Expected single message from server");
+        let msg = &resp.results[0];
+        assert_eq!(msg.err_msg, "");
+        assert_eq!(msg.id, "1");
+
+        shutdown_tx
+            .send(())
+            .expect("Sending shutdown signal to gRPC server");
+        tokio::time::sleep(Duration::from_millis(50)).await;
+        assert!(task.is_finished(), "gRPC server is still running");
+        Ok(())
+    }
 }

From d00aae2bd0e7384f4c47d909880d1df921f5fedd Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Wed, 14 Feb 2024 20:31:47 -0800
Subject: [PATCH 3/8] chore: typos

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 examples/sink-log/Cargo.toml | 3 +--
 src/map.rs                   | 3 ++-
 src/sink.rs                  | 3 ++-
 3 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/examples/sink-log/Cargo.toml b/examples/sink-log/Cargo.toml
index d91aba7..034cb88 100644
--- a/examples/sink-log/Cargo.toml
+++ b/examples/sink-log/Cargo.toml
@@ -10,5 +10,4 @@ path = "src/main.rs"
 [dependencies]
 tonic = "0.9"
 tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
-#numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
-numaflow={path="/Users/vigith/code/git/numaproj/numaflow-rs"}
+numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
diff --git a/src/map.rs b/src/map.rs
index dc34367..7f68dc7 100644
--- a/src/map.rs
+++ b/src/map.rs
@@ -142,7 +142,8 @@ impl<T: Mapper> Server<T> {
         }
     }
 
-    /// Set the unix domain socket file path used by the gRPC server to listen for incoming connections. Defaults value is `/var/run/numaflow/map.sock`
+    /// Set the unix domain socket file path used by the gRPC server to listen for incoming connections.
+    /// Default value is `/var/run/numaflow/map.sock`
     pub fn with_socket_file(mut self, file: impl Into<PathBuf>) -> Self {
         self.sock_addr = file.into();
         self
diff --git a/src/sink.rs b/src/sink.rs
index 1194c5c..cebbf1b 100644
--- a/src/sink.rs
+++ b/src/sink.rs
@@ -194,7 +194,8 @@ impl<T: Sinker> Server<T> {
         }
     }
 
-    /// Set the unix domain socket file path used by the gRPC server to listen for incoming connections. Defaults value is `/var/run/numaflow/map.sock`
+    /// Set the unix domain socket file path used by the gRPC server to listen for incoming connections.
+    /// Default value is `/var/run/numaflow/sink.sock`
     pub fn with_socket_file(mut self, file: impl Into<PathBuf>) -> Self {
         self.sock_addr = file.into();
         self

From a088e33c31d791ee6cb6b569400be11688babad8 Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Sun, 18 Feb 2024 11:30:49 -0800
Subject: [PATCH 4/8] chore: code review

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 examples/sink-log/src/main.rs |  2 +-
 src/map.rs                    |  2 +-
 src/sink.rs                   | 10 +++++-----
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/examples/sink-log/src/main.rs b/examples/sink-log/src/main.rs
index 0950d10..3406180 100644
--- a/examples/sink-log/src/main.rs
+++ b/examples/sink-log/src/main.rs
@@ -6,7 +6,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
     sink::Server::new(Logger {}).start().await
 }
 
-struct Logger {}
+struct Logger;
 
 #[tonic::async_trait]
 impl sink::Sinker for Logger {
diff --git a/src/map.rs b/src/map.rs
index 7f68dc7..3effc99 100644
--- a/src/map.rs
+++ b/src/map.rs
@@ -127,7 +127,7 @@ pub struct Server<T> {
     map_svc: Option<T>,
 }
 
-impl<T: Mapper> Server<T> {
+impl<T> Server<T> {
     pub fn new(map_svc: T) -> Self {
         let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
             "/var/run/numaflow/server-info"
diff --git a/src/sink.rs b/src/sink.rs
index cebbf1b..efe8515 100644
--- a/src/sink.rs
+++ b/src/sink.rs
@@ -20,18 +20,18 @@ struct SinkService<T: Sinker> {
     pub handler: T,
 }
 
-/// Sinker trait implements the user defined sink handle.
+/// Sinker trait for implementing user defined sinks.
 ///
 /// Types implementing this trait can be passed as user-defined sink handle.
 #[tonic::async_trait]
 pub trait Sinker {
-    /// The sink handle is given a stream of [`Datum`]. The result is [`Response`].
+    /// The sink handle is given a stream of [`SinkRequest`] and the result is [`Response`].
     ///
     /// # Example
     ///
     /// A simple log sink.
     ///
-    /// ```rust,ignore
+    /// ```no_run
     /// use numaflow::sink::{self, Response, SinkRequest};
     /// use std::error::Error;
     ///
@@ -40,7 +40,7 @@ pub trait Sinker {
     ///     sink::Server::new(Logger {}).start().await
     /// }
     ///
-    /// struct Logger {}
+    /// struct Logger;
     ///
     /// #[tonic::async_trait]
     /// impl sink::Sinker for Logger {
@@ -105,7 +105,7 @@ impl From<RPCSinkRequest> for SinkRequest {
     }
 }
 
-/// Response is the result returned from the [`Sinker::sink`].
+/// The result of the call to [`Sinker::sink`] method.
 pub struct Response {
     /// id is the unique ID of the message.
     pub id: String,

From b96abfe4b5a44b77ebfe7f497c59af52c81bf402 Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Sun, 18 Feb 2024 11:33:18 -0800
Subject: [PATCH 5/8] chore: dockerignore

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 examples/sink-log/.dockerignore | 1 +
 1 file changed, 1 insertion(+)
 create mode 100644 examples/sink-log/.dockerignore

diff --git a/examples/sink-log/.dockerignore b/examples/sink-log/.dockerignore
new file mode 100644
index 0000000..9f97022
--- /dev/null
+++ b/examples/sink-log/.dockerignore
@@ -0,0 +1 @@
+target/
\ No newline at end of file

From 46ef44cff4ab66f6f1f016fe140c76d639c8a718 Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Sun, 18 Feb 2024 11:42:02 -0800
Subject: [PATCH 6/8] chore: unit struct

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 examples/sink-log/src/main.rs | 2 +-
 src/sink.rs                   | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/examples/sink-log/src/main.rs b/examples/sink-log/src/main.rs
index 3406180..f1d6da4 100644
--- a/examples/sink-log/src/main.rs
+++ b/examples/sink-log/src/main.rs
@@ -3,7 +3,7 @@ use std::error::Error;
 
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
-    sink::Server::new(Logger {}).start().await
+    sink::Server::new(Logger).start().await
 }
 
 struct Logger;
diff --git a/src/sink.rs b/src/sink.rs
index efe8515..5824b2a 100644
--- a/src/sink.rs
+++ b/src/sink.rs
@@ -35,13 +35,13 @@ pub trait Sinker {
     /// use numaflow::sink::{self, Response, SinkRequest};
     /// use std::error::Error;
     ///
+    /// struct Logger;
+    ///
     /// #[tokio::main]
     /// async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
-    ///     sink::Server::new(Logger {}).start().await
+    ///     sink::Server::new(Logger).start().await
     /// }
     ///
-    /// struct Logger;
-    ///
     /// #[tonic::async_trait]
     /// impl sink::Sinker for Logger {
     ///     async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {

From 8c37b27389c1360da325d221cdd839ff83a84bca Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Sun, 18 Feb 2024 21:13:35 -0800
Subject: [PATCH 7/8] chore: add FIXME

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 src/sink.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/sink.rs b/src/sink.rs
index 5824b2a..4ff8b9f 100644
--- a/src/sink.rs
+++ b/src/sink.rs
@@ -150,7 +150,7 @@ where
                 .await
                 .expect("expected next message from stream")
             {
-                // panic is good i think!
+                // FIXME: panic is very bad idea!
                 tx.send(next_message.into())
                     .await
                     .expect("send be successfully received!");

From f7501871bff0493da7777474bf843755a54fa4c9 Mon Sep 17 00:00:00 2001
From: Vigith Maurice <vigith@gmail.com>
Date: Sun, 18 Feb 2024 21:20:13 -0800
Subject: [PATCH 8/8] chore: remove trait bounds

Signed-off-by: Vigith Maurice <vigith@gmail.com>
---
 src/sink.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/sink.rs b/src/sink.rs
index 4ff8b9f..57985f0 100644
--- a/src/sink.rs
+++ b/src/sink.rs
@@ -179,7 +179,7 @@ pub struct Server<T> {
     svc: Option<T>,
 }
 
-impl<T: Sinker> Server<T> {
+impl<T> Server<T> {
     pub fn new(svc: T) -> Self {
         let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
             "/var/run/numaflow/server-info"