From ac465915599c96034eb0e829e17aeaed9a710458 Mon Sep 17 00:00:00 2001 From: Jianyun Zhao Date: Tue, 15 Jun 2021 23:09:58 +0800 Subject: [PATCH] Fix the problem that the cursor of start cannot be consumed when the cursor is larger than the latest cursor. --- .../connectors/pulsar/internal/ReaderThread.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java index 2c8f9cdf..2bacedaf 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java @@ -150,13 +150,14 @@ protected void skipFirstMessageIfNeeded() throws org.apache.pulsar.client.api.Pu if (!messageIdRoughEquals(startMessageId, lastMessageId) && !reader.hasMessageAvailable()) { MessageIdImpl startMsgIdImpl = (MessageIdImpl) startMessageId; // startMessageId is bigger than lastMessageId - if (!metaDataReader.checkCursorAvailable(reader.getTopic(), startMsgIdImpl)) { + if (startMsgIdImpl.compareTo(lastMessageId) > 0) { if (failOnDataLoss) { log.error("the start message id is beyond the last commit message id, with topic:{}", reader.getTopic()); throw new RuntimeException("start message id beyond the last commit"); } else { - log.info("reset message to valid offset {}", startMessageId); - metaDataReader.resetCursor(reader.getTopic(), startMessageId); + log.info("reset message to valid offset {}", lastMessageId); + reader.seek(lastMessageId); + metaDataReader.resetCursor(reader.getTopic(), lastMessageId); } } @@ -169,9 +170,7 @@ protected void skipFirstMessageIfNeeded() throws org.apache.pulsar.client.api.Pu } } if (currentMessage == null) { - reportDataLoss(String.format("Cannot read data at offset %s from topic: %s", - startMessageId.toString(), - topic)); + reportDataLoss(String.format("Cannot read data at offset %s from topic: %s", startMessageId, topic)); } else { currentId = currentMessage.getMessageId(); if (!messageIdRoughEquals(currentId, startMessageId) && failOnDataLoss) {