diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java index 9a3c7939..4bd2ce57 100644 --- a/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java +++ b/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java @@ -1,10 +1,16 @@ package io.numaproj.numaflow.examples.reducestreamer.sum; import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.Metadata; import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserver; import io.numaproj.numaflow.reducestreamer.user.ReduceStreamer; import lombok.extern.slf4j.Slf4j; +/** + * SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys + * and outputs the sum when the sum is greater than 100. + * When the input stream closes, the function outputs the sum no matter what value it holds. + */ @Slf4j public class SumFunction extends ReduceStreamer { @@ -14,7 +20,7 @@ public class SumFunction extends ReduceStreamer { public void processMessage( String[] keys, io.numaproj.numaflow.reducestreamer.model.Datum datum, - OutputStreamObserver outputStream, + OutputStreamObserver outputStreamObserver, io.numaproj.numaflow.reducestreamer.model.Metadata md) { try { sum += Integer.parseInt(new String(datum.getValue())); @@ -22,8 +28,16 @@ public void processMessage( log.info("error while parsing integer - {}", e.getMessage()); } if (sum >= 100) { - outputStream.send(new Message(String.valueOf(sum).getBytes())); + outputStreamObserver.send(new Message(String.valueOf(sum).getBytes())); sum = 0; } } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md) { + outputStreamObserver.send(new Message(String.valueOf(sum).getBytes())); + } } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorEOFResponse.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorEOFResponse.java index 48ca0622..c5436f2b 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorEOFResponse.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorEOFResponse.java @@ -5,8 +5,10 @@ import lombok.Getter; /** - * ActorResponse is to store the response from ReduceActors. - * TODO - not required. Remove + * ActorEOFResponse is to store the EOF signal from a ReduceStreamerActor. + * ReduceStreamerActor sends it back to the supervisor actor to indicate that + * the streamer actor itself has finished processing the data and is ready to be + * released. */ @Getter @AllArgsConstructor diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java index f8b31136..2d94a500 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java @@ -5,7 +5,9 @@ import lombok.Getter; /** - * ActorRequest is to store the request sent to ReduceStreamActors. + * ActorRequest is a wrapper of the gRpc input request. + * It is constructed by the service when service receives an input request and then sent to + * the supervisor actor. */ @Getter @AllArgsConstructor diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java index cb8bf1bb..af25d5b7 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceShutdownActor.java @@ -13,8 +13,6 @@ /** * Shutdown actor, listens to exceptions and handles shutdown. */ - - @Slf4j @AllArgsConstructor class ReduceShutdownActor extends AbstractActor { diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java index fbd6ee8d..6d9770d6 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceStreamerActor.java @@ -17,9 +17,11 @@ import java.util.List; /** - * Reduce stream actor invokes the reducer and returns the result. + * Reduce stream actor invokes user defined functions to handle reduce request. + * When receiving an input request, it invokes the processMessage to handle the datum. + * When receiving an EOF signal from the supervisor, it invokes the handleEndOfStream to execute + * the user-defined end of stream processing logics. */ - @Slf4j @AllArgsConstructor public class ReduceStreamerActor extends AbstractActor { @@ -53,6 +55,7 @@ private void invokeHandler(HandlerDatum handlerDatum) { } private void sendEOF(String EOF) { + this.groupBy.handleEndOfStream(keys, outputStream, md); getSender().tell(buildEOFResponse(), getSelf()); } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java index 89dfea6a..bacd3aec 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActor.java @@ -98,6 +98,7 @@ private void invokeActors(ActorRequest actorRequest) { String uniqueId = actorRequest.getUniqueIdentifier(); if (!actorsMap.containsKey(uniqueId)) { ReduceStreamer reduceStreamerHandler = reduceStreamerFactory.createReduceStreamer(); + // FIXME - the responseObserver is NOT thread-safe but multiple actors are sharing it. ActorRef actorRef = getContext() .actorOf(ReduceStreamerActor.props( keys, diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/user/OutputStreamObserver.java b/src/main/java/io/numaproj/numaflow/reducestreamer/user/OutputStreamObserver.java index 24c3135a..433ae8c6 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/user/OutputStreamObserver.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/user/OutputStreamObserver.java @@ -3,7 +3,7 @@ import io.numaproj.numaflow.reducestreamer.model.Message; /** - * OutputObserver receives messages from the MapStreamer. + * OutputObserver receives messages from the ReduceStreamer. */ public interface OutputStreamObserver { /** diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamer.java b/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamer.java index 7be0351a..dde4ff34 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamer.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamer.java @@ -4,12 +4,36 @@ import io.numaproj.numaflow.reducestreamer.model.Metadata; /** - * TODO - add descriptions + * ReduceStreamer exposes methods for performing reduce streaming operations. */ public abstract class ReduceStreamer { + /** + * processMessage is invoked for each reduce input message. + * It reads the input data from the datum and performs reduce operations for the given keys. + * An output stream is provided for sending back the result to the reduce output stream. + * + * @param keys message keys + * @param datum current message to be processed + * @param outputStream observer of the reduce result, which is used to send back reduce responses + * @param md metadata associated with the window + */ public abstract void processMessage( String[] keys, Datum datum, OutputStreamObserver outputStream, Metadata md); + + /** + * handleEndOfStream handles the closure of the reduce input stream. + * This method is invoked when the input reduce stream is closed. + * It provides the capability of constructing a final response based on the messages processed so far. + * + * @param keys message keys + * @param outputStreamObserver observer of the reduce result, which is used to send back reduce responses + * @param md metadata associated with the window + */ + public abstract void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md); } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamerFactory.java b/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamerFactory.java index d433142b..116d67b3 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamerFactory.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/user/ReduceStreamerFactory.java @@ -2,9 +2,7 @@ /** - * ReducerFactory is the factory for Reducer. - *

- * TODO - do we need this? + * ReduceStreamerFactory is the factory for ReduceStreamer. */ public abstract class ReduceStreamerFactory { diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java new file mode 100644 index 00000000..20ed4fa0 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ReduceSupervisorActorTest.java @@ -0,0 +1,163 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.google.protobuf.ByteString; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Datum; +import io.numaproj.numaflow.reducestreamer.model.IntervalWindowImpl; +import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.MetadataImpl; +import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.user.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.user.ReduceStreamerFactory; +import org.junit.Test; + +import java.time.Instant; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ReduceSupervisorActorTest { + @Test + public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException { + final ActorSystem actorSystem = ActorSystem.create("test-system-1"); + CompletableFuture completableFuture = new CompletableFuture(); + + ActorRef shutdownActor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .props(completableFuture)); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + io.numaproj.numaflow.reducer.ReduceOutputStreamObserver outputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver(); + + ActorRef supervisor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + .props( + new TestReduceStreamerFactory(), + md, + shutdownActor, + outputStreamObserver)); + + for (int i = 1; i <= 10; i++) { + io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( + ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + // all reduce requests share same set of keys. + .addAllKeys(Arrays.asList("key-1", "key-2")) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build()); + supervisor.tell(reduceRequest, ActorRef.noSender()); + } + supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender()); + + try { + completableFuture.get(); + // the observer should receive 2 messages, one is the aggregated result, the other is the EOF response. + assertEquals(2, outputStreamObserver.resultDatum.get().size()); + assertEquals("10", outputStreamObserver.resultDatum + .get() + .get(0) + .getResult() + .getValue() + .toStringUtf8()); + assertEquals(true, outputStreamObserver.resultDatum + .get() + .get(1) + .getEOF()); + } catch (InterruptedException | ExecutionException e) { + fail("Expected the future to complete without exception"); + } + } + + @Test + public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException { + final ActorSystem actorSystem = ActorSystem.create("test-system-2"); + CompletableFuture completableFuture = new CompletableFuture(); + + ActorRef shutdownActor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .props(completableFuture)); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver(); + ActorRef supervisor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + .props( + new TestReduceStreamerFactory(), + md, + shutdownActor, + outputStreamObserver) + ); + + for (int i = 1; i <= 10; i++) { + io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( + ReduceOuterClass.ReduceRequest + .newBuilder() + .setPayload(ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + // each request contain a unique set of keys. + .addAllKeys(Arrays.asList("shared-key", "unique-key-" + i)) + .setValue(ByteString.copyFromUtf8(String.valueOf(i))) + .build()) + .build()); + supervisor.tell(reduceRequest, ActorRef.noSender()); + } + + supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender()); + try { + completableFuture.get(); + // each reduce request generates two reduce responses, one containing the data and the other one indicating EOF. + assertEquals(20, outputStreamObserver.resultDatum.get().size()); + for (int i = 0; i < 20; i++) { + ReduceOuterClass.ReduceResponse response = outputStreamObserver.resultDatum + .get() + .get(i); + assertTrue(response.getResult().getValue().toStringUtf8().equals("1") + || response.getEOF()); + } + } catch (InterruptedException | ExecutionException e) { + fail("Expected the future to complete without exception"); + } + } + + public static class TestReduceStreamerFactory extends ReduceStreamerFactory { + @Override + public TestReduceStreamerHandler createReduceStreamer() { + return new TestReduceStreamerHandler(); + } + + public static class TestReduceStreamerHandler extends ReduceStreamer { + int count = 0; + + @Override + public void processMessage( + String[] keys, + Datum datum, + OutputStreamObserver outputStream, + Metadata md) { + count += 1; + } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md) { + outputStreamObserver.send(new Message(String.valueOf(count).getBytes())); + } + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java index e1d0f1b6..dd14c531 100644 --- a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java @@ -151,13 +151,13 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab } } - public static class ReduceStreamerErrTestFactory extends ReduceStreamerFactory { + public static class ReduceStreamerErrTestFactory extends ReduceStreamerFactory { @Override - public TestReduceHandler createReduceStreamer() { - return new ServerErrTest.ReduceStreamerErrTestFactory.TestReduceHandler(); + public TestReduceStreamHandler createReduceStreamer() { + return new ServerErrTest.ReduceStreamerErrTestFactory.TestReduceStreamHandler(); } - public static class TestReduceHandler extends ReduceStreamer { + public static class TestReduceStreamHandler extends ReduceStreamer { @Override public void processMessage( String[] keys, @@ -166,6 +166,14 @@ public void processMessage( io.numaproj.numaflow.reducestreamer.model.Metadata md) { throw new RuntimeException("unknown exception"); } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + io.numaproj.numaflow.reducestreamer.model.Metadata md) { + + } } } } diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java index 8872050f..988166dc 100644 --- a/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java @@ -111,7 +111,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)) .reduceFn(outputStreamObserver); - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= 11; i++) { ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest.newBuilder() .setPayload(ReduceOuterClass.ReduceRequest.Payload .newBuilder() @@ -126,11 +126,13 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ String[] expectedKeys = new String[]{reduceKey + REDUCE_PROCESSED_KEY_SUFFIX}; // sum of first 10 numbers 1 to 10 -> 55 - ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55)); + ByteString expectedFirstResponse = ByteString.copyFromUtf8(String.valueOf(55)); + // after the sum reaches 55, the test reducer reset the sum, hence when EOF is sent from input stream, the sum is 11 and gets sent to output stream. + ByteString expectedSecondResponse = ByteString.copyFromUtf8(String.valueOf(11)); while (!outputStreamObserver.completed.get()) ; // Expect 2 responses, one containing the aggregated data and the other indicating EOF. - assertEquals(2, outputStreamObserver.resultDatum.get().size()); + assertEquals(3, outputStreamObserver.resultDatum.get().size()); assertEquals( expectedKeys, outputStreamObserver.resultDatum @@ -140,13 +142,28 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ .getKeysList() .toArray(new String[0])); assertEquals( - expectedValue, + expectedFirstResponse, outputStreamObserver.resultDatum .get() .get(0) .getResult() .getValue()); - assertTrue(outputStreamObserver.resultDatum.get().get(1).getEOF()); + assertEquals( + expectedKeys, + outputStreamObserver.resultDatum + .get() + .get(1) + .getResult() + .getKeysList() + .toArray(new String[0])); + assertEquals( + expectedSecondResponse, + outputStreamObserver.resultDatum + .get() + .get(1) + .getResult() + .getValue()); + assertTrue(outputStreamObserver.resultDatum.get().get(2).getEOF()); } @Test @@ -168,7 +185,7 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then // send messages with keyCount different keys for (int j = 0; j < keyCount; j++) { - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= 11; i++) { ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest .newBuilder() .setPayload(ReduceOuterClass.ReduceRequest.Payload.newBuilder() @@ -183,31 +200,36 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then inputStreamObserver.onCompleted(); // sum of first 10 numbers 1 to 10 -> 55 - ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55)); + ByteString expectedFirstResponse = ByteString.copyFromUtf8(String.valueOf(55)); + // after the sum reaches 55, the test reducer reset the sum, hence when EOF is sent from input stream, the sum is 11 and gets sent to output stream. + ByteString expectedSecondResponse = ByteString.copyFromUtf8(String.valueOf(11)); while (!outputStreamObserver.completed.get()) ; List result = outputStreamObserver.resultDatum.get(); - // the outputStreamObserver should have observed 2*keyCount responses, because for each key set, one response for the aggregated result, the other for EOF. - assertEquals(keyCount * 2, result.size()); + // the outputStreamObserver should have observed 3*keyCount responses, 2 with real output sum data, one as EOF. + assertEquals(keyCount * 3, result.size()); result.forEach(response -> { - assertTrue(response.getResult().getValue().equals(expectedValue) || response.getEOF()); + assertTrue(response.getResult().getValue().equals(expectedFirstResponse) || + response.getResult().getValue().equals(expectedSecondResponse) + || response.getEOF()); + }); } - public static class ReduceStreamerTestFactory extends ReduceStreamerFactory { + public static class ReduceStreamerTestFactory extends ReduceStreamerFactory { @Override - public ServerTest.ReduceStreamerTestFactory.TestReduceHandler createReduceStreamer() { - return new ServerTest.ReduceStreamerTestFactory.TestReduceHandler(); + public ServerTest.ReduceStreamerTestFactory.TestReduceStreamHandler createReduceStreamer() { + return new ServerTest.ReduceStreamerTestFactory.TestReduceStreamHandler(); } - public static class TestReduceHandler extends ReduceStreamer { + public static class TestReduceStreamHandler extends ReduceStreamer { private int sum = 0; @Override public void processMessage( String[] keys, Datum datum, - OutputStreamObserver outputStream, + OutputStreamObserver outputStreamObserver, io.numaproj.numaflow.reducestreamer.model.Metadata md) { sum += Integer.parseInt(new String(datum.getValue())); if (sum > 50) { @@ -216,9 +238,24 @@ public void processMessage( .map(c -> c + "-processed") .toArray(String[]::new); Message message = new Message(String.valueOf(sum).getBytes(), updatedKeys); - outputStream.send(message); + outputStreamObserver.send(message); + // reset sum + sum = 0; } } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + io.numaproj.numaflow.reducestreamer.model.Metadata md) { + String[] updatedKeys = Arrays + .stream(keys) + .map(c -> c + "-processed") + .toArray(String[]::new); + Message message = new Message(String.valueOf(sum).getBytes(), updatedKeys); + outputStreamObserver.send(message); + } } } } diff --git a/src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java new file mode 100644 index 00000000..6056601c --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/reducestreamer/ShutDownActorTest.java @@ -0,0 +1,133 @@ +package io.numaproj.numaflow.reducestreamer; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.AllDeadLetters; +import akka.actor.DeadLetter; +import com.google.protobuf.ByteString; +import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; +import io.numaproj.numaflow.reducestreamer.model.Datum; +import io.numaproj.numaflow.reducestreamer.model.IntervalWindowImpl; +import io.numaproj.numaflow.reducestreamer.model.Message; +import io.numaproj.numaflow.reducestreamer.model.Metadata; +import io.numaproj.numaflow.reducestreamer.model.MetadataImpl; +import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserver; +import io.numaproj.numaflow.reducestreamer.user.ReduceStreamer; +import io.numaproj.numaflow.reducestreamer.user.ReduceStreamerFactory; +import org.junit.Test; + +import java.time.Instant; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +public class ShutDownActorTest { + @Test + public void testFailure() { + final ActorSystem actorSystem = ActorSystem.create("test-system-1"); + CompletableFuture completableFuture = new CompletableFuture<>(); + + String reduceKey = "reduce-key"; + ReduceOuterClass.ReduceRequest.Payload.Builder payloadBuilder = ReduceOuterClass.ReduceRequest.Payload + .newBuilder() + .addKeys(reduceKey); + + ActorRef shutdownActor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .props(completableFuture)); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + ActorRef supervisor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + .props( + new TestExceptionFactory(), + md, + shutdownActor, + new io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver())); + + io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest( + ReduceOuterClass.ReduceRequest.newBuilder() + .setPayload(payloadBuilder + .addKeys("reduce-test") + .setValue(ByteString.copyFromUtf8(String.valueOf(1))) + .build()) + .build()); + supervisor.tell(reduceRequest, ActorRef.noSender()); + + try { + completableFuture.get(); + fail("Expected the future to complete with exception"); + } catch (Exception e) { + assertEquals(e.getMessage(), "java.lang.RuntimeException: UDF Failure"); + } + } + + @Test + public void testDeadLetterHandling() { + final ActorSystem actorSystem = ActorSystem.create("test-system-2"); + CompletableFuture completableFuture = new CompletableFuture<>(); + + ActorRef shutdownActor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor + .props(completableFuture)); + + actorSystem.eventStream().subscribe(shutdownActor, AllDeadLetters.class); + + Metadata md = new MetadataImpl( + new IntervalWindowImpl(Instant.now(), Instant.now())); + + ActorRef supervisor = actorSystem + .actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor + .props( + new TestExceptionFactory(), + md, + shutdownActor, + new ReduceOutputStreamObserver())); + + DeadLetter deadLetter = new DeadLetter("dead-letter", shutdownActor, supervisor); + supervisor.tell(deadLetter, ActorRef.noSender()); + + try { + completableFuture.get(); + fail("Expected the future to complete with exception"); + } catch (Exception e) { + assertEquals(e.getMessage(), "java.lang.Throwable: dead letters"); + } + } + + + public static class TestExceptionFactory extends ReduceStreamerFactory { + + @Override + public TestException createReduceStreamer() { + return new TestException(); + } + + public static class TestException extends ReduceStreamer { + + int count = 0; + + @Override + public void processMessage( + String[] keys, + Datum datum, + OutputStreamObserver outputStream, + Metadata md) { + count += 1; + throw new RuntimeException("UDF Failure"); + } + + @Override + public void handleEndOfStream( + String[] keys, + OutputStreamObserver outputStreamObserver, + Metadata md) { + outputStreamObserver.send(new Message(String.valueOf(count).getBytes())); + } + } + } +}