Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement sdk for session reduce #94

Merged
merged 26 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,23 @@
</to>
</configuration>
</execution>
<execution>
<id>session-reduce-count</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducesession.counter.CountFactory
</mainClass>
</container>
<to>
<image>numaflow-java-examples/session-reduce-count</image>
</to>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.examples.reducesession.counter;

import io.numaproj.numaflow.sessionreducer.Server;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import lombok.extern.slf4j.Slf4j;

/**
* CountFactory extends SessionReducerFactory to support creating instances of SumFunction.
* It also provides a main function to start a server for handling the session reduce stream.
*/
@Slf4j
public class CountFactory extends SessionReducerFactory<CountFunction> {

public static void main(String[] args) throws Exception {
log.info("count udf was invoked");
new Server(new CountFactory()).start();
}

@Override
public CountFunction createSessionReducer() {
return new CountFunction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.numaproj.numaflow.examples.reducesession.counter;

import io.numaproj.numaflow.sessionreducer.model.Datum;
import io.numaproj.numaflow.sessionreducer.model.Message;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

/**
* CountFunction is a simple session reducer which counts the number of events in a session.
*/
@Slf4j
public class CountFunction extends SessionReducer {

private final AtomicInteger count = new AtomicInteger(0);

@Override
public void processMessage(
String[] keys,
Datum datum,
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) {
this.count.incrementAndGet();
}

@Override
public void handleEndOfStream(
String[] keys,
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) {
outputStreamObserver.send(new Message(String.valueOf(this.count.get()).getBytes()));
}

@Override
public byte[] accumulator() {
return String.valueOf(this.count.get()).getBytes();
}

@Override
public void mergeAccumulator(byte[] accumulator) {
int value = 0;
try {
value = Integer.parseInt(new String(accumulator));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
this.count.addAndGet(value);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Datum;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
Expand All @@ -19,9 +20,9 @@ public class SumFunction extends ReduceStreamer {
@Override
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
Datum datum,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@
<exclude>io.numaproj.numaflow.mapstream.v1</exclude>
<exclude>io.numaproj.numaflow.map.v1</exclude>
<exclude>io.numaproj.numaflow.reduce.v1</exclude>
<exclude>io.numaproj.numaflow.sessionreduce.v1</exclude>
<exclude>io.numaproj.numaflow.sourcetransformer.v1</exclude>
<exclude>io.numaproj.numaflow.sink.v1</exclude>
<exclude>io.numaproj.numaflow.sideinput.v1</exclude>
Expand Down Expand Up @@ -337,6 +338,7 @@
<exclude>io/numaproj/numaflow/examples/*</exclude>
<exclude>io/numaproj/numaflow/mapstream/v1/*</exclude>
<exclude>io/numaproj/numaflow/reduce/v1/*</exclude>
<exclude>io/numaproj/numaflow/sessionreduce/v1/*</exclude>
<exclude>io/numaproj/numaflow/map/v1/*</exclude>
<exclude>io/numaproj/numaflow/sourcetransformer/v1/*</exclude>
<exclude>io/numaproj/numaflow/sink/v1/*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/**
* Output actor is a wrapper around the gRPC output stream.
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream.
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream.
* ALL reduce responses are sent to the output actor before getting forwarded to the output gRPC stream.
* <p>
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
private final Metadata md;
private final ActorRef responseStreamActor;
private final ActorRef outputActor;

@Override
public void send(Message message) {
this.responseStreamActor.tell(buildResponse(message), ActorRef.noSender());
this.outputActor.tell(buildResponse(message), ActorRef.noSender());
}

private ActorResponse buildResponse(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ class ReduceStreamerActor extends AbstractActor {
private OutputStreamObserver outputStream;

public static Props props(
String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef responseStreamActor) {
String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef outputActor) {
return Props.create(
ReduceStreamerActor.class,
keys,
md,
groupBy,
new OutputStreamObserverImpl(md, responseStreamActor));
new OutputStreamObserverImpl(md, outputActor));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,31 @@ class SupervisorActor extends AbstractActor {
private final ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory;
private final Metadata md;
private final ActorRef shutdownActor;
private final ActorRef responseStreamActor;
private final ActorRef outputActor;
private final Map<String, ActorRef> actorsMap = new HashMap<>();

public SupervisorActor(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor) {
ActorRef outputActor) {
this.reduceStreamerFactory = reduceStreamerFactory;
this.md = md;
this.shutdownActor = shutdownActor;
this.responseStreamActor = responseStreamActor;
this.outputActor = outputActor;
}

public static Props props(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor) {
ActorRef outputActor) {
return Props.create(
SupervisorActor.class,
reduceStreamerFactory,
md,
shutdownActor,
responseStreamActor);
outputActor);
}

// if there is an uncaught exception stop in the supervisor actor, send a signal to shut down
Expand Down Expand Up @@ -101,7 +101,7 @@ private void invokeActor(ActorRequest actorRequest) {
keys,
this.md,
reduceStreamerHandler,
this.responseStreamActor));
this.outputActor));
actorsMap.put(uniqueId, actorRef);
}
HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload());
Expand All @@ -122,9 +122,9 @@ private void handleActorResponse(ActorResponse actorResponse) {
if (actorsMap.isEmpty()) {
// since the actors map is empty, this particular actor response is the last response to forward to output gRPC stream.
actorResponse.setLast(true);
this.responseStreamActor.tell(actorResponse, getSelf());
this.outputActor.tell(actorResponse, getSelf());
} else {
this.responseStreamActor.tell(actorResponse, getSelf());
this.outputActor.tell(actorResponse, getSelf());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.numaproj.numaflow.sessionreducer;

import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Builder;
import lombok.Getter;

/**
* ActorRequest is used by the supervisor actor to distribute session reduce operations to
* individual session reducer actors. One actor request is sent to only one session reducer actor.
*/
@Getter
class ActorRequest {
private final ActorRequestType type;
// the window of the target session the actor request is sent to
private final Sessionreduce.KeyedWindow keyedWindow;
// the new keyed window the target session is to be expanded to
// it is specified only when the actor request is an EXPAND
private final Sessionreduce.KeyedWindow newKeyedWindow;
// the payload of the request
private final Sessionreduce.SessionReduceRequest.Payload payload;
// the id of the merge task this request belongs to, it's equal to the unique id of the merged window,
// it is specified only when the actor request is a GET_ACCUMULATOR
private final String mergeTaskId;

@Builder
private ActorRequest(
ActorRequestType type,
Sessionreduce.KeyedWindow keyedWindow,
Sessionreduce.KeyedWindow newKeyedWindow,
Sessionreduce.SessionReduceRequest.Payload payload,
String mergeTaskId
) {
this.type = type;
this.keyedWindow = keyedWindow;
this.newKeyedWindow = newKeyedWindow;
this.payload = payload;
this.mergeTaskId = mergeTaskId;
}

static class ActorRequestBuilder {
ActorRequest build() {
if (newKeyedWindow != null && type != ActorRequestType.EXPAND) {
throw new IllegalStateException(
"attribute newKeyedWindow can only be set when the request is an EXPAND.");
}
if (newKeyedWindow == null && type == ActorRequestType.EXPAND) {
throw new IllegalStateException(
"attribute newKeyedWindow must be set when the request is an EXPAND.");
}
if (mergeTaskId != null && type != ActorRequestType.GET_ACCUMULATOR) {
throw new IllegalStateException(
"attribute mergeTaskId can only be set when the request is a GET_ACCUMULATOR.");
}
if (mergeTaskId == null && type == ActorRequestType.GET_ACCUMULATOR) {
throw new IllegalStateException(
"attribute mergeTaskId must be set when the request is a GET_ACCUMULATOR.");
}
return new ActorRequest(type, keyedWindow, newKeyedWindow, payload, mergeTaskId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.numaproj.numaflow.sessionreducer;

/**
* ActorRequestType represents the purpose of an ActorRequest.
*/
public enum ActorRequestType {
// open a brand-new session window
OPEN,
// append a message to an existing session window
APPEND,
// close a session window
CLOSE,
// expand a session window
EXPAND,
// ask a to-be-merged session window for it's accumulator
GET_ACCUMULATOR,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.numaproj.numaflow.sessionreducer;

import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

/**
* The actor response holds the session reduce response from a particular session window.
*/
@Getter
@Setter
class ActorResponse {
Sessionreduce.SessionReduceResponse response;
// The isLast attribute indicates whether the response is globally the last one to be sent to
// the output gRPC stream, if set to true, it means the response is the very last response among
// all windows. When output actor receives an isLast response, it sends the response to and immediately
// closes the output stream.
boolean isLast;
// The accumulator attribute holds the accumulator of the session.
byte[] accumulator;
// The mergeTaskId attribute holds the id of the merged window that this session is to be merged into.
String mergeTaskId;

@Builder
private ActorResponse(
Sessionreduce.SessionReduceResponse response,
boolean isLast,
byte[] accumulator,
String mergeTaskId
) {
this.response = response;
this.isLast = isLast;
this.accumulator = accumulator;
this.mergeTaskId = mergeTaskId;
}

static class ActorResponseBuilder {
ActorResponse build() {
if ((accumulator != null && mergeTaskId == null) || (accumulator == null
&& mergeTaskId != null)) {
throw new IllegalStateException(
"attributes accumulator and mergeTaskId should be either both null or both non-null.");
}
return new ActorResponse(response, isLast, accumulator, mergeTaskId);
}
}

boolean isEOFResponse() {
return this.accumulator == null && this.mergeTaskId == null;
}
}
15 changes: 15 additions & 0 deletions src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.numaproj.numaflow.sessionreducer;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sessionreduce.sock";

public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sessionreducer-server-info";

public static final String EOF = "EOF";

public static final String SUCCESS = "SUCCESS";

public static final String DELIMITER = ":";
}
Loading
Loading