Skip to content

Commit

Permalink
add handleEndOfStream method
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 17, 2024
1 parent 2174c73 commit c2937a9
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.user.ReduceStreamer;
import lombok.extern.slf4j.Slf4j;

/**
* SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys
* and outputs the sum when the sum is greater than 100.
* When the input stream closes, the function outputs the sum no matter what value it holds.
*/
@Slf4j
public class SumFunction extends ReduceStreamer {

Expand All @@ -14,16 +20,24 @@ public class SumFunction extends ReduceStreamer {
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
OutputStreamObserver outputStream,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
if (sum >= 100) {
outputStream.send(new Message(String.valueOf(sum).getBytes()));
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
sum = 0;
}
}

@Override
public void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
Metadata md) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import lombok.Getter;

/**
* ActorResponse is to store the response from ReduceActors.
* TODO - not required. Remove
* ActorEOFResponse is to store the EOF signal from a ReduceStreamerActor.
* ReduceStreamerActor sends it back to the supervisor actor to indicate that
* the streamer actor itself has finished processing the data and is ready to be
* released.
*/
@Getter
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import lombok.Getter;

/**
* ActorRequest is to store the request sent to ReduceStreamActors.
* ActorRequest is a wrapper of the gRpc input request.
* It is constructed by the service when service receives an input request and then sent to
* the supervisor actor.
*/
@Getter
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
/**
* Shutdown actor, listens to exceptions and handles shutdown.
*/


@Slf4j
@AllArgsConstructor
class ReduceShutdownActor extends AbstractActor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import java.util.List;

/**
* Reduce stream actor invokes the reducer and returns the result.
* Reduce stream actor invokes user defined functions to handle reduce request.
* When receiving an input request, it invokes the processMessage to handle the datum.
* When receiving an EOF signal from the supervisor, it invokes the handleEndOfStream to execute
* the user-defined end of stream processing logics.
*/

@Slf4j
@AllArgsConstructor
public class ReduceStreamerActor extends AbstractActor {
Expand Down Expand Up @@ -53,6 +55,7 @@ private void invokeHandler(HandlerDatum handlerDatum) {
}

private void sendEOF(String EOF) {
this.groupBy.handleEndOfStream(keys, outputStream, md);
getSender().tell(buildEOFResponse(), getSelf());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private void invokeActors(ActorRequest actorRequest) {
String uniqueId = actorRequest.getUniqueIdentifier();
if (!actorsMap.containsKey(uniqueId)) {
ReduceStreamer reduceStreamerHandler = reduceStreamerFactory.createReduceStreamer();
// FIXME - the responseObserver is NOT thread-safe but multiple actors are sharing it.
ActorRef actorRef = getContext()
.actorOf(ReduceStreamerActor.props(
keys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.numaproj.numaflow.reducestreamer.model.Message;

/**
* OutputObserver receives messages from the MapStreamer.
* OutputObserver receives messages from the ReduceStreamer.
*/
public interface OutputStreamObserver {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,36 @@
import io.numaproj.numaflow.reducestreamer.model.Metadata;

/**
* TODO - add descriptions
* ReduceStreamer exposes methods for performing reduce streaming operations.
*/
public abstract class ReduceStreamer {
/**
* processMessage is invoked for each reduce input message.
* It reads the input data from the datum and performs reduce operations for the given keys.
* An output stream is provided for sending back the result to the reduce output stream.
*
* @param keys message keys
* @param datum current message to be processed
* @param outputStream observer of the reduce result, which is used to send back reduce responses
* @param md metadata associated with the window
*/
public abstract void processMessage(
String[] keys,
Datum datum,
OutputStreamObserver outputStream,
Metadata md);

/**
* handleEndOfStream handles the closure of the reduce input stream.
* This method is invoked when the input reduce stream is closed.
* It provides the capability of constructing a final response based on the messages processed so far.
*
* @param keys message keys
* @param outputStreamObserver observer of the reduce result, which is used to send back reduce responses
* @param md metadata associated with the window
*/
public abstract void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
Metadata md);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@


/**
* ReducerFactory is the factory for Reducer.
* <p>
* TODO - do we need this?
* ReduceStreamerFactory is the factory for ReduceStreamer.
*/

public abstract class ReduceStreamerFactory<ReduceStreamerT extends ReduceStreamer> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.protobuf.ByteString;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.Datum;
import io.numaproj.numaflow.reducestreamer.model.IntervalWindowImpl;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.MetadataImpl;
import io.numaproj.numaflow.reducestreamer.user.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.user.ReduceStreamer;
import io.numaproj.numaflow.reducestreamer.user.ReduceStreamerFactory;
import org.junit.Test;

import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class ReduceSupervisorActorTest {
@Test
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();

ActorRef shutdownActor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
.props(completableFuture));

Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

io.numaproj.numaflow.reducer.ReduceOutputStreamObserver outputStreamObserver = new io.numaproj.numaflow.reducer.ReduceOutputStreamObserver();

ActorRef supervisor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
.props(
new TestReduceStreamerFactory(),
md,
shutdownActor,
outputStreamObserver));

for (int i = 1; i <= 10; i++) {
io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest(
ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// all reduce requests share same set of keys.
.addAllKeys(Arrays.asList("key-1", "key-2"))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
}
supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender());

try {
completableFuture.get();
// the observer should receive 2 messages, one is the aggregated result, the other is the EOF response.
assertEquals(2, outputStreamObserver.resultDatum.get().size());
assertEquals("10", outputStreamObserver.resultDatum
.get()
.get(0)
.getResult()
.getValue()
.toStringUtf8());
assertEquals(true, outputStreamObserver.resultDatum
.get()
.get(1)
.getEOF());
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
}

@Test
public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();

ActorRef shutdownActor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceShutdownActor
.props(completableFuture));

Metadata md = new MetadataImpl(
new IntervalWindowImpl(Instant.now(), Instant.now()));

io.numaproj.numaflow.reducestreamer.ReduceOutputStreamObserver outputStreamObserver = new ReduceOutputStreamObserver();
ActorRef supervisor = actorSystem
.actorOf(io.numaproj.numaflow.reducestreamer.ReduceSupervisorActor
.props(
new TestReduceStreamerFactory(),
md,
shutdownActor,
outputStreamObserver)
);

for (int i = 1; i <= 10; i++) {
io.numaproj.numaflow.reducestreamer.ActorRequest reduceRequest = new io.numaproj.numaflow.reducestreamer.ActorRequest(
ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
// each request contain a unique set of keys.
.addAllKeys(Arrays.asList("shared-key", "unique-key-" + i))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build());
supervisor.tell(reduceRequest, ActorRef.noSender());
}

supervisor.tell(io.numaproj.numaflow.reducestreamer.Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.
assertEquals(20, outputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 20; i++) {
ReduceOuterClass.ReduceResponse response = outputStreamObserver.resultDatum
.get()
.get(i);
assertTrue(response.getResult().getValue().toStringUtf8().equals("1")
|| response.getEOF());
}
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
}

public static class TestReduceStreamerFactory extends ReduceStreamerFactory<TestReduceStreamerFactory.TestReduceStreamerHandler> {
@Override
public TestReduceStreamerHandler createReduceStreamer() {
return new TestReduceStreamerHandler();
}

public static class TestReduceStreamerHandler extends ReduceStreamer {
int count = 0;

@Override
public void processMessage(
String[] keys,
Datum datum,
OutputStreamObserver outputStream,
Metadata md) {
count += 1;
}

@Override
public void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
Metadata md) {
outputStreamObserver.send(new Message(String.valueOf(count).getBytes()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
}
}

public static class ReduceStreamerErrTestFactory extends ReduceStreamerFactory<ServerErrTest.ReduceStreamerErrTestFactory.TestReduceHandler> {
public static class ReduceStreamerErrTestFactory extends ReduceStreamerFactory<ServerErrTest.ReduceStreamerErrTestFactory.TestReduceStreamHandler> {
@Override
public TestReduceHandler createReduceStreamer() {
return new ServerErrTest.ReduceStreamerErrTestFactory.TestReduceHandler();
public TestReduceStreamHandler createReduceStreamer() {
return new ServerErrTest.ReduceStreamerErrTestFactory.TestReduceStreamHandler();
}

public static class TestReduceHandler extends ReduceStreamer {
public static class TestReduceStreamHandler extends ReduceStreamer {
@Override
public void processMessage(
String[] keys,
Expand All @@ -166,6 +166,14 @@ public void processMessage(
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
throw new RuntimeException("unknown exception");
}

@Override
public void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {

}
}
}
}
Loading

0 comments on commit c2937a9

Please sign in to comment.