Skip to content

Commit

Permalink
[fix] [client] call redeliver 1 msg but did 2 msgs (#23943)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7a79c78)
  • Loading branch information
poorbarcode committed Feb 11, 2025
1 parent 449da0a commit 64598b4
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -137,6 +138,67 @@ public void testBatchMessageAck() {
});
}

@DataProvider
public Object[][] enabledBatchSend() {
return new Object[][] {
{false},
{true}
};
}

@Test(dataProvider = "enabledBatchSend")
@SneakyThrows
public void testBatchMessageNAck(boolean enabledBatchSend) {
final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
final String subscriptionName = "s1";
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName)
.receiverQueueSize(21)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxMessages(20)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.enableBatching(enabledBatchSend)
.create();
final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
final PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) topic.getSubscription(subscriptionName).getDispatcher();

// Send messages: 20 * 2.
for (int i = 0; i < 40; i++) {
byte[] message = ("batch-message-" + i).getBytes();
if (i == 19 || i == 39) {
producer.newMessage().value(message).send();
} else {
producer.newMessage().value(message).sendAsync();
}
}
Awaitility.await().untilAsserted(() -> {
if (enabledBatchSend) {
assertEquals(consumer.numMessagesInQueue(), 40);
} else {
assertEquals(consumer.numMessagesInQueue(), 21);
}
});

// Negative ack and verify result/
Message<byte[]> receive1 = consumer.receive();
consumer.pause();
consumer.negativeAcknowledge(receive1);
Awaitility.await().untilAsserted(() -> {
assertEquals(consumer.numMessagesInQueue(), 20);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
});

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(topicName);
}

@Test
public void testBatchMessageMultiNegtiveAck() throws Exception{
final String topicName = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2812,27 +2812,18 @@ private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgM

private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in queue.
return 0;
}

// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
}
message.release();
message = incomingMessages.poll();
Message<T> message;
while (true) {
message = incomingMessages.pollIf(msg -> {
MessageId idPolled = MessageIdAdvUtils.discardBatch(msg.getMessageId());
return messageIds.contains(idPolled);
});
if (message == null) {
break;
}
decreaseIncomingMessageSize(message);
messagesFromQueue++;
message.release();
}
return messagesFromQueue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -83,10 +84,17 @@ public T remove() {

@Override
public T poll() {
return pollIf(v -> true);
}

public T pollIf(Predicate<T> predicate) {
headLock.lock();
try {
if (SIZE_UPDATER.get(this) > 0) {
T item = data[headIndex.value];
if (!predicate.test(item)) {
return null;
}
data[headIndex.value] = null;
headIndex.value = (headIndex.value + 1) & (data.length - 1);
SIZE_UPDATER.decrementAndGet(this);
Expand Down

0 comments on commit 64598b4

Please sign in to comment.