diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1f5f855 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +target/ +**/target/ \ No newline at end of file diff --git a/src/sink.rs b/src/sink.rs index 2cda073..2e2f3ba 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -18,7 +18,8 @@ const DEFAULT_FB_SOCK_ADDR: &str = "/var/run/numaflow/fb-sink.sock"; const DEFAULT_FB_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-info"; const ENV_UD_CONTAINER_TYPE: &str = "NUMAFLOW_UD_CONTAINER_TYPE"; const UD_CONTAINER_FB_SINK: &str = "fb-udsink"; - +// TODO: use batch-size, blocked by https://github.com/numaproj/numaflow/issues/2026 +const DEFAULT_CHANNEL_SIZE: usize = 1000; /// Numaflow Sink Proto definitions. pub mod proto { tonic::include_proto!("sink.v1"); @@ -185,8 +186,8 @@ where let sink_handle = self.handler.clone(); let cancellation_token = self.cancellation_token.clone(); let shutdown_tx = self.shutdown_tx.clone(); - // TODO: what should be the idle buffer size? - let (tx, rx) = mpsc::channel::(1); + // FIXME: we should be using the batch size as the channel size + let (tx, rx) = mpsc::channel::(DEFAULT_CHANNEL_SIZE); let reader_shutdown_tx = shutdown_tx.clone(); // spawn a task to read messages from the stream and send them to the user's sink handle diff --git a/src/source.rs b/src/source.rs index b19d1ec..af3cf3e 100644 --- a/src/source.rs +++ b/src/source.rs @@ -87,11 +87,12 @@ where let sr = request.into_inner().request.unwrap(); // tx,rx pair for sending data over to user-defined source - let (stx, mut srx) = mpsc::channel::(1); + let (stx, mut srx) = mpsc::channel::(sr.num_records as usize); // tx,rx pair for gRPC response - let (tx, rx) = mpsc::channel::>(1); + let (tx, rx) = + mpsc::channel::>(sr.num_records as usize); - // start the ud-source rx asynchronously and start populating the gRPC response so it can be streamed to the gRPC client (numaflow). + // start the ud-source rx asynchronously and start populating the gRPC response, so it can be streamed to the gRPC client (numaflow). tokio::spawn(async move { while let Some(resp) = srx.recv().await { tx.send(Ok(proto::ReadResponse {