Skip to content

Commit

Permalink
fix: cover the case when EOF received but no active window
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Feb 15, 2024
1 parent 144a380 commit 7b56abe
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 14 deletions.
27 changes: 16 additions & 11 deletions src/main/java/io/numaproj/numaflow/sessionreducer/OutputActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,27 @@ public static Props props(
public Receive createReceive() {
return receiveBuilder()
.match(ActorResponse.class, this::handleResponse)
.match(String.class, this::handleEOF)
.build();
}

private void handleResponse(ActorResponse actorResponse) {
log.info("sending to the output: " + actorResponse.getResponse().toString());
responseObserver.onNext(actorResponse.getResponse());
if (actorResponse.isLast()) {
// send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
responseObserver.onCompleted();
// stop the AKKA system right after we close the output stream.
// note: could make more sense if the supervisor actor stops the system,
// but it requires an extra tell.
getContext().getSystem().stop(getSender());
} else {
responseObserver.onNext(actorResponse.getResponse());
this.closeSystem();
}
}

private void handleEOF(String eof) {
this.closeSystem();
}

private void closeSystem() {
// close the output stream.
responseObserver.onCompleted();
// stop the AKKA system right after we close the output stream.
// note: could make more sense if the supervisor actor stops the system,
// but it requires an extra tell.
getContext().getSystem().stop(getSender());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ private void updateKeyedWindow(Sessionreduce.KeyedWindow newKeyedWindow) {
// update the output stream to use the new keyed window
OutputStreamObserverImpl newOutputStream = (OutputStreamObserverImpl) this.outputStream;
newOutputStream.setKeyedWindow(newKeyedWindow);
this.outputStream = newOutputStream;
}

// when receiving a message, process it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public Receive createReceive() {

private void handleEOF(String EOF) {
this.isInputStreamClosed = true;
if (actorsMap.isEmpty()) {
this.outputActor.tell(EOF, getSelf());
return;
}
for (Map.Entry<String, ActorRef> entry : actorsMap.entrySet()) {
entry.getValue().tell(EOF, getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public void open_expand_close() {
}

@Test
public void open_merge_close() {
public void open_merge_close() throws InterruptedException {
// create an output stream observer
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
StreamObserver<Sessionreduce.SessionReduceRequest> inputStreamObserver = SessionReduceGrpc
Expand Down Expand Up @@ -643,7 +643,7 @@ public void open_merge_close() {
}

@Test
public void open_expand_append_merge_close() {
public void open_expand_append_merge_close() throws InterruptedException {
// create an output stream observer
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
StreamObserver<Sessionreduce.SessionReduceRequest> inputStreamObserver = SessionReduceGrpc
Expand Down Expand Up @@ -925,6 +925,8 @@ public void open_expand_append_merge_close() {
for (Sessionreduce.SessionReduceRequest request : requests) {
inputStreamObserver.onNext(request);
}
// This sleep statement tests a case when there is no active windows and an EOF is received.
Thread.sleep(1000);
inputStreamObserver.onCompleted();

while (!outputStreamObserver.completed.get()) ;
Expand Down

0 comments on commit 7b56abe

Please sign in to comment.