Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support headers for message #98

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/Datum.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -28,4 +29,11 @@ public interface Datum {
* @return returns the watermark
*/
public Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
7 changes: 7 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;


@Override
Expand All @@ -28,4 +30,9 @@ public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}

}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void mapFn(
request.getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getSeconds(),
request.getEventTime().getNanos())
request.getEventTime().getNanos()),
request.getHeadersMap()
);

// process request
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Datum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.mapstreamer;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -27,4 +28,11 @@ public interface Datum {
* @return returns the watermark
*/
public Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {

private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;


@Override
Expand All @@ -28,4 +30,9 @@ public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}

}
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapstreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void mapStreamFn(
request.getWatermark().getNanos()),
Instant.ofEpochSecond(
request.getEventTime().getSeconds(),
request.getEventTime().getNanos())
request.getEventTime().getNanos()),
request.getHeadersMap()
);

// process Datum
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducer/Datum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.reducer;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -27,4 +28,11 @@ public interface Datum {
* @return returns the watermark
*/
public Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
7 changes: 7 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducer/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;

@Override
public Instant getWatermark() {
Expand All @@ -25,4 +27,9 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payloa
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
payload.getEventTime().getNanos()),
payload.getHeadersMap()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;

@Override
public Instant getWatermark() {
Expand All @@ -25,4 +27,9 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private HandlerDatum constructHandlerDatum(ReduceOuterClass.ReduceRequest.Payloa
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
payload.getEventTime().getNanos()),
payload.getHeadersMap()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.reducestreamer.model;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -26,4 +27,11 @@ public interface Datum {
* @return returns the watermark
*/
Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;
private Map<String, String> headers;

@Override
public Instant getWatermark() {
Expand All @@ -25,4 +27,9 @@ public byte[] getValue() {
public Instant getEventTime() {
return this.eventTime;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ private HandlerDatum constructHandlerDatum(Sessionreduce.SessionReduceRequest.Pa
payload.getWatermark().getNanos()),
Instant.ofEpochSecond(
payload.getEventTime().getSeconds(),
payload.getEventTime().getNanos())
payload.getEventTime().getNanos()),
payload.getHeadersMap()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.sessionreducer.model;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand All @@ -26,4 +27,11 @@ public interface Datum {
* @return returns the watermark
*/
Instant getWatermark();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Datum.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.numaproj.numaflow.sinker;

import java.time.Instant;
import java.util.Map;

/**
* Datum contains methods to get the payload information.
Expand Down Expand Up @@ -40,4 +41,11 @@ public interface Datum {
* @return returns the ID
*/
String getId();

/**
* method to get the headers information of the payload
*
* @return returns the headers in the form of key value pair
*/
public Map<String, String> getHeaders();
}
9 changes: 8 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/HandlerDatum.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import lombok.AllArgsConstructor;

import java.time.Instant;
import java.util.Map;

@AllArgsConstructor
class HandlerDatum implements Datum {

// EOF_DATUM is used to indicate the end of the stream.
static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null);
static final HandlerDatum EOF_DATUM = new HandlerDatum(null, null, null, null, null, null);
private String[] keys;
private byte[] value;
private Instant watermark;
private Instant eventTime;
private String id;
private Map<String, String> headers;

@Override
public String[] getKeys() {
Expand All @@ -39,4 +41,9 @@ public Instant getEventTime() {
public String getId() {
return id;
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
4 changes: 3 additions & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ private HandlerDatum constructHandlerDatum(SinkOuterClass.SinkRequest d) {
Instant.ofEpochSecond(
d.getEventTime().getSeconds(),
d.getEventTime().getNanos()),
d.getId());
d.getId(),
d.getHeadersMap()
);
}

public SinkOuterClass.SinkResponse buildResponseList(ResponseList responses) {
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/io/numaproj/numaflow/sourcer/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import lombok.Getter;

import java.time.Instant;
import java.util.Map;

/**
* Message is used to wrap the data returned by Sourcer.
Expand All @@ -15,6 +16,7 @@ public class Message {
private final byte[] value;
private final Offset offset;
private final Instant eventTime;
private final Map<String, String> headers;

/**
* used to create Message with value, offset and eventTime.
Expand All @@ -24,7 +26,7 @@ public class Message {
* @param eventTime message eventTime
*/
public Message(byte[] value, Offset offset, Instant eventTime) {
this(value, offset, eventTime, null);
this(value, offset, eventTime, null, null);
}

/**
Expand All @@ -36,9 +38,23 @@ public Message(byte[] value, Offset offset, Instant eventTime) {
* @param keys message keys
*/
public Message(byte[] value, Offset offset, Instant eventTime, String[] keys) {
this(value, offset, eventTime, keys, null);
}

/**
* used to create Message with value, offset, eventTime, keys and headers.
*
* @param value message value
* @param offset message offset
* @param eventTime message eventTime
* @param keys message keys
* @param headers message headers
*/
public Message(byte[] value, Offset offset, Instant eventTime, String[] keys, Map<String, String> headers) {
this.value = value;
this.offset = offset;
this.eventTime = eventTime;
this.keys = keys;
this.headers = headers;
}
}
Loading
Loading