From a2164623eb922d7f51111d07b1d422cb719c97bd Mon Sep 17 00:00:00 2001 From: Jianyun Zhao Date: Wed, 23 Jun 2021 00:05:13 +0800 Subject: [PATCH] fix lost first message --- .../pulsar/FlinkPulsarRowSource.java | 3 +- .../connectors/pulsar/FlinkPulsarSource.java | 12 ++++- .../pulsar/internal/PulsarFetcher.java | 8 +++- .../pulsar/internal/ReaderThread.java | 48 +++++-------------- .../pulsar/FlinkPulsarSourceTest.java | 4 +- 5 files changed, 33 insertions(+), 42 deletions(-) diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java index b7c2331b..f66f4dff 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; /** * Emit Pulsar message as Row to Flink. @@ -72,7 +73,7 @@ public TypeInformation getProducedType() { protected PulsarFetcher createFetcher( SourceContext sourceContext, Map seedTopicsWithInitialOffsets, - SerializedValue> watermarksPeriodic, + Set excludeStartMessageIds, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index a5a5aa59..c4afaaed 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -56,6 +56,7 @@ import org.apache.pulsar.shade.com.google.common.collect.Maps; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -167,6 +168,8 @@ public class FlinkPulsarSource */ private transient volatile TreeMap restoredState; + private transient volatile Set excludeStartMessageIds; + /** Accessor for state in the operator state backend. */ private transient ListState> unionOffsetStates; @@ -361,6 +364,7 @@ public void open(Configuration parameters) throws Exception { this.metadataReader = createMetadataReader(); ownedTopicStarts = new HashMap<>(); + excludeStartMessageIds = new HashSet<>(); Set allTopics = metadataReader.discoverTopicChanges(); log.info("Discovered topics : {}", allTopics); @@ -373,7 +377,10 @@ public void open(Configuration parameters) throws Exception { restoredState.entrySet().stream() .filter(e -> SourceSinkUtils.belongsTo(e.getKey(), numParallelTasks, taskIndex)) - .forEach(e -> ownedTopicStarts.put(e.getKey(), e.getValue())); + .forEach(e -> { + ownedTopicStarts.put(e.getKey(), e.getValue()); + excludeStartMessageIds.add(e.getKey()); + }); Set goneTopics = Sets.difference(restoredState.keySet(), allTopics).stream() .filter(k -> SourceSinkUtils.belongsTo(k, numParallelTasks, taskIndex)) @@ -473,6 +480,7 @@ public void onException(Throwable cause) { this.pulsarFetcher = createFetcher( ctx, ownedTopicStarts, + excludeStartMessageIds, periodicWatermarkAssigner, punctuatedWatermarkAssigner, streamingRuntime.getProcessingTimeService(), @@ -494,6 +502,7 @@ public void onException(Throwable cause) { protected PulsarFetcher createFetcher( SourceContext sourceContext, Map seedTopicsWithInitialOffsets, + Set excludeStartMessageIds, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, @@ -506,6 +515,7 @@ protected PulsarFetcher createFetcher( return new PulsarFetcher( sourceContext, seedTopicsWithInitialOffsets, + excludeStartMessageIds, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java index a5a5f1be..9dad042d 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java @@ -31,6 +31,7 @@ import org.apache.pulsar.shade.com.google.common.collect.ImmutableList; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,6 +61,7 @@ public class PulsarFetcher { protected final SourceFunction.SourceContext sourceContext; protected final Map seedTopicsWithInitialOffsets; + protected final Set excludeStartMessageIds; /** The lock that guarantees that record emission and state updates are atomic, * from the view of taking a checkpoint. */ @@ -140,6 +142,7 @@ public PulsarFetcher( this( sourceContext, seedTopicsWithInitialOffsets, + Collections.emptySet(), watermarksPeriodic, watermarksPunctuated, processingTimeProvider, @@ -157,6 +160,7 @@ public PulsarFetcher( public PulsarFetcher( SourceContext sourceContext, Map seedTopicsWithInitialOffsets, + Set excludeStartMessageIds, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, @@ -172,6 +176,7 @@ public PulsarFetcher( this.sourceContext = sourceContext; this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets; + this.excludeStartMessageIds = excludeStartMessageIds; this.checkpointLock = sourceContext.getCheckpointLock(); this.userCodeClassLoader = userCodeClassLoader; this.runtimeContext = runtimeContext; @@ -575,7 +580,8 @@ protected ReaderThread createReaderThread(ExceptionProxy exceptionProxy, PulsarT pollTimeoutMs, exceptionProxy, failOnDataLoss, - useEarliestWhenDataLoss); + useEarliestWhenDataLoss, + excludeStartMessageIds.contains(state.getTopic())); } /** diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java index 330d1aa7..987b08ea 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java @@ -20,6 +20,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -46,6 +47,7 @@ public class ReaderThread extends Thread { protected final ExceptionProxy exceptionProxy; protected final String topic; protected final MessageId startMessageId; + protected boolean excludeStartMessageId = false; protected volatile boolean running = true; protected volatile boolean closed = false; @@ -87,10 +89,12 @@ public ReaderThread( int pollTimeoutMs, ExceptionProxy exceptionProxy, boolean failOnDataLoss, - boolean useEarliestWhenDataLoss) { + boolean useEarliestWhenDataLoss, + boolean excludeStartMessageId) { this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy); this.failOnDataLoss = failOnDataLoss; this.useEarliestWhenDataLoss = useEarliestWhenDataLoss; + this.excludeStartMessageId = excludeStartMessageId; } @Override @@ -130,14 +134,16 @@ protected void createActualReader() throws org.apache.pulsar.client.api.PulsarCl readerConf0.put(entry.getKey(), entry.getValue()); } }); - reader = CachedPulsarClient + final ReaderBuilder readerBuilder = CachedPulsarClient .getOrCreate(clientConf) .newReader() .topic(topic) .startMessageId(startMessageId) - .startMessageIdInclusive() - .loadConf(readerConf0) - .create(); + .loadConf(readerConf0); + if (!excludeStartMessageId) { + readerBuilder.startMessageIdInclusive(); + } + reader = readerBuilder.create(); log.info("Create a reader at topic {} starting from message {} (inclusive) : config = {}", topic, startMessageId, readerConf0); } @@ -170,38 +176,6 @@ protected void skipFirstMessageIfNeeded() throws org.apache.pulsar.client.api.Pu } } - - while (currentMessage == null && running) { - currentMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS); - if (failOnDataLoss) { - break; - } - } - if (currentMessage == null) { - reportDataLoss(String.format("Cannot read data at offset %s from topic: %s", startMessageId, topic)); - } else { - currentId = currentMessage.getMessageId(); - if (!messageIdRoughEquals(currentId, startMessageId) && failOnDataLoss) { - reportDataLoss( - String.format( - "Potential Data Loss in reading %s: intended to start at %s, actually we get %s", - topic, startMessageId.toString(), currentId.toString())); - } - - if (startMessageId instanceof BatchMessageIdImpl && currentId instanceof BatchMessageIdImpl) { - // we seek using a batch message id, we can read next directly later - } else if (startMessageId instanceof MessageIdImpl && currentId instanceof BatchMessageIdImpl) { - // we seek using a message id, this is supposed to be read by previous task since it's - // inclusive for the checkpoint, so we skip this batch - BatchMessageIdImpl cbmid = (BatchMessageIdImpl) currentId; - - MessageIdImpl newStart = - new MessageIdImpl(cbmid.getLedgerId(), cbmid.getEntryId() + 1, cbmid.getPartitionIndex()); - reader.seek(newStart); - } else if (startMessageId instanceof MessageIdImpl && currentId instanceof MessageIdImpl) { - // current entry is a non-batch entry, we can read next directly later - } - } } } diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java index ecac4467..328f8962 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java @@ -597,7 +597,7 @@ public TestingFlinkPulsarSource(PulsarMetadataReader discoverer) { protected PulsarFetcher createFetcher( SourceContext sourceContext, Map seedTopicsWithInitialOffsets, - SerializedValue> watermarksPeriodic, + Set excludeStartMessageIds, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, @@ -646,7 +646,7 @@ public DummyFlinkPulsarSource() { protected PulsarFetcher createFetcher( SourceContext sourceContext, Map seedTopicsWithInitialOffsets, - SerializedValue> watermarksPeriodic, + Set excludeStartMessageIds, SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval,