diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java index 8ae57982..bfc93392 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -193,7 +194,7 @@ private void readMoreEntries() { if (log.isDebugEnabled()) { log.debug("{} Schedule read of {} messages.", name, messagesToRead); } - cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null, null); + cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null, PositionImpl.LATEST); } else { if (log.isDebugEnabled()) { log.debug("{} Not schedule read due to pending read. Messages to read {}.", diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java index 6ff9c4fd..63644795 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java @@ -46,6 +46,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageId; @@ -161,7 +162,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { HAVE_PENDING_READ_UPDATER.set(ExchangeMessageRouter.this, FALSE); log.error("Failed to read entries from exchange {}", exchange.getName(), exception); } - }, null, null); + }, null, PositionImpl.LATEST); } else { log.warn("{} Not schedule read due to pending read. Messages to read {}.", exchange.getName(), availablePermits);