Skip to content

Commit

Permalink
chore: Send EOT message after the responses are forwarded from Sink (#99
Browse files Browse the repository at this point in the history
)

Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Oct 14, 2024
1 parent 40c0550 commit 9fb3c0a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 27 deletions.
6 changes: 3 additions & 3 deletions proto/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ message MapRequest {
// This ID is used to uniquely identify a map request
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/*
Expand All @@ -42,7 +42,7 @@ message Handshake {
/*
* Status message to indicate the status of the message.
*/
message Status {
message TransmissionStatus {
bool eot = 1;
}

Expand All @@ -59,7 +59,7 @@ message MapResponse {
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
optional Handshake handshake = 3;
optional Status status = 4;
optional TransmissionStatus status = 4;
}

/**
Expand Down
13 changes: 9 additions & 4 deletions proto/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@ message SinkRequest {
string id = 5;
map<string, string> headers = 6;
}
message Status {
bool eot = 1;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
Status status = 2;
TransmissionStatus status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}
Expand All @@ -52,6 +49,13 @@ message ReadyResponse {
bool ready = 1;
}

/**
* TransmissionStatus is the status of the transmission.
*/
message TransmissionStatus {
bool eot = 1;
}

/*
* Status is the status of the response.
*/
Expand All @@ -75,4 +79,5 @@ message SinkResponse {
}
Result result = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
6 changes: 3 additions & 3 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ where
results: vec![],
id: "".to_string(),
handshake: None,
status: Some(proto::Status { eot: true }),
status: Some(proto::TransmissionStatus { eot: true }),
}))
.await
.expect("Sending response to channel");
Expand Down Expand Up @@ -677,7 +677,7 @@ mod tests {
request: None,
id: "3".to_string(),
handshake: None,
status: Some(batchmap::proto::Status { eot: true }),
status: Some(batchmap::proto::TransmissionStatus { eot: true }),
};

let resp = client
Expand Down Expand Up @@ -778,7 +778,7 @@ mod tests {
request: None,
id: "11".to_string(),
handshake: None,
status: Some(batchmap::proto::Status { eot: true }),
status: Some(batchmap::proto::TransmissionStatus { eot: true }),
};
requests.push(eot_request);

Expand Down
6 changes: 3 additions & 3 deletions src/servers/map.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct MapRequest {
#[prost(message, optional, tag = "3")]
pub handshake: ::core::option::Option<Handshake>,
#[prost(message, optional, tag = "4")]
pub status: ::core::option::Option<Status>,
pub status: ::core::option::Option<TransmissionStatus>,
}
/// Nested message and enum types in `MapRequest`.
pub mod map_request {
Expand Down Expand Up @@ -43,7 +43,7 @@ pub struct Handshake {
///
/// Status message to indicate the status of the message.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Status {
pub struct TransmissionStatus {
#[prost(bool, tag = "1")]
pub eot: bool,
}
Expand All @@ -59,7 +59,7 @@ pub struct MapResponse {
#[prost(message, optional, tag = "3")]
pub handshake: ::core::option::Option<Handshake>,
#[prost(message, optional, tag = "4")]
pub status: ::core::option::Option<Status>,
pub status: ::core::option::Option<TransmissionStatus>,
}
/// Nested message and enum types in `MapResponse`.
pub mod map_response {
Expand Down
16 changes: 10 additions & 6 deletions src/servers/sink.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct SinkRequest {
/// Required field indicating the status of the request.
/// If eot is set to true, it indicates the end of transmission.
#[prost(message, optional, tag = "2")]
pub status: ::core::option::Option<sink_request::Status>,
pub status: ::core::option::Option<TransmissionStatus>,
/// optional field indicating the handshake message.
#[prost(message, optional, tag = "3")]
pub handshake: ::core::option::Option<Handshake>,
Expand All @@ -34,11 +34,6 @@ pub mod sink_request {
::prost::alloc::string::String,
>,
}
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct Status {
#[prost(bool, tag = "1")]
pub eot: bool,
}
}
///
/// Handshake message between client and server to indicate the start of transmission.
Expand All @@ -56,13 +51,22 @@ pub struct ReadyResponse {
pub ready: bool,
}
/// *
/// TransmissionStatus is the status of the transmission.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct TransmissionStatus {
#[prost(bool, tag = "1")]
pub eot: bool,
}
/// *
/// SinkResponse is the individual response of each message written to the sink.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SinkResponse {
#[prost(message, optional, tag = "1")]
pub result: ::core::option::Option<sink_response::Result>,
#[prost(message, optional, tag = "2")]
pub handshake: ::core::option::Option<Handshake>,
#[prost(message, optional, tag = "3")]
pub status: ::core::option::Option<TransmissionStatus>,
}
/// Nested message and enum types in `SinkResponse`.
pub mod sink_response {
Expand Down
42 changes: 34 additions & 8 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,20 @@ where
.send(Ok(SinkResponse {
result: Some(response.into()),
handshake: None,
status: None,
}))
.await
.expect("Sending response to channel");
}
// send an EOT message to the client to indicate the end of transmission for this batch
resp_tx
.send(Ok(SinkResponse {
result: None,
handshake: None,
status: Some(sink_pb::TransmissionStatus { eot: true }),
}))
.await
.expect("Sending EOT message to channel");
});

let mut global_stream_ended = false;
Expand Down Expand Up @@ -377,6 +387,7 @@ where
.send(Ok(SinkResponse {
result: None,
handshake: Some(handshake),
status: None,
}))
.await
.map_err(|e| {
Expand Down Expand Up @@ -512,17 +523,17 @@ impl<C> Drop for Server<C> {
mod tests {
use std::{error::Error, time::Duration};

use crate::servers::sink::TransmissionStatus;
use crate::sink;
use crate::sink::sink_pb::sink_client::SinkClient;
use crate::sink::sink_pb::sink_request::Request;
use crate::sink::sink_pb::Handshake;
use tempfile::TempDir;
use tokio::net::UnixStream;
use tokio::sync::oneshot;
use tonic::transport::Uri;
use tower::service_fn;

use crate::sink;
use crate::sink::sink_pb::sink_client::SinkClient;
use crate::sink::sink_pb::sink_request::{Request, Status};
use crate::sink::sink_pb::Handshake;

#[tokio::test]
async fn sink_server() -> Result<(), Box<dyn Error>> {
struct Logger;
Expand Down Expand Up @@ -600,7 +611,7 @@ mod tests {

let eot_request = sink::sink_pb::SinkRequest {
request: None,
status: Some(Status { eot: true }),
status: Some(TransmissionStatus { eot: true }),
handshake: None,
};

Expand All @@ -621,8 +632,9 @@ mod tests {
.sink_fn(tokio_stream::iter(vec![
handshake_request,
request,
eot_request,
eot_request.clone(),
request_two,
eot_request,
]))
.await?;

Expand All @@ -638,13 +650,27 @@ mod tests {
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "1");

// eot for first request
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.handshake.is_none());
let msg = &resp.status.unwrap();
assert!(msg.eot);

let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_some());
assert!(resp.handshake.is_none());
let msg = &resp.result.unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "2");

// eot for second request
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(resp.result.is_none());
assert!(resp.handshake.is_none());
let msg = &resp.status.unwrap();
assert!(msg.eot);

shutdown_tx
.send(())
.expect("Sending shutdown signal to gRPC server");
Expand Down Expand Up @@ -735,7 +761,7 @@ mod tests {

requests.push(sink::sink_pb::SinkRequest {
request: None,
status: Some(Status { eot: true }),
status: Some(TransmissionStatus { eot: true }),
handshake: None,
});

Expand Down

0 comments on commit 9fb3c0a

Please sign in to comment.