From 43be654d863e618f9393781f11493e6a37e2ab7e Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Thu, 11 Jan 2024 15:18:29 -0500 Subject: [PATCH] this is working Signed-off-by: Keran Yang --- .../numaflow/reducer/ReduceActor.java | 19 ++++++++++++++++-- .../reducer/ReduceSupervisorActor.java | 20 ++++++++++--------- .../io/numaproj/numaflow/reducer/Server.java | 3 ++- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java index 0d6e045b..204c5e1b 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java @@ -47,6 +47,8 @@ private void getResult(String eof) { resultMessages.getMessages().forEach(message -> { getSender().tell(buildActorResponse(message), getSelf()); }); + // send EOF + getSender().tell(buildEOFActorResponse(), getSelf()); } private ActorResponse buildActorResponse(Message message) { @@ -60,8 +62,7 @@ private ActorResponse buildActorResponse(Message message) { .setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) .setNanos(this.md.getIntervalWindow().getEndTime().getNano())) .setSlot("slot-0").build()); - // if we start building the response, it means we already reached EOF. - responseBuilder.setEOF(true); + responseBuilder.setEOF(false); // set the result. responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result .newBuilder() @@ -73,5 +74,19 @@ private ActorResponse buildActorResponse(Message message) { .build()); return new ActorResponse(this.keys, responseBuilder.build()); } + + private ActorResponse buildEOFActorResponse() { + ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); + responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() + .setStart(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getStartTime().getNano())) + .setEnd(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getEndTime().getNano())) + .setSlot("slot-0").build()); + responseBuilder.setEOF(true); + return new ActorResponse(this.keys, responseBuilder.build()); + } } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java index f39fd6fc..215fae1a 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java @@ -115,22 +115,24 @@ private void sendEOF(String EOF) { // listen to child actors for the result. private void responseListener(ActorResponse actorResponse) { + // TODO - do we need to include window information for aligned windows? + // for aligned reducer, there is always single window. + // but at the same time, would like to be consistent with GO SDK implementation. + if (actorResponse.getResponse().getEOF()) { + actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys())); + if (actorsMap.isEmpty()) { + responseObserver.onCompleted(); + getContext().getSystem().stop(getSelf()); + } + return; + } /* send the result back to the client remove the child entry from the map after getting result. if there are no entries in the map, that means processing is done we can close the stream. */ - responseObserver.onNext(actorResponse.getResponse()); - // TODO - do we need to include window information for aligned windows? - // for aligned reducer, there is always single window. - // but at the same time, would like to be consistent with GO SDK implementation. - actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys())); - if (actorsMap.isEmpty()) { - responseObserver.onCompleted(); - getContext().getSystem().stop(getSelf()); - } } private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) { diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index 164c50d1..81950729 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -67,7 +67,8 @@ public void start() throws Exception { server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "(keran-test)Server started, listening on socket path: " + + grpcConfig.getSocketPath()); // register shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> {