Skip to content

Commit

Permalink
Implementing delayed message cancellation in pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
Denovo1998 committed Jan 29, 2025
1 parent 0f9f661 commit 7afe161
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty;
import static org.apache.pulsar.common.naming.Constants.DELAY_CANCELED_MESSAGE_POSITION;
import static org.apache.pulsar.common.naming.Constants.IS_MARK_DELETE_DELAY_MESSAGE;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.Gauge;
import java.util.ArrayList;
Expand All @@ -28,6 +30,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -46,10 +49,12 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.compaction.Compactor;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -69,10 +74,13 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
private final LongAdder filterRejectedMsgs = new LongAdder();
private final LongAdder filterRescheduledMsgs = new LongAdder();

protected final ConcurrentLongLongPairHashMap delayedMessageMarkDeleteMap;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription);
this.serviceConfig = serviceConfig;
this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
this.delayedMessageMarkDeleteMap = ConcurrentLongLongPairHashMap.newBuilder().autoShrink(true).build();
}


Expand Down Expand Up @@ -221,6 +229,47 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
entries.set(i, null);
entry.release();
continue;
} else if (delayedMessageMarkDeleteMap.containsKey(entry.getLedgerId(), entry.getEntryId())) {
// The delayed message is marked for delete.
ConcurrentLongLongPairHashMap.LongPair markMessageId = delayedMessageMarkDeleteMap
.get(entry.getLedgerId(), entry.getEntryId());
List<Position> deleteDelayedMessageList = new ArrayList<>();
deleteDelayedMessageList.add(entry.getPosition());
deleteDelayedMessageList.add(PositionFactory.create(markMessageId.first, markMessageId.second));

delayedMessageMarkDeleteMap.remove(entry.getLedgerId(), entry.getEntryId());
individualAcknowledgeMessageIfNeeded(deleteDelayedMessageList, Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}

List<KeyValue> propertiesList = msgMetadata.getPropertiesList();
if (!propertiesList.isEmpty()) {
Map<String, String> propertiesMap = propertiesList.stream()
.filter(p -> p.getKey().equals(DELAY_CANCELED_MESSAGE_POSITION)
|| p.getKey().equals(IS_MARK_DELETE_DELAY_MESSAGE))
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
(oldValue, newValue) -> newValue));

if (propertiesMap.containsKey(IS_MARK_DELETE_DELAY_MESSAGE)) {
if (propertiesMap.containsKey(DELAY_CANCELED_MESSAGE_POSITION)) {
String[] data = propertiesMap.get(DELAY_CANCELED_MESSAGE_POSITION).split(":");
long ledgerId = Long.parseLong(data[0]);
long entryId = Long.parseLong(data[1]);
delayedMessageMarkDeleteMap.put(ledgerId, entryId,
entry.getLedgerId(), entry.getEntryId());
entries.set(i, null);
entry.release();
continue;
} else {
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}
}
}

if (hasFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.common.naming.Constants.DELAY_CANCELED_MESSAGE_POSITION;
import static org.apache.pulsar.common.naming.Constants.IS_MARK_DELETE_DELAY_MESSAGE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -32,6 +34,8 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
Expand All @@ -43,6 +47,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -686,4 +691,65 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
+ maxDeliveryDelayInMillis + " milliseconds");
}
}

@Test
public void testDelayedMessageCancel() throws Exception {
String topic = BrokerTestUtil.newUniqueName("testDelayedMessageCancel");
CountDownLatch latch = new CountDownLatch(9);
Set<String> receivedMessages = ConcurrentHashMap.newKeySet();

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.messageListener((Consumer<String> c, Message<String> msg) -> {
receivedMessages.add(msg.getValue());
c.acknowledgeAsync(msg);
latch.countDown();
})
.subscribe();

final long tickTime = 1000L;

admin.topicPolicies().setDelayedDeliveryPolicy(topic,
DelayedDeliveryPolicies.builder()
.active(true)
.tickTime(tickTime)
.maxDeliveryDelayInMillis(10000)
.build());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

for (int i = 0; i < 10; i++) {
final int n = i;
final long currentTime = System.currentTimeMillis();
final long deliverAtTime = currentTime + 5000L;
producer.newMessage()
.key(String.valueOf(i))
.value("msg-" + i)
.deliverAt(deliverAtTime)
.sendAsync().whenComplete((id, ex) -> {
if (n == 0) {
MessageIdAdv messageIdAdv = (MessageIdAdv) id;
String deleteDelayedMessageId = messageIdAdv.getLedgerId() + ":" + messageIdAdv.getEntryId();
producer.newMessage()
.key(String.valueOf(n))
.value("msg-0-mark")
.deliverAt(deliverAtTime - 2 * tickTime)
.property(IS_MARK_DELETE_DELAY_MESSAGE, "true")
.property(DELAY_CANCELED_MESSAGE_POSITION, deleteDelayedMessageId)
.sendAsync();
}
});
}
producer.flush();

assertTrue(latch.await(15, TimeUnit.SECONDS), "Not all messages were received in time");
assertFalse(receivedMessages.contains("msg-0") || receivedMessages.contains("msg-0-mark"),
"msg-0 and msg-0-mark should have been cancelled but was received");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
public class Constants {

public static final String GLOBAL_CLUSTER = "global";
public static final String DELAY_CANCELED_MESSAGE_POSITION = "delayCanceledMsgPosition";
public static final String IS_MARK_DELETE_DELAY_MESSAGE = "isMarkDeleteDelayMessage";

private Constants() {}
}

0 comments on commit 7afe161

Please sign in to comment.