Skip to content

Commit

Permalink
chore: send one EOF response only for reduce stream
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Mar 25, 2024
1 parent 502262f commit af0b061
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,12 @@

/**
* The actor response holds the final EOF response for a particular key set.
* <p>
* The isLast attribute indicates whether the response is globally the last one to be sent to
* the output gRPC stream, if set to true, it means the response is the very last response among
* all key sets. When output stream actor receives an isLast response, it sends the response and immediately
* closes the output stream.
*/
@Getter
@Setter
@AllArgsConstructor
class ActorResponse {
ReduceOuterClass.ReduceResponse response;
boolean isLast;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Receive createReceive() {
}

private void handleResponse(ActorResponse actorResponse) {
if (actorResponse.isLast()) {
if (actorResponse.getResponse().getEOF()) {
// send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ private ActorResponse buildResponse(Message message) {
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build(), false);
return new ActorResponse(responseBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ private ActorResponse buildEOFResponse() {
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorResponse(responseBuilder.build(), false);
return new ActorResponse(responseBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ private void handleActorResponse(ActorResponse actorResponse) {
actorsMap.remove(actorResponse.getActorUniqueIdentifier());
if (actorsMap.isEmpty()) {
// since the actors map is empty, this particular actor response is the last response to forward to output gRPC stream.
actorResponse.setLast(true);
this.outputActor.tell(actorResponse, getSelf());
} else {
// for reduce streamer, we only send to output stream one single EOF response, which is the last one.
// we don't care about per-key-set EOFs.
this.outputActor.tell(actorResponse, getSelf());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
@Test
public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then_requestsGetAggregatedSeparately() {
String reduceKey = "reduce-key";
int keyCount = 3;
int keyCount = 10;

Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
Expand Down Expand Up @@ -206,14 +206,15 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then

while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the outputStreamObserver should have observed 3*keyCount responses, 2 with real output sum data, one as EOF.
assertEquals(keyCount * 3, result.size());
result.forEach(response -> {
// the outputStreamObserver should have observed (keyCount * 2 + 1) responses, 2 with real output sum data per key, 1 as the final single EOF response.
assertEquals(keyCount * 2 + 1, result.size());
for (int i = 0; i < keyCount * 2; i++) {
ReduceOuterClass.ReduceResponse response = result.get(i);
assertTrue(response.getResult().getValue().equals(expectedFirstResponse) ||
response.getResult().getValue().equals(expectedSecondResponse)
|| response.getEOF());

});
response.getResult().getValue().equals(expectedSecondResponse));
}
// verify the last one is the EOF.
assertTrue(result.get(keyCount * 2).getEOF());
}

public static class ReduceStreamerTestFactory extends ReduceStreamerFactory<ServerTest.ReduceStreamerTestFactory.TestReduceStreamHandler> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
int keyCount = 10;

ActorRef shutdownActor = actorSystem
.actorOf(ShutdownActor
Expand All @@ -107,7 +108,7 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
outputActor)
);

for (int i = 1; i <= 10; i++) {
for (int i = 1; i <= keyCount; i++) {
io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest(
ReduceOuterClass.ReduceRequest
.newBuilder()
Expand All @@ -126,15 +127,16 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
ActorRef.noSender());
try {
completableFuture.get();
// each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.
assertEquals(20, reduceOutputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 20; i++) {
// each reduce request generates keyCount number of responses with data, plus one final EOF response.
assertEquals(keyCount + 1, reduceOutputStreamObserver.resultDatum.get().size());
for (int i = 0; i < keyCount; i++) {
ReduceOuterClass.ReduceResponse response = reduceOutputStreamObserver.resultDatum
.get()
.get(i);
assertTrue(response.getResult().getValue().toStringUtf8().equals("1")
|| response.getEOF());
assertTrue(response.getResult().getValue().toStringUtf8().equals("1"));
}
// verify the last one is the EOF.
assertTrue(reduceOutputStreamObserver.resultDatum.get().get(keyCount).getEOF());
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
Expand Down

0 comments on commit af0b061

Please sign in to comment.