Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
fix lost first message
Browse files Browse the repository at this point in the history
  • Loading branch information
jianyun8023 committed Jun 22, 2021
1 parent 40fb8af commit a216462
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* Emit Pulsar message as Row to Flink.
Expand Down Expand Up @@ -72,7 +73,7 @@ public TypeInformation<Row> getProducedType() {
protected PulsarFetcher<Row> createFetcher(
SourceContext sourceContext,
Map<String, MessageId> seedTopicsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic,
Set<String> excludeStartMessageIds, SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.shade.com.google.common.collect.Maps;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -167,6 +168,8 @@ public class FlinkPulsarSource<T>
*/
private transient volatile TreeMap<String, MessageId> restoredState;

private transient volatile Set<String> excludeStartMessageIds;

/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<String, MessageId>> unionOffsetStates;

Expand Down Expand Up @@ -361,6 +364,7 @@ public void open(Configuration parameters) throws Exception {
this.metadataReader = createMetadataReader();

ownedTopicStarts = new HashMap<>();
excludeStartMessageIds = new HashSet<>();
Set<String> allTopics = metadataReader.discoverTopicChanges();

log.info("Discovered topics : {}", allTopics);
Expand All @@ -373,7 +377,10 @@ public void open(Configuration parameters) throws Exception {

restoredState.entrySet().stream()
.filter(e -> SourceSinkUtils.belongsTo(e.getKey(), numParallelTasks, taskIndex))
.forEach(e -> ownedTopicStarts.put(e.getKey(), e.getValue()));
.forEach(e -> {
ownedTopicStarts.put(e.getKey(), e.getValue());
excludeStartMessageIds.add(e.getKey());
});

Set<String> goneTopics = Sets.difference(restoredState.keySet(), allTopics).stream()
.filter(k -> SourceSinkUtils.belongsTo(k, numParallelTasks, taskIndex))
Expand Down Expand Up @@ -473,6 +480,7 @@ public void onException(Throwable cause) {
this.pulsarFetcher = createFetcher(
ctx,
ownedTopicStarts,
excludeStartMessageIds,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
streamingRuntime.getProcessingTimeService(),
Expand All @@ -494,6 +502,7 @@ public void onException(Throwable cause) {
protected PulsarFetcher<T> createFetcher(
SourceContext sourceContext,
Map<String, MessageId> seedTopicsWithInitialOffsets,
Set<String> excludeStartMessageIds,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
Expand All @@ -506,6 +515,7 @@ protected PulsarFetcher<T> createFetcher(
return new PulsarFetcher(
sourceContext,
seedTopicsWithInitialOffsets,
excludeStartMessageIds,
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class PulsarFetcher<T> {
protected final SourceFunction.SourceContext<T> sourceContext;

protected final Map<String, MessageId> seedTopicsWithInitialOffsets;
protected final Set<String> excludeStartMessageIds;

/** The lock that guarantees that record emission and state updates are atomic,
* from the view of taking a checkpoint. */
Expand Down Expand Up @@ -140,6 +142,7 @@ public PulsarFetcher(
this(
sourceContext,
seedTopicsWithInitialOffsets,
Collections.emptySet(),
watermarksPeriodic,
watermarksPunctuated,
processingTimeProvider,
Expand All @@ -157,6 +160,7 @@ public PulsarFetcher(
public PulsarFetcher(
SourceContext<T> sourceContext,
Map<String, MessageId> seedTopicsWithInitialOffsets,
Set<String> excludeStartMessageIds,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
Expand All @@ -172,6 +176,7 @@ public PulsarFetcher(

this.sourceContext = sourceContext;
this.seedTopicsWithInitialOffsets = seedTopicsWithInitialOffsets;
this.excludeStartMessageIds = excludeStartMessageIds;
this.checkpointLock = sourceContext.getCheckpointLock();
this.userCodeClassLoader = userCodeClassLoader;
this.runtimeContext = runtimeContext;
Expand Down Expand Up @@ -575,7 +580,8 @@ protected ReaderThread createReaderThread(ExceptionProxy exceptionProxy, PulsarT
pollTimeoutMs,
exceptionProxy,
failOnDataLoss,
useEarliestWhenDataLoss);
useEarliestWhenDataLoss,
excludeStartMessageIds.contains(state.getTopic()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
Expand All @@ -46,6 +47,7 @@ public class ReaderThread<T> extends Thread {
protected final ExceptionProxy exceptionProxy;
protected final String topic;
protected final MessageId startMessageId;
protected boolean excludeStartMessageId = false;

protected volatile boolean running = true;
protected volatile boolean closed = false;
Expand Down Expand Up @@ -87,10 +89,12 @@ public ReaderThread(
int pollTimeoutMs,
ExceptionProxy exceptionProxy,
boolean failOnDataLoss,
boolean useEarliestWhenDataLoss) {
boolean useEarliestWhenDataLoss,
boolean excludeStartMessageId) {
this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy);
this.failOnDataLoss = failOnDataLoss;
this.useEarliestWhenDataLoss = useEarliestWhenDataLoss;
this.excludeStartMessageId = excludeStartMessageId;
}

@Override
Expand Down Expand Up @@ -130,14 +134,16 @@ protected void createActualReader() throws org.apache.pulsar.client.api.PulsarCl
readerConf0.put(entry.getKey(), entry.getValue());
}
});
reader = CachedPulsarClient
final ReaderBuilder<byte[]> readerBuilder = CachedPulsarClient
.getOrCreate(clientConf)
.newReader()
.topic(topic)
.startMessageId(startMessageId)
.startMessageIdInclusive()
.loadConf(readerConf0)
.create();
.loadConf(readerConf0);
if (!excludeStartMessageId) {
readerBuilder.startMessageIdInclusive();
}
reader = readerBuilder.create();
log.info("Create a reader at topic {} starting from message {} (inclusive) : config = {}",
topic, startMessageId, readerConf0);
}
Expand Down Expand Up @@ -170,38 +176,6 @@ protected void skipFirstMessageIfNeeded() throws org.apache.pulsar.client.api.Pu
}

}

while (currentMessage == null && running) {
currentMessage = reader.readNext(pollTimeoutMs, TimeUnit.MILLISECONDS);
if (failOnDataLoss) {
break;
}
}
if (currentMessage == null) {
reportDataLoss(String.format("Cannot read data at offset %s from topic: %s", startMessageId, topic));
} else {
currentId = currentMessage.getMessageId();
if (!messageIdRoughEquals(currentId, startMessageId) && failOnDataLoss) {
reportDataLoss(
String.format(
"Potential Data Loss in reading %s: intended to start at %s, actually we get %s",
topic, startMessageId.toString(), currentId.toString()));
}

if (startMessageId instanceof BatchMessageIdImpl && currentId instanceof BatchMessageIdImpl) {
// we seek using a batch message id, we can read next directly later
} else if (startMessageId instanceof MessageIdImpl && currentId instanceof BatchMessageIdImpl) {
// we seek using a message id, this is supposed to be read by previous task since it's
// inclusive for the checkpoint, so we skip this batch
BatchMessageIdImpl cbmid = (BatchMessageIdImpl) currentId;

MessageIdImpl newStart =
new MessageIdImpl(cbmid.getLedgerId(), cbmid.getEntryId() + 1, cbmid.getPartitionIndex());
reader.seek(newStart);
} else if (startMessageId instanceof MessageIdImpl && currentId instanceof MessageIdImpl) {
// current entry is a non-batch entry, we can read next directly later
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ public TestingFlinkPulsarSource(PulsarMetadataReader discoverer) {
protected PulsarFetcher<T> createFetcher(
SourceContext sourceContext,
Map<String, MessageId> seedTopicsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
Set<String> excludeStartMessageIds, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
Expand Down Expand Up @@ -646,7 +646,7 @@ public DummyFlinkPulsarSource() {
protected PulsarFetcher<T> createFetcher(
SourceContext sourceContext,
Map<String, MessageId> seedTopicsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
Set<String> excludeStartMessageIds, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
ProcessingTimeService processingTimeProvider,
long autoWatermarkInterval,
Expand Down

0 comments on commit a216462

Please sign in to comment.