From d63d88bdf40bc481bc369856bd2570beae60b430 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Tue, 9 Jan 2024 17:28:41 -0500 Subject: [PATCH] not working Signed-off-by: Keran Yang --- .../numaflow/reducer/HandlerDatum.java | 3 -- .../numaflow/reducer/ReduceActor.java | 37 ++++++++++---- .../io/numaproj/numaflow/reducer/Service.java | 1 - src/main/proto/reduce/v1/reduce.proto | 48 ++++++++++++++++--- 4 files changed, 71 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java index 542c8a8f..612160e0 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java @@ -7,12 +7,10 @@ @AllArgsConstructor class HandlerDatum implements Datum { - private byte[] value; private Instant watermark; private Instant eventTime; - @Override public Instant getWatermark() { return this.watermark; @@ -27,5 +25,4 @@ public byte[] getValue() { public Instant getEventTime() { return this.eventTime; } - } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java index 1065c7b5..b84eebff 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java @@ -4,6 +4,7 @@ import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import io.numaproj.numaflow.reduce.v1.ReduceOuterClass; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -49,18 +50,38 @@ private void getResult(String eof) { private ActorResponse buildDatumListResponse(MessageList messageList) { ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder(); + responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder() + .setStart(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getStartTime().getNano())) + .setEnd(Timestamp.newBuilder() + .setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond()) + .setNanos(this.md.getIntervalWindow().getEndTime().getNano())) + .setSlot("slot-0").build()); + // for aligned window, if we start building the response, it means we already reached EOF. + responseBuilder.setEOF(true); + + ReduceOuterClass.ReduceResponse.Result.Builder resultBuilder = ReduceOuterClass.ReduceResponse.Result.newBuilder(); + messageList.getMessages().forEach(message -> { + responseBuilder.set + resultBuilder.setValue(ByteString.copyFrom(message.getValue())) + .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) + .addAllTags(message.getTags() == null ? new ArrayList<>() : List.of(message.getTags())) + .build(); + }); + + + // set result + + responseBuilder.setResult() + + messageList.getMessages().forEach(message -> { - responseBuilder.addResults(ReduceOuterClass.ReduceResponse.Result.newBuilder() - .setValue(ByteString.copyFrom(message.getValue())) - .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList( - message.getKeys())) - .addAllTags(message.getTags() == null ? new ArrayList<>() : List.of( - message.getTags())) - .build()); + + }); return new ActorResponse(this.keys, responseBuilder.build()); } - } diff --git a/src/main/java/io/numaproj/numaflow/reducer/Service.java b/src/main/java/io/numaproj/numaflow/reducer/Service.java index 37048679..7ee45107 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Service.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Service.java @@ -110,7 +110,6 @@ public void onError(Throwable throwable) { public void onCompleted() { // indicate the end of input to the supervisor supervisorActor.tell(Constants.EOF, ActorRef.noSender()); - } }; } diff --git a/src/main/proto/reduce/v1/reduce.proto b/src/main/proto/reduce/v1/reduce.proto index e1afc890..3d9bcd71 100644 --- a/src/main/proto/reduce/v1/reduce.proto +++ b/src/main/proto/reduce/v1/reduce.proto @@ -8,7 +8,7 @@ import "google/protobuf/empty.proto"; package reduce.v1; service Reduce { - // ReduceFn applies a reduce function to a request stream. + // ReduceFn applies a reduce function to a stream of reduce requests and sends reduce response back in a streaming fashion. rpc ReduceFn(stream ReduceRequest) returns (stream ReduceResponse); // IsReady is the heartbeat endpoint for gRPC. @@ -19,22 +19,58 @@ service Reduce { * ReduceRequest represents a request element. */ message ReduceRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; + // WindowOperation represents a window operation. + // For Aligned windows, OPEN, APPEND and CLOSE events are sent. + message WindowOperation { + enum Event { + OPEN = 0; + CLOSE = 1; + APPEND = 4; + } + + Event event = 1; + repeated Window windows = 2; + } + + // Payload represents a payload element. + message Payload { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + } + + Payload payload = 1; + WindowOperation operation = 2; +} + +// Window represents a window. +// Since the client doesn't track keys, window doesn't have a keys field. +message Window { + google.protobuf.Timestamp start = 1; + google.protobuf.Timestamp end = 2; + string slot = 3; } /** * ReduceResponse represents a response element. */ message ReduceResponse { + // FIXME: put all fields(window, EOF) inside of Result. Reference: https://protobuf.dev/programming-guides/api/#dont-include-primitive-types + // Result represents a result element. It contains the result of the reduce function. message Result { repeated string keys = 1; bytes value = 2; repeated string tags = 3; } - repeated Result results = 1; + + Result result = 1; + + // window represents a window to which the result belongs. + Window window = 2; + + // EOF represents the end of the response for a window. + bool EOF = 3; } /**