forked from numaproj/numaflow-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement reduce stream sdk (numaproj#91)
* Define the user interface as two methods - processMessage and handleEndOfStream. * Continue using akka to handle concurrency. Introduce an OutputActor to make sure we are sending responses back to the gRPC output stream synchronously. Signed-off-by: Keran Yang <[email protected]>
- Loading branch information
Showing
32 changed files
with
1,937 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package io.numaproj.numaflow.examples.reducestreamer.sum; | ||
|
||
import io.numaproj.numaflow.reducestreamer.Server; | ||
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* SumFactory extends ReduceStreamerFactory to support creating instances of SumFunction. | ||
* It also provides a main function to start a server for handling the reduce stream. | ||
*/ | ||
@Slf4j | ||
public class SumFactory extends ReduceStreamerFactory<SumFunction> { | ||
|
||
public static void main(String[] args) throws Exception { | ||
log.info("sum udf was invoked"); | ||
new Server(new SumFactory()).start(); | ||
} | ||
|
||
@Override | ||
public SumFunction createReduceStreamer() { | ||
return new SumFunction(); | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum/SumFunction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package io.numaproj.numaflow.examples.reducestreamer.sum; | ||
|
||
import io.numaproj.numaflow.reducestreamer.model.Message; | ||
import io.numaproj.numaflow.reducestreamer.model.Metadata; | ||
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; | ||
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys | ||
* and outputs the sum when the sum is greater than 100. | ||
* When the input stream closes, the function outputs the sum no matter what value it holds. | ||
*/ | ||
@Slf4j | ||
public class SumFunction extends ReduceStreamer { | ||
|
||
private int sum = 0; | ||
|
||
@Override | ||
public void processMessage( | ||
String[] keys, | ||
io.numaproj.numaflow.reducestreamer.model.Datum datum, | ||
OutputStreamObserver outputStreamObserver, | ||
io.numaproj.numaflow.reducestreamer.model.Metadata md) { | ||
try { | ||
sum += Integer.parseInt(new String(datum.getValue())); | ||
} catch (NumberFormatException e) { | ||
log.info("error while parsing integer - {}", e.getMessage()); | ||
} | ||
if (sum >= 100) { | ||
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes())); | ||
sum = 0; | ||
} | ||
} | ||
|
||
@Override | ||
public void handleEndOfStream( | ||
String[] keys, | ||
OutputStreamObserver outputStreamObserver, | ||
Metadata md) { | ||
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes())); | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
src/main/java/io/numaproj/numaflow/reducestreamer/ActorRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
|
||
/** | ||
* ActorRequest is a wrapper of the gRpc input request. | ||
* It is constructed by the service when service receives an input request and then sent to | ||
* the supervisor actor, to be distributed to reduce streamer actors. | ||
*/ | ||
@Getter | ||
@AllArgsConstructor | ||
class ActorRequest { | ||
ReduceOuterClass.ReduceRequest request; | ||
|
||
// TODO - do we need to include window information in the id? | ||
// for aligned reducer, there is always single window. | ||
// but at the same time, would like to be consistent with GO SDK implementation. | ||
// we will revisit this one later. | ||
public String getUniqueIdentifier() { | ||
return String.join( | ||
Constants.DELIMITER, | ||
this.getRequest().getPayload().getKeysList().toArray(new String[0])); | ||
} | ||
|
||
public String[] getKeySet() { | ||
return this.getRequest().getPayload().getKeysList().toArray(new String[0]); | ||
} | ||
} |
32 changes: 32 additions & 0 deletions
32
src/main/java/io/numaproj/numaflow/reducestreamer/ActorResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; | ||
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
|
||
/** | ||
* The actor response holds the final EOF response for a particular key set. | ||
* <p> | ||
* The isLast attribute indicates whether the response is globally the last one to be sent to | ||
* the output gRPC stream, if set to true, it means the response is the very last response among | ||
* all key sets. When output stream actor receives an isLast response, it sends the response and immediately | ||
* closes the output stream. | ||
*/ | ||
@Getter | ||
@Setter | ||
@AllArgsConstructor | ||
class ActorResponse { | ||
ReduceOuterClass.ReduceResponse response; | ||
boolean isLast; | ||
|
||
// TODO - do we need to include window information in the id? | ||
// for aligned reducer, there is always single window. | ||
// but at the same time, would like to be consistent with GO SDK implementation. | ||
// we will revisit this one later. | ||
public String getActorUniqueIdentifier() { | ||
return String.join( | ||
Constants.DELIMITER, | ||
this.getResponse().getResult().getKeysList().toArray(new String[0])); | ||
} | ||
} |
13 changes: 13 additions & 0 deletions
13
src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
class Constants { | ||
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64; | ||
|
||
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reducestream.sock"; | ||
|
||
public static final String EOF = "EOF"; | ||
|
||
public static final String SUCCESS = "SUCCESS"; | ||
|
||
public static final String DELIMITER = ":"; | ||
} |
26 changes: 26 additions & 0 deletions
26
src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import io.numaproj.numaflow.info.ServerInfoAccessor; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
|
||
/** | ||
* GRPCConfig is used to provide configurations for gRPC server. | ||
*/ | ||
@Getter | ||
@Builder(builderMethodName = "newBuilder") | ||
public class GRPCConfig { | ||
private String socketPath; | ||
private int maxMessageSize; | ||
private String infoFilePath; | ||
|
||
/** | ||
* Static method to create default GRPCConfig. | ||
*/ | ||
static GRPCConfig defaultGrpcConfig() { | ||
return GRPCConfig.newBuilder() | ||
.infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH) | ||
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) | ||
.socketPath(Constants.DEFAULT_SOCKET_PATH).build(); | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import io.numaproj.numaflow.reducestreamer.model.Datum; | ||
import lombok.AllArgsConstructor; | ||
|
||
import java.time.Instant; | ||
|
||
@AllArgsConstructor | ||
class HandlerDatum implements Datum { | ||
private byte[] value; | ||
private Instant watermark; | ||
private Instant eventTime; | ||
|
||
@Override | ||
public Instant getWatermark() { | ||
return this.watermark; | ||
} | ||
|
||
@Override | ||
public byte[] getValue() { | ||
return this.value; | ||
} | ||
|
||
@Override | ||
public Instant getEventTime() { | ||
return this.eventTime; | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
src/main/java/io/numaproj/numaflow/reducestreamer/IntervalWindowImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import io.numaproj.numaflow.reducestreamer.model.IntervalWindow; | ||
import lombok.AllArgsConstructor; | ||
|
||
import java.time.Instant; | ||
|
||
@AllArgsConstructor | ||
class IntervalWindowImpl implements IntervalWindow { | ||
private final Instant startTime; | ||
private final Instant endTime; | ||
|
||
@Override | ||
public Instant getStartTime() { | ||
return this.startTime; | ||
} | ||
|
||
@Override | ||
public Instant getEndTime() { | ||
return this.endTime; | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
src/main/java/io/numaproj/numaflow/reducestreamer/MetadataImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import io.numaproj.numaflow.reducestreamer.model.IntervalWindow; | ||
import io.numaproj.numaflow.reducestreamer.model.Metadata; | ||
import lombok.AllArgsConstructor; | ||
|
||
@AllArgsConstructor | ||
class MetadataImpl implements Metadata { | ||
private final IntervalWindow intervalWindow; | ||
|
||
@Override | ||
public IntervalWindow getIntervalWindow() { | ||
return intervalWindow; | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import akka.actor.AbstractActor; | ||
import akka.actor.Props; | ||
import io.grpc.stub.StreamObserver; | ||
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; | ||
import lombok.AllArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* Output actor is a wrapper around the gRPC output stream. | ||
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream. | ||
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream. | ||
* <p> | ||
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html | ||
*/ | ||
@Slf4j | ||
@AllArgsConstructor | ||
class OutputActor extends AbstractActor { | ||
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver; | ||
|
||
public static Props props( | ||
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) { | ||
return Props.create(OutputActor.class, responseObserver); | ||
} | ||
|
||
@Override | ||
public Receive createReceive() { | ||
return receiveBuilder() | ||
.match(ActorResponse.class, this::handleResponse) | ||
.build(); | ||
} | ||
|
||
private void handleResponse(ActorResponse actorResponse) { | ||
if (actorResponse.isLast()) { | ||
// send the very last response. | ||
responseObserver.onNext(actorResponse.getResponse()); | ||
// close the output stream. | ||
responseObserver.onCompleted(); | ||
// stop the AKKA system right after we close the output stream. | ||
// note: could make more sense if the supervisor actor stops the system, | ||
// but it requires an extra tell. | ||
getContext().getSystem().stop(getSender()); | ||
} else { | ||
responseObserver.onNext(actorResponse.getResponse()); | ||
} | ||
} | ||
} |
49 changes: 49 additions & 0 deletions
49
src/main/java/io/numaproj/numaflow/reducestreamer/OutputStreamObserverImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package io.numaproj.numaflow.reducestreamer; | ||
|
||
import akka.actor.ActorRef; | ||
import com.google.protobuf.ByteString; | ||
import com.google.protobuf.Timestamp; | ||
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; | ||
import io.numaproj.numaflow.reducestreamer.model.Message; | ||
import io.numaproj.numaflow.reducestreamer.model.Metadata; | ||
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver; | ||
import lombok.AllArgsConstructor; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
@AllArgsConstructor | ||
class OutputStreamObserverImpl implements OutputStreamObserver { | ||
private final Metadata md; | ||
private final ActorRef responseStreamActor; | ||
|
||
@Override | ||
public void send(Message message) { | ||
this.responseStreamActor.tell(buildResponse(message), ActorRef.noSender()); | ||
} | ||
|
||
private ActorResponse buildResponse(Message message) { | ||
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); | ||
// set the window using the actor metadata. | ||
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() | ||
.setStart(Timestamp.newBuilder() | ||
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()) | ||
.setNanos(this.md.getIntervalWindow().getStartTime().getNano())) | ||
.setEnd(Timestamp.newBuilder() | ||
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) | ||
.setNanos(this.md.getIntervalWindow().getEndTime().getNano())) | ||
.setSlot("slot-0").build()); | ||
responseBuilder.setEOF(false); | ||
// set the result. | ||
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result | ||
.newBuilder() | ||
.setValue(ByteString.copyFrom(message.getValue())) | ||
.addAllKeys(message.getKeys() | ||
== null ? new ArrayList<>():Arrays.asList(message.getKeys())) | ||
.addAllTags( | ||
message.getTags() == null ? new ArrayList<>():List.of(message.getTags())) | ||
.build()); | ||
return new ActorResponse(responseBuilder.build(), false); | ||
} | ||
} |
Oops, something went wrong.