Skip to content

Commit

Permalink
feat: Use gRPC Bidirectional Streaming for Map
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 4, 2024
1 parent 36ad0a7 commit d4c8494
Show file tree
Hide file tree
Showing 5 changed files with 429 additions and 175 deletions.
28 changes: 22 additions & 6 deletions proto/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package map.v1;

service Map {
// MapFn applies a function to each map request element.
rpc MapFn(MapRequest) returns (MapResponse);
rpc MapFn(stream MapRequest) returns (stream MapResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
Expand All @@ -17,12 +17,25 @@ service Map {
* MapRequest represents a request element.
*/
message MapRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}
Request request = 1;
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
Expand All @@ -35,6 +48,9 @@ message MapResponse {
repeated string tags = 3;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
}

/**
Expand Down
25 changes: 11 additions & 14 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub struct Message {
}

/// Represents a message that can be modified and forwarded.
impl crate::batchmap::Message {
impl Message {
/// Creates a new message with the specified value.
///
/// This constructor initializes the message with no keys, tags, or specific event time.
Expand Down Expand Up @@ -148,11 +148,11 @@ impl crate::batchmap::Message {
/// use numaflow::batchmap::Message;
/// let dropped_message = Message::message_to_drop();
/// ```
pub fn message_to_drop() -> crate::batchmap::Message {
crate::batchmap::Message {
pub fn message_to_drop() -> Message {
Message {
keys: None,
value: vec![],
tags: Some(vec![crate::batchmap::DROP.to_string()]),
tags: Some(vec![DROP.to_string()]),
}
}

Expand Down Expand Up @@ -245,11 +245,8 @@ impl<T> BatchMap for BatchMapService<T>
where
T: BatchMapper + Send + Sync + 'static,
{
async fn is_ready(
&self,
_: Request<()>,
) -> Result<tonic::Response<proto::ReadyResponse>, Status> {
Ok(tonic::Response::new(proto::ReadyResponse { ready: true }))
async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Ok(Response::new(proto::ReadyResponse { ready: true }))
}

type BatchMapFnStream = ReceiverStream<Result<proto::BatchMapResponse, Status>>;
Expand All @@ -261,7 +258,7 @@ where
let mut stream = request.into_inner();

// Create a channel to send the messages to the user defined function.
let (tx, rx) = mpsc::channel::<Datum>(1);
let (tx, rx) = channel::<Datum>(1);

// Create a channel to send the response back to the grpc client.
let (grpc_response_tx, grpc_response_rx) =
Expand Down Expand Up @@ -418,9 +415,9 @@ pub struct Server<T> {
server_info_file: PathBuf,
svc: Option<T>,
}
impl<T> crate::batchmap::Server<T> {
impl<T> Server<T> {
pub fn new(batch_map_svc: T) -> Self {
crate::batchmap::Server {
Server {
sock_addr: DEFAULT_SOCK_ADDR.into(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
server_info_file: DEFAULT_SERVER_INFO_FILE.into(),
Expand Down Expand Up @@ -478,8 +475,8 @@ impl<T> crate::batchmap::Server<T> {
let cln_token = CancellationToken::new();

// Create a channel to send shutdown signal to the server to do graceful shutdown in case of non retryable errors.
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let map_svc = crate::batchmap::BatchMapService {
let (internal_shutdown_tx, internal_shutdown_rx) = channel(1);
let map_svc = BatchMapService {
handler: Arc::new(handler),
_shutdown_tx: internal_shutdown_tx,
cancellation_token: cln_token.clone(),
Expand Down
Loading

0 comments on commit d4c8494

Please sign in to comment.