diff --git a/src/main/java/io/numaproj/numaflow/mapper/Datum.java b/src/main/java/io/numaproj/numaflow/mapper/Datum.java index daa59780..028dd264 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Datum.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Datum.java @@ -2,6 +2,7 @@ import java.time.Instant; +import java.util.Map; /** * Datum contains methods to get the payload information. @@ -28,4 +29,11 @@ public interface Datum { * @return returns the watermark */ public Instant getWatermark(); + + /** + * method to get the headers information of the payload + * + * @return returns the headers in the form of key value pair + */ + public Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java index 5c7709d3..17945596 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import java.time.Instant; +import java.util.Map; @AllArgsConstructor class HandlerDatum implements Datum { @@ -11,6 +12,7 @@ class HandlerDatum implements Datum { private byte[] value; private Instant watermark; private Instant eventTime; + private Map headers; @Override @@ -28,4 +30,9 @@ public Instant getEventTime() { return this.eventTime; } + @Override + public Map getHeaders() { + return this.headers; + } + } diff --git a/src/main/java/io/numaproj/numaflow/mapper/Service.java b/src/main/java/io/numaproj/numaflow/mapper/Service.java index 80e4c06d..c0260433 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Service.java @@ -42,7 +42,8 @@ public void mapFn( request.getWatermark().getNanos()), Instant.ofEpochSecond( request.getEventTime().getSeconds(), - request.getEventTime().getNanos()) + request.getEventTime().getNanos()), + request.getHeadersMap() ); // process request diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java index 020126b7..379ec499 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.mapstreamer; import java.time.Instant; +import java.util.Map; /** * Datum contains methods to get the payload information. @@ -27,4 +28,11 @@ public interface Datum { * @return returns the watermark */ public Instant getWatermark(); + + /** + * method to get the headers information of the payload + * + * @return returns the headers in the form of key value pair + */ + public Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/mapstreamer/HandlerDatum.java index d6d42099..65154f9b 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/HandlerDatum.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import java.time.Instant; +import java.util.Map; @AllArgsConstructor class HandlerDatum implements Datum { @@ -11,6 +12,7 @@ class HandlerDatum implements Datum { private byte[] value; private Instant watermark; private Instant eventTime; + private Map headers; @Override @@ -28,4 +30,9 @@ public Instant getEventTime() { return this.eventTime; } + @Override + public Map getHeaders() { + return this.headers; + } + } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java index 41078521..8ccd0913 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java @@ -40,7 +40,8 @@ public void mapStreamFn( request.getWatermark().getNanos()), Instant.ofEpochSecond( request.getEventTime().getSeconds(), - request.getEventTime().getNanos()) + request.getEventTime().getNanos()), + request.getHeadersMap() ); // process Datum diff --git a/src/main/java/io/numaproj/numaflow/reducer/Datum.java b/src/main/java/io/numaproj/numaflow/reducer/Datum.java index 5af27c55..3b7165fc 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Datum.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Datum.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.reducer; import java.time.Instant; +import java.util.Map; /** * Datum contains methods to get the payload information. @@ -27,4 +28,11 @@ public interface Datum { * @return returns the watermark */ public Instant getWatermark(); + + /** + * method to get the headers information of the payload + * + * @return returns the headers in the form of key value pair + */ + public Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java index 612160e0..95166624 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java @@ -4,12 +4,14 @@ import lombok.AllArgsConstructor; import java.time.Instant; +import java.util.Map; @AllArgsConstructor class HandlerDatum implements Datum { private byte[] value; private Instant watermark; private Instant eventTime; + private Map headers; @Override public Instant getWatermark() { @@ -25,4 +27,9 @@ public byte[] getValue() { public Instant getEventTime() { return this.eventTime; } + + @Override + public Map getHeaders() { + return this.headers; + } } diff --git a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java index ec3b8750..ca344967 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducer/ReduceSupervisorActor.java @@ -135,7 +135,8 @@ private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payloa payload.getWatermark().getNanos()), Instant.ofEpochSecond( payload.getEventTime().getSeconds(), - payload.getEventTime().getNanos()) + payload.getEventTime().getNanos()), + payload.getHeadersMap() ); } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java index a36df0ce..2ba53678 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/HandlerDatum.java @@ -4,12 +4,14 @@ import lombok.AllArgsConstructor; import java.time.Instant; +import java.util.Map; @AllArgsConstructor class HandlerDatum implements Datum { private byte[] value; private Instant watermark; private Instant eventTime; + private Map headers; @Override public Instant getWatermark() { @@ -25,4 +27,9 @@ public byte[] getValue() { public Instant getEventTime() { return this.eventTime; } + + @Override + public Map getHeaders() { + return this.headers; + } } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java index b5d5f4dd..42c15ab2 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/SupervisorActor.java @@ -136,7 +136,8 @@ private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payloa payload.getWatermark().getNanos()), Instant.ofEpochSecond( payload.getEventTime().getSeconds(), - payload.getEventTime().getNanos()) + payload.getEventTime().getNanos()), + payload.getHeadersMap() ); } diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java index a590a1ce..c0d28a08 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/model/Datum.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.reducestreamer.model; import java.time.Instant; +import java.util.Map; /** * Datum contains methods to get the payload information. @@ -26,4 +27,11 @@ public interface Datum { * @return returns the watermark */ Instant getWatermark(); + + /** + * method to get the headers information of the payload + * + * @return returns the headers in the form of key value pair + */ + public Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/sessionreducer/HandlerDatum.java index 26080d9a..bc7b60ba 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/HandlerDatum.java @@ -4,12 +4,14 @@ import lombok.AllArgsConstructor; import java.time.Instant; +import java.util.Map; @AllArgsConstructor class HandlerDatum implements Datum { private byte[] value; private Instant watermark; private Instant eventTime; + private Map headers; @Override public Instant getWatermark() { @@ -25,4 +27,9 @@ public byte[] getValue() { public Instant getEventTime() { return this.eventTime; } + + @Override + public Map getHeaders() { + return this.headers; + } } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java b/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java index 0c7b7eb2..499cebe3 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/SupervisorActor.java @@ -357,7 +357,8 @@ private HandlerDatum constructHandlerDatum(Sessionreduce.SessionReduceRequest.Pa payload.getWatermark().getNanos()), Instant.ofEpochSecond( payload.getEventTime().getSeconds(), - payload.getEventTime().getNanos()) + payload.getEventTime().getNanos()), + payload.getHeadersMap() ); } diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java b/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java index 99fddfad..c6d370e3 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/model/Datum.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.sessionreducer.model; import java.time.Instant; +import java.util.Map; /** * Datum contains methods to get the payload information. @@ -26,4 +27,11 @@ public interface Datum { * @return returns the watermark */ Instant getWatermark(); + + /** + * method to get the headers information of the payload + * + * @return returns the headers in the form of key value pair + */ + public Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Datum.java b/src/main/java/io/numaproj/numaflow/sinker/Datum.java index ae845e55..bceb2f62 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Datum.java @@ -1,6 +1,7 @@ package io.numaproj.numaflow.sinker; import java.time.Instant; +import java.util.Map; /** * Datum contains methods to get the payload information. @@ -40,4 +41,11 @@ public interface Datum { * @return returns the ID */ String getId(); + + /** + * method to get the headers information of the payload + * + * @return returns the headers in the form of key value pair + */ + public Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java index be9359ca..60e9a2b2 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java @@ -3,17 +3,19 @@ import lombok.AllArgsConstructor; import java.time.Instant; +import java.util.Map; @AllArgsConstructor class HandlerDatum implements Datum { // EOF_DATUM is used to indicate the end of the stream. - static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null); + static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null, null); private String[] keys; private byte[] value; private Instant watermark; private Instant eventTime; private String id; + private Map headers; @Override public String[] getKeys() { @@ -39,4 +41,9 @@ public Instant getEventTime() { public String getId() { return id; } + + @Override + public Map getHeaders() { + return this.headers; + } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 5cc84e2a..243b393f 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -112,7 +112,9 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) { Instant.ofEpochSecond( d.getEventTime().getSeconds(), d.getEventTime().getNanos()), - d.getId()); + d.getId(), + d.getHeadersMap() + ); } public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) { diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Message.java b/src/main/java/io/numaproj/numaflow/sourcer/Message.java index 7423370c..104f9606 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Message.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Message.java @@ -3,6 +3,7 @@ import lombok.Getter; import java.time.Instant; +import java.util.Map; /** * Message is used to wrap the data returned by Sourcer. @@ -15,6 +16,7 @@ public class Message { private final byte[] value; private final Offset offset; private final Instant eventTime; + private final Map headers; /** * used to create Message with value, offset and eventTime. @@ -24,7 +26,7 @@ public class Message { * @param eventTime message eventTime */ public Message(byte[] value, Offset offset, Instant eventTime) { - this(value, offset, eventTime, null); + this(value, offset, eventTime, null, null); } /** @@ -36,9 +38,35 @@ public Message(byte[] value, Offset offset, Instant eventTime) { * @param keys message keys */ public Message(byte[] value, Offset offset, Instant eventTime, String[] keys) { + this(value, offset, eventTime, keys, null); + } + + /** + * used to create Message with value, offset, eventTime and headers. + * + * @param value message value + * @param offset message offset + * @param eventTime message eventTime + * @param headers message headers + */ + public Message(byte[] value, Offset offset, Instant eventTime, Map headers) { + this(value, offset, eventTime, null, headers); + } + + /** + * used to create Message with value, offset, eventTime, keys and headers. + * + * @param value message value + * @param offset message offset + * @param eventTime message eventTime + * @param keys message keys + * @param headers message headers + */ + public Message(byte[] value, Offset offset, Instant eventTime, String[] keys, Map headers) { this.value = value; this.offset = offset; this.eventTime = eventTime; this.keys = keys; + this.headers = headers; } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java index fcadf65c..fb06c4ea 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/OutputObserverImpl.java @@ -7,6 +7,7 @@ import lombok.AllArgsConstructor; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -43,6 +44,7 @@ private SourceOuterClass.ReadResponse buildResponse(Message message) { .getOffset() .getValue())) .setPartitionId(message.getOffset().getPartitionId())) + .putAllHeaders(message.getHeaders() != null ? message.getHeaders() : new HashMap<>()) .build()); return builder.build(); diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java index 05155ae2..91f87917 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Datum.java @@ -2,6 +2,7 @@ import java.time.Instant; +import java.util.Map; /** * Datum contains methods to get the payload information. @@ -28,4 +29,11 @@ public interface Datum { * @return returns the watermark */ public Instant getWatermark(); + + /** + * method to get the headers information of the payload + * + * @return returns the headers in the form of key value pair + */ + public Map getHeaders(); } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java index ddb9be56..e3774ed8 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/HandlerDatum.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import java.time.Instant; +import java.util.Map; @AllArgsConstructor class HandlerDatum implements Datum { @@ -11,6 +12,7 @@ class HandlerDatum implements Datum { private byte[] value; private Instant watermark; private Instant eventTime; + private Map headers; @Override @@ -28,4 +30,8 @@ public Instant getEventTime() { return this.eventTime; } + @Override + public Map getHeaders() { + return this.headers; + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java index 2b3a1356..ff622fed 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java @@ -44,7 +44,8 @@ public void sourceTransformFn( request.getWatermark().getNanos()), Instant.ofEpochSecond( request.getEventTime().getSeconds(), - request.getEventTime().getNanos()) + request.getEventTime().getNanos()), + request.getHeadersMap() ); // process request diff --git a/src/main/proto/map/v1/map.proto b/src/main/proto/map/v1/map.proto index 7e12f069..09842985 100644 --- a/src/main/proto/map/v1/map.proto +++ b/src/main/proto/map/v1/map.proto @@ -23,6 +23,7 @@ message MapRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; } /** diff --git a/src/main/proto/mapstream/v1/mapstream.proto b/src/main/proto/mapstream/v1/mapstream.proto index b700b27b..ab1c51c8 100644 --- a/src/main/proto/mapstream/v1/mapstream.proto +++ b/src/main/proto/mapstream/v1/mapstream.proto @@ -23,6 +23,7 @@ message MapStreamRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; } /** diff --git a/src/main/proto/reduce/v1/reduce.proto b/src/main/proto/reduce/v1/reduce.proto index 93a257b0..895cfccb 100644 --- a/src/main/proto/reduce/v1/reduce.proto +++ b/src/main/proto/reduce/v1/reduce.proto @@ -38,6 +38,7 @@ message ReduceRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; } Payload payload = 1; diff --git a/src/main/proto/sessionreduce/v1/sessionreduce.proto b/src/main/proto/sessionreduce/v1/sessionreduce.proto index 4bac0df4..02da0f73 100644 --- a/src/main/proto/sessionreduce/v1/sessionreduce.proto +++ b/src/main/proto/sessionreduce/v1/sessionreduce.proto @@ -49,6 +49,7 @@ message SessionReduceRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; } Payload payload = 1; diff --git a/src/main/proto/sink/v1/sink.proto b/src/main/proto/sink/v1/sink.proto index 780a57f7..d378bcba 100644 --- a/src/main/proto/sink/v1/sink.proto +++ b/src/main/proto/sink/v1/sink.proto @@ -24,6 +24,7 @@ message SinkRequest { google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; string id = 5; + map headers = 6; } /** diff --git a/src/main/proto/source/v1/source.proto b/src/main/proto/source/v1/source.proto index b757fc1e..48057b35 100644 --- a/src/main/proto/source/v1/source.proto +++ b/src/main/proto/source/v1/source.proto @@ -62,6 +62,10 @@ message ReadResponse { // We add this optional field to support the use case where the user defined source can provide keys for the datum. // e.g. Kafka and Redis Stream message usually include information about the keys. repeated string keys = 4; + // Optional list of headers associated with the datum. + // Headers are the metadata associated with the datum. e.g. Kafka message headers. + // headers are in the form of (key, value) pairs. + map headers = 5; } // Required field holding the result. Result result = 1; diff --git a/src/main/proto/sourcetransform/v1/sourcetransformer.proto b/src/main/proto/sourcetransform/v1/sourcetransformer.proto index f17b7928..f961c774 100644 --- a/src/main/proto/sourcetransform/v1/sourcetransformer.proto +++ b/src/main/proto/sourcetransform/v1/sourcetransformer.proto @@ -25,6 +25,7 @@ message SourceTransformRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; } /** diff --git a/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java b/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java index 0399d8f3..9e941b3d 100644 --- a/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java +++ b/src/test/java/io/numaproj/numaflow/sinker/DatumStreamImplTest.java @@ -4,6 +4,7 @@ import org.junit.Test; import java.time.Instant; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -57,6 +58,11 @@ public Instant getWatermark() { public String getId() { return null; } + + @Override + public Map getHeaders() { + return null; + } } }