Skip to content

Commit

Permalink
fixed all unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 10, 2024
1 parent d63d88b commit d207d6b
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@Getter
@AllArgsConstructor
class ActorResponse {
// TODO - do we need keys? they seem already present in the ReduceResponse
String[] keys;
ReduceOuterClass.ReduceResponse response;
}
41 changes: 16 additions & 25 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
@Slf4j
@AllArgsConstructor
class ReduceActor extends AbstractActor {

private String[] keys;
private Metadata md;
private Reducer groupBy;
Expand All @@ -45,11 +44,15 @@ private void invokeHandler(HandlerDatum handlerDatum) {
private void getResult(String eof) {
MessageList resultMessages = this.groupBy.getOutput(keys, md);
// send the result back to sender(parent actor)
getSender().tell(buildDatumListResponse(resultMessages), getSelf());
resultMessages.getMessages().forEach(message -> {
getSender().tell(buildActorResponse(message), getSelf());
});
}

private ActorResponse buildDatumListResponse(MessageList messageList) {
private ActorResponse buildActorResponse(Message message) {
// TODO(nit) - this transformation is complex, we can create a local model of ReduceResponse and handle transformation between gRPC response and ReduceResponse at a central place.
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
Expand All @@ -58,29 +61,17 @@ private ActorResponse buildDatumListResponse(MessageList messageList) {
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
// for aligned window, if we start building the response, it means we already reached EOF.
// if we start building the response, it means we already reached EOF.
responseBuilder.setEOF(true);

ReduceOuterClass.ReduceResponse.Result.Builder resultBuilder = ReduceOuterClass.ReduceResponse.Result.newBuilder();
messageList.getMessages().forEach(message -> {
responseBuilder.set
resultBuilder.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
.build();
});


// set result

responseBuilder.setResult()


messageList.getMessages().forEach(message -> {



});
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(this.keys, responseBuilder.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
/**
* ReduceSupervisorActor actor distributes the messages to actors and handles failure.
*/

@Slf4j
class ReduceSupervisorActor extends AbstractActor {
private final ReducerFactory<? extends Reducer> reducerFactory;
Expand Down Expand Up @@ -90,19 +89,21 @@ public Receive createReceive() {
if there is no actor for an incoming set of keys, create a new actor
track all the child actors using actors map
*/
private void invokeActors(ReduceOuterClass.ReduceRequest datumRequest) {
String[] keys = datumRequest.getKeysList().toArray(new String[0]);
private void invokeActors(ReduceOuterClass.ReduceRequest reduceRequest) {
ReduceOuterClass.ReduceRequest.Payload payload = reduceRequest.getPayload();
String[] keys = payload.getKeysList().toArray(new String[0]);
String keyStr = String.join(Constants.DELIMITER, keys);
System.out.println("kerantest keyStr: " + keyStr);
if (!actorsMap.containsKey(keyStr)) {
Reducer reduceHandler = reducerFactory.createReducer();
ActorRef actorRef = getContext()
.actorOf(ReduceActor.props(keys, md, reduceHandler));

actorsMap.put(keyStr, actorRef);
}

HandlerDatum handlerDatum = constructHandlerDatum(datumRequest);
HandlerDatum handlerDatum = constructHandlerDatum(payload);
actorsMap.get(keyStr).tell(handlerDatum, getSelf());
System.out.println("kerantest number of actors: " + actorsMap.size());
}

private void sendEOF(String EOF) {
Expand All @@ -128,15 +129,15 @@ private void responseListener(ActorResponse actorResponse) {
}
}

private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest datumRequest) {
private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payload payload) {
return new HandlerDatum(
datumRequest.getValue().toByteArray(),
payload.getValue().toByteArray(),
Instant.ofEpochSecond(
datumRequest.getWatermark().getSeconds(),
datumRequest.getWatermark().getNanos()),
payload.getWatermark().getSeconds(),
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
datumRequest.getEventTime().getSeconds(),
datumRequest.getEventTime().getNanos())
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* This is a dummy implementation of reduce output stream observer for testing purpose.
*/
@Slf4j
public class ReduceOutputStreamObserver implements StreamObserver<ReduceOuterClass.ReduceResponse> {
public AtomicReference<Boolean> completed = new AtomicReference<>(false);
public AtomicReference<ReduceOuterClass.ReduceResponse> resultDatum = new AtomicReference<>(
ReduceOuterClass.ReduceResponse.newBuilder().build());
public AtomicReference<List<ReduceOuterClass.ReduceResponse>> resultDatum = new AtomicReference<>(
new ArrayList<>());
public Throwable t;

@Override
public void onNext(ReduceOuterClass.ReduceResponse datum) {
resultDatum.set(resultDatum
.get()
.toBuilder()
.addAllResults(datum.getResultsList())
.build());
public void onNext(ReduceOuterClass.ReduceResponse response) {
List<ReduceOuterClass.ReduceResponse> receivedResponses = resultDatum.get();
receivedResponses.add(response);
resultDatum.set(receivedResponses);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@
import org.junit.Test;

import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class ReduceSupervisorActorTest {

@Test
public void invokeSingleActor() throws RuntimeException {
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();

String reduceKey = "reduce-key";
ReduceOuterClass.ReduceRequest.Builder requestBuilder = ReduceOuterClass.ReduceRequest.
newBuilder().addKeys(reduceKey);

ActorRef shutdownActor = actorSystem
.actorOf(ReduceShutdownActor
.props(completableFuture));
Expand All @@ -39,24 +37,35 @@ public void invokeSingleActor() throws RuntimeException {
.props(new TestReducerFactory(), md, shutdownActor, outputStreamObserver));

for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest reduceRequest = requestBuilder
.addKeys("reduce-test")
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// all reduce requests share same set of keys.
.addAllKeys(Arrays.asList("key-1", "key-2"))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
supervisor.tell(reduceRequest, ActorRef.noSender());
}

supervisor.tell(Constants.EOF, ActorRef.noSender());

try {
completableFuture.get();
assertEquals(1, outputStreamObserver.resultDatum.get().size());
assertEquals("10", outputStreamObserver.resultDatum
.get()
.get(0)
.getResult()
.getValue()
.toStringUtf8());
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
}

@Test
public void invokeMultipleActors() throws RuntimeException {
public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();

Expand All @@ -67,26 +76,42 @@ public void invokeMultipleActors() throws RuntimeException {
Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
ActorRef supervisor = actorSystem
.actorOf(ReduceSupervisorActor
.props(
new TestReducerFactory(),
md,
shutdownActor,
new ReduceOutputStreamObserver()));
outputStreamObserver)
);

for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest
.newBuilder()
.addKeys("reduce-test" + i)
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// each request contain a unique set of keys.
.addAllKeys(Arrays.asList("shared-key", "unique-key-" + i))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
supervisor.tell(reduceRequest, ActorRef.noSender());
}

supervisor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// the outputStreamObserver should get updated 10 times, each time with value 1.
assertEquals(10, outputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 10; i++) {
assertEquals("1", outputStreamObserver.resultDatum
.get()
.get(i)
.getResult()
.getValue()
.toStringUtf8());
}
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public void addMessage(String[] keys, Datum datum, Metadata md) {

@Override
public MessageList getOutput(String[] keys, Metadata md) {
System.out.println("returning number: " + sum);
String[] updatedKeys = Arrays
.stream(keys)
.map(c -> c + "-processed")
Expand Down
25 changes: 15 additions & 10 deletions src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import static org.junit.Assert.fail;

public class ServerErrTest {


public static final Metadata.Key<String> DATUM_METADATA_WIN_START = io.grpc.Metadata.Key.of(
WIN_START_KEY,
Metadata.ASCII_STRING_MARSHALLER);
Expand All @@ -42,7 +40,6 @@ public class ServerErrTest {

@Before
public void setUp() throws Exception {

ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
Expand Down Expand Up @@ -89,24 +86,27 @@ public void tearDown() throws Exception {
}

@Test
public void TestReducerErr() {
String reduceKey = "reduce-key";

public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowable() {
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
metadata.put(Metadata.Key.of(WIN_END_KEY, Metadata.ASCII_STRING_MARSHALLER), "120000");

//create an output stream observer
// create an output stream observer
ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();

Thread t = new Thread(() -> {
while (outputStreamObserver.t == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO - FIXME - if reaching here, fail the test.
System.out.println("kerantest - should never reach here.");
e.printStackTrace();
}
}
System.out.println("kerantest - should reach here.");
// TODO - FIXME - the assertion happens within a thread so even if it fails, the test method can still pass.
// nit: also make "unknown exception" a shared const between test factory and test.
assertEquals(
"UNKNOWN: java.lang.RuntimeException: unknown exception",
outputStreamObserver.t.getMessage());
Expand All @@ -121,8 +121,11 @@ public void TestReducerErr() {
for (int i = 1; i <= 10; i++) {
ReduceOuterClass.ReduceRequest reduceRequest = ReduceOuterClass.ReduceRequest
.newBuilder()
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.addKeys(reduceKey)
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
.addKeys("reduce-key")
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
inputStreamObserver.onNext(reduceRequest);
}
Expand All @@ -132,7 +135,9 @@ public void TestReducerErr() {
try {
t.join();
} catch (InterruptedException e) {
fail("Thread interrupted");
// the thread should always finish successfully with test assertion passing.
// if not, fail the test.
fail("Thread got interrupted before test assertion.");
}
}
}
Loading

0 comments on commit d207d6b

Please sign in to comment.