Skip to content

Commit

Permalink
small refactors
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 11, 2024
1 parent 43be654 commit 5b0b9f8
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 61 deletions.
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<dependency>
<groupId>io.numaproj.numaflow</groupId>
<artifactId>numaflow-java</artifactId>
<version>0.6.0</version>
<version>0.6.3</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.numaproj.numaflow</groupId>
<artifactId>numaflow-java</artifactId>
<version>0.6.0</version>
<version>0.6.3</version>
<packaging>jar</packaging>

<name>numaflow-java</name>
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducer/ActorRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.numaproj.numaflow.reducer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorRequest is to store the request sent to ReduceActors.
*/
@Getter
@AllArgsConstructor
class ActorRequest {
ReduceOuterClass.ReduceRequest request;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getRequest().getPayload().getKeysList().toArray(new String[0]));
}

public String[] getKeySet() {
return this.getRequest().getPayload().getKeysList().toArray(new String[0]);
}
}
12 changes: 10 additions & 2 deletions src/main/java/io/numaproj/numaflow/reducer/ActorResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@
@Getter
@AllArgsConstructor
class ActorResponse {
// FIXME - with the latest proto update, each response has a single set of keys, hence we can remove the keys field here.
String[] keys;
ReduceOuterClass.ReduceResponse response;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
}
}
12 changes: 8 additions & 4 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private void getResult(String eof) {
resultMessages.getMessages().forEach(message -> {
getSender().tell(buildActorResponse(message), getSelf());
});
// send EOF
// send a response back with EOF set to true, indicating the reducer has finished the data aggregation.
getSender().tell(buildEOFActorResponse(), getSelf());
}

Expand All @@ -72,7 +72,7 @@ private ActorResponse buildActorResponse(Message message) {
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(this.keys, responseBuilder.build());
return new ActorResponse(responseBuilder.build());
}

private ActorResponse buildEOFActorResponse() {
Expand All @@ -86,7 +86,11 @@ private ActorResponse buildEOFActorResponse() {
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(true);
return new ActorResponse(this.keys, responseBuilder.build());
// set a dummy result with the keys.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorResponse(responseBuilder.build());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void postStop() {
public Receive createReceive() {
return ReceiveBuilder
.create()
.match(ReduceOuterClass.ReduceRequest.class, this::invokeActors)
.match(ActorRequest.class, this::invokeActors)
.match(String.class, this::sendEOF)
.match(ActorResponse.class, this::responseListener)
.build();
Expand All @@ -89,22 +89,18 @@ public Receive createReceive() {
if there is no actor for an incoming set of keys, create a new actor
track all the child actors using actors map
*/
private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) {
ReduceOuterClass.ReduceRequest.Payload payload = reduceRequest.getPayload();
String[] keys = payload.getKeysList().toArray(new String[0]);
// TODO - do we need to include window information in the keyStr?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
String keyStr = String.join(Constants.DELIMITER, keys);
if (!actorsMap.containsKey(keyStr)) {
private void invokeActors(ActorRequest actorRequest) {
String[] keys = actorRequest.getKeySet();
String uniqueId = actorRequest.getUniqueIdentifier();
if (!actorsMap.containsKey(uniqueId)) {
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));
actorsMap.put(keyStr, actorRef);
actorsMap.put(uniqueId, actorRef);
}

HandlerDatum handlerDatum = constructHandlerDatum(payload);
actorsMap.get(keyStr).tell(handlerDatum, getSelf());
HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload());
actorsMap.get(uniqueId).tell(handlerDatum, getSelf());
}

private void sendEOF(String EOF) {
Expand All @@ -115,24 +111,21 @@ private void sendEOF(String EOF) {

// listen to child actors for the result.
private void responseListener(ActorResponse actorResponse) {
// TODO - do we need to include window information for aligned windows?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(String.join(Constants.DELIMITER, actorResponse.getKeys()));
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
return;
}
/*
send the result back to the client
remove the child entry from the map after getting result.
if there are no entries in the map, that means processing is
done we can close the stream.
*/
responseObserver.onNext(actorResponse.getResponse());
String uniqueId = actorResponse.getUniqueIdentifier();
if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(uniqueId);
if (actorsMap.isEmpty()) {
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
}
}

private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void start() throws Exception {
server.start();

log.info(
"(keran-test)Server started, listening on socket path: "
"Server started, listening on socket path: "
+ grpcConfig.getSocketPath());

// register shutdown hook
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
public void onNext(ReduceOuterClass.ReduceRequest datum) {
// send the message to parent actor, which takes care of distribution.
if (!supervisorActor.isTerminated()) {
supervisorActor.tell(datum, ActorRef.noSender());
supervisorActor.tell(new ActorRequest(datum), ActorRef.noSender());
} else {
responseObserver.onError(new Throwable("Supervisor actor was terminated"));
}
Expand Down
1 change: 1 addition & 0 deletions src/main/proto/reduce/v1/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ message ReduceResponse {
Window window = 2;

// EOF represents the end of the response for a window.
// When it's set to true, the platform considers the response as an indicator which doesn't contain real data.
bool EOF = 3;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ReduceSupervisorActorTest {
Expand All @@ -37,28 +38,33 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
.props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver));

for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest
ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// all reduce requests share same set of keys.
.addAllKeys(Arrays.asList("key-1", "key-2"))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
}
supervisor.tell(Constants.EOF, ActorRef.noSender());

try {
completableFuture.get();
assertEquals(1, outputStreamObserver.resultDatum.get().size());
// the observer should receive 2 messages, one is the aggregated result, the other is the EOF response.
assertEquals(2, outputStreamObserver.resultDatum.get().size());
assertEquals("10", outputStreamObserver.resultDatum
.get()
.get(0)
.getResult()
.getValue()
.toStringUtf8());
assertEquals(true, outputStreamObserver.resultDatum
.get()
.get(1)
.getEOF());
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
Expand Down Expand Up @@ -87,30 +93,29 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
);

for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest
ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// each request contain a unique set of keys.
.addAllKeys(Arrays.asList("shared-key", "unique-key-" + i))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
}

supervisor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// the outputStreamObserver should get updated 10 times, each time with value 1.
assertEquals(10, outputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 10; i++) {
assertEquals("1", outputStreamObserver.resultDatum
// each reduce request generates two reduce response, one containing the data and the other one indicating EOF.
assertEquals(20, outputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 20; i++) {
ReduceOuterClass.ReduceResponse response = outputStreamObserver.resultDatum
.get()
.get(i)
.getResult()
.getValue()
.toStringUtf8());
.get(i);
assertTrue(response.getResult().getValue().toStringUtf8().equals("1")
|| response.getEOF());
}
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
Expand Down
25 changes: 16 additions & 9 deletions src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicReference;

import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -94,22 +96,23 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
// create an output stream observer
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();

AtomicReference<Throwable> exceptionInThread = new AtomicReference<>();

Thread t = new Thread(() -> {
while (outputStreamObserver.t == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO - FIXME - if reaching here, fail the test.
System.out.println("kerantest - should never reach here.");
e.printStackTrace();
exceptionInThread.set(e);
}
}
System.out.println("kerantest - should reach here.");
// TODO - FIXME - the assertion happens within a thread so even if it fails, the test method can still pass.
// nit: also make "unknown exception" a shared const between test factory and test.
assertEquals(
"UNKNOWN: java.lang.RuntimeException: unknown exception",
outputStreamObserver.t.getMessage());
try {
assertEquals(
"UNKNOWN: java.lang.RuntimeException: unknown exception",
outputStreamObserver.t.getMessage());
} catch (Throwable e) {
exceptionInThread.set(e);
}
});
t.start();

Expand Down Expand Up @@ -137,5 +140,9 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
} catch (InterruptedException e) {
fail("Thread got interrupted before test assertion.");
}
// Fail the test if any exception caught in the thread
if (exceptionInThread.get() != null) {
fail("Assertion failed in the thread: " + exceptionInThread.get().getMessage());
}
}
}
11 changes: 7 additions & 4 deletions src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ServerTest {
public static final Metadata.Key<String> DATUM_METADATA_WIN_START = io.grpc.Metadata.Key.of(
Expand Down Expand Up @@ -123,7 +124,8 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55));
while (!outputStreamObserver.completed.get()) ;

assertEquals(1, outputStreamObserver.resultDatum.get().size());
// Expect 2 responses, one containing the aggregated data and the other indicating EOF.
assertEquals(2, outputStreamObserver.resultDatum.get().size());
assertEquals(
expectedKeys,
outputStreamObserver.resultDatum
Expand All @@ -139,6 +141,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
.get(0)
.getResult()
.getValue());
assertTrue(outputStreamObserver.resultDatum.get().get(1).getEOF());
}

@Test
Expand Down Expand Up @@ -179,10 +182,10 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then

while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the outputStreamObserver should have observed keyCount responses, each of which has value 55.
assertEquals(keyCount, result.size());
// the outputStreamObserver should have observed 2*keyCount responses, because for each key set, one response for the aggregated result, the other for EOF.
assertEquals(keyCount * 2, result.size());
result.forEach(response -> {
assertEquals(expectedValue, response.getResult().getValue());
assertTrue(response.getResult().getValue().equals(expectedValue) || response.getEOF());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public void testFailure() {
shutdownActor,
new ReduceOutputStreamObserver()));

ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest.newBuilder()
ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest.newBuilder()
.setPayload(payloadBuilder
.addKeys("reduce-test")
.setValue(ByteString.copyFromUtf8(String.valueOf(1)))
.build())
.build();
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());

try {
Expand Down

0 comments on commit 5b0b9f8

Please sign in to comment.