Skip to content

Commit

Permalink
fix shutdown actor tests
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 10, 2024
1 parent d207d6b commit 7a8f856
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private void responseListener(ActorResponse actorResponse) {
*/

responseObserver.onNext(actorResponse.getResponse());
// TODO - do we need to include window information for aligned windows?
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
responseObserver));


return new StreamObserver<ReduceOuterClass.ReduceRequest>() {
return new StreamObserver<>() {
@Override
public void onNext(ReduceOuterClass.ReduceRequest datum) {
// send the message to parent actor, which takes care of distribution.
Expand Down
21 changes: 15 additions & 6 deletions src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
package io.numaproj.numaflow.reducer;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import akka.actor.DeadLetter;
import com.google.protobuf.ByteString;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
import org.junit.Test;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;


public class ShutDownActorTest {

@Test
public void testFailure() {
/*
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();

String reduceKey = "reduce-key";
ReduceOuterClass.ReduceRequest.Payload.Builder payloadBuilder = ReduceOuterClass.ReduceRequest.Payload
Expand Down Expand Up @@ -45,12 +58,10 @@ public void testFailure() {
} catch (Exception e) {
assertEquals(e.getMessage(), "java.lang.RuntimeException: UDF Failure");
}
*/
}

@Test
public void testDeadLetterHandling() {
/*
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture<Void> completableFuture = new CompletableFuture<>();

Expand Down Expand Up @@ -80,8 +91,6 @@ public void testDeadLetterHandling() {
} catch (Exception e) {
assertEquals(e.getMessage(), "java.lang.Throwable: dead letters");
}
*/
}


Expand Down

0 comments on commit 7a8f856

Please sign in to comment.