-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Map Stream to Support Concurrent Requests #160
Conversation
Signed-off-by: Yashash H L <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #160 +/- ##
=======================================
Coverage ? 57.90%
Complexity ? 390
=======================================
Files ? 129
Lines ? 2910
Branches ? 181
=======================================
Hits ? 1685
Misses ? 1108
Partials ? 117 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
|
||
private void handleFailure(Exception e) { | ||
getContext().getSystem().log().error("Encountered error in mapStreamFn", e); | ||
if (userException == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: && e != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handleFailure
will only be invoked when there is an exception in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I don't think we shouldn't assume caller behaviour when we implement a method.
src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java
Outdated
Show resolved
Hide resolved
src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java
Outdated
Show resolved
Hide resolved
src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java
Outdated
Show resolved
Hide resolved
src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java
Outdated
Show resolved
Hide resolved
src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java
Outdated
Show resolved
Hide resolved
String[] keys = mapRequest.getRequest().getKeysList().toArray(new String[0]); | ||
|
||
try { | ||
OutputObserverImpl outputObserver = new OutputObserverImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OutputObserverImpl outputObserver = new OutputObserverImpl( | |
OutputObserver outputObserver = new OutputObserverImpl( |
We can move sendEOF()
to MapStreamerActor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MapStreamerActor
won't have that context, I prefer to do it here because the processMessage
is a blocking call and immediately after the processing is done we are sending EOF. I don't anything wrong here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't have that context.
Do you mean requestId
and supervisor actor ref
? if so, I think MapStreamerActor
has the context of both.
I was proposing that ObserverImpl only implements methods the interface defines but I am also ok with having an extra sendEOF.
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
No description provided.