From 34ecc2f742d90f36298af644690123da35ac52e7 Mon Sep 17 00:00:00 2001 From: "Alexandre DUVAL - @kannarfr" Date: Thu, 2 Mar 2023 20:17:11 +0100 Subject: [PATCH 1/2] config --- .../java/org/apache/pulsar/client/api/ConsumerBuilder.java | 7 +++++++ .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java | 6 ++++++ .../pulsar/client/impl/conf/ConsumerConfigurationData.java | 6 ++++++ 3 files changed, 19 insertions(+) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 14a94cb8286dc..e78b0ca4a0155 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -272,6 +272,13 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder subscriptionMode(SubscriptionMode subscriptionMode); + /** + * Set the consumer to read from start message to previous + * @param readReverse the read reverse boolean + * @return the consumer builder instance + */ + ConsumerBuilder readReverse(Boolean readReverse); + /** * Sets a {@link MessageListener} for the consumer. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f644c6a18398f..9cb62d0bfa361 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -415,6 +415,12 @@ public ConsumerBuilder subscriptionInitialPosition(@NonNull SubscriptionIniti return this; } + @Override + public ConsumerBuilder readReverse(Boolean readReverse) { + conf.setReadReverse(readReverse); + return this; + } + @Override public ConsumerBuilder subscriptionTopicsMode(@NonNull RegexSubscriptionMode mode) { conf.setRegexSubscriptionMode(mode); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 373d4e66c0ecf..f4d55e6f47ddb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -75,6 +75,12 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { ) private String subscriptionName; + @ApiModelProperty( + name = "readReverse", + value = "Read reverse" + ) + private Boolean readReverse = false; + @ApiModelProperty( name = "subscriptionType", value = "Subscription type.\n" From de11c7914dfe1e68409145e5fcadf52a8354c79c Mon Sep 17 00:00:00 2001 From: "Alexandre DUVAL - @kannarfr" Date: Fri, 3 Mar 2023 16:49:19 +0100 Subject: [PATCH 2/2] wip --- .../bookkeeper/mledger/ManagedCursor.java | 6 + .../bookkeeper/mledger/ManagedLedger.java | 49 ++++++- .../apache/bookkeeper/mledger/Position.java | 7 + .../mledger/impl/ManagedCursorImpl.java | 132 +++++++++++++----- .../mledger/impl/ManagedLedgerImpl.java | 61 +++++--- .../mledger/impl/NonDurableCursorImpl.java | 2 +- .../bookkeeper/mledger/impl/OpFindNewest.java | 23 ++- .../bookkeeper/mledger/impl/OpScan.java | 7 + .../bookkeeper/mledger/impl/PositionImpl.java | 9 ++ .../src/main/proto/MLDataFormats.proto | 3 + .../impl/ManagedCursorContainerTest.java | 13 ++ .../impl/ManagedCursorPropertiesTest.java | 7 +- .../mledger/impl/ManagedCursorTest.java | 23 ++- .../mledger/impl/ManagedLedgerTest.java | 38 ++++- pom.xml | 9 +- .../admin/impl/PersistentTopicsBase.java | 4 +- .../pulsar/broker/service/Producer.java | 5 + .../pulsar/broker/service/ServerCnx.java | 4 +- .../broker/service/SubscriptionOption.java | 1 + .../apache/pulsar/broker/service/Topic.java | 4 +- .../nonpersistent/NonPersistentTopic.java | 4 +- .../service/persistent/PersistentTopic.java | 36 ++--- .../ReplicatedSubscriptionsController.java | 2 +- .../StreamingEntryReader.java | 23 ++- .../impl/MLPendingAckStoreProvider.java | 3 +- .../broker/delayed/MockManagedCursor.java | 12 ++ ...sistentDispatcherFailoverConsumerTest.java | 2 +- .../broker/service/PersistentTopicTest.java | 7 +- .../pulsar/broker/service/ServerCnxTest.java | 2 +- .../broker/service/SubscriptionSeekTest.java | 54 +++++++ .../broker/transaction/TransactionTest.java | 2 +- .../impl/MLPendingAckStoreTest.java | 4 +- .../pulsar/client/api/ConsumerBuilder.java | 2 +- .../pulsar/client/impl/ConsumerImpl.java | 4 +- .../impl/conf/ConsumerConfigurationData.java | 2 +- .../pulsar/common/protocol/Commands.java | 15 +- pulsar-common/src/main/proto/PulsarApi.proto | 3 + .../impl/MLTransactionLogImpl.java | 3 +- 38 files changed, 467 insertions(+), 120 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 7802ed07781ba..1458dd2ecfbdb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -839,4 +839,10 @@ Set asyncReplayEntries( * @return whether this cursor is closed. */ boolean isClosed(); + + /** + * Checks if the cursor is read reverse mode. + * @return: whether this cursor is reading in a reverse way + */ + boolean isReadReverse(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 4ca56508891a1..5b4fe511e0d94 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -225,8 +225,8 @@ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, Ad * @return the ManagedCursor * @throws ManagedLedgerException */ - ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, - ManagedLedgerException; + ManagedCursor openCursor(String name, InitialPosition initialPosition, boolean readReverse) + throws InterruptedException, ManagedLedgerException; /** * Open a ManagedCursor in this ManagedLedger. @@ -245,8 +245,8 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In * @return the ManagedCursor * @throws ManagedLedgerException */ - ManagedCursor openCursor(String name, InitialPosition initialPosition, Map properties, - Map cursorProperties) + ManagedCursor openCursor(String name, InitialPosition initialPosition, boolean readReverse, + Map properties, Map cursorProperties) throws InterruptedException, ManagedLedgerException; /** @@ -267,7 +267,7 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition, Map properties, Map cursorProperties, OpenCursorCallback callback, Object ctx); + /** + * Open a ManagedCursor asynchronously. + * + * @see #openCursor(String) + * @param name + * the name associated with the ManagedCursor + * @param initialPosition + * if null, the cursor will be set at latest position when first created + * @param readReverse + * if true, the cursor will read ledger from current to previous messages + * @param cursorProperties + * the properties for the Cursor + * @param callback + * callback object + * @param ctx + * opaque context + */ + void asyncOpenCursor(String name, InitialPosition initialPosition, boolean readReverse, + Map properties, Map cursorProperties, + OpenCursorCallback callback, Object ctx); + /** * Get a list of all the cursors reading from this ManagedLedger. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java index ac5810bbf01e7..5d0e033323ef6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java @@ -35,6 +35,13 @@ public interface Position { */ Position getNext(); + /** + * Get the position of the entry previous to this one. The returned position might point to a non-existing entry + * + * @return the position of the previous logical entry + */ + Position getPrevious(); + long getLedgerId(); long getEntryId(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 5851395b08566..21351a5b1f202 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.PositionImpl.LATEST; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import com.google.common.annotations.VisibleForTesting; @@ -130,6 +131,8 @@ public class ManagedCursorImpl implements ManagedCursor { protected volatile PositionImpl markDeletePosition; + private boolean readReverse = false; + // this position is have persistent mark delete position protected volatile PositionImpl persistentMarkDeletePosition; protected static final AtomicReferenceFieldUpdater @@ -507,7 +510,8 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) { } } - recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null); + recoveredCursor(recoveredPosition, info.getReadReverse(), recoveredProperties, + recoveredCursorProperties, null); callback.operationComplete(); } else { // Need to proceed and read the last entry in the specified ledger to find out the last position @@ -537,7 +541,8 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback); + initialize(getRollbackPosition(info), info.getReadReverse(), Collections.emptyMap(), + Collections.emptyMap(), callback); return; } else if (rc != BKException.Code.OK) { log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, @@ -553,7 +558,8 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger", ledger.getName(), ledgerId, name); // Rewind to last cursor snapshot available - initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); + initialize(getRollbackPosition(info), info.getReadReverse(), Collections.emptyMap(), cursorProperties, + callback); return; } @@ -565,7 +571,8 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc1)); // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); + initialize(getRollbackPosition(info), info.getReadReverse(), Collections.emptyMap(), + cursorProperties, callback); return; } else if (rc1 != BKException.Code.OK) { log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), @@ -603,7 +610,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } - recoveredCursor(position, recoveredProperties, cursorProperties, lh); + recoveredCursor(position, info.getReadReverse(), recoveredProperties, cursorProperties, lh); callback.operationComplete(); }, null); }; @@ -673,25 +680,43 @@ private void recoverBatchDeletedIndexes ( } } - private void recoveredCursor(PositionImpl position, Map properties, + private void recoveredCursor(PositionImpl position, boolean readReverse, Map properties, Map cursorProperties, LedgerHandle recoveredFromCursorLedger) { + this.readReverse = readReverse; // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty), // we need to move to the next existing ledger if (!ledger.ledgerExists(position.getLedgerId())) { - Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId()); - if (nextExistingLedger == null) { - log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name, - position); + if (readReverse) { + Long previousExistingLedger = ledger.getPreviousValidLedger(position.getLedgerId()); + if (previousExistingLedger == null) { + log.info("[{}] [{}] Couldn't find previous valid ledger for recovery {}, readReverse {}", + ledger.getName(), name, position, readReverse); + } + position = previousExistingLedger != null ? PositionImpl.get(previousExistingLedger, LATEST.entryId) : + position; + } else { + Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId()); + if (nextExistingLedger == null) { + log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name, + position); + } + position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position; } - position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position; } - if (position.compareTo(ledger.getLastPosition()) > 0) { - log.warn("[{}] [{}] Current position {} is ahead of last position {}", ledger.getName(), name, position, - ledger.getLastPosition()); + if (position.compareTo(ledger.getLastPosition()) > 0 && !readReverse) { + log.warn("[{}] [{}] Current position {} is ahead of last position {}, readReverse {}", ledger.getName(), + name, position, ledger.getLastPosition(), readReverse); position = ledger.getLastPosition(); } - log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position); + if (position.compareTo(ledger.getFirstPosition()) < 0 && readReverse) { + log.warn("[{}] [{}] Current position {} is ahead of first position {}, readReverse {}", ledger.getName(), + name, position, ledger.getLastPosition(), readReverse); + position = ledger.getLastPosition(); + } + + log.info("[{}] Cursor {} recovered to position {}, readReverse {}", ledger.getName(), name, position, + readReverse); this.cursorProperties = cursorProperties; messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition())); markDeletePosition = position; @@ -706,9 +731,9 @@ private void recoveredCursor(PositionImpl position, Map properties STATE_UPDATER.set(this, State.NoLedger); } - void initialize(PositionImpl position, Map properties, Map cursorProperties, - final VoidCallback callback) { - recoveredCursor(position, properties, cursorProperties, null); + void initialize(PositionImpl position, boolean readReverse, Map properties, + Map cursorProperties, final VoidCallback callback) { + recoveredCursor(position, readReverse, properties, cursorProperties, null); if (log.isDebugEnabled()) { log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}", ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); @@ -752,7 +777,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.countDown(); } - }, null, PositionImpl.LATEST); + }, null, LATEST); counter.await(); @@ -789,7 +814,8 @@ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeByte PENDING_READ_OPS_UPDATER.incrementAndGet(this); OpReadEntry op = - OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition); + OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, + skipCondition); ledger.asyncReadEntries(op); } @@ -891,7 +917,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.countDown(); } - }, null, PositionImpl.LATEST); + }, null, LATEST); counter.await(); @@ -1022,6 +1048,11 @@ public boolean isClosed() { return state == State.Closed || state == State.Closing; } + @Override + public boolean isReadReverse() { + return readReverse; + } + @Override public boolean cancelPendingReadRequest() { if (log.isDebugEnabled()) { @@ -1059,14 +1090,26 @@ public boolean hasMoreEntries() { @Override public long getNumberOfEntries() { - if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read", - ledger.getName(), name, readPosition, ledger.getLastPosition()); + if (readReverse) { + if (readPosition.compareTo(ledger.getFirstPosition().getPrevious()) > 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Read position {} is behind of last position {}. There are no entries to read", + ledger.getName(), name, readPosition, ledger.getLastPosition()); + } + return 0; + } else { + return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getPrevious())); } - return 0; } else { - return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext())); + if (readPosition.compareTo(ledger.getLastPosition().getNext()) > 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Read position {} is ahead of last position {}. There are no entries to read", + ledger.getName(), name, readPosition, ledger.getLastPosition()); + } + return 0; + } else { + return getNumberOfEntries(Range.closedOpen(readPosition, ledger.getLastPosition().getNext())); + } } } @@ -1104,20 +1147,32 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) { messagesConsumedCounter, markDeletePosition, readPosition); } if (isPrecise) { - return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + if (readReverse) { + return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getFirstPosition())); + } else { + return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + } } long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter; if (backlog < 0) { // In some case the counters get incorrect values, fall back to the precise backlog count - backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + if (readReverse) { + backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getFirstPosition())); + } else { + backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + } } return backlog; } public long getNumberOfEntriesInStorage() { - return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + if (readReverse) { + return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getFirstPosition())); + } else { + return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + } } @Override @@ -1247,7 +1302,7 @@ protected void internalResetCursor(PositionImpl proposedReadPosition, final PositionImpl newReadPosition; if (proposedReadPosition.equals(PositionImpl.EARLIEST)) { newReadPosition = ledger.getFirstPosition(); - } else if (proposedReadPosition.equals(PositionImpl.LATEST)) { + } else if (proposedReadPosition.equals(LATEST)) { newReadPosition = ledger.getLastPosition().getNext(); } else { newReadPosition = proposedReadPosition; @@ -1364,14 +1419,14 @@ public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks if (!ledger.isValidPosition(actualPosition) && !actualPosition.equals(PositionImpl.EARLIEST) - && !actualPosition.equals(PositionImpl.LATEST) + && !actualPosition.equals(LATEST) && !forceReset) { actualPosition = ledger.getNextValidPosition(actualPosition); if (actualPosition == null) { // next valid position would only return null when newPos // is larger than all available positions, then it's latest in effect. - actualPosition = PositionImpl.LATEST; + actualPosition = LATEST; } } @@ -1793,8 +1848,15 @@ long getNumIndividualDeletedEntriesToSkip(long numEntries) { boolean hasMoreEntries(PositionImpl position) { PositionImpl lastPositionInLedger = ledger.getLastPosition(); - if (position.compareTo(lastPositionInLedger) <= 0) { - return getNumberOfEntries(Range.closed(position, lastPositionInLedger)) > 0; + PositionImpl firstPositionInLedger = ledger.getFirstPosition(); + if (readReverse) { + if (position.compareTo(firstPositionInLedger) >= 0) { + return getNumberOfEntries(Range.closed(firstPositionInLedger, position)) > 0; + } + } else { + if (position.compareTo(lastPositionInLedger) <= 0) { + return getNumberOfEntries(Range.closed(position, lastPositionInLedger)) > 0; + } } return false; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9c05fb7c1047e..0583429d20133 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -892,19 +892,18 @@ public void readyToCreateNewLedger() { @Override public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException { - return openCursor(cursorName, InitialPosition.Latest); + return openCursor(cursorName, InitialPosition.Latest, false); } - @Override - public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition) + public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, boolean readReverse) throws InterruptedException, ManagedLedgerException { - return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap()); + return openCursor(cursorName, initialPosition, readReverse, Collections.emptyMap(), Collections.emptyMap()); } @Override - public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map properties, - Map cursorProperties) + public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, boolean readReverse, + Map properties, Map cursorProperties) throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); class Result { @@ -913,7 +912,8 @@ class Result { } final Result result = new Result(); - asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() { + asyncOpenCursor(cursorName, initialPosition, readReverse, properties, cursorProperties, + new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { result.cursor = cursor; @@ -942,19 +942,34 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { @Override public void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx) { - this.asyncOpenCursor(cursorName, InitialPosition.Latest, callback, ctx); + this.asyncOpenCursor(cursorName, InitialPosition.Latest, false, callback, ctx); } @Override public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, - final OpenCursorCallback callback, final Object ctx) { + final OpenCursorCallback callback, final Object ctx) { this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(), callback, ctx); } + @Override + public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, boolean readReverse, + final OpenCursorCallback callback, final Object ctx) { + this.asyncOpenCursor(cursorName, initialPosition, readReverse, Collections.emptyMap(), Collections.emptyMap(), + callback, ctx); + } + @Override public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, - Map properties, Map cursorProperties, + Map properties, Map cursorProperties, + final OpenCursorCallback callback, final Object ctx) { + this.asyncOpenCursor(cursorName, initialPosition, false, Collections.emptyMap(), Collections.emptyMap(), + callback, ctx); + } + + @Override + public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, + final boolean readReverse, Map properties, Map cursorProperties, final OpenCursorCallback callback, final Object ctx) { try { checkManagedLedgerIsOpen(); @@ -989,7 +1004,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP CompletableFuture cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); - cursor.initialize(position, properties, cursorProperties, new VoidCallback() { + cursor.initialize(position, readReverse, properties, cursorProperties, new VoidCallback() { @Override public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); @@ -1098,12 +1113,13 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws Ma @Override public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException { - return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false); + return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false, false); } @Override public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, - InitialPosition initialPosition, boolean isReadCompacted) + InitialPosition initialPosition, boolean isReadReverse, + boolean isReadCompacted) throws ManagedLedgerException { Objects.requireNonNull(cursorName, "cursor name can't be null"); checkManagedLedgerIsOpen(); @@ -1118,7 +1134,7 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu } NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName, - (PositionImpl) startCursorPosition, initialPosition, isReadCompacted); + (PositionImpl) startCursorPosition, initialPosition, isReadReverse, isReadCompacted); cursor.setActive(); log.info("[{}] Opened new cursor: {}", name, cursor); @@ -2275,11 +2291,18 @@ public ManagedLedgerMXBean getStats() { return mbean; } - public boolean hasMoreEntries(PositionImpl position) { + public boolean hasMoreEntries(PositionImpl position, boolean readReverse) { PositionImpl lastPos = lastConfirmedEntry; - boolean result = position.compareTo(lastPos) <= 0; + PositionImpl firstPos = getFirstPosition(); + boolean result; + if (readReverse) { + result = position.compareTo(firstPos) >= 0; + } else { + result = position.compareTo(lastPos) <= 0; + } if (log.isDebugEnabled()) { - log.debug("[{}] hasMoreEntries: pos={} lastPos={} res={}", name, position, lastPos, result); + log.debug("[{}] hasMoreEntries: pos={} lastPos={} readReverse={} res={}", name, position, lastPos, + readReverse, result); } return result; } @@ -3573,6 +3596,10 @@ public Long getNextValidLedger(long ledgerId) { return ledgers.ceilingKey(ledgerId + 1); } + public Long getPreviousValidLedger(long ledgerId) { + return ledgers.ceilingKey(ledgerId - 1); + } + public PositionImpl getNextValidPosition(final PositionImpl position) { return getValidPositionAfterSkippedEntries(position, 1); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 9d2829b1707f4..79954d6de8b5a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -37,7 +37,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName, PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, - boolean isReadCompacted) { + boolean isReadReverse, boolean isReadCompacted) { super(bookkeeper, config, ledger, cursorName); this.readCompacted = isReadCompacted; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 900af9322c791..e365703abdd5e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -42,6 +42,7 @@ enum State { } PositionImpl searchPosition; + boolean readReverse; long min; long max; Position lastMatchedPosition = null; @@ -60,6 +61,24 @@ public OpFindNewest(ManagedCursorImpl cursor, PositionImpl startPosition, Predic this.max = numberOfEntries; this.searchPosition = startPosition; + this.readReverse = false; + this.state = State.checkFirst; + } + + public OpFindNewest(ManagedCursorImpl cursor, PositionImpl startPosition, boolean readReverse, + Predicate condition, long numberOfEntries, FindEntryCallback callback, Object ctx) { + this.cursor = cursor; + this.ledger = cursor.ledger; + this.startPosition = startPosition; + this.callback = callback; + this.condition = condition; + this.ctx = ctx; + + this.min = 0; + this.max = numberOfEntries; + + this.searchPosition = startPosition; + this.readReverse = readReverse; this.state = State.checkFirst; } @@ -76,6 +95,7 @@ public OpFindNewest(ManagedLedgerImpl ledger, PositionImpl startPosition, Predic this.max = numberOfEntries; this.searchPosition = startPosition; + this.readReverse = false; this.state = State.checkFirst; } @@ -142,7 +162,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { } public void find() { - if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) { + if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition, + cursor.isReadReverse())) { ledger.asyncReadEntry(searchPosition, this, null); } else { callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java index 6d68b042a7ad6..037f0e7a19d59 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java @@ -46,10 +46,16 @@ class OpScan implements ReadEntriesCallback { PositionImpl searchPosition; Position lastSeenPosition = null; + boolean readReverse; public OpScan(ManagedCursorImpl cursor, int batchSize, PositionImpl startPosition, Predicate condition, ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) { + this(cursor, batchSize, startPosition, false, condition, callback, ctx, maxEntries, timeOutMs); + } + + public OpScan(ManagedCursorImpl cursor, int batchSize, PositionImpl startPosition, boolean readReverse, + Predicate condition, ScanCallback callback, Object ctx, long maxEntries, long timeOutMs) { this.batchSize = batchSize; if (batchSize <= 0) { throw new IllegalArgumentException("batchSize " + batchSize); @@ -60,6 +66,7 @@ public OpScan(ManagedCursorImpl cursor, int batchSize, this.condition = condition; this.ctx = ctx; this.searchPosition = startPosition; + this.readReverse = readReverse; this.remainingEntries.set(maxEntries); this.timeOutMs = timeOutMs; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java index ee179b5d059c8..6e0f112869792 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java @@ -94,6 +94,15 @@ public PositionImpl getNext() { } } + @Override + public PositionImpl getPrevious() { + if (entryId > 0) { + return PositionImpl.get(ledgerId, entryId - 1); + } else { + return PositionImpl.get(ledgerId, 0); + } + } + /** * Position after moving entryNum messages, * if entryNum < 1, then return the current position. diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index c4e502819fa9e..8d1da609f8cd3 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -132,6 +132,9 @@ message ManagedCursorInfo { // Additional custom properties associated with // the cursor repeated StringProperty cursorProperties = 8; + + // Is cursor readReverse + optional bool readReverse = 9; } enum CompressionType { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 2c01b778caf6b..40d352b11bf65 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -58,10 +58,18 @@ private static class MockManagedCursor implements ManagedCursor { Position position; String name; + boolean readReverse; + public MockManagedCursor(ManagedCursorContainer container, String name, Position position) { + this(container, name, position, false); + } + + public MockManagedCursor(ManagedCursorContainer container, String name, Position position, + boolean readReverse) { this.container = container; this.name = name; this.position = position; + this.readReverse = readReverse; } @Override @@ -413,6 +421,11 @@ public boolean checkAndUpdateReadPositionChanged() { public boolean isClosed() { return false; } + + @Override + public boolean isReadReverse() { + return readReverse; + } } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java index 500de5dd13879..f8cdf24fd2f96 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java @@ -91,7 +91,8 @@ void testPropertiesRecoveryAfterCrash() throws Exception { cursorProperties.put("custom1", "one"); cursorProperties.put("custom2", "two"); - ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties); + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, false, + Collections.emptyMap(), cursorProperties); assertEquals(c1.getProperties(), Collections.emptyMap()); assertEquals(c1.getCursorProperties(), cursorProperties); @@ -170,7 +171,7 @@ void testPropertiesAtCreation() throws Exception { cursorProperties.put("custom1", "one"); cursorProperties.put("custom2", "two"); - ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties); + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, false, properties, cursorProperties); assertEquals(c1.getProperties(), properties); assertEquals(c1.getCursorProperties(), cursorProperties); @@ -197,7 +198,7 @@ void testUpdateCursorProperties() throws Exception { cursorProperties.put("custom1", "one"); cursorProperties.put("custom2", "two"); - ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties); + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, false, properties, cursorProperties); assertEquals(c1.getProperties(), properties); assertEquals(c1.getCursorProperties(), cursorProperties); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 8dc726c249efc..3d6899c0b97a1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -170,7 +170,24 @@ public void testOpenCursorWithNullInitialPosition() throws Exception { ledger.addEntry(new byte[]{4}); ledger.addEntry(new byte[]{5}); - ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c_testOpenCursorWithNullInitialPosition", null); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c_testOpenCursorWithNullInitialPosition", + null, false); + assertEquals(cursor.getMarkDeletedPosition(), ledger.getLastConfirmedEntry()); + } + + @Test + public void testOpenCursorWithReadReverse() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + ManagedLedger ledger = factory.open("testOpenCursorWithReadReverse", config); + // Write some data. + ledger.addEntry(new byte[]{1}); + ledger.addEntry(new byte[]{2}); + ledger.addEntry(new byte[]{3}); + ledger.addEntry(new byte[]{4}); + ledger.addEntry(new byte[]{5}); + + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c_testOpenCursorWithReadReverse", + CommandSubscribe.InitialPosition.Latest, true); assertEquals(cursor.getMarkDeletedPosition(), ledger.getLastConfirmedEntry()); } @@ -3313,7 +3330,7 @@ public void operationFailed(ManagedLedgerException exception) { @Test(timeOut = 20000) public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception { ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry"); - ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest, false); // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); @@ -3367,7 +3384,7 @@ public void operationFailed(ManagedLedgerException exception) { @Test(timeOut = 20000) public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception { ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries"); - ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest, false); // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a4d8b75d00c96..1a3dd2f0df1f6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3024,8 +3024,40 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{ ledger.addEntry(content.getBytes()); } // Open Cursor also adds cursor into activeCursor-container - ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest); - ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest); + ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest, false); + ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest, false); + + // Since getReadPosition returns the next position, we decrease the entryId by 1 + PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition(); + PositionImpl p2 = (PositionImpl) earliestCursor.getReadPosition(); + + Pair latestPositionAndCounter = ledger.getLastPositionAndCounter(); + Pair earliestPositionAndCounter = ledger.getFirstPositionAndCounter(); + + assertEquals(latestPositionAndCounter.getLeft().getNext(), p1); + assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2); + + assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries); + assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false)); + + ledger.close(); + + } + + @Test + public void testConsumerSubscriptionReadReverse() throws Exception{ + final int MAX_ENTRY_PER_LEDGER = 2; + ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(MAX_ENTRY_PER_LEDGER); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("lastest_earliest_ledger", config); + + final int totalInsertedEntries = 20; + for (int i = 0; i < totalInsertedEntries; i++) { + String content = "entry" + i; // 5 bytes + ledger.addEntry(content.getBytes()); + } + // Open Cursor also adds cursor into activeCursor-container + ManagedCursor latestCursor = ledger.openCursor("c1", InitialPosition.Latest, false); + ManagedCursor earliestCursor = ledger.openCursor("c2", InitialPosition.Earliest, false); // Since getReadPosition returns the next position, we decrease the entryId by 1 PositionImpl p1 = (PositionImpl) latestCursor.getReadPosition(); @@ -3714,7 +3746,7 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception { }); // Verify the ReadHandle can be reopened. - ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest); + ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest, false); entryList = cursor3.readEntries(3); assertEquals(entryList.size(), 3); assertEquals(ledger.ledgerCache.size(), 2); diff --git a/pom.xml b/pom.xml index 5b001f2d3f610..6332f3b282c2a 100644 --- a/pom.xml +++ b/pom.xml @@ -126,7 +126,7 @@ flexible messaging model and an intuitive client API. 1.21 - 4.15.3 + 4.15.4-SNAPSHOT 3.8.1 1.5.0 1.10.0 @@ -2437,5 +2437,12 @@ flexible messaging model and an intuitive client API. false + + local-maven-repo + file:///home/kannar/.m2/repository + + true + + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 633c4747ee068..0176b7c94f198 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2428,7 +2428,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic( throw new RestException(Status.CONFLICT, "Subscription already exists for topic"); } - return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated, properties); + return topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated, false, properties); }).thenCompose(subscription -> { // Mark the cursor as "inactive" as it was created without a real consumer connected ((PersistentSubscription) subscription).deactivateCursor(); @@ -4441,7 +4441,7 @@ private Subscription getSubscriptionReference(String subName, PersistentTopic to Subscription sub = topic.getSubscription(subName); if (sub == null) { sub = topic.createSubscription(subName, - InitialPosition.Earliest, false, null).get(); + InitialPosition.Earliest, false, false, null).get(); } return checkNotNull(sub); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 5b62e3261e64f..63ca3447f1f4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -397,6 +397,11 @@ public Position getNext() { return null; } + @Override + public Position getPrevious() { + return null; + } + @Override public long getLedgerId() { return ledgerId; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5b9c7d39ff12f..33b177be9785f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1083,6 +1083,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final boolean readCompacted = subscribe.hasReadCompacted() && subscribe.isReadCompacted(); final Map metadata = CommandUtils.metadataFromCommand(subscribe); final InitialPosition initialPosition = subscribe.getInitialPosition(); + final boolean readReverse = subscribe.isReadReverse(); final long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec() ? subscribe.getStartMessageRollbackDurationSec() : -1; @@ -1191,6 +1192,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .consumerName(consumerName).isDurable(isDurable) .startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted) .initialPosition(initialPosition) + .readReverse(readReverse) .startMessageRollbackDurationSec(startMessageRollbackDurationSec) .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta) .subscriptionProperties(subscriptionProperties) @@ -1440,7 +1442,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { } createInitSubFuture = topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest, - false, null); + false, false, null); } else { createInitSubFuture = CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index d375c539e550e..c8cac3856e54c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -44,6 +44,7 @@ public class SubscriptionOption { private Map metadata; private boolean readCompacted; private CommandSubscribe.InitialPosition initialPosition; + private boolean readReverse; private long startMessageRollbackDurationSec; private boolean replicatedSubscriptionStateArg; private KeySharedMeta keySharedMeta; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index e6a29368dbb85..25750649d3436 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -168,7 +168,7 @@ CompletableFuture subscribe(TransportCnx cnx, String subscriptionName, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map metadata, boolean readCompacted, - InitialPosition initialPosition, + InitialPosition initialPosition, boolean readReverse, long startMessageRollbackDurationSec, boolean replicateSubscriptionState, KeySharedMeta keySharedMeta); @@ -180,7 +180,7 @@ CompletableFuture subscribe(TransportCnx cnx, String subscriptionName, CompletableFuture subscribe(SubscriptionOption option); CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, - boolean replicateSubscriptionState, Map properties); + boolean readReverse, boolean replicateSubscriptionState, Map properties); CompletableFuture unsubscribe(String subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 3b046570d732e..c1b19a289d126 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -263,7 +263,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map metadata, boolean readCompacted, - InitialPosition initialPosition, + InitialPosition initialPosition, boolean readReverse, long resetStartMessageBackInSec, boolean replicateSubscriptionState, KeySharedMeta keySharedMeta) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, @@ -373,7 +373,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St @Override public CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, - boolean replicateSubscriptionState, Map properties) { + boolean readReverse, boolean replicateSubscriptionState, Map properties) { return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true, properties)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4277edb074c5e..91546b4234b77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -725,7 +725,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(), option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), - option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), + option.getInitialPosition(), option.isReadReverse(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), option.getConsumerEpoch()); } @@ -736,6 +736,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St MessageId startMessageId, Map metadata, boolean readCompacted, InitialPosition initialPosition, + boolean readReverse, long startMessageRollbackDurationSec, boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta, @@ -820,9 +821,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } CompletableFuture subscriptionFuture = isDurable ? // - getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionState, subscriptionProperties) - : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, + getDurableSubscription(subscriptionName, initialPosition, readReverse, + startMessageRollbackDurationSec, replicatedSubscriptionState, subscriptionProperties) + : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, readReverse, startMessageRollbackDurationSec, readCompacted, subscriptionProperties); CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { @@ -901,18 +902,19 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map metadata, boolean readCompacted, - InitialPosition initialPosition, + InitialPosition initialPosition, boolean readReverse, long startMessageRollbackDurationSec, boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, - isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH); + isDurable, startMessageId, metadata, readCompacted, initialPosition, readReverse, + startMessageRollbackDurationSec, replicatedSubscriptionStateArg, keySharedMeta, null, + DEFAULT_CONSUMER_EPOCH); } private CompletableFuture getDurableSubscription(String subscriptionName, - InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated, - Map subscriptionProperties) { + InitialPosition initialPosition, boolean readReverse, long startMessageRollbackDurationSec, boolean replicated, + Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { subscriptionFuture.completeExceptionally(new NotAllowedException( @@ -922,7 +924,8 @@ private CompletableFuture getDurableSubscription(String subscripti Map properties = PersistentSubscription.getBaseCursorProperties(replicated); - ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties, + ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, readReverse, properties, + subscriptionProperties, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { @@ -971,8 +974,8 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { } private CompletableFuture getNonDurableSubscription(String subscriptionName, - MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean isReadCompacted, Map subscriptionProperties) { + MessageId startMessageId, InitialPosition initialPosition, boolean readReverse, + long startMessageRollbackDurationSec, boolean isReadCompacted, Map subscriptionProperties) { log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}", topic, subscriptionName, startMessageId, subscriptionProperties); @@ -1006,7 +1009,7 @@ private CompletableFuture getNonDurableSubscription(Stri Position startPosition = new PositionImpl(ledgerId, entryId); ManagedCursor cursor = null; try { - cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition, + cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition, readReverse, isReadCompacted); } catch (ManagedLedgerException e) { return FutureUtil.failedFuture(e); @@ -1049,9 +1052,9 @@ private void resetSubscriptionCursor(Subscription subscription, CompletableFutur @Override public CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, - boolean replicateSubscriptionState, + boolean readReverse, boolean replicateSubscriptionState, Map subscriptionProperties) { - return getDurableSubscription(subscriptionName, initialPosition, + return getDurableSubscription(subscriptionName, initialPosition, readReverse, 0 /*avoid reseting cursor*/, replicateSubscriptionState, subscriptionProperties); } @@ -1618,7 +1621,8 @@ public CompletableFuture preCreateSubscriptionForCompactionIfNeeded() { return isCompactionEnabled() // If a topic has a compaction policy setup, we must make sure that the compaction cursor // is pre-created, in order to ensure all the data will be seen by the compactor. - ? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null) + ? createSubscription(COMPACTION_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, + false, null) .thenCompose(__ -> CompletableFuture.completedFuture(null)) : CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 335f2cf8eec08..51bd2583133fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -195,7 +195,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos); topic.createSubscription(update.getSubscriptionName(), - InitialPosition.Latest, true /* replicateSubscriptionState */, null); + InitialPosition.Latest, true, /* replicateSubscriptionState */ false, null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java index 0f75538ee88f0..fd11d7a5fea9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java @@ -112,7 +112,7 @@ public synchronized void asyncReadEntries(int numEntriesToRead, long maxReadSize if (!managedLedger.isValidPosition(nextReadPosition)) { nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition); } - boolean hasEntriesToRead = managedLedger.hasMoreEntries(nextReadPosition); + boolean hasEntriesToRead = managedLedger.hasMoreEntries(nextReadPosition, this.cursor.isReadReverse()); currentReadSizeByte.set(0); STATE_UPDATER.set(this, State.Issued); this.maxReadSizeByte = maxReadSizeByte; @@ -120,12 +120,16 @@ public synchronized void asyncReadEntries(int numEntriesToRead, long maxReadSize PendingReadEntryRequest pendingReadEntryRequest = PendingReadEntryRequest.create(ctx, nextReadPosition); // Make sure once we start putting request into pending requests queue, we won't put any following request // to issued requests queue in order to guarantee the order. - if (hasEntriesToRead && managedLedger.hasMoreEntries(nextReadPosition)) { + if (hasEntriesToRead && managedLedger.hasMoreEntries(nextReadPosition, this.cursor.isReadReverse())) { issuedReads.offer(pendingReadEntryRequest); } else { pendingReads.offer(pendingReadEntryRequest); } - nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition); + if (this.cursor.isReadReverse()) { + nextReadPosition = managedLedger.getPreviousPosition(nextReadPosition); + } else { + nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition); + } } // Issue requests. @@ -140,7 +144,7 @@ public synchronized void asyncReadEntries(int numEntriesToRead, long maxReadSize } // If new entries are available after we put request into pending queue, fire read. // Else register callback with managed ledger to get notify when new entries are available. - if (managedLedger.hasMoreEntries(pendingReads.peek().position)) { + if (managedLedger.hasMoreEntries(pendingReads.peek().position, this.cursor.isReadReverse())) { entriesAvailable(); } else if (managedLedger.isTerminated()) { dispatcher.notifyConsumersEndOfTopic(); @@ -307,14 +311,19 @@ private synchronized void internalEntriesAvailable() { if (!managedLedger.isValidPosition(pendingReads.peek().position)) { pendingReads.peek().position = managedLedger.getNextValidPosition(pendingReads.peek().position); } - while (!pendingReads.isEmpty() && managedLedger.hasMoreEntries(pendingReads.peek().position)) { + while (!pendingReads.isEmpty() && managedLedger.hasMoreEntries(pendingReads.peek().position, + this.cursor.isReadReverse())) { PendingReadEntryRequest next = pendingReads.poll(); issuedReads.offer(next); newlyIssuedRequests.add(next); // Need to update the position because when the PendingReadEntryRequest is created, the position could // be all set to managed ledger's last confirmed position. if (!pendingReads.isEmpty()) { - pendingReads.peek().position = managedLedger.getNextValidPosition(next.position); + if (this.cursor.isReadReverse()) { + pendingReads.peek().position = managedLedger.getPreviousPosition(next.position); + } else { + pendingReads.peek().position = managedLedger.getNextValidPosition(next.position); + } } } @@ -327,7 +336,7 @@ private synchronized void internalEntriesAvailable() { log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry." , cursor.getName(), pendingReads.size()); } - if (managedLedger.hasMoreEntries(pendingReads.peek().position)) { + if (managedLedger.hasMoreEntries(pendingReads.peek().position, this.cursor.isReadReverse())) { entriesAvailable(); } else { managedLedger.addWaitingEntryCallBack(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java index ecc6599ce52b5..18699a6a6d091 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java @@ -122,7 +122,8 @@ public CompletableFuture newPendingAckStore(PersistentSubscript public void openLedgerComplete(ManagedLedger ledger, Object ctx) { ledger.asyncOpenCursor( MLPendingAckStore.getTransactionPendingAckStoreCursorName(), - InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { + InitialPosition.Earliest, false, + new AsyncCallbacks.OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { pendingAckStoreFuture.complete(new MLPendingAckStore(ledger, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java index 499262c1e60b9..dd80e5fbc7f3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java @@ -37,11 +37,18 @@ public class MockManagedCursor implements ManagedCursor { private final String name; + private final boolean readReverse; + private final Map cursorProperties; public MockManagedCursor(String name) { + this(name, false); + } + + public MockManagedCursor(String name, boolean readReverse) { this.name = name; this.cursorProperties = new ConcurrentHashMap<>(); + this.readReverse = readReverse; } @Override @@ -409,4 +416,9 @@ public boolean checkAndUpdateReadPositionChanged() { public boolean isClosed() { return false; } + + @Override + public boolean isReadReverse() { + return readReverse; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 0dafe4bca3e2c..d151ec1fe97c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -211,7 +211,7 @@ void setupMLAsyncCallbackMocks() { ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null); return null; }).when(ledgerMock) - .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), + .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), false, any(OpenCursorCallback.class), any()); // call deleteLedgerComplete on ledger asyncDelete diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 45ef58bb7038c..84e174195d678 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1453,13 +1453,13 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null); return null; }).when(ledgerMock) - .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), + .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), false, any(OpenCursorCallback.class), any()); doAnswer(invocationOnMock -> { ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null); return null; - }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), + }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), false, any(Map.class), any(Map.class), any(OpenCursorCallback.class), any()); doAnswer(invocationOnMock -> { @@ -2156,7 +2156,7 @@ public void testGetDurableSubscription() throws Exception { doAnswer((Answer) invocationOnMock -> { ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(mockCursor, null); return null; - }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any()); + }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any(), any()); PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService); CommandSubscribe cmd = new CommandSubscribe() @@ -2167,6 +2167,7 @@ public void testGetDurableSubscription() throws Exception { .setSubscription(successSubName) .setConsumerName("consumer-name") .setReadCompacted(false) + .setReadReverse(false) .setRequestId(1) .setSubType(SubType.Exclusive); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 4580f028de2b0..325392cdc9ea9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -2504,7 +2504,7 @@ private void setupMLAsyncCallbackMocks() { ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null); return null; }).when(ledgerMock) - .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), any(Map.class), + .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), true, any(Map.class), any(Map.class), any(OpenCursorCallback.class), any()); doAnswer((Answer) invocationOnMock -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 93f2a42bcda35..ddb11b8410701 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -128,6 +128,60 @@ public void testSeek() throws Exception { assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); } + @Test + public void testSeekReadReverse() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testSeek"; + + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + + // Disable pre-fetch in consumer to track the messages received + org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName) + .readReverse(true).subscriptionName("my-subscription").receiverQueueSize(0).subscribe(); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + assertNotNull(topicRef); + assertEquals(topicRef.getProducers().size(), 1); + assertEquals(topicRef.getSubscriptions().size(), 1); + + List messageIds = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + MessageId msgId = producer.send(message.getBytes()); + messageIds.add(msgId); + } + + PersistentSubscription sub = topicRef.getSubscription("my-subscription"); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); + + consumer.seek(MessageId.latest); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); + + // Wait for consumer to reconnect + Awaitility.await().until(consumer::isConnected); + consumer.seek(MessageId.earliest); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); + + Awaitility.await().until(consumer::isConnected); + consumer.seek(messageIds.get(5)); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 5); + + MessageIdImpl messageId = (MessageIdImpl) messageIds.get(5); + MessageIdImpl beforeEarliest = new MessageIdImpl( + messageId.getLedgerId() - 1, messageId.getEntryId(), messageId.getPartitionIndex()); + MessageIdImpl afterLatest = new MessageIdImpl( + messageId.getLedgerId() + 1, messageId.getEntryId(), messageId.getPartitionIndex()); + + log.info("MessageId {}: beforeEarliest: {}, afterLatest: {}", messageId, beforeEarliest, afterLatest); + + Awaitility.await().until(consumer::isConnected); + consumer.seek(beforeEarliest); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 0); + + Awaitility.await().until(consumer::isConnected); + consumer.seek(afterLatest); + assertEquals(sub.getNumberOfEntriesInBacklog(false), 10); + } + @Test public void testSeekForBatch() throws Exception { final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index a568db3d9f143..b2e47a79216e9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -758,7 +758,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{ persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true); PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic .createSubscription("test", - CommandSubscribe.InitialPosition.Earliest, false, null).get(); + CommandSubscribe.InitialPosition.Earliest, false, false,null).get(); ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class); doReturn(true).when(managedCursor).hasMoreEntries(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java index 19d6cc85c9ff6..13853d3c95cf2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java @@ -79,8 +79,8 @@ public void beforeMethod() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() .getTopic(topic, false).get().get(); getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(pendingAckLogIndexMinLag); - CompletableFuture subscriptionFuture = persistentTopic .createSubscription("test", - CommandSubscribe.InitialPosition.Earliest, false, null); + CompletableFuture subscriptionFuture = persistentTopic.createSubscription("test", + CommandSubscribe.InitialPosition.Earliest, false, false,null); PersistentSubscription subscription = (PersistentSubscription) subscriptionFuture.get(); ManagedCursor managedCursor = subscription.getCursor(); this.managedCursorMock = spy(managedCursor); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index e78b0ca4a0155..39e3c8eda81f6 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -273,7 +273,7 @@ public interface ConsumerBuilder extends Cloneable { ConsumerBuilder subscriptionMode(SubscriptionMode subscriptionMode); /** - * Set the consumer to read from start message to previous + * Set the consumer to read from start message to previous. * @param readReverse the read reverse boolean * @return the consumer builder instance */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 08a6bb15807c9..d245644852cc1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -169,6 +169,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final Map metadata; private final boolean readCompacted; + private final boolean readReverse; private final boolean resetIncludeHead; private final SubscriptionInitialPosition subscriptionInitialPosition; @@ -273,6 +274,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.parentConsumerHasListener = parentConsumerHasListener; this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel(); this.readCompacted = conf.isReadCompacted(); + this.readReverse = conf.isReadReverse(); this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition(); this.negativeAcksTracker = new NegativeAcksTracker(this, conf); this.resetIncludeHead = conf.isResetIncludeHead(); @@ -821,7 +823,7 @@ public void connectionOpened(final ClientCnx cnx) { synchronized (this) { setClientCnx(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), - priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, + priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, readReverse, conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index f4d55e6f47ddb..915384d5465dc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -79,7 +79,7 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { name = "readReverse", value = "Read reverse" ) - private Boolean readReverse = false; + private boolean readReverse = false; @ApiModelProperty( name = "subscriptionType", diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 081dfe4275b24..4ec7d66a4bd9b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -561,24 +561,24 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu SubType subType, int priorityLevel, String consumerName, long resetStartMessageBackInSeconds) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, - false /* isReplicated */, InitialPosition.Earliest, resetStartMessageBackInSeconds, null, - true /* createTopicIfDoesNotExist */); + /* readReverse */ false, false /* isReplicated */, InitialPosition.Earliest, + resetStartMessageBackInSeconds, null, true /* createTopicIfDoesNotExist */); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, boolean readReverse, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, - isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, - startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, - Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH); + isDurable, startMessageId, metadata, readCompacted, readReverse, isReplicated, + subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, + null, Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, boolean readReverse, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, Map subscriptionProperties, long consumerEpoch) { @@ -590,6 +590,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setConsumerId(consumerId) .setConsumerName(consumerName) .setRequestId(requestId) + .setReadReverse(readReverse) .setPriorityLevel(priorityLevel) .setDurable(isDurable) .setReadCompacted(readCompacted) diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index acf75eab85826..bcc484a01d171 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -395,6 +395,9 @@ message CommandSubscribe { // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch optional uint64 consumer_epoch = 19; + + // The consumer is read from startMessageId to previous messages + optional bool read_reverse = 20 [default = false]; } message CommandPartitionedTopicMetadata { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index f2e1f60663d28..14a68beb7a860 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -127,7 +127,8 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { bufferedWriterMetrics); managedLedger.asyncOpenCursor(TRANSACTION_SUBSCRIPTION_NAME, - CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { + CommandSubscribe.InitialPosition.Earliest, false, + new AsyncCallbacks.OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { MLTransactionLogImpl.this.cursor = cursor;