Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement reduce stream sdk #91

Merged
merged 11 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@
</to>
</configuration>
</execution>
<execution>
<id>reduce-stream-sum</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory
</mainClass>
</container>
<to>
<image>numaflow-java-examples/reduce-stream-sum</image>
</to>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.Server;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SumFactory extends ReduceStreamerFactory<SumFunction> {

public static void main(String[] args) throws Exception {
log.info("sum udf was invoked");
new Server(new SumFactory()).start();
}

@Override
public SumFunction createReduceStreamer() {
return new SumFunction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import lombok.extern.slf4j.Slf4j;

/**
* SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys
* and outputs the sum when the sum is greater than 100.
* When the input stream closes, the function outputs the sum no matter what value it holds.
*/
@Slf4j
public class SumFunction extends ReduceStreamer {

private int sum = 0;

@Override
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
if (sum >= 100) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
sum = 0;
}
}

@Override
public void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
Metadata md) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorRequest is a wrapper of the gRpc input request.
* It is constructed by the service when service receives an input request and then sent to
* the supervisor actor, to be distributed to reduce streamer actors.
*/
@Getter
@AllArgsConstructor
class ActorRequest {
ReduceOuterClass.ReduceRequest request;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getRequest().getPayload().getKeysList().toArray(new String[0]));
}

public String[] getKeySet() {
return this.getRequest().getPayload().getKeysList().toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorResponse is for child reduce streamer actors to report back to the supervisor actor about the status of the data processing.
* It serves two purposes:
* 1. Send to the supervisor an EOF response, which is to be sent to the gRPC output stream.
* 2. Send to the supervisor a signal, indicating that the actor has finished all its processing work,
* and it's ready to be cleaned up by the supervisor actor.
*/
@Getter
@AllArgsConstructor
class ActorResponse {
ReduceOuterClass.ReduceResponse response;
ActorResponseType type;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getActorUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.numaproj.numaflow.reducestreamer;

enum ActorResponseType {
// EOF_RESPONSE indicates that the actor response contains an EOF reduce response without real data.
EOF_RESPONSE,
// READY_FOR_CLEAN_UP_SIGNAL indicates that the actor has finished sending responses and now ready to be cleaned up.
READY_FOR_CLEAN_UP_SIGNAL,
}
13 changes: 13 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.numaproj.numaflow.reducestreamer;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reducestream.sock";

public static final String EOF = "EOF";

public static final String SUCCESS = "SUCCESS";

public static final String DELIMITER = ":";
}
26 changes: 26 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.info.ServerInfoAccessor;
import lombok.Builder;
import lombok.Getter;

/**
* GRPCConfig is used to provide configurations for gRPC server.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

/**
* Static method to create default GRPCConfig.
*/
static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.Datum;
import lombok.AllArgsConstructor;

import java.time.Instant;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;

@Override
public Instant getWatermark() {
return this.watermark;
}

@Override
public byte[] getValue() {
return this.value;
}

@Override
public Instant getEventTime() {
return this.eventTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
import lombok.AllArgsConstructor;

import java.time.Instant;

@AllArgsConstructor
class IntervalWindowImpl implements IntervalWindow {
private final Instant startTime;
private final Instant endTime;

@Override
public Instant getStartTime() {
return this.startTime;
}

@Override
public Instant getEndTime() {
return this.endTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import lombok.AllArgsConstructor;

@AllArgsConstructor
class MetadataImpl implements Metadata {
private final IntervalWindow intervalWindow;

@Override
public IntervalWindow getIntervalWindow() {
return intervalWindow;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.ActorRef;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
import lombok.AllArgsConstructor;

@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
private final ActorRef responseStreamActor;

@Override
public void send(Message message) {
this.responseStreamActor.tell(message, ActorRef.noSender());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.AllDeadLetters;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Reduce shutdown actor, listens to exceptions and handles shutdown.
*/
@Slf4j
@AllArgsConstructor
class ReduceShutdownActor extends AbstractActor {
private final CompletableFuture<Void> failureFuture;

public static Props props(
CompletableFuture<Void> failureFuture) {
return Props.create(ReduceShutdownActor.class, failureFuture);
}

@Override
public void preRestart(Throwable reason, Optional<Object> message) {
failureFuture.completeExceptionally(reason);
}

@Override
public Receive createReceive() {
return ReceiveBuilder
.create()
.match(Throwable.class, this::shutdown)
.match(String.class, this::completedSuccessfully)
.match(AllDeadLetters.class, this::handleDeadLetters)
.build();
}

/*
complete the future with exception so that the exception will be thrown
indicate that same to response observer.
*/
private void shutdown(Throwable throwable) {
log.debug("got a shut down exception");
failureFuture.completeExceptionally(throwable);
}

// if there are no exceptions, complete the future without exception.
private void completedSuccessfully(String eof) {
log.debug("completed successfully of shutdown executed");
failureFuture.complete(null);
// if all the actors completed successfully, we can stop the shutdown actor.
getContext().getSystem().stop(getSelf());
}

// if we see dead letters, we need to stop the execution and exit
// to make sure no messages are lost
private void handleDeadLetters(AllDeadLetters deadLetter) {
log.debug("got a dead letter, stopping the execution");
failureFuture.completeExceptionally(new Throwable("dead letters"));
getContext().getSystem().stop(getSelf());
}
}
Loading
Loading