From cd078115f17e1d02b771c264ecfb02b96f2a5770 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 22 Jan 2024 14:24:18 -0500 Subject: [PATCH] rename Signed-off-by: Keran Yang --- .../reducestreamer/ActorResponse.java | 7 +++- ...ponseStreamActor.java => OutputActor.java} | 8 ++-- .../numaflow/reducestreamer/Service.java | 12 +++--- ...eShutdownActor.java => ShutdownActor.java} | 6 +-- ...ervisorActor.java => SupervisorActor.java} | 8 ++-- .../numaflow/reducer/ShutDownActorTest.java | 10 ++--- ...ctorTest.java => SupervisorActorTest.java} | 14 +++---- ...nActorTest.java => ShutdownActorTest.java} | 28 ++++++------- ...ctorTest.java => SupervisorActorTest.java} | 40 ++++++++++--------- 9 files changed, 70 insertions(+), 63 deletions(-) rename src/main/java/io/numaproj/numaflow/reducestreamer/{ResponseStreamActor.java => OutputActor.java} (93%) rename src/main/java/io/numaproj/numaflow/reducestreamer/{ReduceShutdownActor.java => ShutdownActor.java} (91%) rename src/main/java/io/numaproj/numaflow/reducestreamer/{ReduceSupervisorActor.java => SupervisorActor.java} (96%) rename src/test/java/io/numaproj/numaflow/reducer/{ReduceSupervisorActorTest.java => SupervisorActorTest.java} (93%) rename src/test/java/io/numaproj/numaflow/reducestreamer/{ShutDownActorTest.java => ShutdownActorTest.java} (83%) rename src/test/java/io/numaproj/numaflow/reducestreamer/{ReduceSupervisorActorTest.java => SupervisorActorTest.java} (84%) diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java index d8e522db..3a73509c 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java @@ -7,8 +7,11 @@ /** * The actor response holds the final EOF response for a particular key set. - * The isLast attribute indicates whether the response is the last one to be sent to - * the output gRPC stream. + *

+ * The isLast attribute indicates whether the response is globally the last one to be sent to + * the output gRPC stream, if set to true, it means the response is the very last response among + * all key sets. When output stream actor receives an isLast response, it sends the response and immediately + * closes the output stream. */ @Getter @Setter diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ResponseStreamActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java similarity index 93% rename from src/main/java/io/numaproj/numaflow/reducestreamer/ResponseStreamActor.java rename to src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java index 170d1f23..fc12ee02 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ResponseStreamActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java @@ -16,7 +16,7 @@ import java.util.List; /** - * Response stream actor is a wrapper around the gRPC output stream. + * Output actor is a wrapper around the gRPC output stream. * It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream. * ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream. *

@@ -24,14 +24,14 @@ */ @Slf4j @AllArgsConstructor -class ResponseStreamActor extends AbstractActor { +class OutputActor extends AbstractActor { StreamObserver responseObserver; Metadata md; public static Props props( StreamObserver responseObserver, Metadata md) { - return Props.create(ResponseStreamActor.class, responseObserver, md); + return Props.create(OutputActor.class, responseObserver, md); } @Override @@ -48,7 +48,7 @@ private void sendMessage(Message message) { private void sendEOF(ActorResponse actorResponse) { if (actorResponse.isLast()) { - // handle the very last response. + // send the very last response. responseObserver.onNext(actorResponse.getResponse()); // close the output stream. responseObserver.onCompleted(); diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java index 0533b3a7..5b0b0745 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java @@ -71,26 +71,26 @@ public StreamObserver reduceFn(final StreamObser // create a shutdown actor that listens to exceptions. ActorRef shutdownActorRef = reduceActorSystem. - actorOf(ReduceShutdownActor.props(failureFuture)); + actorOf(ShutdownActor.props(failureFuture)); // subscribe for dead letters reduceActorSystem.getEventStream().subscribe(shutdownActorRef, AllDeadLetters.class); handleFailure(failureFuture, responseObserver); - // create a response stream actor that ensures synchronized delivery of reduce responses. - ActorRef responseStreamActor = reduceActorSystem. - actorOf(ResponseStreamActor.props(responseObserver, md)); + // create an output actor that ensures synchronized delivery of reduce responses. + ActorRef outputActor = reduceActorSystem. + actorOf(OutputActor.props(responseObserver, md)); /* create a supervisor actor which assign the tasks to child actors. we create a child actor for every unique set of keys in a window. */ ActorRef supervisorActor = reduceActorSystem - .actorOf(ReduceSupervisorActor.props( + .actorOf(SupervisorActor.props( reduceStreamerFactory, md, shutdownActorRef, - responseStreamActor)); + outputActor)); return new StreamObserver<>() { diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java similarity index 91% rename from src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java rename to src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java index 010c0862..6c690fcf 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java @@ -11,16 +11,16 @@ import java.util.concurrent.CompletableFuture; /** - * Reduce shutdown actor, listens to exceptions and handles shutdown. + * Shutdown actor, listens to exceptions and handles shutdown. */ @Slf4j @AllArgsConstructor -class ReduceShutdownActor extends AbstractActor { +class ShutdownActor extends AbstractActor { private final CompletableFuture failureFuture; public static Props props( CompletableFuture failureFuture) { - return Props.create(ReduceShutdownActor.class, failureFuture); + return Props.create(ShutdownActor.class, failureFuture); } @Override diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java similarity index 96% rename from src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java rename to src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java index f74da750..a4d782b4 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java @@ -22,17 +22,17 @@ import java.util.Optional; /** - * Reduce supervisor actor distributes the messages to other actors and handles failures. + * Supervisor actor distributes the messages to other actors and handles failures. */ @Slf4j -class ReduceSupervisorActor extends AbstractActor { +class SupervisorActor extends AbstractActor { private final ReduceStreamerFactory reduceStreamerFactory; private final Metadata md; private final ActorRef shutdownActor; private final ActorRef responseStreamActor; private final Map actorsMap = new HashMap<>(); - public ReduceSupervisorActor( + public SupervisorActor( ReduceStreamerFactory reduceStreamerFactory, Metadata md, ActorRef shutdownActor, @@ -49,7 +49,7 @@ public static Props props( ActorRef shutdownActor, ActorRef responseStreamActor) { return Props.create( - ReduceSupervisorActor.class, + SupervisorActor.class, reduceStreamerFactory, md, shutdownActor, diff --git a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java index b3bc7e7d..897872b8 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java @@ -36,7 +36,7 @@ public void testFailure() { Metadata md = new MetadataImpl( new IntervalWindowImpl(Instant.now(), Instant.now())); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestExceptionFactory(), @@ -50,7 +50,7 @@ public void testFailure() { .setValue(ByteString.copyFromUtf8(String.valueOf(1))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); try { completableFuture.get(); @@ -74,7 +74,7 @@ public void testDeadLetterHandling() { Metadata md = new MetadataImpl( new IntervalWindowImpl(Instant.now(), Instant.now())); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestExceptionFactory(), @@ -82,8 +82,8 @@ public void testDeadLetterHandling() { shutdownActor, new ReduceOutputStreamObserver())); - DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor); - supervisor.tell(deadLetter, ActorRef.noSender()); + DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor); + supervisorActor.tell(deadLetter, ActorRef.noSender()); try { completableFuture.get(); diff --git a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java similarity index 93% rename from src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java rename to src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java index e33cbd21..d98760ff 100644 --- a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java @@ -17,7 +17,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class ReduceSupervisorActorTest { +public class SupervisorActorTest { @Test public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException { @@ -33,7 +33,7 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver)); @@ -47,9 +47,9 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); } - supervisor.tell(Constants.EOF, ActorRef.noSender()); + supervisorActor.tell(Constants.EOF, ActorRef.noSender()); try { completableFuture.get(); @@ -83,7 +83,7 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas new IntervalWindowImpl(Instant.now(), Instant.now())); ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); - ActorRef supervisor = actorSystem + ActorRef supervisorActor = actorSystem .actorOf(ReduceSupervisorActor .props( new TestReducerFactory(), @@ -102,10 +102,10 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); } - supervisor.tell(Constants.EOF, ActorRef.noSender()); + supervisorActor.tell(Constants.EOF, ActorRef.noSender()); try { completableFuture.get(); // each reduce request generates two reduce responses, one containing the data and the other one indicating EOF. diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java similarity index 83% rename from src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java rename to src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java index 1b3bba6f..8e51e814 100644 --- a/src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java @@ -21,7 +21,7 @@ import static org.junit.Assert.fail; -public class ShutDownActorTest { +public class ShutdownActorTest { @Test public void testFailure() { final ActorSystem actorSystem = ActorSystem.create("test-system-1"); @@ -33,7 +33,7 @@ public void testFailure() { .addKeys(reduceKey); ActorRef shutdownActor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .actorOf(ShutdownActor .props(completableFuture)); Metadata md = new MetadataImpl( @@ -41,16 +41,16 @@ public void testFailure() { io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver(); - ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor + ActorRef outputActor = actorSystem.actorOf(OutputActor .props(reduceOutputStreamObserver, md)); - ActorRef supervisor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor .props( new TestExceptionFactory(), md, shutdownActor, - responseStreamActor)); + outputActor)); io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( ReduceOuterClass.ReduceRequest.newBuilder() @@ -59,7 +59,7 @@ public void testFailure() { .setValue(ByteString.copyFromUtf8(String.valueOf(1))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); try { completableFuture.get(); @@ -75,7 +75,7 @@ public void testDeadLetterHandling() { CompletableFuture completableFuture = new CompletableFuture<>(); ActorRef shutdownActor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .actorOf(ShutdownActor .props(completableFuture)); actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class); @@ -85,19 +85,19 @@ public void testDeadLetterHandling() { io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver(); - ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor + ActorRef outputActor = actorSystem.actorOf(OutputActor .props(reduceOutputStreamObserver, md)); - ActorRef supervisor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor .props( new TestExceptionFactory(), md, shutdownActor, - responseStreamActor)); + outputActor)); - DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor); - supervisor.tell(deadLetter, ActorRef.noSender()); + DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor); + supervisorActor.tell(deadLetter, ActorRef.noSender()); try { completableFuture.get(); diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java similarity index 84% rename from src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java rename to src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java index 84ff65d0..14518e36 100644 --- a/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java @@ -21,14 +21,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class ReduceSupervisorActorTest { +public class SupervisorActorTest { @Test public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException { final ActorSystem actorSystem = ActorSystem.create("test-system-1"); - CompletableFuture completableFuture = new CompletableFuture(); + CompletableFuture completableFuture = new CompletableFuture<>(); ActorRef shutdownActor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .actorOf(ShutdownActor .props(completableFuture)); Metadata md = new MetadataImpl( @@ -36,16 +36,16 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then io.numaproj.numaflow.reducer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver(); - ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor + ActorRef outputActor = actorSystem.actorOf(OutputActor .props(reduceOutputStreamObserver, md)); - ActorRef supervisor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor .props( new TestReduceStreamerFactory(), md, shutdownActor, - responseStreamActor)); + outputActor)); for (int i = 1; i <= 10; i++) { io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( @@ -58,9 +58,11 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); } - supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender()); + supervisorActor.tell( + io.numaproj.numaflow.reducestreamer.Constants.EOF, + ActorRef.noSender()); try { completableFuture.get(); @@ -72,7 +74,7 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then .getResult() .getValue() .toStringUtf8()); - assertEquals(true, reduceOutputStreamObserver.resultDatum + assertTrue(reduceOutputStreamObserver.resultDatum .get() .get(1) .getEOF()); @@ -84,25 +86,25 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then @Test public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException { final ActorSystem actorSystem = ActorSystem.create("test-system-2"); - CompletableFuture completableFuture = new CompletableFuture(); + CompletableFuture completableFuture = new CompletableFuture<>(); ActorRef shutdownActor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .actorOf(ShutdownActor .props(completableFuture)); Metadata md = new MetadataImpl( new IntervalWindowImpl(Instant.now(), Instant.now())); io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new ReduceOutputStreamObserver(); - ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor + ActorRef outputActor = actorSystem.actorOf(OutputActor .props(reduceOutputStreamObserver, md)); - ActorRef supervisor = actorSystem - .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + ActorRef supervisorActor = actorSystem + .actorOf(SupervisorActor .props( new TestReduceStreamerFactory(), md, shutdownActor, - responseStreamActor) + outputActor) ); for (int i = 1; i <= 10; i++) { @@ -116,10 +118,12 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas .setValue(ByteString.copyFromUtf8(String.valueOf(i))) .build()) .build()); - supervisor.tell(reduceRequest, ActorRef.noSender()); + supervisorActor.tell(reduceRequest, ActorRef.noSender()); } - supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender()); + supervisorActor.tell( + io.numaproj.numaflow.reducestreamer.Constants.EOF, + ActorRef.noSender()); try { completableFuture.get(); // each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.