Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
Fix the problem that the cursor of start cannot be consumed when the …
Browse files Browse the repository at this point in the history
…cursor is larger than the latest cursor.
  • Loading branch information
jianyun8023 committed Jun 15, 2021
1 parent 85e472a commit ac46591
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,14 @@ protected void skipFirstMessageIfNeeded() throws org.apache.pulsar.client.api.Pu
if (!messageIdRoughEquals(startMessageId, lastMessageId) && !reader.hasMessageAvailable()) {
MessageIdImpl startMsgIdImpl = (MessageIdImpl) startMessageId;
// startMessageId is bigger than lastMessageId
if (!metaDataReader.checkCursorAvailable(reader.getTopic(), startMsgIdImpl)) {
if (startMsgIdImpl.compareTo(lastMessageId) > 0) {
if (failOnDataLoss) {
log.error("the start message id is beyond the last commit message id, with topic:{}", reader.getTopic());
throw new RuntimeException("start message id beyond the last commit");
} else {
log.info("reset message to valid offset {}", startMessageId);
metaDataReader.resetCursor(reader.getTopic(), startMessageId);
log.info("reset message to valid offset {}", lastMessageId);
reader.seek(lastMessageId);
metaDataReader.resetCursor(reader.getTopic(), lastMessageId);
}
}

Expand All @@ -169,9 +170,7 @@ protected void skipFirstMessageIfNeeded() throws org.apache.pulsar.client.api.Pu
}
}
if (currentMessage == null) {
reportDataLoss(String.format("Cannot read data at offset %s from topic: %s",
startMessageId.toString(),
topic));
reportDataLoss(String.format("Cannot read data at offset %s from topic: %s", startMessageId, topic));
} else {
currentId = currentMessage.getMessageId();
if (!messageIdRoughEquals(currentId, startMessageId) && failOnDataLoss) {
Expand Down

0 comments on commit ac46591

Please sign in to comment.