Skip to content

Commit

Permalink
use read batch size as the channel size
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 4, 2024
1 parent c456a31 commit 9c893db
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
3 changes: 1 addition & 2 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const DEFAULT_FB_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-in
const ENV_UD_CONTAINER_TYPE: &str = "NUMAFLOW_UD_CONTAINER_TYPE";
const UD_CONTAINER_FB_SINK: &str = "fb-udsink";
const DEFAULT_CHANNEL_SIZE: usize = 1000;

/// Numaflow Sink Proto definitions.
pub mod proto {
tonic::include_proto!("sink.v1");
Expand Down Expand Up @@ -186,7 +185,7 @@ where
let sink_handle = self.handler.clone();
let cancellation_token = self.cancellation_token.clone();
let shutdown_tx = self.shutdown_tx.clone();
// TODO: what should be the idle buffer size?
// FIXME: we should be using the batch size as the channel size
let (tx, rx) = mpsc::channel::<SinkRequest>(DEFAULT_CHANNEL_SIZE);

let reader_shutdown_tx = shutdown_tx.clone();
Expand Down
6 changes: 3 additions & 3 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tonic::{async_trait, Request, Response, Status};
const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/source.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info";
const DEFAULT_CHANNEL_SIZE: usize = 1000;

/// Source Proto definitions.
pub mod proto {
Expand Down Expand Up @@ -88,9 +87,10 @@ where
let sr = request.into_inner().request.unwrap();

// tx,rx pair for sending data over to user-defined source
let (stx, mut srx) = mpsc::channel::<Message>(DEFAULT_CHANNEL_SIZE);
let (stx, mut srx) = mpsc::channel::<Message>(sr.num_records as usize);
// tx,rx pair for gRPC response
let (tx, rx) = mpsc::channel::<Result<proto::ReadResponse, Status>>(DEFAULT_CHANNEL_SIZE);
let (tx, rx) =
mpsc::channel::<Result<proto::ReadResponse, Status>>(sr.num_records as usize);

// start the ud-source rx asynchronously and start populating the gRPC response, so it can be streamed to the gRPC client (numaflow).
tokio::spawn(async move {
Expand Down

0 comments on commit 9c893db

Please sign in to comment.