Skip to content

Commit

Permalink
fix: require event time when dropping messages (#86)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Dec 4, 2023
1 parent a072f6d commit 06b90a0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
* This is a simple User Defined Function example which receives a message, applies the following
* data transformation, and returns the message.
* <p>
* If the message event time is before year 2022, drop the message. If it's within year 2022, update
* the tag to "within_year_2022" and update the message event time to Jan 1st 2022.
* If the message event time is before year 2022, drop the message with the original event time.
* If it's within year 2022, update the tag to "within_year_2022" and update the message event time to Jan 1st 2022.
* Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the
* message event time to Jan 1st 2023.
*/
Expand All @@ -31,7 +31,7 @@ public MessageList processMessage(String[] keys, Datum data) {
Instant eventTime = data.getEventTime();

if (eventTime.isBefore(januaryFirst2022)) {
return MessageList.newBuilder().addMessage(Message.toDrop()).build();
return MessageList.newBuilder().addMessage(Message.toDrop(eventTime)).build();
} else if (eventTime.isBefore(januaryFirst2023)) {
return MessageList
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
@Getter
public class Message {
public static final String DROP = "U+005C__DROP__";
// Watermark are at millisecond granularity, hence we use epoch(0) - 1 to indicate watermark is not available.
// EventTimeForDrop is used to indicate that the message is dropped hence, excluded from watermark calculation
private static final Instant EventTimeForDrop = Instant.ofEpochMilli(-1);
private final String[] keys;
private final byte[] value;
private final Instant eventTime;
Expand Down Expand Up @@ -56,14 +53,18 @@ public Message(byte[] value, Instant eventTime, String[] keys) {
}

/**
* creates a Message which will be dropped
* creates a Message which will be dropped.
*
* @param eventTime message eventTime is required because even though a message is dropped,
* we consider it as being processed, hence it should be counted in the watermark calculation
* using the provided event time.
*
* @return returns the Message which will be dropped
*/
public static Message toDrop() {
public static Message toDrop(Instant eventTime) {
return new Message(
new byte[0],
EventTimeForDrop,
eventTime,
null,
new String[]{DROP});
}
Expand Down

0 comments on commit 06b90a0

Please sign in to comment.