Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 22, 2024
1 parent c67de10 commit cd07811
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@

/**
* The actor response holds the final EOF response for a particular key set.
* The isLast attribute indicates whether the response is the last one to be sent to
* the output gRPC stream.
* <p>
* The isLast attribute indicates whether the response is globally the last one to be sent to
* the output gRPC stream, if set to true, it means the response is the very last response among
* all key sets. When output stream actor receives an isLast response, it sends the response and immediately
* closes the output stream.
*/
@Getter
@Setter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@
import java.util.List;

/**
* Response stream actor is a wrapper around the gRPC output stream.
* Output actor is a wrapper around the gRPC output stream.
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream.
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream.
* <p>
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
*/
@Slf4j
@AllArgsConstructor
class ResponseStreamActor extends AbstractActor {
class OutputActor extends AbstractActor {
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
Metadata md;

public static Props props(
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver,
Metadata md) {
return Props.create(ResponseStreamActor.class, responseObserver, md);
return Props.create(OutputActor.class, responseObserver, md);
}

@Override
Expand All @@ -48,7 +48,7 @@ private void sendMessage(Message message) {

private void sendEOF(ActorResponse actorResponse) {
if (actorResponse.isLast()) {
// handle the very last response.
// send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
responseObserver.onCompleted();
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,26 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser

// create a shutdown actor that listens to exceptions.
ActorRef shutdownActorRef = reduceActorSystem.
actorOf(ReduceShutdownActor.props(failureFuture));
actorOf(ShutdownActor.props(failureFuture));

// subscribe for dead letters
reduceActorSystem.getEventStream().subscribe(shutdownActorRef, AllDeadLetters.class);

handleFailure(failureFuture, responseObserver);

// create a response stream actor that ensures synchronized delivery of reduce responses.
ActorRef responseStreamActor = reduceActorSystem.
actorOf(ResponseStreamActor.props(responseObserver, md));
// create an output actor that ensures synchronized delivery of reduce responses.
ActorRef outputActor = reduceActorSystem.
actorOf(OutputActor.props(responseObserver, md));
/*
create a supervisor actor which assign the tasks to child actors.
we create a child actor for every unique set of keys in a window.
*/
ActorRef supervisorActor = reduceActorSystem
.actorOf(ReduceSupervisorActor.props(
.actorOf(SupervisorActor.props(
reduceStreamerFactory,
md,
shutdownActorRef,
responseStreamActor));
outputActor));


return new StreamObserver<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
import java.util.concurrent.CompletableFuture;

/**
* Reduce shutdown actor, listens to exceptions and handles shutdown.
* Shutdown actor, listens to exceptions and handles shutdown.
*/
@Slf4j
@AllArgsConstructor
class ReduceShutdownActor extends AbstractActor {
class ShutdownActor extends AbstractActor {
private final CompletableFuture<Void> failureFuture;

public static Props props(
CompletableFuture<Void> failureFuture) {
return Props.create(ReduceShutdownActor.class, failureFuture);
return Props.create(ShutdownActor.class, failureFuture);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
import java.util.Optional;

/**
* Reduce supervisor actor distributes the messages to other actors and handles failures.
* Supervisor actor distributes the messages to other actors and handles failures.
*/
@Slf4j
class ReduceSupervisorActor extends AbstractActor {
class SupervisorActor extends AbstractActor {
private final ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory;
private final Metadata md;
private final ActorRef shutdownActor;
private final ActorRef responseStreamActor;
private final Map<String, ActorRef> actorsMap = new HashMap<>();

public ReduceSupervisorActor(
public SupervisorActor(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
Expand All @@ -49,7 +49,7 @@ public static Props props(
ActorRef shutdownActor,
ActorRef responseStreamActor) {
return Props.create(
ReduceSupervisorActor.class,
SupervisorActor.class,
reduceStreamerFactory,
md,
shutdownActor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void testFailure() {
Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ActorRef supervisor = actorSystem
ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestExceptionFactory(),
Expand All @@ -50,7 +50,7 @@ public void testFailure() {
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
.build())
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
supervisorActor.tell(reduceRequest, ActorRef.noSender());

try {
completableFuture.get();
Expand All @@ -74,16 +74,16 @@ public void testDeadLetterHandling() {
Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ActorRef supervisor = actorSystem
ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestExceptionFactory(),
md,
shutdownActor,
new ReduceOutputStreamObserver()));

DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor);
supervisor.tell(deadLetter, ActorRef.noSender());
DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor);
supervisorActor.tell(deadLetter, ActorRef.noSender());

try {
completableFuture.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ReduceSupervisorActorTest {
public class SupervisorActorTest {

@Test
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
Expand All @@ -33,7 +33,7 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then

ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();

ActorRef supervisor = actorSystem
ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver));

Expand All @@ -47,9 +47,9 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
supervisorActor.tell(reduceRequest, ActorRef.noSender());
}
supervisor.tell(Constants.EOF, ActorRef.noSender());
supervisorActor.tell(Constants.EOF, ActorRef.noSender());

try {
completableFuture.get();
Expand Down Expand Up @@ -83,7 +83,7 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
new IntervalWindowImpl(Instant.now(), Instant.now()));

ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
ActorRef supervisor = actorSystem
ActorRef supervisorActor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestReducerFactory(),
Expand All @@ -102,10 +102,10 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
supervisorActor.tell(reduceRequest, ActorRef.noSender());
}

supervisor.tell(Constants.EOF, ActorRef.noSender());
supervisorActor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.junit.Assert.fail;


public class ShutDownActorTest {
public class ShutdownActorTest {
@Test
public void testFailure() {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
Expand All @@ -33,24 +33,24 @@ public void testFailure() {
.addKeys(reduceKey);

ActorRef shutdownActor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
.actorOf(ShutdownActor
.props(completableFuture));

Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver();

ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor
ActorRef outputActor = actorSystem.actorOf(OutputActor
.props(reduceOutputStreamObserver, md));

ActorRef supervisor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
ActorRef supervisorActor = actorSystem
.actorOf(SupervisorActor
.props(
new TestExceptionFactory(),
md,
shutdownActor,
responseStreamActor));
outputActor));

io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest(
ReduceOuterClass.ReduceRequest.newBuilder()
Expand All @@ -59,7 +59,7 @@ public void testFailure() {
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
.build())
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
supervisorActor.tell(reduceRequest, ActorRef.noSender());

try {
completableFuture.get();
Expand All @@ -75,7 +75,7 @@ public void testDeadLetterHandling() {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();

ActorRef shutdownActor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
.actorOf(ShutdownActor
.props(completableFuture));

actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class);
Expand All @@ -85,19 +85,19 @@ public void testDeadLetterHandling() {

io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver reduceOutputStreamObserver = new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver();

ActorRef responseStreamActor = actorSystem.actorOf(io.numaproj.numaflow.reducestreamer.ResponseStreamActor
ActorRef outputActor = actorSystem.actorOf(OutputActor
.props(reduceOutputStreamObserver, md));

ActorRef supervisor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
ActorRef supervisorActor = actorSystem
.actorOf(SupervisorActor
.props(
new TestExceptionFactory(),
md,
shutdownActor,
responseStreamActor));
outputActor));

DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor);
supervisor.tell(deadLetter, ActorRef.noSender());
DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisorActor);
supervisorActor.tell(deadLetter, ActorRef.noSender());

try {
completableFuture.get();
Expand Down
Loading

0 comments on commit cd07811

Please sign in to comment.