Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Dec 17, 2024
1 parent 8d7d1fb commit 21908df
Show file tree
Hide file tree
Showing 15 changed files with 922 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void startProducer() {
prepareCreateProducer().thenCompose(ignore -> {
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
builderImpl.getConf().setNonPartitionedTopicExpected(true);
builderImpl.getConf().setReplProducer(true);
return producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -40,6 +42,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand Down Expand Up @@ -271,7 +274,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, i
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position);
batchSize, isChunked, System.nanoTime(), isMarker, position, cnx.isClientSupportsDedupReplV2());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand All @@ -283,7 +286,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc
int batchSize, boolean isChunked, boolean isMarker, Position position) {
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position);
isChunked, System.nanoTime(), isMarker, position, cnx.isClientSupportsDedupReplV2());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down Expand Up @@ -378,6 +381,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
private int batchSize;
private boolean chunked;
private boolean isMarker;
private boolean supportsDedupReplV2;

private long startTimeNs;

Expand Down Expand Up @@ -463,6 +467,11 @@ public long getOriginalSequenceId() {
return originalSequenceId;
}

@Override
public boolean supportsDedupReplV2() {
return supportsDedupReplV2;
}

@Override
public void setOriginalHighestSequenceId(long originalHighestSequenceId) {
this.originalHighestSequenceId = originalHighestSequenceId;
Expand Down Expand Up @@ -537,8 +546,11 @@ public void run() {
// stats
producer.stats.recordMsgIn(batchSize, msgSize);
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
ledgerId, entryId);
if (producer.isRemote() && supportsDedupReplV2) {
sendSendReceiptResponseRepl();
} else {
sendSendReceiptResponseNormal();
}
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
if (this.chunked) {
producer.stats.recordChunkedMsgIn();
Expand All @@ -551,8 +563,37 @@ public void run() {
recycle();
}

private void sendSendReceiptResponseRepl() {
String replSequenceLIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_LID));
String replSequenceEIdStr = String.valueOf(getProperty(MSG_PROP_REPL_SEQUENCE_EID));
if (!StringUtils.isNumeric(replSequenceLIdStr) || !StringUtils.isNumeric(replSequenceEIdStr)) {
log.error("[{}] Message can not determine whether the message is duplicated due to the acquired messages"
+ " props were are invalid. producer={}. supportsDedupReplV2: {}, sequence-id {},"
+ " prop-{}: {}, prop-{}: {}",
producer.topic.getName(), producer.producerName,
supportsDedupReplV2(), getSequenceId(),
MSG_PROP_REPL_SEQUENCE_LID, replSequenceLIdStr,
MSG_PROP_REPL_SEQUENCE_EID, replSequenceEIdStr);
producer.cnx.getCommandSender().sendSendError(producer.producerId,
Math.max(highestSequenceId, sequenceId),
ServerError.PersistenceError, "Message can not determine whether the message is"
+ " duplicated due to the acquired messages props were are invalid");
return;
}
Long replSequenceLId = Long.valueOf(replSequenceLIdStr);
Long replSequenceEId = Long.valueOf(replSequenceEIdStr);
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, replSequenceLId,
replSequenceEId, ledgerId, entryId);
}

private void sendSendReceiptResponseNormal() {
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
ledgerId, entryId);
}

static MessagePublishContext get(Producer producer, long sequenceId, int msgSize, int batchSize,
boolean chunked, long startTimeNs, boolean isMarker, Position position) {
boolean chunked, long startTimeNs, boolean isMarker, Position position,
boolean supportsDedupReplV2) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
Expand All @@ -563,6 +604,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
callback.supportsDedupReplV2 = supportsDedupReplV2;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
Expand All @@ -572,7 +614,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize
}

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, int msgSize,
int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
int batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position,
boolean supportsDedupReplV2) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
Expand All @@ -584,6 +627,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
callback.supportsDedupReplV2 = supportsDedupReplV2;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
if (callback.propertyMap != null) {
Expand Down Expand Up @@ -801,7 +845,8 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
}
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
cnx.isClientSupportsDedupReplV2());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import javax.net.ssl.SSLSession;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -237,6 +238,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {

private boolean encryptionRequireOnProducer;

@Getter
private FeatureFlags features;

private PulsarCommandSender commandSender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ default long getEntryTimestamp() {
default void setEntryTimestamp(long entryTimestamp) {

}

default boolean supportsDedupReplV2() {
return false;
}
}

CompletableFuture<Void> initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.api.proto.FeatureFlags;

public interface TransportCnx {

Expand Down Expand Up @@ -103,4 +104,10 @@ public interface TransportCnx {
* previously called {@link #incrementThrottleCount()}.
*/
void decrementThrottleCount();

FeatureFlags getFeatures();

default boolean isClientSupportsDedupReplV2() {
return getFeatures() != null && getFeatures().hasSupportsDedupReplV2() && getFeatures().isSupportsDedupReplV2();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_EID;
import static org.apache.pulsar.client.impl.GeoReplicationProducerImpl.MSG_PROP_REPL_SEQUENCE_LID;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -194,11 +196,22 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.setSchemaInfoForReplicator(schemaFuture.get());
msg.getMessageBuilder().clearTxnidMostBits();
msg.getMessageBuilder().clearTxnidLeastBits();
// Why not use a generated sequence ID that initialized with "-1" when the replicator is starting?
// Because that we should persist the props to the value of the current value of sequence id for
// each acknowledge after publishing for guarantees the sequence id can be recovered after a cursor
// reset that will happen when getting new schema or publish fails, which cost much more.
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_LID)
.setValue(Long.valueOf(entry.getLedgerId()).toString());
msg.getMessageBuilder().addProperty().setKey(MSG_PROP_REPL_SEQUENCE_EID)
.setValue(Long.valueOf(entry.getEntryId()).toString());
msgOut.recordEvent(headersAndPayload.readableBytes());
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId());
}
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
atLeastOneMessageSentForReplication = true;
}
Expand Down
Loading

0 comments on commit 21908df

Please sign in to comment.