diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 34fd9f17f6ea69..aae54162e5a08a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -195,6 +195,7 @@ public void startProducer() { prepareCreateProducer().thenCompose(ignore -> { ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; builderImpl.getConf().setNonPartitionedTopicExpected(true); + builderImpl.getConf().setReplProducer(true); return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index c39b722888f714..bfb588b54ee608 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -20,6 +20,8 @@ import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID; import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.protocol.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; @@ -40,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; @@ -271,7 +274,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, i boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(), - batchSize, isChunked, System.nanoTime(), isMarker, position); + batchSize, isChunked, System.nanoTime(), isMarker, position, cnx.isClientSupportsDedupReplV2()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -283,7 +286,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc int batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker, position); + isChunked, System.nanoTime(), isMarker, position, cnx.isClientSupportsDedupReplV2()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -378,6 +381,7 @@ private static final class MessagePublishContext implements PublishContext, Runn private int batchSize; private boolean chunked; private boolean isMarker; + private boolean supportsDedupReplV2; private long startTimeNs; @@ -463,6 +467,11 @@ public long getOriginalSequenceId() { return originalSequenceId; } + @Override + public boolean supportsDedupReplV2() { + return supportsDedupReplV2; + } + @Override public void setOriginalHighestSequenceId(long originalHighestSequenceId) { this.originalHighestSequenceId = originalHighestSequenceId; @@ -537,8 +546,11 @@ public void run() { // stats producer.stats.recordMsgIn(batchSize, msgSize); producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); - producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, - ledgerId, entryId); + if (producer.isRemote() && supportsDedupReplV2) { + sendSendReceiptResponseRepl(); + } else { + sendSendReceiptResponseNormal(); + } producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize); if (this.chunked) { producer.stats.recordChunkedMsgIn(); @@ -551,8 +563,37 @@ public void run() { recycle(); } + private void sendSendReceiptResponseRepl() { + String replSequenceLIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_LID)); + String replSequenceEIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_EID)); + if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) { + log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages" + + " props were are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {}," + + " prop-{}: {}, prop-{}: {}", + producer.topic.getName(), producer.producerName, + supportsDedupReplV2(), getSequenceId(), + MSG_PROP_REPL_SEQUENCE_LID, replSequenceLIdStr, + MSG_PROP_REPL_SEQUENCE_EID, replSequenceEIdStr); + producer.cnx.getCommandSender().sendSendError(producer.producerId, + Math.max(highestSequenceId, sequenceId), + ServerError.PersistenceError, "Message can not determine whether the message is" + + " duplicated due to the acquired messages props were are invalid"); + return; + } + Long replSequenceLId = Long.valueOf(replSequenceLIdStr); + Long replSequenceEId = Long.valueOf(replSequenceEIdStr); + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId, + replSequenceEId, ledgerId, entryId); + } + + private void sendSendReceiptResponseNormal() { + producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, + ledgerId, entryId); + } + static MessagePublishContext get(Producer producer, long sequenceId, int msgSize, int batchSize, - boolean chunked, long startTimeNs, boolean isMarker, Position position) { + boolean chunked, long startTimeNs, boolean isMarker, Position position, + boolean supportsDedupReplV2) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; @@ -563,6 +604,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; + callback.supportsDedupReplV2 = supportsDedupReplV2; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { @@ -572,7 +614,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, int msgSize, - int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position, + boolean supportsDedupReplV2) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; @@ -584,6 +627,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; + callback.supportsDedupReplV2 = supportsDedupReplV2; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { @@ -801,7 +845,8 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon } MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null, + cnx.isClientSupportsDedupReplV2()); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f9e593345d85fd..7cd0b3440c9f2c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -65,6 +65,7 @@ import javax.net.ssl.SSLSession; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -237,6 +238,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private boolean encryptionRequireOnProducer; + @Getter private FeatureFlags features; private PulsarCommandSender commandSender; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index ec7889af6bbbea..abe30abe8fdec9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -126,6 +126,10 @@ default long getEntryTimestamp() { default void setEntryTimestamp(long entryTimestamp) { } + + default boolean supportsDedupReplV2() { + return false; + } } CompletableFuture initialize(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java index eb2b318b7ead17..0748d14e71ece9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.common.api.proto.FeatureFlags; public interface TransportCnx { @@ -103,4 +104,10 @@ public interface TransportCnx { * previously called {@link #incrementThrottleCount()}. */ void decrementThrottleCount(); + + FeatureFlags getFeatures(); + + default boolean isClientSupportsDedupReplV2() { + return getFeatures() != null && getFeatures().hasSupportsDedupReplV2() && getFeatures().isSupportsDedupReplV2(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index cd5b2ba721215e..fca79551ac3a39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID; import io.netty.buffer.ByteBuf; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -194,11 +196,22 @@ protected boolean replicateEntries(List entries) { msg.setSchemaInfoForReplicator(schemaFuture.get()); msg.getMessageBuilder().clearTxnidMostBits(); msg.getMessageBuilder().clearTxnidLeastBits(); + // Why not use a generated sequence ID that initialized with "-1" when the replicator is starting? + // Because that we should persist the props to the value of the current value of sequence id for + // each acknowledge after publishing for guarantees the sequence id can be recovered after a cursor + // reset that will happen when getting new schema or publish fails, which cost much more. + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_LID) + .setValue(Long.valueOf(entry.getLedgerId()).toString()); + msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_EID) + .setValue(Long.valueOf(entry.getEntryId()).toString()); msgOut.recordEvent(headersAndPayload.readableBytes()); stats.incrementMsgOutCounter(); stats.incrementBytesOutCounter(headersAndPayload.readableBytes()); // Increment pending messages for messages produced locally PENDING_MESSAGES_UPDATER.incrementAndGet(this); + if (log.isDebugEnabled()) { + log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId()); + } producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); atLeastOneMessageSentForReplication = true; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index dfb8b9d2edb12b..1301359f9833ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID; +import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.util.Iterator; @@ -38,8 +40,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic.PublishContext; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; @@ -321,27 +325,145 @@ public boolean isEnabled() { * @return true if the message should be published or false if it was recognized as a duplicate */ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) { + setContextPropsIfRepl(publishContext, headersAndPayload); + if (!isEnabled() || publishContext.isMarkerMessage()) { return MessageDupStatus.NotDup; } + if (publishContext.getProducerName().startsWith(replicatorPrefix)) { + if (!publishContext.supportsDedupReplV2()){ + return isDuplicateReplV1(publishContext, headersAndPayload); + } else { + return isDuplicateReplV2(publishContext, headersAndPayload); + } + } + return isDuplicateNormal(publishContext, headersAndPayload, false); + } + + public MessageDupStatus isDuplicateReplV1(PublishContext publishContext, ByteBuf headersAndPayload) { + // Message is coming from replication, we need to use the original producer name and sequence id + // for the purpose of deduplication and not rely on the "replicator" name. + int readerIndex = headersAndPayload.readerIndex(); + MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.readerIndex(readerIndex); + + String producerName = md.getProducerName(); + long sequenceId = md.getSequenceId(); + long highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); + publishContext.setOriginalProducerName(producerName); + publishContext.setOriginalSequenceId(sequenceId); + publishContext.setOriginalHighestSequenceId(highestSequenceId); + return isDuplicateNormal(publishContext, headersAndPayload, true); + } + + private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf headersAndPayload) { + if (publishContext.getProducerName().startsWith(replicatorPrefix)) { + // Message is coming from replication, we need to use the replication's producer name, ledger id and entry id + // for the purpose of deduplication. + int readerIndex = headersAndPayload.readerIndex(); + MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.readerIndex(readerIndex); + + Long replSequenceLId = null; + Long replSequenceEId = null; + List kvPairList = md.getPropertiesList(); + for (KeyValue kvPair : kvPairList) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_LID)) { + if (StringUtils.isNumeric(kvPair.getValue())) { + replSequenceLId = Long.valueOf(kvPair.getValue()); + publishContext.setProperty(MSG_PROP_REPL_SEQUENCE_LID, replSequenceLId); + } else { + break; + } + } + if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_EID)) { + if (StringUtils.isNumeric(kvPair.getValue())) { + replSequenceEId = Long.valueOf(kvPair.getValue()); + publishContext.setProperty(MSG_PROP_REPL_SEQUENCE_EID, replSequenceEId); + } else { + break; + } + } + if (replSequenceLId != null && replSequenceEId != null) { + break; + } + } + } + } + + public MessageDupStatus isDuplicateReplV2(PublishContext publishContext, ByteBuf headersAndPayload) { + Long replSequenceLId = (Long) publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_LID); + Long replSequenceEId = (Long) publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_EID); + if (replSequenceLId == null || replSequenceEId == null) { + log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages" + + " props were are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {}," + + " prop-{}: {}, prop-{}: {}", + topic.getName(), publishContext.getProducerName(), + publishContext.supportsDedupReplV2(), publishContext.getSequenceId(), + MSG_PROP_REPL_SEQUENCE_LID, replSequenceLId, + MSG_PROP_REPL_SEQUENCE_EID, replSequenceEId); + return MessageDupStatus.Unknown; + } + + String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; + String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + synchronized (highestSequencedPushed) { + Long lastSequenceLIdPushed = highestSequencedPushed.get(lastSequenceLIdKey); + Long lastSequenceEIdPushed = highestSequencedPushed.get(lastSequenceEIdKey); + if (lastSequenceLIdPushed != null && lastSequenceEIdPushed != null && + (replSequenceLId.compareTo(lastSequenceLIdPushed) < 0 + || (replSequenceLId.compareTo(lastSequenceLIdPushed) == 0 + && replSequenceEId.compareTo(lastSequenceEIdPushed) <= 0))) { + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as duplicated producer={}. publishing {}:{}, latest publishing" + + " in-progress {}:{}", + topic.getName(), publishContext.getProducerName(), lastSequenceLIdPushed, + lastSequenceEIdPushed, lastSequenceLIdPushed, lastSequenceEIdPushed); + } + + // Also need to check sequence ids that has been persisted. + // If current message's seq id is smaller or equals to the + // "lastSequenceLIdPersisted:lastSequenceEIdPersisted" than its definitely a dup + // If current message's seq id is between "lastSequenceLIdPushed:lastSequenceEIdPushed" and + // "lastSequenceLIdPersisted:lastSequenceEIdPersisted", then we cannot be sure whether the message + // is a dup or not we should return an error to the producer for the latter case so that it can retry + // at a future time + Long lastSequenceLIdPersisted = highestSequencedPersisted.get(lastSequenceLIdKey); + Long lastSequenceEIdPersisted = highestSequencedPersisted.get(lastSequenceEIdKey); + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as duplicated producer={}. publishing {}:{}, latest" + + " persisted {}:{}", + topic.getName(), publishContext.getProducerName(), replSequenceLId, + replSequenceEId, lastSequenceLIdPersisted, lastSequenceEIdPersisted); + } + if (lastSequenceLIdPersisted != null && lastSequenceEIdPersisted != null && + (replSequenceLId.compareTo(lastSequenceLIdPersisted) < 0 + || (replSequenceLId.compareTo(lastSequenceLIdPersisted) == 0 + && replSequenceEId.compareTo(lastSequenceEIdPersisted) <= 0))) { + return MessageDupStatus.Dup; + } else { + return MessageDupStatus.Unknown; + } + } + highestSequencedPushed.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPushed.put(lastSequenceEIdKey, replSequenceEId); + } + if (log.isDebugEnabled()) { + log.debug("[{}] Message identified as non-duplicated producer={}. publishing {}:{}", + topic.getName(), publishContext.getProducerName(), replSequenceLId, replSequenceEId); + } + return MessageDupStatus.NotDup; + } + public MessageDupStatus isDuplicateNormal(PublishContext publishContext, ByteBuf headersAndPayload, + boolean useOriginalProducerName) { String producerName = publishContext.getProducerName(); + if (useOriginalProducerName) { + producerName = publishContext.getOriginalProducerName(); + } long sequenceId = publishContext.getSequenceId(); long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId); MessageMetadata md = null; - if (producerName.startsWith(replicatorPrefix)) { - // Message is coming from replication, we need to use the original producer name and sequence id - // for the purpose of deduplication and not rely on the "replicator" name. - int readerIndex = headersAndPayload.readerIndex(); - md = Commands.parseMessageMetadata(headersAndPayload); - producerName = md.getProducerName(); - sequenceId = md.getSequenceId(); - highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); - publishContext.setOriginalProducerName(producerName); - publishContext.setOriginalSequenceId(sequenceId); - publishContext.setOriginalHighestSequenceId(highestSequenceId); - headersAndPayload.readerIndex(readerIndex); - } long chunkID = -1; long totalChunk = -1; if (publishContext.isChunked()) { @@ -399,7 +521,37 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit if (!isEnabled() || publishContext.isMarkerMessage()) { return; } + if (publishContext.getProducerName().startsWith(replicatorPrefix) && publishContext.supportsDedupReplV2()) { + recordMessagePersistedRepl(publishContext, position); + } else { + recordMessagePersistedNormal(publishContext, position); + } + } + public void recordMessagePersistedRepl(PublishContext publishContext, Position position) { + String replSequenceLIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_LID)); + String replSequenceEIdStr = String.valueOf(publishContext.getProperty(MSG_PROP_REPL_SEQUENCE_EID)); + if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) { + log.error("[{}] Can not persist highest sequence-id due to the acquired messages" + + " props are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {}," + + " prop-{}: {}, prop-{}: {}", + topic.getName(), publishContext.getProducerName(), + publishContext.supportsDedupReplV2(), publishContext.getSequenceId(), + MSG_PROP_REPL_SEQUENCE_LID, replSequenceLIdStr, + MSG_PROP_REPL_SEQUENCE_EID, replSequenceEIdStr); + recordMessagePersistedNormal(publishContext, position); + return; + } + Long replSequenceLId = Long.valueOf(replSequenceLIdStr); + Long replSequenceEId = Long.valueOf(replSequenceEIdStr); + String lastSequenceLIdKey = publishContext.getProducerName() + "_LID"; + String lastSequenceEIdKey = publishContext.getProducerName() + "_EID"; + highestSequencedPersisted.put(lastSequenceLIdKey, replSequenceLId); + highestSequencedPersisted.put(lastSequenceEIdKey, replSequenceEId); + increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); + } + + public void recordMessagePersistedNormal(PublishContext publishContext, Position position) { String producerName = publishContext.getProducerName(); long sequenceId = publishContext.getSequenceId(); long highestSequenceId = publishContext.getHighestSequenceId(); @@ -413,9 +565,18 @@ public void recordMessagePersisted(PublishContext publishContext, Position posit if (isLastChunk == null || isLastChunk) { highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId)); } + increaseSnapshotCounterAndTakeSnapshotIfNeeded(position); + } + + private void increaseSnapshotCounterAndTakeSnapshotIfNeeded(Position position) { if (++snapshotCounter >= snapshotInterval) { snapshotCounter = 0; takeSnapshot(position); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Waiting for sequence-id snapshot {}/{}", topic.getName(), snapshotCounter, + snapshotInterval); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java new file mode 100644 index 00000000000000..ce66e9f69dd569 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.MessageDeduplication; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.protocol.ByteBufPair; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class OneWayReplicatorDeduplicationTest extends OneWayReplicatorTestBase { + + static final ObjectMapper JACKSON = new ObjectMapper(); + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + waitInternalClientCreated(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @Override + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, + LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { + super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk); + // For check whether deduplication snapshot has done. + config.setBrokerDeduplicationEntriesInterval(10); + config.setReplicationStartAt("earliest"); + // To cover more cases, write more than one ledger. + config.setManagedLedgerMaxEntriesPerLedger(100); + config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + config.setManagedLedgerMaxLedgerRolloverTimeMinutes(1); + } + + protected void waitReplicatorStopped(String topicName) { + Awaitility.await().untilAsserted(() -> { + Optional topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional2.isPresent()); + PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic2.getProducers().isEmpty()); + Optional topicOptional1 = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topicOptional1.isPresent()); + PersistentTopic persistentTopic1 = (PersistentTopic) topicOptional2.get(); + assertTrue(persistentTopic1.getReplicators().isEmpty() + || !persistentTopic1.getReplicators().get(cluster2).isConnected()); + }); + } + + protected void waitInternalClientCreated() throws Exception { + // Wait for the internal client created. + final String topicNameTriggerInternalClientCreate = + BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate); + waitReplicatorStarted(topicNameTriggerInternalClientCreate); + cleanupTopics(() -> { + admin1.topics().delete(topicNameTriggerInternalClientCreate); + admin2.topics().delete(topicNameTriggerInternalClientCreate); + }); + } + + protected Runnable injectReplicatorClientCnx( + InjectedClientCnxClientBuilder.ClientCnxFactory clientCnxFactory) throws Exception { + String cluster2 = pulsar2.getConfig().getClusterName(); + BrokerService brokerService = pulsar1.getBrokerService(); + ClientBuilderImpl clientBuilder2 = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(url2.toString()); + + // Inject spy client. + final var replicationClients = brokerService.getReplicationClients(); + PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); + PulsarClientImpl injectedClient = InjectedClientCnxClientBuilder.create(clientBuilder2, clientCnxFactory); + assertTrue(replicationClients.remove(cluster2, internalClient)); + assertNull(replicationClients.putIfAbsent(cluster2, injectedClient)); + + // Return a cleanup injection task; + return () -> { + assertTrue(replicationClients.remove(cluster2, injectedClient)); + assertNull(replicationClients.putIfAbsent(cluster2, internalClient)); + injectedClient.closeAsync(); + }; + } + + @DataProvider(name = "deduplicationArgs") + public Object[][] deduplicationArgs() { + return new Object[][] { + {true/* inject repeated publishing*/, 1/* repeated messages window */, + true /* supportsDedupReplV2 */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 2/* repeated messages window */, + true /* supportsDedupReplV2 */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + true /* supportsDedupReplV2 */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 4/* repeated messages window */, + true /* supportsDedupReplV2 */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 5/* repeated messages window */, + true /* supportsDedupReplV2 */, false/* multi schemas */}, + {true/* inject repeated publishing*/, 10/* repeated messages window */, + true /* supportsDedupReplV2 */, false/* multi schemas */}, + // ===== multi schema + {true/* inject repeated publishing*/, 1/* repeated messages window */, + true /* supportsDedupReplV2 */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 2/* repeated messages window */, + true /* supportsDedupReplV2 */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + true /* supportsDedupReplV2 */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 4/* repeated messages window */, + true /* supportsDedupReplV2 */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 5/* repeated messages window */, + true /* supportsDedupReplV2 */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 10/* repeated messages window */, + true /* supportsDedupReplV2 */, true/* multi schemas */}, + // ===== Compatability "source-cluster: old, target-cluster: new". + {false/* inject repeated publishing*/, 0/* repeated messages window */, + false /* supportsDedupReplV2 */, false/* multi schemas */}, + {false/* inject repeated publishing*/, 0/* repeated messages window */, + false /* supportsDedupReplV2 */, true/* multi schemas */}, + {true/* inject repeated publishing*/, 3/* repeated messages window */, + false /* supportsDedupReplV2 */, true/* multi schemas */}, + }; + } + + // TODO add more tests + // - The old deduplication does not work when multi source producers use a same sequence-id. + // - Add more issue explain in the PR. + // - Review the code to confirm that multi source-brokers can work when the source topic switch. + // - Try to reproduce the issue mentioned in the PR. + + @Test(timeOut = 360 * 1000, dataProvider = "deduplicationArgs") + public void testDeduplication(final boolean injectRepeatedPublish, final int repeatedMessagesWindow, + final boolean supportsDedupReplV2, boolean multiSchemas) throws Exception { + // 0. Inject a mechanism that duplicate all Send-Command for the replicator. + final List duplicatedMsgs = new ArrayList<>(); + Runnable taskToClearInjection = injectReplicatorClientCnx( + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + + @Override + protected ByteBuf newConnectCommand() throws Exception { + if (supportsDedupReplV2) { + return super.newConnectCommand(); + } + authenticationDataProvider = authentication.getAuthData(remoteHostName); + AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); + BaseCommand cmd = Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(), authData, + this.protocolVersion, clientVersion, proxyToTargetBrokerAddress, null, null, null, null); + cmd.getConnect().getFeatureFlags().setSupportsDedupReplV2(false); + return Commands.serializeWithSize(cmd); + } + + @Override + public boolean isBrokerSupportsDedupReplV2() { + return supportsDedupReplV2; + } + + @Override + public ChannelHandlerContext ctx() { + if (!injectRepeatedPublish) { + return super.ctx(); + } + final ChannelHandlerContext originalCtx = super.ctx; + ChannelHandlerContext spyContext = spy(originalCtx); + doAnswer(invocation -> { + // Do not repeat the messages re-sending, and clear the previous cached messages when + // calling re-sending, to avoid publishing outs of order. + for (StackTraceElement stackTraceElement : Thread.currentThread().getStackTrace()) { + if (stackTraceElement.toString().contains("recoverProcessOpSendMsgFrom") + || stackTraceElement.toString().contains("resendMessages")) { + duplicatedMsgs.clear(); + return invocation.callRealMethod(); + } + } + + Object data = invocation.getArguments()[0]; + if (true && !(data instanceof ByteBufPair)) { + return invocation.callRealMethod(); + } + // Repeatedly send every message. + ByteBufPair byteBufPair = (ByteBufPair) data; + ByteBuf buf1 = byteBufPair.getFirst(); + ByteBuf buf2 = byteBufPair.getSecond(); + int bufferIndex1 = buf1.readerIndex(); + int bufferIndex2 = buf2.readerIndex(); + // Skip totalSize. + buf1.readInt(); + int cmdSize = buf1.readInt(); + BaseCommand cmd = new BaseCommand(); + cmd.parseFrom(buf1, cmdSize); + buf1.readerIndex(bufferIndex1); + if (cmd.getType().equals(BaseCommand.Type.SEND)) { + synchronized (duplicatedMsgs) { + if (duplicatedMsgs.size() >= repeatedMessagesWindow) { + for (ByteBufPair bufferPair : duplicatedMsgs) { + originalCtx.channel().write(bufferPair, originalCtx.voidPromise()); + originalCtx.channel().flush(); + } + duplicatedMsgs.clear(); + } + } + ByteBuf newBuffer1 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf1.readableBytes()); + buf1.readBytes(newBuffer1); + buf1.readerIndex(bufferIndex1); + ByteBuf newBuffer2 = UnpooledByteBufAllocator.DEFAULT.heapBuffer( + buf2.readableBytes()); + buf2.readBytes(newBuffer2); + buf2.readerIndex(bufferIndex2); + synchronized (duplicatedMsgs) { + if (newBuffer2.readableBytes() > 0) { + duplicatedMsgs.add(ByteBufPair.get(newBuffer1, newBuffer2)); + } + } + return invocation.callRealMethod(); + } else { + return invocation.callRealMethod(); + } + }).when(spyContext).write(any(), any(ChannelPromise.class)); + return spyContext; + } + }); + + // 1. Create topics and enable deduplication. + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + // TODO fix the bug: the policy "admin1.topicPolicies().setDeduplicationSnapshotInterval(topicName, 10)" + // does not work. + PersistentTopic persistentTopic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin1.topicPolicies().setDeduplicationStatus(topicName, true); + admin1.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + MessageDeduplication messageDeduplication1 = persistentTopic1.getMessageDeduplication(); + if (messageDeduplication1 != null) { + int snapshotInterval1 = WhiteboxImpl.getInternalState(messageDeduplication1, "snapshotInterval"); + assertEquals(snapshotInterval1, 10); + } + MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication(); + if (messageDeduplication2 != null) { + int snapshotInterval2 = WhiteboxImpl.getInternalState(messageDeduplication2, "snapshotInterval"); + assertEquals(snapshotInterval2, 10); + } + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic1.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + // TODO fix the bug: after schema check failed, the replication will get a broken package error. + }); + PersistentTopic tp1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentTopic tp2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + + // 2, Publish messages. + List msgSent = new ArrayList<>(); + Producer p1 = client1.newProducer(Schema.INT32).topic(topicName).create(); + Producer p2 = client1.newProducer(Schema.INT32).topic(topicName).create(); + Producer p3 = client1.newProducer(Schema.STRING).topic(topicName).create(); + Producer p4 = client1.newProducer(Schema.BOOL).topic(topicName).create(); + for (int i = 0; i < 10; i++) { + p1.send(i); + msgSent.add(String.valueOf(i)); + } + for (int i = 10; i < 200; i++) { + int msg1 = i; + int msg2 = 1000 + i; + String msg3 = (2000 + i) + ""; + boolean msg4 = i % 2 == 0; + p1.send(msg1); + p2.send(msg2); + msgSent.add(String.valueOf(msg1)); + msgSent.add(String.valueOf(msg2)); + if (multiSchemas) { + p3.send(msg3); + p4.send(msg4); + msgSent.add(String.valueOf(msg3)); + msgSent.add(String.valueOf(msg4)); + } + } + p1.close(); + p2.close(); + p3.close(); + p4.close(); + + // 3. Enable replication and wait the task to be finished. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + waitReplicatorStarted(topicName); + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + long replBacklog = cursor.getNumberOfEntriesInBacklog(true); + log.info("repl backlog: {}", replBacklog); + assertEquals(replBacklog, 0); + } + } + }); + + // Verify: all messages were copied correctly. + List msgReceived = new ArrayList<>(); + Consumer consumer = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived.add(String.valueOf(msg.getValue())); + consumer.acknowledgeAsync(msg); + } + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getInternalStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getInternalStats(topicName))); + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getStats(topicName))); + assertEquals(msgReceived, msgSent); + consumer.close(); + + // Verify: the deduplication cursor has been acked. + // "topic-policy.DeduplicationSnapshotInterval" is "10". + Awaitility.await().untilAsserted(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.dedup")) { + assertTrue(cursor.getNumberOfEntriesInBacklog(true) < 10); + } + } + for (ManagedCursor cursor : tp2.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.dedup")) { + assertTrue(cursor.getNumberOfEntriesInBacklog(true) < 10); + } + } + }); + // Remove the injection. + taskToClearInjection.run(); + + log.info("====== Verify: all messages will be replicated after reopening replication ======"); + + // Verify: all messages will be replicated after reopening replication. + // Reopen replication: stop replication. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + admin2.topics().unload(topicName); + admin2.topics().delete(topicName); + // Reopen replication: enable replication. + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + admin2.topicPolicies().setDeduplicationStatus(topicName, true); + admin2.topicPolicies().setSchemaCompatibilityStrategy(topicName, + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication(); + if (messageDeduplication2 != null) { + int snapshotInterval2 = WhiteboxImpl.getInternalState(messageDeduplication2, "snapshotInterval"); + assertEquals(snapshotInterval2, 10); + } + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getDeduplicationEnabled().get(), Boolean.TRUE); + assertEquals(persistentTopic2.getHierarchyTopicPolicies().getSchemaCompatibilityStrategy().get(), + SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + }); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + for (ManagedCursor cursor : tp2.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.c2")) { + assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0); + } + } + }); + // Reopen replication: consumption. + List msgReceived2 = new ArrayList<>(); + Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName("s1").subscribe(); + while (true) { + Message msg = consumer2.receive(10, TimeUnit.SECONDS); + if (msg == null) { + break; + } + MessageIdAdv messageIdAdv = (MessageIdAdv) msg.getMessageId(); + log.info("received msg. source {}, target {}:{}", StringUtils.join(msg.getProperties().values(), ":"), + messageIdAdv.getLedgerId(), messageIdAdv.getEntryId()); + msgReceived2.add(String.valueOf(msg.getValue())); + consumer2.acknowledgeAsync(msg); + } + // Verify: all messages were copied correctly. + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getInternalStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getInternalStats(topicName))); + log.info("c1 topic stats-internal: " + + JACKSON.writeValueAsString(admin1.topics().getStats(topicName))); + log.info("c2 topic stats-internal: " + + JACKSON.writeValueAsString(admin2.topics().getStats(topicName))); + assertEquals(msgReceived2, msgSent); + consumer2.close(); + + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(topicName); + Awaitility.await().until(() -> { + for (ManagedCursor cursor : tp1.getManagedLedger().getCursors()) { + if (cursor.getName().equals("pulsar.repl.r2")) { + return false; + } + } + return true; + }); + admin1.topics().unload(topicName); // TODO fix the bug: topic can not be deleted successfully without an unload. + admin1.topics().delete(topicName); + admin2.topics().unload(topicName); + admin2.topics().delete(topicName); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 35c41455e89876..1ee815c82c3444 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -193,6 +193,8 @@ public class ClientCnx extends PulsarHandler { private boolean supportsTopicWatchers; @Getter private boolean supportsGetPartitionedMetadataWithoutAutoCreation; + @Getter + private boolean brokerSupportsDedupReplV2; /** Idle stat. **/ @Getter @@ -201,7 +203,7 @@ public class ClientCnx extends PulsarHandler { @Getter private long lastDisconnectedTimestamp; - private final String clientVersion; + protected final String clientVersion; protected enum State { None, SentConnectFrame, Ready, Failed, Connecting @@ -405,6 +407,8 @@ protected void handleConnected(CommandConnected connected) { supportsGetPartitionedMetadataWithoutAutoCreation = connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsGetPartitionedMetadataWithoutAutoCreation(); + brokerSupportsDedupReplV2 = + connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsDedupReplV2(); // set remote protocol version to the correct version before we complete the connection future setRemoteEndpointProtocolVersion(connected.getProtocolVersion()); @@ -474,18 +478,18 @@ protected void handleSendReceipt(CommandSendReceipt sendReceipt) { ledgerId = sendReceipt.getMessageId().getLedgerId(); entryId = sendReceipt.getMessageId().getEntryId(); } - + ProducerImpl producer = producers.get(producerId); if (ledgerId == -1 && entryId == -1) { - log.warn("{} Message with sequence-id {} published by producer {} has been dropped", ctx.channel(), - sequenceId, producerId); - } - - if (log.isDebugEnabled()) { - log.debug("{} Got receipt for producer: {} -- msg: {} -- id: {}:{}", ctx.channel(), producerId, sequenceId, - ledgerId, entryId); + log.warn("{} Message with sequence-id {}-{} published by producer [id:{}, name:{}] has been dropped", + ctx.channel(), sequenceId, highestSequenceId, producerId, producer.getProducerName()); + } else { + if (log.isDebugEnabled()) { + log.debug("{} Got receipt for producer: [id:{}, name:{}] -- sequence-id: {}-{} -- entry-id: {}:{}", + ctx.channel(), producerId, producer.getProducerName(), sequenceId, highestSequenceId, + ledgerId, entryId); + } } - ProducerImpl producer = producers.get(producerId); if (producer != null) { producer.ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId); } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java new file mode 100644 index 00000000000000..b19845736fedb9 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.util.ReferenceCountUtil; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.api.proto.KeyValue; + +@Slf4j +public class GeoReplicationProducerImpl extends ProducerImpl{ + + public static String MSG_PROP_REPL_SEQUENCE_LID = "__MSG_PROP_REPL_SEQUENCE_LID"; + public static String MSG_PROP_REPL_SEQUENCE_EID = "__MSG_PROP_REPL_SEQUENCE_EID"; + + public GeoReplicationProducerImpl(PulsarClientImpl client, String topic, + ProducerConfigurationData conf, + CompletableFuture producerCreatedFuture, int partitionIndex, + Schema schema, ProducerInterceptors interceptors, + Optional overrideProducerName) { + super(client, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors, overrideProducerName); + } + + @Override + protected void ackReceived(ClientCnx cnx, long lIdSent, long eIdSent, long ledgerId, long entryId) { + if (!cnx.isBrokerSupportsDedupReplV2()) { + super.ackReceived(cnx, lIdSent, eIdSent, ledgerId, entryId); + return; + } + + OpSendMsg op = null; + synchronized (this) { + op = pendingMessages.peek(); + if (op == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Got ack for timed out msg {}:{}", topic, producerName, lIdSent, eIdSent); + } + return; + } + Long lIdPendingRes = null; + Long eIdPendingRes = null; + List kvPairList = op.msg.getMessageBuilder().getPropertiesList(); + for (KeyValue kvPair : kvPairList) { + if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_LID)) { + if (StringUtils.isNumeric(kvPair.getValue())) { + lIdPendingRes = Long.valueOf(kvPair.getValue()); + } else { + break; + } + } + if (kvPair.getKey().equals(MSG_PROP_REPL_SEQUENCE_EID)) { + if (StringUtils.isNumeric(kvPair.getValue())) { + eIdPendingRes = Long.valueOf(kvPair.getValue()); + } else { + break; + } + } + if (lIdPendingRes != null && eIdPendingRes != null) { + break; + } + } + if (lIdPendingRes == null || eIdPendingRes == null) { + // Rollback to the original implementation. + log.error("[{}] [{}] can not found v2 sequence-id {}:{}, ackReceived: {}:{} {}:{} - queue-size: {}", + topic, producerName, lIdPendingRes, eIdPendingRes, lIdSent, + eIdSent, ledgerId, entryId, pendingMessages.messagesCount()); + cnx.channel().close(); + return; + } + + if (lIdSent == lIdPendingRes && eIdSent == eIdPendingRes) { + // Q: After a reconnect, maybe we have lost the response of Send-Receipt, then how can we remove + // pending messages from the queue? + // A: if both @param-ledgerId and @param-entry-id are "-1", it means the message has been sent + // successfully. + // PS: broker will respond "-1" only when it confirms the message has been persisted, broker will + // respond a "MessageDeduplication.MessageDupUnknownException" if the message is sending + // in-progress. + // Notice: if send messages outs of oder, may lost messages. + // Conclusion: So whether @param-ledgerId and @param-entry-id are "-1" or not, we can remove pending + // message. + if (log.isInfoEnabled()) { + log.info("Got receipt for producer: [{}] -- source-message: {}:{} -- target-msg: {}:{}", + getProducerName(), lIdSent, eIdSent, ledgerId, entryId); + } + pendingMessages.remove(); + releaseSemaphoreForSendOp(op); + // TODO LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp))); + op.setMessageId(ledgerId, entryId, partitionIndex); + try { + // Need to protect ourselves from any exception being thrown in the future handler from the + // application + op.sendComplete(null); + } catch (Throwable t) { + log.warn("[{}] [{}] Got exception while completing the callback for -- source-message: {}:{} --" + + " target-msg: {}:{}", + topic, producerName, lIdSent, eIdSent, ledgerId, entryId, t); + } + ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); + } else if (lIdSent < lIdPendingRes || (lIdSent == lIdPendingRes && eIdSent < eIdPendingRes)) { + // Ignoring the ack since it's referring to a message that has already timed out. + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Got ack for timed out msg. expecting less or equals {}:{}, but got: {}:{}", + topic, producerName, lIdPendingRes, eIdPendingRes, lIdSent, + eIdSent); + } + } else { + log.warn("[{}] [{}] Got ack for msg. expecting less or equals {}:{}, but got: {}:{} - queue-size: {}", + topic, producerName, lIdPendingRes, eIdPendingRes, lIdSent, + eIdSent, pendingMessages.messagesCount()); + // Force connection closing so that messages can be re-transmitted in a new connection + cnx.channel().close(); + } + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b686252b58ade0..9b1e127773171e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -112,7 +112,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne // Variable is updated in a synchronized block private volatile long msgIdGenerator; - private final OpSendMsgQueue pendingMessages; + protected final OpSendMsgQueue pendingMessages; private final Optional semaphore; private volatile Timeout sendTimeout = null; private final long lookupDeadline; @@ -129,12 +129,12 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private LastSendFutureWrapper lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture); // Globally unique producer name - private String producerName; + protected String producerName; private final boolean userProvidedProducerName; private String connectionId; private String connectedSince; - private final int partitionIndex; + protected final int partitionIndex; private final ProducerStatsRecorder stats; @@ -1212,7 +1212,7 @@ public void terminated(ClientCnx cnx) { } } - void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long ledgerId, long entryId) { + protected void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long ledgerId, long entryId) { OpSendMsg op = null; synchronized (this) { op = pendingMessages.peek(); @@ -1291,7 +1291,7 @@ private long getHighestSequenceId(OpSendMsg op) { return Math.max(op.highestSequenceId, op.sequenceId); } - private void releaseSemaphoreForSendOp(OpSendMsg op) { + protected void releaseSemaphoreForSendOp(OpSendMsg op) { semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 871666620b7b4a..b07c63762e4c49 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -506,6 +506,10 @@ protected ProducerImpl newProducerImpl(String topic, int partitionIndex, ProducerInterceptors interceptors, CompletableFuture> producerCreatedFuture, Optional overrideProducerName) { + if (conf.isReplProducer()) { + return new GeoReplicationProducerImpl(PulsarClientImpl.this, topic, conf, producerCreatedFuture, + partitionIndex, schema, interceptors, overrideProducerName); + } return new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, partitionIndex, schema, interceptors, overrideProducerName); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 6ec738bbf4c8d1..93261a3b7f68a6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -206,6 +206,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { private boolean isNonPartitionedTopicExpected; + private boolean isReplProducer; + @ApiModelProperty( name = "initialSubscriptionName", value = "Use this configuration to automatically create an initial subscription when creating a topic." diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 19aa9907549d93..2259531d22baea 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -192,6 +192,7 @@ private static void setFeatureFlags(FeatureFlags flags) { flags.setSupportsBrokerEntryMetadata(true); flags.setSupportsPartialProducer(true); flags.setSupportsGetPartitionedMetadataWithoutAutoCreation(true); + flags.setSupportsDedupReplV2(true); } public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, @@ -245,6 +246,15 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion, String targetBroker, String originalPrincipal, AuthData originalAuthData, String originalAuthMethod, String proxyVersion) { + BaseCommand cmd = newConnectWithoutSerialize(authMethodName, authData, protocolVersion, libVersion, + targetBroker, originalPrincipal, originalAuthData, originalAuthMethod, proxyVersion); + return serializeWithSize(cmd); + } + + public static BaseCommand newConnectWithoutSerialize(String authMethodName, AuthData authData, + int protocolVersion, String libVersion, + String targetBroker, String originalPrincipal, AuthData originalAuthData, + String originalAuthMethod, String proxyVersion) { BaseCommand cmd = localCmd(Type.CONNECT); CommandConnect connect = cmd.setConnect() .setClientVersion(libVersion != null ? libVersion : "Pulsar Client") @@ -277,7 +287,7 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p connect.setProtocolVersion(protocolVersion); setFeatureFlags(connect.setFeatureFlags()); - return serializeWithSize(cmd); + return cmd; } public static ByteBuf newConnected(int clientProtocoVersion, boolean supportsTopicWatchers) { @@ -303,6 +313,7 @@ public static BaseCommand newConnectedCommand(int clientProtocolVersion, int max connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers); connected.setFeatureFlags().setSupportsGetPartitionedMetadataWithoutAutoCreation(true); + connected.setFeatureFlags().setSupportsDedupReplV2(true); return cmd; } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 19658c5e57ff9a..2fb3d100c2b9f6 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -302,6 +302,7 @@ message FeatureFlags { optional bool supports_partial_producer = 3 [default = false]; optional bool supports_topic_watchers = 4 [default = false]; optional bool supports_get_partitioned_metadata_without_auto_creation = 5 [default = false]; + optional bool supports_dedup_repl_v2 = 6 [default = false]; } message CommandConnected {