Skip to content

Commit

Permalink
make it thread-safe
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 18, 2024
1 parent c2937a9 commit 461610a
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/**
* ActorRequest is a wrapper of the gRpc input request.
* It is constructed by the service when service receives an input request and then sent to
* the supervisor actor.
* the supervisor actor, to be distributed to reduce stream actors.
*/
@Getter
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@
import lombok.Getter;

/**
* ActorEOFResponse is to store the EOF signal from a ReduceStreamerActor.
* ReduceStreamerActor sends it back to the supervisor actor to indicate that
* the streamer actor itself has finished processing the data and is ready to be
* released.
* ActorResponse is for child actors to report back to the supervisor actor about the status of the data processing.
* It serves two purposes:
* 1. Send to the supervisor an EOF response, which is to be sent to the gRPC output stream.
* 2. Send to the supervisor a signal, indicating that the actor has finished all its processing work,
* and it's ready to be cleaned up by the supervisor actor.
*/
@Getter
@AllArgsConstructor
class ActorEOFResponse {
class ActorResponse {
ReduceOuterClass.ReduceResponse response;
ActorResponseType type;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
public String getActorUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.numaproj.numaflow.reducestreamer;

public enum ActorResponseType {
// EOF_RESPONSE indicates that the actor response contains an EOF reduce response without real data.
EOF_RESPONSE,
// READY_FOR_CLEAN_UP_SIGNAL indicates that the actor has finished sending responses and now ready to be cleaned up.
READY_FOR_CLEAN_UP_SIGNAL,
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.HandlerDatum;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
Expand All @@ -31,14 +31,13 @@ public class ReduceStreamerActor extends AbstractActor {
private OutputStreamObserver outputStream;

public static Props props(
String[] keys, Metadata md, ReduceStreamer groupBy,
StreamObserver<ReduceOuterClass.ReduceResponse> responseStreamObserver) {
String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef responseStreamActor) {
return Props.create(
ReduceStreamerActor.class,
keys,
md,
groupBy,
new OutputStreamObserverImpl(md, responseStreamObserver));
new OutputStreamObserverImpl(responseStreamActor));
}

@Override
Expand All @@ -55,11 +54,13 @@ private void invokeHandler(HandlerDatum handlerDatum) {
}

private void sendEOF(String EOF) {
// constructing final responses based on the messages processed so far and sending them out.
this.groupBy.handleEndOfStream(keys, outputStream, md);
// constructing an EOF response and sending it back to the supervisor actor.
getSender().tell(buildEOFResponse(), getSelf());
}

private ActorEOFResponse buildEOFResponse() {
private ActorResponse buildEOFResponse() {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
Expand All @@ -75,6 +76,6 @@ private ActorEOFResponse buildEOFResponse() {
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorEOFResponse(responseBuilder.build());
return new ActorResponse(responseBuilder.build(), ActorResponseType.EOF_RESPONSE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,35 @@ class ReduceSupervisorActor extends AbstractActor {
private final ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory;
private final Metadata md;
private final ActorRef shutdownActor;
private final ActorRef responseStreamActor;
private final StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
private final Map<String, ActorRef> actorsMap = new HashMap<>();

public ReduceSupervisorActor(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor,
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
this.reduceStreamerFactory = reduceStreamerFactory;
this.md = md;
this.shutdownActor = shutdownActor;
this.responseStreamActor = responseStreamActor;
this.responseObserver = responseObserver;
}

public static Props props(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor,
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
return Props.create(
ReduceSupervisorActor.class,
reduceStreamerFactory,
md,
shutdownActor,
responseStreamActor,
responseObserver);
}

Expand Down Expand Up @@ -84,27 +89,26 @@ public Receive createReceive() {
.create()
.match(ActorRequest.class, this::invokeActors)
.match(String.class, this::sendEOF)
.match(ActorEOFResponse.class, this::eofResponseListener)
.match(ActorResponse.class, this::handleActorResponse)
.build();
}

/*
based on the keys of the input message invoke the right actor
if there is no actor for an incoming set of keys, create a new actor
based on the keys of the input message invoke the right reduce streamer actor
if there is no actor for an incoming set of keys, create a new reduce streamer actor
track all the child actors using actors map
*/
private void invokeActors(ActorRequest actorRequest) {
String[] keys = actorRequest.getKeySet();
String uniqueId = actorRequest.getUniqueIdentifier();
if (!actorsMap.containsKey(uniqueId)) {
ReduceStreamer reduceStreamerHandler = reduceStreamerFactory.createReduceStreamer();
// FIXME - the responseObserver is NOT thread-safe but multiple actors are sharing it.
ActorRef actorRef = getContext()
.actorOf(ReduceStreamerActor.props(
keys,
md,
this.md,
reduceStreamerHandler,
responseObserver));
this.responseStreamActor));
actorsMap.put(uniqueId, actorRef);
}

Expand All @@ -118,19 +122,24 @@ private void sendEOF(String EOF) {
}
}

// listen to child actors for the result.
private void eofResponseListener(ActorEOFResponse actorEOFResponse) {
/*
send the result back to the client
remove the child entry from the map after getting result.
if there are no entries in the map, that means processing is
done we can close the stream.
*/
responseObserver.onNext(actorEOFResponse.getResponse());
actorsMap.remove(actorEOFResponse.getUniqueIdentifier());
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
private void handleActorResponse(ActorResponse actorResponse) {
if (actorResponse.getType() == ActorResponseType.EOF_RESPONSE) {
// forward the response to the response stream actor to send back to gRPC output stream.
this.responseStreamActor.tell(actorResponse, getSelf());
} else if (actorResponse.getType() == ActorResponseType.READY_FOR_CLEAN_UP_SIGNAL) {
// the corresponding actor is ready to be cleaned up.
// remove the child entry from the map.
// if there are no entries in the map, that means processing is
// done we can close the entire stream.
actorsMap.remove(actorResponse.getActorUniqueIdentifier());
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
} else {
throw new RuntimeException(
"Supervisor actor received an actor response with unsupported type: "
+ actorResponse.getType());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* ResponseStreamActor is dedicated to ensure synchronized calls to the responseObserver onNext().
* ALL the responses are sent to ResponseStreamActor before getting sent to output gRPC stream.
* <p>
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
*/
@Slf4j
@AllArgsConstructor
public class ResponseStreamActor extends AbstractActor {
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
Metadata md;

public static Props props(
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver,
Metadata md) {
return Props.create(ResponseStreamActor.class, responseObserver, md);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(Message.class, this::sendMessage)
.match(ActorResponse.class, this::sendEOF)
.build();
}

private void sendMessage(Message message) {
// Synchronized access to the output stream
synchronized (responseObserver) {
responseObserver.onNext(this.buildResponse(message));
}
}

private void sendEOF(ActorResponse actorResponse) {
if (actorResponse.getType() != ActorResponseType.EOF_RESPONSE) {
throw new RuntimeException(
"Unexpected behavior - Response Stream actor received a non-eof response. Response type is: "
+ actorResponse.getType());
}
// Synchronized access to the output stream
synchronized (responseObserver) {
responseObserver.onNext(actorResponse.getResponse());
}
// After the EOF response gets sent to gRPC output stream,
// tell the supervisor that the actor is ready to be cleaned up.
getSender().tell(
new ActorResponse(
actorResponse.getResponse(),
ActorResponseType.READY_FOR_CLEAN_UP_SIGNAL),
getSelf());
}

private ReduceOuterClass.ReduceResponse buildResponse(Message message) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(false);
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return responseBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ static void handleFailure(
*/
@Override
public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {

if (this.reduceStreamerFactory == null) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(
getReduceFnMethod(),
Expand Down Expand Up @@ -81,15 +80,20 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
reduceActorSystem.getEventStream().subscribe(shutdownActorRef, AllDeadLetters.class);

handleFailure(failureFuture, responseObserver);

// create a response stream actor that ensures synchronized delivery of reduce responses.
ActorRef responseStreamActor = reduceActorSystem.
actorOf(ResponseStreamActor.props(responseObserver, md));
/*
create a supervisor actor which assign the tasks to child actors.
we create a child actor for every unique set of keys in a window
we create a child actor for every unique set of keys in a window.
*/
ActorRef supervisorActor = reduceActorSystem
.actorOf(ReduceSupervisorActor.props(
reduceStreamerFactory,
md,
shutdownActorRef,
responseStreamActor,
responseObserver));


Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,16 @@
package io.numaproj.numaflow.reducestreamer.user;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass.ReduceResponse;
import akka.actor.ActorRef;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import lombok.AllArgsConstructor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@AllArgsConstructor
public
class OutputStreamObserverImpl implements OutputStreamObserver {
Metadata md;
StreamObserver<ReduceResponse> responseObserver;
private final ActorRef responseStreamActor;

@Override
public void send(Message message) {
ReduceResponse response = buildResponse(message);
responseObserver.onNext(response);
}

private ReduceResponse buildResponse(Message message) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(false);
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());

return responseBuilder.build();
this.responseStreamActor.tell(message, ActorRef.noSender());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public abstract void processMessage(
/**
* handleEndOfStream handles the closure of the reduce input stream.
* This method is invoked when the input reduce stream is closed.
* It provides the capability of constructing a final response based on the messages processed so far.
* It provides the capability of constructing final responses based on the messages processed so far.
*
* @param keys message keys
* @param outputStreamObserver observer of the reduce result, which is used to send back reduce responses
Expand Down
Loading

0 comments on commit 461610a

Please sign in to comment.