Skip to content

Commit

Permalink
add handshake for readFn
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 17, 2024
1 parent 5c37e7c commit f3061b0
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 16 deletions.
5 changes: 2 additions & 3 deletions examples/simple-source/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ update:

.PHONY: image
image: update
cd ../../ && docker build \
cd ../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
-t ${IMAGE_REGISTRY} . --platform linux/amd64,linux/arm64 --push

.PHONY: clean
clean:
Expand Down
11 changes: 4 additions & 7 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let source_handle = simple_source::SimpleSource::new("Hello World!".to_string());
let source_handle = simple_source::SimpleSource::new();
numaflow::source::Server::new(source_handle).start().await
}

pub(crate) mod simple_source {
use std::{collections::HashSet, sync::RwLock};

use chrono::Utc;
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
Expand All @@ -18,14 +17,12 @@ pub(crate) mod simple_source {
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
/// does not provide a mutable reference as explained in [`Sourcer`]
pub(crate) struct SimpleSource {
payload: String,
yet_to_ack: RwLock<HashSet<String>>,
}

impl SimpleSource {
pub(crate) fn new(payload: String) -> Self {
pub(crate) fn new() -> Self {
Self {
payload,
yet_to_ack: RwLock::new(HashSet::new()),
}
}
Expand All @@ -44,7 +41,7 @@ pub(crate) mod simple_source {
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
transmitter
.send(Message {
value: format!("{}-{}", self.payload, event_time).into_bytes(),
value: format!("{}", i).into_bytes(),
event_time,
offset: Offset {
offset: offset.clone().into_bytes(),
Expand Down
12 changes: 11 additions & 1 deletion proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ service Source {
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/*
* ReadRequest is the request for reading datum stream from user defined source.
*/
Expand All @@ -43,6 +51,7 @@ message ReadRequest {
}
// Required field indicating the request.
Request request = 1;
optional Handshake handshake = 2;
}

/*
Expand Down Expand Up @@ -82,14 +91,15 @@ message ReadResponse {
// End of transmission flag.
bool eot = 1;
Code code = 2;
Error error = 3;
optional Error error = 3;
optional string msg = 4;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
optional Handshake handshake = 3;
}

/*
Expand Down
43 changes: 38 additions & 5 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ where
headers: Default::default(),
}),
status: None,
handshake: None,
}))
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;
Expand All @@ -119,9 +120,10 @@ where
status: Some(proto::read_response::Status {
eot: true,
code: 0,
error: 0,
error: None,
msg: None,
}),
handshake: None,
}))
.await
.map_err(|e| SourceError(ErrorKind::InternalError(e.to_string())))?;
Expand All @@ -140,7 +142,7 @@ where
let (stx, srx) = mpsc::channel::<Message>(DEFAULT_CHANNEL_SIZE);

// spawn the rx side so that when the handler is invoked, we can stream the handler's read data
// to the gprc response stream.
// to the grpc response stream.
let grpc_writer_handle: JoinHandle<Result<(), Error>> =
tokio::spawn(async move { Self::write_a_batch(grpc_resp_tx, srx).await });

Expand Down Expand Up @@ -189,6 +191,33 @@ where

let cln_token = self.cancellation_token.clone();

// Handle the handshake first
let handshake_request = sr
.message()
.await
.map_err(|e| Status::internal(format!("handshake failed {}", e)))?
.ok_or_else(|| Status::internal("stream closed before handshake"))?;

if let Some(handshake) = handshake_request.handshake {
grpc_tx
.send(Ok(ReadResponse {
result: None,
status: Some(proto::read_response::Status {
eot: false,
code: 0,
error: None,
msg: None,
}),
handshake: Some(handshake),
}))
.await
.map_err(|e| {
Status::internal(format!("failed to send handshake response {}", e))
})?;
} else {
return Err(Status::invalid_argument("Handshake not present"));
}

// this is the top-level stream consumer and this task will only exit when stream is closed (which
// will happen when server and client are shutting down).
let grpc_read_handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
Expand Down Expand Up @@ -528,11 +557,8 @@ mod tests {

tokio::time::sleep(Duration::from_millis(50)).await;

// https://github.com/hyperium/tonic/blob/master/examples/src/uds/client.rs
// https://github.com/hyperium/tonic/blob/master/examples/src/uds/client.rs
let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
// https://rust-lang.github.io/async-book/03_async_await/01_chapter.html#async-lifetimes
let sock_file = sock_file.clone();
async move {
Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(
Expand All @@ -546,11 +572,18 @@ mod tests {

// Test read_fn with bidirectional streaming
let (read_tx, read_rx) = mpsc::channel(4);
let handshake_request = proto::ReadRequest {
request: None,
handshake: Some(proto::Handshake { sot: true }),
};
read_tx.send(handshake_request).await.unwrap();

let read_request = proto::ReadRequest {
request: Some(proto::read_request::Request {
num_records: 5,
timeout_in_ms: 1000,
}),
handshake: None,
};
read_tx.send(read_request).await.unwrap();
drop(read_tx); // Close the sender to indicate no more requests
Expand Down

0 comments on commit f3061b0

Please sign in to comment.