Skip to content

Commit

Permalink
send eot message
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 13, 2024
1 parent 2f40e4b commit f748f94
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
11 changes: 8 additions & 3 deletions proto/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ message MapRequest {
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}
message Status {
bool eot = 1;
}
Request request = 1;
// This ID is used to uniquely identify a map request
string id = 2;
Expand All @@ -42,6 +39,13 @@ message Handshake {
bool sot = 1;
}

/*
* Status message to indicate the status of the message.
*/
message Status {
bool eot = 1;
}

/**
* MapResponse represents a response element.
*/
Expand All @@ -55,6 +59,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
}

/**
Expand Down
19 changes: 16 additions & 3 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,22 @@ where
.collect::<Vec<proto::map_response::Result>>(),
id: response.id,
handshake: None,
status: None,
}))
.await
.expect("Sending response to channel");
}

// send the eot message to the client
resp_tx
.send(Ok(MapResponse {
results: vec![],
id: "".to_string(),
handshake: None,
status: Some(proto::Status { eot: true }),
}))
.await
.expect("Sending response to channel");
});

let mut global_stream_ended = false;
Expand Down Expand Up @@ -441,6 +453,7 @@ where
results: vec![],
id: "".to_string(),
handshake: Some(handshake),
status: None,
}))
.await
.map_err(|e| {
Expand Down Expand Up @@ -664,7 +677,7 @@ mod tests {
request: None,
id: "3".to_string(),
handshake: None,
status: Some(batchmap::proto::map_request::Status { eot: true }),
status: Some(batchmap::proto::Status { eot: true }),
};

let resp = client
Expand All @@ -682,7 +695,7 @@ mod tests {
responses.push(response);
}

assert_eq!(responses.len(), 3, "Expected three message from server");
assert_eq!(responses.len(), 5, "Expected five message from server");
assert!(responses[0].handshake.is_some());
assert_eq!(&responses[1].id, "1");
assert_eq!(&responses[2].id, "2");
Expand Down Expand Up @@ -765,7 +778,7 @@ mod tests {
request: None,
id: "11".to_string(),
handshake: None,
status: Some(batchmap::proto::map_request::Status { eot: true }),
status: Some(batchmap::proto::Status { eot: true }),
};
requests.push(eot_request);

Expand Down
2 changes: 2 additions & 0 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ async fn run_map<T>(
results: messages.into_iter().map(|msg| msg.into()).collect(),
id: message_id,
handshake: None,
status: None,
}))
.await;

Expand Down Expand Up @@ -420,6 +421,7 @@ async fn perform_handshake(
results: vec![],
id: "".to_string(),
handshake: Some(handshake),
status: None,
}))
.await
.map_err(|e| Status::internal(format!("Failed to send handshake response: {}", e)))?;
Expand Down
16 changes: 10 additions & 6 deletions src/servers/map.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct MapRequest {
#[prost(message, optional, tag = "3")]
pub handshake: ::core::option::Option<Handshake>,
#[prost(message, optional, tag = "4")]
pub status: ::core::option::Option<map_request::Status>,
pub status: ::core::option::Option<Status>,
}
/// Nested message and enum types in `MapRequest`.
pub mod map_request {
Expand All @@ -31,11 +31,6 @@ pub mod map_request {
::prost::alloc::string::String,
>,
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Status {
#[prost(bool, tag = "1")]
pub eot: bool,
}
}
///
/// Handshake message between client and server to indicate the start of transmission.
Expand All @@ -45,6 +40,13 @@ pub struct Handshake {
#[prost(bool, tag = "1")]
pub sot: bool,
}
///
/// Status message to indicate the status of the message.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Status {
#[prost(bool, tag = "1")]
pub eot: bool,
}
/// *
/// MapResponse represents a response element.
#[derive(Clone, PartialEq, ::prost::Message)]
Expand All @@ -56,6 +58,8 @@ pub struct MapResponse {
pub id: ::prost::alloc::string::String,
#[prost(message, optional, tag = "3")]
pub handshake: ::core::option::Option<Handshake>,
#[prost(message, optional, tag = "4")]
pub status: ::core::option::Option<Status>,
}
/// Nested message and enum types in `MapResponse`.
pub mod map_response {
Expand Down

0 comments on commit f748f94

Please sign in to comment.