From f748f942b45a6d483ee7e81b6a5b522173425fc3 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sun, 13 Oct 2024 16:25:31 +0530 Subject: [PATCH] send eot message Signed-off-by: Yashash H L --- proto/map.proto | 11 ++++++++--- src/batchmap.rs | 19 ++++++++++++++++--- src/map.rs | 2 ++ src/servers/map.v1.rs | 16 ++++++++++------ 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/proto/map.proto b/proto/map.proto index 33c7009..780acb1 100644 --- a/proto/map.proto +++ b/proto/map.proto @@ -24,9 +24,6 @@ message MapRequest { google.protobuf.Timestamp watermark = 4; map headers = 5; } - message Status { - bool eot = 1; - } Request request = 1; // This ID is used to uniquely identify a map request string id = 2; @@ -42,6 +39,13 @@ message Handshake { bool sot = 1; } +/* + * Status message to indicate the status of the message. + */ +message Status { + bool eot = 1; +} + /** * MapResponse represents a response element. */ @@ -55,6 +59,7 @@ message MapResponse { // This ID is used to refer the responses to the request it corresponds to. string id = 2; optional Handshake handshake = 3; + optional Status status = 4; } /** diff --git a/src/batchmap.rs b/src/batchmap.rs index d7340eb..ff43e37 100644 --- a/src/batchmap.rs +++ b/src/batchmap.rs @@ -332,10 +332,22 @@ where .collect::>(), id: response.id, handshake: None, + status: None, })) .await .expect("Sending response to channel"); } + + // send the eot message to the client + resp_tx + .send(Ok(MapResponse { + results: vec![], + id: "".to_string(), + handshake: None, + status: Some(proto::Status { eot: true }), + })) + .await + .expect("Sending response to channel"); }); let mut global_stream_ended = false; @@ -441,6 +453,7 @@ where results: vec![], id: "".to_string(), handshake: Some(handshake), + status: None, })) .await .map_err(|e| { @@ -664,7 +677,7 @@ mod tests { request: None, id: "3".to_string(), handshake: None, - status: Some(batchmap::proto::map_request::Status { eot: true }), + status: Some(batchmap::proto::Status { eot: true }), }; let resp = client @@ -682,7 +695,7 @@ mod tests { responses.push(response); } - assert_eq!(responses.len(), 3, "Expected three message from server"); + assert_eq!(responses.len(), 5, "Expected five message from server"); assert!(responses[0].handshake.is_some()); assert_eq!(&responses[1].id, "1"); assert_eq!(&responses[2].id, "2"); @@ -765,7 +778,7 @@ mod tests { request: None, id: "11".to_string(), handshake: None, - status: Some(batchmap::proto::map_request::Status { eot: true }), + status: Some(batchmap::proto::Status { eot: true }), }; requests.push(eot_request); diff --git a/src/map.rs b/src/map.rs index db8ecf9..c9dd0c0 100644 --- a/src/map.rs +++ b/src/map.rs @@ -389,6 +389,7 @@ async fn run_map( results: messages.into_iter().map(|msg| msg.into()).collect(), id: message_id, handshake: None, + status: None, })) .await; @@ -420,6 +421,7 @@ async fn perform_handshake( results: vec![], id: "".to_string(), handshake: Some(handshake), + status: None, })) .await .map_err(|e| Status::internal(format!("Failed to send handshake response: {}", e)))?; diff --git a/src/servers/map.v1.rs b/src/servers/map.v1.rs index 634c121..d9d4275 100644 --- a/src/servers/map.v1.rs +++ b/src/servers/map.v1.rs @@ -11,7 +11,7 @@ pub struct MapRequest { #[prost(message, optional, tag = "3")] pub handshake: ::core::option::Option, #[prost(message, optional, tag = "4")] - pub status: ::core::option::Option, + pub status: ::core::option::Option, } /// Nested message and enum types in `MapRequest`. pub mod map_request { @@ -31,11 +31,6 @@ pub mod map_request { ::prost::alloc::string::String, >, } - #[derive(Clone, Copy, PartialEq, ::prost::Message)] - pub struct Status { - #[prost(bool, tag = "1")] - pub eot: bool, - } } /// /// Handshake message between client and server to indicate the start of transmission. @@ -45,6 +40,13 @@ pub struct Handshake { #[prost(bool, tag = "1")] pub sot: bool, } +/// +/// Status message to indicate the status of the message. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Status { + #[prost(bool, tag = "1")] + pub eot: bool, +} /// * /// MapResponse represents a response element. #[derive(Clone, PartialEq, ::prost::Message)] @@ -56,6 +58,8 @@ pub struct MapResponse { pub id: ::prost::alloc::string::String, #[prost(message, optional, tag = "3")] pub handshake: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub status: ::core::option::Option, } /// Nested message and enum types in `MapResponse`. pub mod map_response {