-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: impl batch map #66
Conversation
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, you can avoid atomics.
/// # Arguments | ||
/// | ||
/// * `value` - A vector of bytes representing the message's payload. | ||
/// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a standard way of doc? check axum, tokio.
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
src/batchmap.rs
Outdated
let shutdown_tx = self._shutdown_tx.clone(); | ||
|
||
// counter to keep track of the number of messages received | ||
let counter_orig = Arc::new(AtomicUsize::new(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we dont need this, there is a natural barrier.. make your spawn return the count and compare that in the next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case where the user returns from the UDF function early before consuming all the messages on the channel, in such a scenario we would need this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be fine, because you are awaiting on the spawned task so it won't exit.. perhaps do a select!
to force the selection since returning early is not right.
/// use numaflow::batchmap::Message; | ||
/// let message = Message::new(vec![1, 2, 3]).keys(vec!["key1".to_string(), "key2".to_string()]); | ||
/// ``` | ||
pub fn keys(mut self, keys: Vec<String>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conventions for builder pattern is to have a with_
prefix for methods eg. pub fn with_keys(mut self, keys: Vec<String>) -> Self
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead with the style followed in the current map code
Line 99 in db74f62
impl Message { |
Based on what we want to go forward with it would be good to keep it consistent throughout then.
src/batchmap.rs
Outdated
let shutdown = shutdown_signal( | ||
internal_shutdown_rx, | ||
Some(shutdown_rx), | ||
CancellationToken::new(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CancellationToken
is intended to cancel the tasks spawned by the BatchMapService
when the Grpc server gets a shutdown signal. It seems only Reducer
implementation uses it as of now. We might want to start implementing it for all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is right, we need to do for rest.
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Fixes numaproj/numaflow#1827
Fixes numaproj/numaflow#1849