Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use gRPC bidirectional streaming for source transformer #2071

Merged
merged 11 commits into from
Oct 2, 2024

Conversation

BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Sep 18, 2024

Copy link

codecov bot commented Sep 18, 2024

Codecov Report

Attention: Patch coverage is 72.25806% with 86 lines in your changes missing coverage. Please review.

Project coverage is 64.49%. Comparing base (ac7b33b) to head (0b9cede).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
pkg/sdkclient/sourcetransformer/client.go 38.23% 34 Missing and 8 partials ⚠️
rust/numaflow-core/src/transformer/user_defined.rs 81.45% 23 Missing ⚠️
pkg/isb/tracker/message_tracker.go 66.66% 8 Missing ⚠️
pkg/udf/rpc/grpc_batch_map.go 20.00% 4 Missing ⚠️
pkg/sources/transformer/grpc_transformer.go 94.82% 2 Missing and 1 partial ⚠️
pkg/sources/forward/applier/sourcetransformer.go 0.00% 2 Missing ⚠️
pkg/webhook/validator/validator.go 33.33% 1 Missing and 1 partial ⚠️
pkg/sources/source.go 0.00% 1 Missing ⚠️
pkg/udf/forward/forward.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2071      +/-   ##
==========================================
+ Coverage   64.44%   64.49%   +0.04%     
==========================================
  Files         324      324              
  Lines       30502    30579      +77     
==========================================
+ Hits        19658    19721      +63     
- Misses       9819     9820       +1     
- Partials     1025     1038      +13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -25,13 +25,13 @@ import (
// SourceTransformApplier applies the source transform on the read message and gives back a new message. Any UserError will be retried here, while
// InternalErr can be returned and could be retried by the callee.
type SourceTransformApplier interface {
ApplyTransform(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error)
ApplyTransform(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please help me understand the need for this signature change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we run the below func in goroutines and call ApplyTransform within it:

concurrentApplyTransformer(ctx context.Context, readMessagePair <-chan *isb.ReadWriteMessagePair)

It retrieves message from the readMessagePair channel, apply transformer, and store the WriteMessages in the same element.

With bidrectional streaming, the concurrency is moved to the server side. It might be difficult to have a function signature that takes a message and returns the output since we are not sure of the result order from the stream. Instead, we send all messages, wait for all output and aggregate the result in the new implementation.

func (df *DataForward) concurrentApplyTransformer(ctx context.Context, readMessagePair <-chan *isb.ReadWriteMessagePair) {
for message := range readMessagePair {
start := time.Now()
metrics.SourceTransformerReadMessagesCount.With(map[string]string{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need this metrics?

}
taggedMessages = append(taggedMessages, taggedMessage)
if !tracker.IsEmpty() {
return nil, errors.New("transform response for all requests were not received from UDF")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we dump the tracker state at this point if you think it could help triaging later on?

/// # Returns
/// A string representing the `OnFailureStrategy` enum variant.
fn to_string(&self) -> String {
impl Display for OnFailureStrategy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you :)

pub(crate) async fn new(client: SourceTransformClient<Channel>) -> Result<Self> {
Ok(Self { client })
pub(crate) async fn new(mut client: SourceTransformClient<Channel>) -> Result<Self> {
let (read_tx, read_rx) = mpsc::channel(config().batch_size as usize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: referencing config directly can make testing difficult if you want to change the value for testing.

Comment on lines 44 to 52
let transform_fn_with_timeout = tokio::time::timeout(
Duration::from_secs(3),
client.source_transform_fn(Request::new(read_stream)),
);
let Ok(resp_stream) = transform_fn_with_timeout.await else {
return Err(Error::TransformerError(
"connection to transformer gRPC server timed out".to_string(),
));
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only 3 sec timeout? will this whole thing become flaky in a busy system?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we create the stream at the beginning and we don't create multiple streams, this should succeed immediately. I added the timeout because the tests were stuck when there was some issue with the server (server started,but there was a mismatch in the protobuf files on the client and the server). I will remove the timeouts.

}

Ok(Some(messages))
let sender_task = tokio::time::timeout(Duration::from_millis(2), sender);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please document your thought on why you did this approach.

@@ -6,68 +6,68 @@ import (
"github.com/numaproj/numaflow/pkg/isb"
)

// tracker is used to store a key value pair for string and *isb.ReadMessage
// Tracker is used to store a key value pair for string and *isb.ReadMessage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this file to pkg/isb/ and rename to message_tracker.go, also rename the struck name?


use crate::config::config;
use crate::error::{Error, Result};
use crate::message::{Message, Offset};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this Message will be moved to a separate package that will be shared by both MonoVertex and Pipeline.

};
for (i, result) in resp.results.into_iter().enumerate() {
if result.tags.iter().any(|x| x == DROP) {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: metrics

for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("waiting for transformer gRPC server to be ready: %w", context.Cause(ctx))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("waiting for transformer gRPC server to be ready: %w", context.Cause(ctx))
return nil, fmt.Errorf("failed waiting for transformer gRPC server to be ready: %w", ctx.Err())

context.Cause(ctx) returns same error obj as ctx.Err(). let's change it to be consistent with other sdk clients.


c.stream, err = c.grpcClt.SourceTransformFn(ctx)
if err != nil {
return nil, fmt.Errorf("creating a gRPC stream for source transform: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("creating a gRPC stream for source transform: %w", err)
return nil, fmt.Errorf("failed creating a gRPC stream for source transform: %w", err)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen somewhere (Go blog / talks), to not include "failed" in an error message, since error type itself indicates that it failed. If the error gets wrapped again on the caller's side, it could result in a final message of below pattern:

Failed to create user: Failed to create new account: Failed to INSERT new account record: database not connected

Compared to:

Failed to create user: creating new account:  INSERT new account record: database not connected

return c, nil
}
func doHandshake(stream transformpb.SourceTransform_SourceTransformFnClient) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func doHandshake(stream transformpb.SourceTransform_SourceTransformFnClient) error {
func doHandshake(stream transformpb.SourceTransform_SourceTransformFnClient) error {

},
}
if err := stream.Send(handshakeReq); err != nil {
return fmt.Errorf("sending handshake request for source tansform: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("sending handshake request for source tansform: %w", err)
return fmt.Errorf("failed sending handshake request for source tansform: %w", err)


handshakeResp, err := stream.Recv()
if err != nil {
return fmt.Errorf("receiving handshake response from source transform stream: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("receiving handshake response from source transform stream: %w", err)
return fmt.Errorf("failed receiving handshake response from source transform stream: %w", err)

Tags: result.Tags,
}
taggedMessages = append(taggedMessages, taggedMessage)
if !tracker.IsEmpty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the current implementation, this if statement should never hold true, right? because we only break the loop when messageCount is 0.

m: make(map[string]*isb.ReadMessage),
lock: sync.RWMutex{},
}
}

// addRequest add a new entry for a given message to the tracker.
// AddRequest add a new entry for a given message to the Tracker.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming: please be concise. if we are saving messages, use AddMessage instead of Request.

// to indicate if it does not exist
func (t *tracker) getRequest(id string) (*isb.ReadMessage, bool) {
func (t *Tracker) GetRequest(id string) (*isb.ReadMessage, bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simply return *isb.RM and use nil to indicate message doesn't exist?

return &tracker{
// NewTracker initializes a new instance of a Tracker
func NewTracker() *Tracker {
return &Tracker{
m: make(map[string]*isb.ReadMessage),
lock: sync.RWMutex{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a fundamental question. With bidirectional stream, are we trying to improve performance? If so, is having this tracker with RWMutex making introducing extra io latency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutex operations (unless there is too much contention) are much faster than network calls (nanoseconds vs micro/milli seconds). Unary calls has some overhead. As per the docs:

Use streaming RPCs when handling a long-lived logical flow of data from the client-to-server, server-to-client, or in both directions. Streams can avoid continuous RPC initiation, which includes connection load balancing at the client-side, starting a new HTTP/2 request at the transport layer, and invoking a user-defined method handler on the server side.

// removeRequest will remove the entry for a given id
func (t *tracker) removeRequest(id string) {
// RemoveRequest will remove the entry for a given id
func (t *Tracker) RemoveRequest(id string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove can be merged with Get as one method.

@BulkBeing BulkBeing force-pushed the source-transformer-streaming branch from 8cad234 to 8c96ca7 Compare September 30, 2024 08:26
Copy link
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@yhl25 yhl25 merged commit e69551b into numaproj:main Oct 2, 2024
25 checks passed
SaniyaKalamkar pushed a commit to SaniyaKalamkar/numaflow that referenced this pull request Jan 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants