Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ask #9

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a84e932
add user interfaces
KeranYang Jan 24, 2024
16c0571
copy reduce streamer
KeranYang Jan 24, 2024
12cd9b8
add proto - coverage to fix
KeranYang Jan 24, 2024
d908719
use new proto - coverage to fix
KeranYang Jan 24, 2024
25950d3
open-append-close
KeranYang Jan 25, 2024
518bab6
open-expand-close
KeranYang Jan 25, 2024
92bb0e9
fix unit tests
KeranYang Jan 25, 2024
95a03ae
implement merge operation and add unit tests
KeranYang Jan 26, 2024
717f699
update comments
KeranYang Jan 28, 2024
9e15827
add more error tests and make server test same as golang
KeranYang Jan 28, 2024
4a9c7eb
fix merge into an existing window
KeranYang Jan 28, 2024
7399ea3
fix merge and close race condition
KeranYang Jan 28, 2024
a481c53
properly handle EOF
KeranYang Jan 28, 2024
bf22f7f
add more tests
KeranYang Jan 28, 2024
18c4151
unify actor responses
KeranYang Jan 28, 2024
1e06d61
introduce builder pattern
KeranYang Jan 28, 2024
31c7e1f
clean up logs
KeranYang Jan 28, 2024
720c0db
clean up
KeranYang Jan 28, 2024
144a380
add an example
KeranYang Jan 31, 2024
7b56abe
fix: cover the case when EOF received but no active window
KeranYang Feb 15, 2024
7859b59
.
KeranYang Feb 15, 2024
ff17ebd
add 3 failed test cases - pending fix
KeranYang Feb 14, 2024
a85201e
Merge branch 'main' into session
KeranYang Feb 15, 2024
8cc3eff
fix merge conflict
KeranYang Feb 15, 2024
493be31
make merge a blocking call
KeranYang Mar 11, 2024
d917ee3
remove isMerging
KeranYang Mar 11, 2024
7d8f793
a bit of clean up
KeranYang Mar 11, 2024
0615cd9
clean up
KeranYang Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,23 @@
</to>
</configuration>
</execution>
<execution>
<id>session-reduce-count</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducesession.counter.CountFactory
</mainClass>
</container>
<to>
<image>numaflow-java-examples/session-reduce-count</image>
</to>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.examples.reducesession.counter;

import io.numaproj.numaflow.sessionreducer.Server;
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
import lombok.extern.slf4j.Slf4j;

/**
* CountFactory extends SessionReducerFactory to support creating instances of SumFunction.
* It also provides a main function to start a server for handling the session reduce stream.
*/
@Slf4j
public class CountFactory extends SessionReducerFactory<CountFunction> {

public static void main(String[] args) throws Exception {
log.info("count udf was invoked");
new Server(new CountFactory()).start();
}

@Override
public CountFunction createSessionReducer() {
return new CountFunction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.numaproj.numaflow.examples.reducesession.counter;

import io.numaproj.numaflow.sessionreducer.model.Datum;
import io.numaproj.numaflow.sessionreducer.model.Message;
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

/**
* CountFunction is a simple session reducer which counts the number of events in a session.
*/
@Slf4j
public class CountFunction extends SessionReducer {

private final AtomicInteger count = new AtomicInteger(0);

@Override
public void processMessage(
String[] keys,
Datum datum,
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) {
this.count.incrementAndGet();
}

@Override
public void handleEndOfStream(
String[] keys,
io.numaproj.numaflow.sessionreducer.model.OutputStreamObserver outputStreamObserver) {
outputStreamObserver.send(new Message(String.valueOf(this.count.get()).getBytes()));
}

@Override
public byte[] accumulator() {
return String.valueOf(this.count.get()).getBytes();
}

@Override
public void mergeAccumulator(byte[] accumulator) {
int value = 0;
try {
value = Integer.parseInt(new String(accumulator));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
this.count.addAndGet(value);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Datum;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
Expand All @@ -19,9 +20,9 @@ public class SumFunction extends ReduceStreamer {
@Override
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
Datum datum,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@
<exclude>io.numaproj.numaflow.mapstream.v1</exclude>
<exclude>io.numaproj.numaflow.map.v1</exclude>
<exclude>io.numaproj.numaflow.reduce.v1</exclude>
<exclude>io.numaproj.numaflow.sessionreduce.v1</exclude>
<exclude>io.numaproj.numaflow.sourcetransformer.v1</exclude>
<exclude>io.numaproj.numaflow.sink.v1</exclude>
<exclude>io.numaproj.numaflow.sideinput.v1</exclude>
Expand Down Expand Up @@ -337,6 +338,7 @@
<exclude>io/numaproj/numaflow/examples/*</exclude>
<exclude>io/numaproj/numaflow/mapstream/v1/*</exclude>
<exclude>io/numaproj/numaflow/reduce/v1/*</exclude>
<exclude>io/numaproj/numaflow/sessionreduce/v1/*</exclude>
<exclude>io/numaproj/numaflow/map/v1/*</exclude>
<exclude>io/numaproj/numaflow/sourcetransformer/v1/*</exclude>
<exclude>io/numaproj/numaflow/sink/v1/*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/**
* Output actor is a wrapper around the gRPC output stream.
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream.
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream.
* ALL reduce responses are sent to the output actor before getting forwarded to the output gRPC stream.
* <p>
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
private final Metadata md;
private final ActorRef responseStreamActor;
private final ActorRef outputActor;

@Override
public void send(Message message) {
this.responseStreamActor.tell(buildResponse(message), ActorRef.noSender());
this.outputActor.tell(buildResponse(message), ActorRef.noSender());
}

private ActorResponse buildResponse(Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ class ReduceStreamerActor extends AbstractActor {
private OutputStreamObserver outputStream;

public static Props props(
String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef responseStreamActor) {
String[] keys, Metadata md, ReduceStreamer groupBy, ActorRef outputActor) {
return Props.create(
ReduceStreamerActor.class,
keys,
md,
groupBy,
new OutputStreamObserverImpl(md, responseStreamActor));
new OutputStreamObserverImpl(md, outputActor));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,31 @@ class SupervisorActor extends AbstractActor {
private final ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory;
private final Metadata md;
private final ActorRef shutdownActor;
private final ActorRef responseStreamActor;
private final ActorRef outputActor;
private final Map<String, ActorRef> actorsMap = new HashMap<>();

public SupervisorActor(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor) {
ActorRef outputActor) {
this.reduceStreamerFactory = reduceStreamerFactory;
this.md = md;
this.shutdownActor = shutdownActor;
this.responseStreamActor = responseStreamActor;
this.outputActor = outputActor;
}

public static Props props(
ReduceStreamerFactory<? extends ReduceStreamer> reduceStreamerFactory,
Metadata md,
ActorRef shutdownActor,
ActorRef responseStreamActor) {
ActorRef outputActor) {
return Props.create(
SupervisorActor.class,
reduceStreamerFactory,
md,
shutdownActor,
responseStreamActor);
outputActor);
}

// if there is an uncaught exception stop in the supervisor actor, send a signal to shut down
Expand Down Expand Up @@ -101,7 +101,7 @@ private void invokeActor(ActorRequest actorRequest) {
keys,
this.md,
reduceStreamerHandler,
this.responseStreamActor));
this.outputActor));
actorsMap.put(uniqueId, actorRef);
}
HandlerDatum handlerDatum = constructHandlerDatum(actorRequest.getRequest().getPayload());
Expand All @@ -122,9 +122,9 @@ private void handleActorResponse(ActorResponse actorResponse) {
if (actorsMap.isEmpty()) {
// since the actors map is empty, this particular actor response is the last response to forward to output gRPC stream.
actorResponse.setLast(true);
this.responseStreamActor.tell(actorResponse, getSelf());
this.outputActor.tell(actorResponse, getSelf());
} else {
this.responseStreamActor.tell(actorResponse, getSelf());
this.outputActor.tell(actorResponse, getSelf());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.numaproj.numaflow.sessionreducer;

import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Builder;
import lombok.Getter;

/**
* ActorRequest is used by the supervisor actor to distribute session reduce operations to
* individual session reducer actors. One actor request is sent to only one session reducer actor.
*/
@Getter
class ActorRequest {
private final ActorRequestType type;
// the window of the target session the actor request is sent to
private final Sessionreduce.KeyedWindow keyedWindow;
// the new keyed window the target session is to be expanded to
// it is specified only when the actor request is an EXPAND
private final Sessionreduce.KeyedWindow newKeyedWindow;
// the payload of the request
private final Sessionreduce.SessionReduceRequest.Payload payload;
// the id of the merge task this request belongs to, it's equal to the unique id of the merged window,
// it is specified only when the actor request is a GET_ACCUMULATOR
private final String mergeTaskId;

@Builder
private ActorRequest(
ActorRequestType type,
Sessionreduce.KeyedWindow keyedWindow,
Sessionreduce.KeyedWindow newKeyedWindow,
Sessionreduce.SessionReduceRequest.Payload payload,
String mergeTaskId
) {
this.type = type;
this.keyedWindow = keyedWindow;
this.newKeyedWindow = newKeyedWindow;
this.payload = payload;
this.mergeTaskId = mergeTaskId;
}

static class ActorRequestBuilder {
ActorRequest build() {
if (newKeyedWindow != null && type != ActorRequestType.EXPAND) {
throw new IllegalStateException(
"attribute newKeyedWindow can only be set when the request is an EXPAND.");
}
if (newKeyedWindow == null && type == ActorRequestType.EXPAND) {
throw new IllegalStateException(
"attribute newKeyedWindow must be set when the request is an EXPAND.");
}
if (mergeTaskId != null && type != ActorRequestType.GET_ACCUMULATOR) {
throw new IllegalStateException(
"attribute mergeTaskId can only be set when the request is a GET_ACCUMULATOR.");
}
if (mergeTaskId == null && type == ActorRequestType.GET_ACCUMULATOR) {
throw new IllegalStateException(
"attribute mergeTaskId must be set when the request is a GET_ACCUMULATOR.");
}
return new ActorRequest(type, keyedWindow, newKeyedWindow, payload, mergeTaskId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.numaproj.numaflow.sessionreducer;

/**
* ActorRequestType represents the purpose of an ActorRequest.
*/
public enum ActorRequestType {
// open a brand-new session window
OPEN,
// append a message to an existing session window
APPEND,
// close a session window
CLOSE,
// expand a session window
EXPAND,
// ask a to-be-merged session window for it's accumulator
GET_ACCUMULATOR,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.numaproj.numaflow.sessionreducer;

import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

/**
* The actor response holds the session reduce response from a particular session window.
*/
@Getter
@Setter
class ActorResponse {
Sessionreduce.SessionReduceResponse response;
// The isLast attribute indicates whether the response is globally the last one to be sent to
// the output gRPC stream, if set to true, it means the response is the very last response among
// all windows. When output actor receives an isLast response, it sends the response to and immediately
// closes the output stream.
boolean isLast;
// The accumulator attribute holds the accumulator of the session.
byte[] accumulator;
// The mergeTaskId attribute holds the id of the merged window that this session is to be merged into.
String mergeTaskId;

@Builder
private ActorResponse(
Sessionreduce.SessionReduceResponse response,
boolean isLast,
byte[] accumulator,
String mergeTaskId
) {
this.response = response;
this.isLast = isLast;
this.accumulator = accumulator;
this.mergeTaskId = mergeTaskId;
}

static class ActorResponseBuilder {
ActorResponse build() {
if ((accumulator != null && mergeTaskId == null) || (accumulator == null
&& mergeTaskId != null)) {
throw new IllegalStateException(
"attributes accumulator and mergeTaskId should be either both null or both non-null.");
}
return new ActorResponse(response, isLast, accumulator, mergeTaskId);
}
}

boolean isEOFResponse() {
return this.accumulator == null && this.mergeTaskId == null;
}
}
15 changes: 15 additions & 0 deletions src/main/java/io/numaproj/numaflow/sessionreducer/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.numaproj.numaflow.sessionreducer;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/sessionreduce.sock";

public static final String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sessionreducer-server-info";

public static final String EOF = "EOF";

public static final String SUCCESS = "SUCCESS";

public static final String DELIMITER = ":";
}
Loading
Loading