From 64598b40db3c60ae0d548c5742ed28df2edec28d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 10 Feb 2025 09:44:07 +0800 Subject: [PATCH] [fix] [client] call redeliver 1 msg but did 2 msgs (#23943) (cherry picked from commit 7a79c78f8e6f4b52f13be1c6441f4b007d9a00fe) --- .../BatchMessageWithBatchIndexLevelTest.java | 62 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 31 ++++------ .../GrowableArrayBlockingQueue.java | 8 +++ 3 files changed, 81 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 8e902d5d1e700..52147f74f4a6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -59,6 +59,7 @@ import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -137,6 +138,67 @@ public void testBatchMessageAck() { }); } + @DataProvider + public Object[][] enabledBatchSend() { + return new Object[][] { + {false}, + {true} + }; + } + + @Test(dataProvider = "enabledBatchSend") + @SneakyThrows + public void testBatchMessageNAck(boolean enabledBatchSend) { + final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + final String subscriptionName = "s1"; + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer().topic(topicName) + .subscriptionName(subscriptionName) + .receiverQueueSize(21) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .subscribe(); + Producer producer = pulsarClient.newProducer().topic(topicName) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .enableBatching(enabledBatchSend) + .create(); + final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + final PersistentDispatcherMultipleConsumers dispatcher = + (PersistentDispatcherMultipleConsumers) topic.getSubscription(subscriptionName).getDispatcher(); + + // Send messages: 20 * 2. + for (int i = 0; i < 40; i++) { + byte[] message = ("batch-message-" + i).getBytes(); + if (i == 19 || i == 39) { + producer.newMessage().value(message).send(); + } else { + producer.newMessage().value(message).sendAsync(); + } + } + Awaitility.await().untilAsserted(() -> { + if (enabledBatchSend) { + assertEquals(consumer.numMessagesInQueue(), 40); + } else { + assertEquals(consumer.numMessagesInQueue(), 21); + } + }); + + // Negative ack and verify result/ + Message receive1 = consumer.receive(); + consumer.pause(); + consumer.negativeAcknowledge(receive1); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer.numMessagesInQueue(), 20); + assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20); + }); + + // cleanup. + producer.close(); + consumer.close(); + admin.topics().delete(topicName); + } + @Test public void testBatchMessageMultiNegtiveAck() throws Exception{ final String topicName = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index d5bf2619b7fd4..7186bfd3fb16a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2812,27 +2812,18 @@ private Optional createEncryptionContext(MessageMetadata msgM private int removeExpiredMessagesFromQueue(Set messageIds) { int messagesFromQueue = 0; - Message peek = incomingMessages.peek(); - if (peek != null) { - MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId()); - if (!messageIds.contains(messageId)) { - // first message is not expired, then no message is expired in queue. - return 0; - } - - // try not to remove elements that are added while we remove - Message message = incomingMessages.poll(); - while (message != null) { - decreaseIncomingMessageSize(message); - messagesFromQueue++; - MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId()); - if (!messageIds.contains(id)) { - messageIds.add(id); - break; - } - message.release(); - message = incomingMessages.poll(); + Message message; + while (true) { + message = incomingMessages.pollIf(msg -> { + MessageId idPolled = MessageIdAdvUtils.discardBatch(msg.getMessageId()); + return messageIds.contains(idPolled); + }); + if (message == null) { + break; } + decreaseIncomingMessageSize(message); + messagesFromQueue++; + message.release(); } return messagesFromQueue; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java index 467a455ed8b3d..94bfad1fbd29b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.StampedLock; import java.util.function.Consumer; +import java.util.function.Predicate; import javax.annotation.Nullable; /** @@ -83,10 +84,17 @@ public T remove() { @Override public T poll() { + return pollIf(v -> true); + } + + public T pollIf(Predicate predicate) { headLock.lock(); try { if (SIZE_UPDATER.get(this) > 0) { T item = data[headIndex.value]; + if (!predicate.test(item)) { + return null; + } data[headIndex.value] = null; headIndex.value = (headIndex.value + 1) & (data.length - 1); SIZE_UPDATER.decrementAndGet(this);