Skip to content

Commit

Permalink
this is working
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 11, 2024
1 parent 07f977b commit 43be654
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
19 changes: 17 additions & 2 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private void getResult(String eof) {
resultMessages.getMessages().forEach(message -> {
getSender().tell(buildActorResponse(message), getSelf());
});
// send EOF
getSender().tell(buildEOFActorResponse(), getSelf());
}

private ActorResponse buildActorResponse(Message message) {
Expand All @@ -60,8 +62,7 @@ private ActorResponse buildActorResponse(Message message) {
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
// if we start building the response, it means we already reached EOF.
responseBuilder.setEOF(true);
responseBuilder.setEOF(false);
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
Expand All @@ -73,5 +74,19 @@ private ActorResponse buildActorResponse(Message message) {
.build());
return new ActorResponse(this.keys, responseBuilder.build());
}

private ActorResponse buildEOFActorResponse() {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(true);
return new ActorResponse(this.keys, responseBuilder.build());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,24 @@ private void sendEOF(String EOF) {

// listen to child actors for the result.
private void responseListener(ActorResponse actorResponse) {
// TODO - do we need to include window information for aligned windows?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
return;
}
/*
send the result back to the client
remove the child entry from the map after getting result.
if there are no entries in the map, that means processing is
done we can close the stream.
*/

responseObserver.onNext(actorResponse.getResponse());
// TODO - do we need to include window information for aligned windows?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
}

private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public void start() throws Exception {
server.start();

log.info(
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
"(keran-test)Server started, listening on socket path: "
+ grpcConfig.getSocketPath());

// register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down

0 comments on commit 43be654

Please sign in to comment.