Skip to content

Commit

Permalink
introduce handshake
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 17, 2024
1 parent bd95156 commit 19326f8
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 97 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.17
github.com/nats-io/nats.go v1.36.0
github.com/numaproj/numaflow-go v0.8.2-0.20240916060625-c2c5d1798c2e
github.com/numaproj/numaflow-go v0.8.2-0.20240917052911-ee2f3086d64e
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDm
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.8.2-0.20240916060625-c2c5d1798c2e h1:hHugY5YX3IoguvCzFw4MopRgXklJ7wq1Rgclw3LCdxQ=
github.com/numaproj/numaflow-go v0.8.2-0.20240916060625-c2c5d1798c2e/go.mod h1:g4JZOyUPhjfhv+kR0sX5d8taw/dasgKPXLvQBi39mJ4=
github.com/numaproj/numaflow-go v0.8.2-0.20240917052911-ee2f3086d64e h1:F3iujbel8y5X20bVMY0Am6XDyL5eDOC/6kxyI8uxfpg=
github.com/numaproj/numaflow-go v0.8.2-0.20240917052911-ee2f3086d64e/go.mod h1:g4JZOyUPhjfhv+kR0sX5d8taw/dasgKPXLvQBi39mJ4=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
168 changes: 89 additions & 79 deletions pkg/apis/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,92 +24,102 @@ import "google/protobuf/empty.proto";

package source.v1;

service Source {
// Read returns a stream of datum responses.
// The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream.
rpc ReadFn(stream ReadRequest) returns (stream ReadResponse);

// AckFn acknowledges a stream of datum offsets.
// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex.
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(stream AckRequest) returns (AckResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);

// PartitionsFn returns the list of partitions for the user defined source.
rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse);

// IsReady is the heartbeat endpoint for user defined source gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* ReadRequest is the request for reading datum stream from user defined source.
*/
message ReadRequest {
message Request {
// Required field indicating the number of records to read.
uint64 num_records = 1;
// Required field indicating the request timeout in milliseconds.
// uint32 can represent 2^32 milliseconds, which is about 49 days.
// We don't use uint64 because time.Duration takes int64 as nano seconds. Using uint64 for milli will cause overflow.
uint32 timeout_in_ms = 2;
service Source {
// Read returns a stream of datum responses.
// The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream.
rpc ReadFn(stream ReadRequest) returns (stream ReadResponse);

// AckFn acknowledges a stream of datum offsets.
// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex.
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(stream AckRequest) returns (AckResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);

// PartitionsFn returns the list of partitions for the user defined source.
rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse);

// IsReady is the heartbeat endpoint for user defined source gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}
// Required field indicating the request.
Request request = 1;
}

/*
* ReadResponse is the response for reading datum stream from user defined source.
*/
message ReadResponse {
message Result {
// Required field holding the payload of the datum.
bytes payload = 1;
// Required field indicating the offset information of the datum.
Offset offset = 2;
// Required field representing the time associated with each datum. It is used for watermarking.
google.protobuf.Timestamp event_time = 3;
// Optional list of keys associated with the datum.
// Key is the "key" attribute in (key,value) as in the map-reduce paradigm.
// We add this optional field to support the use case where the user defined source can provide keys for the datum.
// e.g. Kafka and Redis Stream message usually include information about the keys.
repeated string keys = 4;
// Optional list of headers associated with the datum.
// Headers are the metadata associated with the datum.
// e.g. Kafka and Redis Stream message usually include information about the headers.
map<string, string> headers = 5;
/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}
message Status {
// Code to indicate the status of the response.
enum Code {
SUCCESS = 0;
FAILURE = 1;
}

// Error to indicate the error type. If the code is FAILURE, then the error field will be populated.
enum Error {
UNACKED = 0;
OTHER = 1;
/*
* ReadRequest is the request for reading datum stream from user defined source.
*/
message ReadRequest {
message Request {
// Required field indicating the number of records to read.
uint64 num_records = 1;
// Required field indicating the request timeout in milliseconds.
// uint32 can represent 2^32 milliseconds, which is about 49 days.
// We don't use uint64 because time.Duration takes int64 as nano seconds. Using uint64 for milli will cause overflow.
uint32 timeout_in_ms = 2;
}
// Required field indicating the request.
Request request = 1;
optional Handshake handshake = 2;
}

// End of transmission flag.
bool eot = 1;
Code code = 2;
Error error = 3;
optional string msg = 4;
/*
* ReadResponse is the response for reading datum stream from user defined source.
*/
message ReadResponse {
message Result {
// Required field holding the payload of the datum.
bytes payload = 1;
// Required field indicating the offset information of the datum.
Offset offset = 2;
// Required field representing the time associated with each datum. It is used for watermarking.
google.protobuf.Timestamp event_time = 3;
// Optional list of keys associated with the datum.
// Key is the "key" attribute in (key,value) as in the map-reduce paradigm.
// We add this optional field to support the use case where the user defined source can provide keys for the datum.
// e.g. Kafka and Redis Stream message usually include information about the keys.
repeated string keys = 4;
// Optional list of headers associated with the datum.
// Headers are the metadata associated with the datum.
// e.g. Kafka and Redis Stream message usually include information about the headers.
map<string, string> headers = 5;
}
message Status {
// Code to indicate the status of the response.
enum Code {
SUCCESS = 0;
FAILURE = 1;
}

// Error to indicate the error type. If the code is FAILURE, then the error field will be populated.
enum Error {
UNACKED = 0;
OTHER = 1;
}

// End of transmission flag.
bool eot = 1;
Code code = 2;
optional Error error = 3;
optional string msg = 4;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
optional Handshake handshake = 3;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
}

/*
* AckRequest is the request for acknowledging datum.
Expand Down
19 changes: 19 additions & 0 deletions pkg/sdkclient/source/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@ waitUntilReady:
return nil, fmt.Errorf("failed to create ack stream: %v", err)
}

// Send handshake request
handshakeRequest := &sourcepb.ReadRequest{
Handshake: &sourcepb.Handshake{
Sot: true,
},
}
if err := c.readStream.Send(handshakeRequest); err != nil {
return nil, fmt.Errorf("failed to send handshake request: %v", err)
}

// Wait for handshake response
handshakeResponse, err := c.readStream.Recv()
if err != nil {
return nil, fmt.Errorf("failed to receive handshake response: %v", err)
}
if handshakeResponse.GetHandshake() == nil || !handshakeResponse.GetHandshake().GetSot() {
return nil, fmt.Errorf("invalid handshake response")
}

return c, nil
}

Expand Down
26 changes: 24 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/monovertex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ log = "0.4.22"

[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "source-streaming" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "handshake" }

[build-dependencies]
tonic-build = "0.12.1"
13 changes: 11 additions & 2 deletions rust/monovertex/proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ service Source {
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/*
* ReadRequest is the request for reading datum stream from user defined source.
*/
Expand All @@ -43,6 +51,7 @@ message ReadRequest {
}
// Required field indicating the request.
Request request = 1;
optional Handshake handshake = 2;
}

/*
Expand Down Expand Up @@ -82,14 +91,14 @@ message ReadResponse {
// End of transmission flag.
bool eot = 1;
Code code = 2;
Error error = 3;
optional Error error = 3;
optional string msg = 4;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
optional Handshake handshake = 3;
}

/*
Expand Down
27 changes: 23 additions & 4 deletions rust/monovertex/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ impl Forwarder {
let messages = self
.source_reader
.read(config().batch_size, config().timeout_in_ms)
.await?;
.await
.map_err(|e| {
Error::ForwarderError(format!("Failed to read messages from source {:?}", e))
})?;

debug!(
"Read batch size: {} and latency - {}ms",
messages.len(),
start_time.elapsed().as_millis()
);

forward_metrics()
.read_time
.get_or_create(&self.common_labels)
Expand Down Expand Up @@ -161,13 +166,27 @@ impl Forwarder {
.inc_by(bytes_count);

// Apply transformation if transformer is present
let transformed_messages = self.apply_transformer(messages).await?;
let transformed_messages = self.apply_transformer(messages).await.map_err(|e| {
Error::ForwarderError(format!(
"Failed to apply transformation to messages {:?}",
e
))
})?;

// Write the messages to the sink
self.write_to_sink(transformed_messages).await?;
self.write_to_sink(transformed_messages)
.await
.map_err(|e| {
Error::ForwarderError(format!("Failed to write messages to sink {:?}", e))
})?;

// Acknowledge the messages back to the source
self.acknowledge_messages(offsets).await?;
self.acknowledge_messages(offsets).await.map_err(|e| {
Error::ForwarderError(format!(
"Failed to acknowledge messages back to source {:?}",
e
))
})?;

Ok(msg_count as usize)
}
Expand Down
1 change: 1 addition & 0 deletions rust/monovertex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async fn start_forwarder(cln_token: CancellationToken) -> Result<()> {

// readiness check for all the ud containers
startup::wait_until_ready(
cln_token.clone(),
&mut source_grpc_client,
&mut sink_grpc_client,
&mut transformer_grpc_client,
Expand Down
Loading

0 comments on commit 19326f8

Please sign in to comment.