-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement Source trait and use it for user-defined source #2114
Conversation
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2114 +/- ##
==========================================
- Coverage 64.25% 63.96% -0.30%
==========================================
Files 324 325 +1
Lines 30650 31186 +536
==========================================
+ Hits 19695 19947 +252
- Misses 9913 10200 +287
+ Partials 1042 1039 -3 ☔ View full report in Codecov by Sentry. |
use crate::monovertex::source_pb::{read_response, AckRequest}; | ||
use crate::monovertex::sourcetransform_pb::SourceTransformRequest; | ||
use crate::monovertex::{source_pb, sourcetransform_pb}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, I saw a lot of formatting changes in the PR, how did you generate those? asking because I ran cargo fmt
in my last PR and it didn't generate your changes. is it because we are using different formatting scheme?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do "cargo fmt" right before committing.
@@ -365,8 +366,8 @@ struct TimestampedPending { | |||
/// `LagReader` is responsible for periodically checking the lag of the source client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please update comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely 😄
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
rust/numaflow-core/src/monovertex.rs
Outdated
let (source_reader, lag_reader) = new_source(source_grpc_client.clone(), 500, 100).await?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment about the numbers 500 and 100? use constants.
|
||
let mut source = Source::new(client) | ||
let (mut source, mut lagreader) = new_source(client, 5, 1000) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is confusing, method name is new_source but its creating a source as well as lag_reader
500, | ||
100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to make it a constant, since its used in multiple places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fetch it from the config?
rust/numaflow-core/src/monovertex.rs
Outdated
// start the lag reader to publish lag metrics | ||
let mut lag_reader = utils::create_lag_reader(source_grpc_client.clone()).await; | ||
let mut lag_reader = utils::create_lag_reader(lag_reader).await; | ||
lag_reader.start().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we pass lag_reader inside create_lag_reader method, which is confusing
pub(crate) struct PendingReader<T> { | ||
lag_reader: T, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lag_reader inside PendingReader which is confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lag_reader is the trait. any recommendation?
Signed-off-by: Vigith Maurice <[email protected]> Co-authored-by: Yashash H L <[email protected]>
@@ -117,7 +143,7 @@ impl Source { | |||
.await | |||
.map_err(|e| SourceError(e.to_string()))?; | |||
|
|||
let mut messages = Vec::with_capacity(num_records as usize); | |||
let mut messages = Vec::with_capacity(self.num_records as usize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.num_records
is usize
Signed-off-by: Vigith Maurice <[email protected]> Co-authored-by: Sreekanth <[email protected]>
let ud_src = UserDefinedSource::new(client.clone(), num_records, timeout_in_ms) | ||
.await | ||
.map_err(|e| e)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let ud_src = UserDefinedSource::new(client.clone(), num_records, timeout_in_ms) | |
.await | |
.map_err(|e| e)?; | |
let ud_src = UserDefinedSource::new(client.clone(), num_records, timeout_in_ms).await?; |
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
…proj#2114) Signed-off-by: Vigith Maurice <[email protected]> Signed-off-by: Yashash H L <[email protected]> Co-authored-by: Yashash H L <[email protected]> Co-authored-by: Sreekanth <[email protected]>
user-defined source over Source Trait