Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 28, 2024
1 parent 31c7e1f commit 720c0db
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ReduceSupervisorActor extends AbstractActor {
private final ActorRef shutdownActor;
private final StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;
private final Map<String, ActorRef> actorsMap = new HashMap<>();

public ReduceSupervisorActor(
ReducerFactory<? extends Reducer> reducerFactory,
Metadata md,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.numaproj.numaflow.reducestreamer.model;

/**
* OutputStreamObserver sends to the output stream, the messages generate by the session reducer.
* OutputStreamObserver sends to the output stream, the messages generate by the reduce streamer.
*/
public interface OutputStreamObserver {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,24 @@
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

/**
* ActorRequest is used by the supervisor actor to distribute session reduce operations to
* individual session reducer actors. One actor request is sent to only one session reducer actor.
*/
@Getter
@Setter
class ActorRequest {
private ActorRequestType type;
private final ActorRequestType type;
// the window of the target session the actor request is sent to
private Sessionreduce.KeyedWindow keyedWindow;
private final Sessionreduce.KeyedWindow keyedWindow;
// the new keyed window the target session is to be expanded to
// it is specified only when the actor request is an EXPAND
private Sessionreduce.KeyedWindow newKeyedWindow;
private final Sessionreduce.KeyedWindow newKeyedWindow;
// the payload of the request
private Sessionreduce.SessionReduceRequest.Payload payload;
private final Sessionreduce.SessionReduceRequest.Payload payload;
// the id of the merge task this request belongs to
// it is specified only when the actor request is a GET_ACCUMULATOR
private String mergeTaskId;
private final String mergeTaskId;

@Builder
private ActorRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,20 @@
import lombok.Setter;

/**
* The actor response holds the session reduce response for a particular session window.
* The actor response holds the session reduce response from a particular session window.
*/
@Getter
@Setter
class ActorResponse {
Sessionreduce.SessionReduceResponse response;
/**
* The isLast attribute indicates whether the response is globally the last one to be sent to
* the output gRPC stream, if set to true, it means the response is the very last response among
* all windows. When output actor receives an isLast response, it sends the response and immediately
* closes the output stream.
*/
// The isLast attribute indicates whether the response is globally the last one to be sent to
// the output gRPC stream, if set to true, it means the response is the very last response among
// all windows. When output actor receives an isLast response, it sends the response to and immediately
// closes the output stream.
boolean isLast;

// The accumulator attribute holds the accumulator of the session.
byte[] accumulator;
// The mergeTaskId attribute holds the merge task if this session is to be merged in.
// The mergeTaskId attribute holds the merge task id that this session is to be merged into.
String mergeTaskId;

@Builder
Expand All @@ -40,7 +37,8 @@ private ActorResponse(

static class ActorResponseBuilder {
ActorResponse build() {
if (accumulator != null && mergeTaskId == null) {
if ((accumulator != null && mergeTaskId == null) || (accumulator == null
&& mergeTaskId != null)) {
throw new IllegalStateException(
"attributes accumulator and mergeTaskId should be either both null or both non-null.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
* the window is to be merged with other windows.
* <p>
* "I am working on a merge task (mergeTaskId),
* and you are one of the windows to be merged. Please give me your accumulator."
* and you are one of the windows to be merged.
* Please give me your accumulator."
*/
@AllArgsConstructor
@Getter
class GetAccumulatorRequest {
String mergeTaskId;
private final String mergeTaskId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
@AllArgsConstructor
@Getter
class GetAccumulatorResponse {
Sessionreduce.KeyedWindow fromKeyedWindow;
String mergeTaskId;
byte[] accumulator;
private final Sessionreduce.KeyedWindow fromKeyedWindow;
private final String mergeTaskId;
private final byte[] accumulator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
@AllArgsConstructor
@Getter
class MergeAccumulatorRequest {
boolean isLast;
byte[] accumulator;
private final boolean isLast;
private final byte[] accumulator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
import java.util.List;

/**
* OutputStreamObserverImpl uses its assigned window to transform a message to
* an ActorResponse. The send method sends the ActorResponse to the output actor to
* forward to output gRPC stream.
* OutputStreamObserverImpl transforms a message to an ActorResponse.
* The send method sends the ActorResponse to the output actor to forward to output gRPC stream.
*/
@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void stop() throws InterruptedException {
/**
* Set server builder for testing.
*
* @param serverBuilder
* @param serverBuilder for building the server
*/
@VisibleForTesting
void setServerBuilder(ServerBuilder<?> serverBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class Service extends SessionReduceGrpc.SessionReduceImplBase {
public static final ActorSystem sessionReduceActorSystem = ActorSystem.create("sessionreduce");

private SessionReducerFactory<? extends SessionReducer> sessionReducerFactory;
private final SessionReducerFactory<? extends SessionReducer> sessionReducerFactory;

public Service(SessionReducerFactory<? extends SessionReducer> sessionReducerFactory) {
this.sessionReducerFactory = sessionReducerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void handleMergeAccumulatorRequest(MergeAccumulatorRequest mergeAccumula
"received a merge accumulator request but the session is not in a merging process.");
}
this.sessionReducer.mergeAccumulator(mergeAccumulatorRequest.getAccumulator());
if (mergeAccumulatorRequest.isLast) {
if (mergeAccumulatorRequest.isLast()) {
// I have merged the last accumulator, I am no longer in a MERGE process.
this.isMerging = false;
if (this.eofPending) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) {
throw new RuntimeException(
"merge operation error: session not found for id: " + id);
}
// mergedWindow will be the largest window which contains all the windows.
// merged window will be the largest window which contains all the windows.
if (Instant
.ofEpochSecond(
window.getStart().getSeconds(),
Expand Down Expand Up @@ -231,7 +231,8 @@ private void handleReduceRequest(Sessionreduce.SessionReduceRequest request) {
// it's possible that merged keyed window is the same as one of the existing windows,
// in this case, since we already send out the GET_ACCUMULATOR request, it's ok to replace
// the existing window with the new one.
// the accumulator of the old window will get merged to the new window eventually.
// the accumulator of the old window will get merged to the new window eventually,
// when the supervisor receives the get accumulator response.
ActorRequest mergeOpenRequest = ActorRequest.builder()
.type(ActorRequestType.MERGE_OPEN)
.keyedWindow(mergedWindow)
Expand Down

0 comments on commit 720c0db

Please sign in to comment.