Skip to content

Commit

Permalink
not working
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Jan 9, 2024
1 parent 615d49b commit d63d88b
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 18 deletions.
3 changes: 0 additions & 3 deletions src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;


@Override
public Instant getWatermark() {
return this.watermark;
Expand All @@ -27,5 +25,4 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

}
37 changes: 29 additions & 8 deletions src/main/java/io/numaproj/numaflow/reducer/ReduceActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -49,18 +50,38 @@ private void getResult(String eof) {

private ActorResponse buildDatumListResponse(MessageList messageList) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
// for aligned window, if we start building the response, it means we already reached EOF.
responseBuilder.setEOF(true);

ReduceOuterClass.ReduceResponse.Result.Builder resultBuilder = ReduceOuterClass.ReduceResponse.Result.newBuilder();
messageList.getMessages().forEach(message -> {
responseBuilder.set
resultBuilder.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : List.of(message.getTags()))
.build();
});


// set result

responseBuilder.setResult()


messageList.getMessages().forEach(message -> {
responseBuilder.addResults(ReduceOuterClass.ReduceResponse.Result.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(
message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : List.of(
message.getTags()))
.build());



});
return new ActorResponse(this.keys, responseBuilder.build());
}

}

1 change: 0 additions & 1 deletion src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ public void onError(Throwable throwable) {
public void onCompleted() {
// indicate the end of input to the supervisor
supervisorActor.tell(Constants.EOF, ActorRef.noSender());

}
};
}
Expand Down
48 changes: 42 additions & 6 deletions src/main/proto/reduce/v1/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import "google/protobuf/empty.proto";
package reduce.v1;

service Reduce {
// ReduceFn applies a reduce function to a request stream.
// ReduceFn applies a reduce function to a stream of reduce requests and sends reduce response back in a streaming fashion.
rpc ReduceFn(stream ReduceRequest) returns (stream ReduceResponse);

// IsReady is the heartbeat endpoint for gRPC.
Expand All @@ -19,22 +19,58 @@ service Reduce {
* ReduceRequest represents a request element.
*/
message ReduceRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
// WindowOperation represents a window operation.
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
APPEND = 4;
}

Event event = 1;
repeated Window windows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

Payload payload = 1;
WindowOperation operation = 2;
}

// Window represents a window.
// Since the client doesn't track keys, window doesn't have a keys field.
message Window {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
}

/**
* ReduceResponse represents a response element.
*/
message ReduceResponse {
// FIXME: put all fields(window, EOF) inside of Result. Reference: https://protobuf.dev/programming-guides/api/#dont-include-primitive-types
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;

Result result = 1;

// window represents a window to which the result belongs.
Window window = 2;

// EOF represents the end of the response for a window.
bool EOF = 3;
}

/**
Expand Down

0 comments on commit d63d88b

Please sign in to comment.