Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
feat #332: sink support delay message (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jun 5, 2021
1 parent 36d40d6 commit 5f0763a
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PulsarSerializationSchemaWrapper<T> implements PulsarSerializationS
private final SchemaMode schemaMode;
private final SerializableFunction<T, String> topicExtractor;
private final SerializableFunction<T, byte[]> keyExtractor;
private final SerializableFunction<T, Optional<Long>> deliverAtExtractor;

private PulsarSerializationSchemaWrapper(SerializationSchema<T> serializationSchema,
RecordSchemaType recordSchemaType,
Expand All @@ -53,7 +54,8 @@ private PulsarSerializationSchemaWrapper(SerializationSchema<T> serializationSch
DataType dataType,
SchemaMode schemaMode,
SerializableFunction<T, String> topicExtractor,
SerializableFunction<T, byte[]> keyExtractor) {
SerializableFunction<T, byte[]> keyExtractor,
SerializableFunction<T, Optional<Long>> deliverAtExtractor) {
this.serializationSchema = serializationSchema;
this.recordSchemaType = recordSchemaType;
this.schema = schema;
Expand All @@ -62,6 +64,7 @@ private PulsarSerializationSchemaWrapper(SerializationSchema<T> serializationSch
this.schemaMode = checkNotNull(schemaMode);
this.topicExtractor = topicExtractor;
this.keyExtractor = keyExtractor;
this.deliverAtExtractor = deliverAtExtractor;
}

@Override
Expand All @@ -79,6 +82,9 @@ public void serialize(T element, TypedMessageBuilder<T> messageBuilder) {
if (keyExtractor != null) {
messageBuilder.keyBytes(keyExtractor.apply(element));
}
if (deliverAtExtractor != null) {
deliverAtExtractor.apply(element).ifPresent(deliverAt -> messageBuilder.deliverAt(deliverAt));
}
messageBuilder.value(element);
}

Expand Down Expand Up @@ -147,6 +153,7 @@ public static class Builder<T> {
private SchemaMode mode;
private SerializableFunction<T, String> topicExtractor = (T) -> null;
private SerializableFunction<T, byte[]> keyExtractor;
private SerializableFunction<T, Optional<Long>> deliverAtExtractor;

public Builder(SerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
Expand Down Expand Up @@ -199,6 +206,12 @@ public PulsarSerializationSchemaWrapper.Builder<T> setKeyExtractor(
return this;
}

public PulsarSerializationSchemaWrapper.Builder<T> setDeliverAtExtractor(
SerializableFunction<T, Optional<Long>> deliverAtExtractor) {
this.deliverAtExtractor = deliverAtExtractor;
return this;
}

public PulsarSerializationSchemaWrapper<T> build() {
checkNotNull(mode, "Must set mode " +
"use useSpecialMode or useAtomicMode or usePojoMode or useRowMode");
Expand All @@ -210,7 +223,8 @@ public PulsarSerializationSchemaWrapper<T> build() {
dataType,
mode,
topicExtractor,
keyExtractor);
keyExtractor,
deliverAtExtractor);
}
}
}

0 comments on commit 5f0763a

Please sign in to comment.