diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java
index d8e522db..3a73509c 100644
--- a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java
+++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java
@@ -7,8 +7,11 @@
/**
* The actor response holds the final EOF response for a particular key set.
- * The isLast attribute indicates whether the response is the last one to be sent to
- * the output gRPC stream.
+ *
+ * The isLast attribute indicates whether the response is globally the last one to be sent to
+ * the output gRPC stream, if set to true, it means the response is the very last response among
+ * all key sets. When output stream actor receives an isLast response, it sends the response and immediately
+ * closes the output stream.
*/
@Getter
@Setter
diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ResponseStreamActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java
similarity index 93%
rename from src/main/java/io/numaproj/numaflow/reducestreamer/ResponseStreamActor.java
rename to src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java
index 170d1f23..fc12ee02 100644
--- a/src/main/java/io/numaproj/numaflow/reducestreamer/ResponseStreamActor.java
+++ b/src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java
@@ -16,7 +16,7 @@
import java.util.List;
/**
- * Response stream actor is a wrapper around the gRPC output stream.
+ * Output actor is a wrapper around the gRPC output stream.
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream.
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream.
*
@@ -24,14 +24,14 @@
*/
@Slf4j
@AllArgsConstructor
-class ResponseStreamActor extends AbstractActor {
+class OutputActor extends AbstractActor {
StreamObserver responseObserver;
Metadata md;
public static Props props(
StreamObserver responseObserver,
Metadata md) {
- return Props.create(ResponseStreamActor.class, responseObserver, md);
+ return Props.create(OutputActor.class, responseObserver, md);
}
@Override
@@ -48,7 +48,7 @@ private void sendMessage(Message message) {
private void sendEOF(ActorResponse actorResponse) {
if (actorResponse.isLast()) {
- // handle the very last response.
+ // send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
responseObserver.onCompleted();
diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java
index 0533b3a7..5b0b0745 100644
--- a/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java
+++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Service.java
@@ -71,26 +71,26 @@ public StreamObserver reduceFn(final StreamObser
// create a shutdown actor that listens to exceptions.
ActorRef shutdownActorRef = reduceActorSystem.
- actorOf(ReduceShutdownActor.props(failureFuture));
+ actorOf(ShutdownActor.props(failureFuture));
// subscribe for dead letters
reduceActorSystem.getEventStream().subscribe(shutdownActorRef, AllDeadLetters.class);
handleFailure(failureFuture, responseObserver);
- // create a response stream actor that ensures synchronized delivery of reduce responses.
- ActorRef responseStreamActor = reduceActorSystem.
- actorOf(ResponseStreamActor.props(responseObserver, md));
+ // create an output actor that ensures synchronized delivery of reduce responses.
+ ActorRef outputActor = reduceActorSystem.
+ actorOf(OutputActor.props(responseObserver, md));
/*
create a supervisor actor which assign the tasks to child actors.
we create a child actor for every unique set of keys in a window.
*/
ActorRef supervisorActor = reduceActorSystem
- .actorOf(ReduceSupervisorActor.props(
+ .actorOf(SupervisorActor.props(
reduceStreamerFactory,
md,
shutdownActorRef,
- responseStreamActor));
+ outputActor));
return new StreamObserver<>() {
diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java
similarity index 91%
rename from src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java
rename to src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java
index 010c0862..6c690fcf 100644
--- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java
+++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ShutdownActor.java
@@ -11,16 +11,16 @@
import java.util.concurrent.CompletableFuture;
/**
- * Reduce shutdown actor, listens to exceptions and handles shutdown.
+ * Shutdown actor, listens to exceptions and handles shutdown.
*/
@Slf4j
@AllArgsConstructor
-class ReduceShutdownActor extends AbstractActor {
+class ShutdownActor extends AbstractActor {
private final CompletableFuture failureFuture;
public static Props props(
CompletableFuture failureFuture) {
- return Props.create(ReduceShutdownActor.class, failureFuture);
+ return Props.create(ShutdownActor.class, failureFuture);
}
@Override
diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java
similarity index 96%
rename from src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java
rename to src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java
index f74da750..a4d782b4 100644
--- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java
+++ b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java
@@ -22,17 +22,17 @@
import java.util.Optional;
/**
- * Reduce supervisor actor distributes the messages to other actors and handles failures.
+ * Supervisor actor distributes the messages to other actors and handles failures.
*/
@Slf4j
-class ReduceSupervisorActor extends AbstractActor {
+class SupervisorActor extends AbstractActor {
private final ReduceStreamerFactory extends ReduceStreamer> reduceStreamerFactory;
private final Metadata md;
private final ActorRef shutdownActor;
private final ActorRef responseStreamActor;
private final Map actorsMap = new HashMap<>();
- public ReduceSupervisorActor(
+ public SupervisorActor(
ReduceStreamerFactory extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
@@ -49,7 +49,7 @@ public static Props props(
ActorRef shutdownActor,
ActorRef responseStreamActor) {
return Props.create(
- ReduceSupervisorActor.class,
+ SupervisorActor.class,
reduceStreamerFactory,
md,
shutdownActor,
diff --git a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java
index b3bc7e7d..897872b8 100644
--- a/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducer/ShutDownActorTest.java
@@ -36,7 +36,7 @@ public void testFailure() {
Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));
- ActorRef supervisor = actorSystem
+ ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestExceptionFactory(),
@@ -50,7 +50,7 @@ public void testFailure() {
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
.build())
.build());
- supervisor.tell(reduceRequest, ActorRef.noSender());
+ supervisorActor.tell(reduceRequest, ActorRef.noSender());
try {
completableFuture.get();
@@ -74,7 +74,7 @@ public void testDeadLetterHandling() {
Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));
- ActorRef supervisor = actorSystem
+ ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestExceptionFactory(),
@@ -82,8 +82,8 @@ public void testDeadLetterHandling() {
shutdownActor,
new ReduceOutputStreamObserver()));
- DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor);
- supervisor.tell(deadLetter, ActorRef.noSender());
+ DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor);
+ supervisorActor.tell(deadLetter, ActorRef.noSender());
try {
completableFuture.get();
diff --git a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java
similarity index 93%
rename from src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java
rename to src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java
index e33cbd21..d98760ff 100644
--- a/src/test/java/io/numaproj/numaflow/reducer/ReduceSupervisorActorTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java
@@ -17,7 +17,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class ReduceSupervisorActorTest {
+public class SupervisorActorTest {
@Test
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
@@ -33,7 +33,7 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
- ActorRef supervisor = actorSystem
+ ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver));
@@ -47,9 +47,9 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
- supervisor.tell(reduceRequest, ActorRef.noSender());
+ supervisorActor.tell(reduceRequest, ActorRef.noSender());
}
- supervisor.tell(Constants.EOF, ActorRef.noSender());
+ supervisorActor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
@@ -83,7 +83,7 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
new IntervalWindowImpl(Instant.now(), Instant.now()));
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
- ActorRef supervisor = actorSystem
+ ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestReducerFactory(),
@@ -102,10 +102,10 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
- supervisor.tell(reduceRequest, ActorRef.noSender());
+ supervisorActor.tell(reduceRequest, ActorRef.noSender());
}
- supervisor.tell(Constants.EOF, ActorRef.noSender());
+ supervisorActor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.
diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java
similarity index 83%
rename from src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java
rename to src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java
index 1b3bba6f..8e51e814 100644
--- a/src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutdownActorTest.java
@@ -21,7 +21,7 @@
import static org.junit.Assert.fail;
-public class ShutDownActorTest {
+public class ShutdownActorTest {
@Test
public void testFailure() {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
@@ -33,7 +33,7 @@ public void testFailure() {
.addKeys(reduceKey);
ActorRef shutdownActor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
+ .actorOf(ShutdownActor
.props(completableFuture));
Metadata md = new MetadataImpl(
@@ -41,16 +41,16 @@ public void testFailure() {
io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver();
- ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor
+ ActorRef outputActor = actorSystem.actorOf(OutputActor
.props(reduceOutputStreamObserver, md));
- ActorRef supervisor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
+ ActorRef supervisorActor = actorSystem
+ .actorOf(SupervisorActor
.props(
new TestExceptionFactory(),
md,
shutdownActor,
- responseStreamActor));
+ outputActor));
io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest(
ReduceOuterClass.ReduceRequest.newBuilder()
@@ -59,7 +59,7 @@ public void testFailure() {
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
.build())
.build());
- supervisor.tell(reduceRequest, ActorRef.noSender());
+ supervisorActor.tell(reduceRequest, ActorRef.noSender());
try {
completableFuture.get();
@@ -75,7 +75,7 @@ public void testDeadLetterHandling() {
CompletableFuture completableFuture = new CompletableFuture<>();
ActorRef shutdownActor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
+ .actorOf(ShutdownActor
.props(completableFuture));
actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class);
@@ -85,19 +85,19 @@ public void testDeadLetterHandling() {
io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver();
- ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor
+ ActorRef outputActor = actorSystem.actorOf(OutputActor
.props(reduceOutputStreamObserver, md));
- ActorRef supervisor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
+ ActorRef supervisorActor = actorSystem
+ .actorOf(SupervisorActor
.props(
new TestExceptionFactory(),
md,
shutdownActor,
- responseStreamActor));
+ outputActor));
- DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor);
- supervisor.tell(deadLetter, ActorRef.noSender());
+ DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor);
+ supervisorActor.tell(deadLetter, ActorRef.noSender());
try {
completableFuture.get();
diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java
similarity index 84%
rename from src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java
rename to src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java
index 84ff65d0..14518e36 100644
--- a/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java
+++ b/src/test/java/io/numaproj/numaflow/reducestreamer/SupervisorActorTest.java
@@ -21,14 +21,14 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class ReduceSupervisorActorTest {
+public class SupervisorActorTest {
@Test
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
- CompletableFuture completableFuture = new CompletableFuture();
+ CompletableFuture completableFuture = new CompletableFuture<>();
ActorRef shutdownActor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
+ .actorOf(ShutdownActor
.props(completableFuture));
Metadata md = new MetadataImpl(
@@ -36,16 +36,16 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
io.numaproj.numaflow.reducer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver();
- ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor
+ ActorRef outputActor = actorSystem.actorOf(OutputActor
.props(reduceOutputStreamObserver, md));
- ActorRef supervisor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
+ ActorRef supervisorActor = actorSystem
+ .actorOf(SupervisorActor
.props(
new TestReduceStreamerFactory(),
md,
shutdownActor,
- responseStreamActor));
+ outputActor));
for (int i = 1; i <= 10; i++) {
io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest(
@@ -58,9 +58,11 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
- supervisor.tell(reduceRequest, ActorRef.noSender());
+ supervisorActor.tell(reduceRequest, ActorRef.noSender());
}
- supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender());
+ supervisorActor.tell(
+ io.numaproj.numaflow.reducestreamer.Constants.EOF,
+ ActorRef.noSender());
try {
completableFuture.get();
@@ -72,7 +74,7 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
.getResult()
.getValue()
.toStringUtf8());
- assertEquals(true, reduceOutputStreamObserver.resultDatum
+ assertTrue(reduceOutputStreamObserver.resultDatum
.get()
.get(1)
.getEOF());
@@ -84,25 +86,25 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
@Test
public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
- CompletableFuture completableFuture = new CompletableFuture();
+ CompletableFuture completableFuture = new CompletableFuture<>();
ActorRef shutdownActor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
+ .actorOf(ShutdownActor
.props(completableFuture));
Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));
io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new ReduceOutputStreamObserver();
- ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor
+ ActorRef outputActor = actorSystem.actorOf(OutputActor
.props(reduceOutputStreamObserver, md));
- ActorRef supervisor = actorSystem
- .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
+ ActorRef supervisorActor = actorSystem
+ .actorOf(SupervisorActor
.props(
new TestReduceStreamerFactory(),
md,
shutdownActor,
- responseStreamActor)
+ outputActor)
);
for (int i = 1; i <= 10; i++) {
@@ -116,10 +118,12 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
- supervisor.tell(reduceRequest, ActorRef.noSender());
+ supervisorActor.tell(reduceRequest, ActorRef.noSender());
}
- supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender());
+ supervisorActor.tell(
+ io.numaproj.numaflow.reducestreamer.Constants.EOF,
+ ActorRef.noSender());
try {
completableFuture.get();
// each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.