From 14691ed1ecb5554bd1abd8848018f0259d8a452f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 19 Dec 2024 22:27:51 +0800 Subject: [PATCH] fix bug --- .../impl/GeoReplicationProducerImpl.java | 63 +++++++++---------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java index 30996c64063f22..289941a761a46d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java @@ -72,7 +72,7 @@ protected void ackReceived(ClientCnx cnx, long seq, long highSeq, long ledgerId, return; } // Replicator send markers also, use sequenceId to check the marker send-receipt. - if (isReplicationMarker(entryId)) { + if (isReplicationMarker(highSeq)) { ackReceivedReplMarker(cnx, op, seq, highSeq, ledgerId, entryId); return; } @@ -85,8 +85,8 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI if (sourceLId < lastPersistedSourceLedgerId || (sourceLId == lastPersistedSourceLedgerId && sourceEId < lastPersistedSourceEntryId)) { if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Dropped a repl marker SendReceipt. Got entry {}:{}, last persisted source entry:" - + " {}:{}", + log.debug("[{}] [{}] Received an msg send receipt[repeated]: source entry {}:{}, latest persisted:" + + " {}:{}", topic, producerName, sourceLId, sourceEId, lastPersistedSourceLedgerId, lastPersistedSourceEntryId); } @@ -130,6 +130,12 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI // Notice: if send messages outs of oder, may lost messages. // Conclusion: So whether @param-ledgerId and @param-entry-id are "-1" or not, we can remove pending // message. + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an msg send receipt[expected]: source entry {}:{}, target entry:" + + " {}:{}", + topic, producerName, sourceLId, sourceEId, + targetLId, targetEid); + } lastPersistedSourceLedgerId = sourceLId; lastPersistedSourceEntryId = sourceEId; removeAndApplyCallback(op, sourceLId, sourceEId, targetLId, targetEid, false); @@ -137,33 +143,24 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI } // Case-3: got null source cluster's entry position, which is unexpected. - if (pendingLId == null || pendingEId == null) { - log.error("[{}] [{}] can not found v2 sequence-id {}:{}, ackReceived: {}:{} {}:{} - queue-size: {}", - topic, producerName, pendingLId, pendingEId, sourceLId, - sourceEId, targetLId, targetEid, pendingMessages.messagesCount()); - cnx.channel().close(); - return; - } - // Case-4: unknown error, which is unexpected. - log.warn("[{}] [{}] Got ack for msg. expecting {}:{}, but got: {}:{} - queue-size: {}", - topic, producerName, pendingLId, pendingEId, sourceLId, - sourceEId, pendingMessages.messagesCount()); - // Force connection closing so that messages can be re-transmitted in a new connection + log.error("[{}] [{}] Received an msg send receipt[error]: source entry {}:{}, target entry: {}:{}," + + " pending send: {}:{}, queue-size: {}", + topic, producerName, sourceLId, sourceEId, targetLId, targetEid, pendingLId, pendingEId, + pendingMessages.messagesCount()); cnx.channel().close(); } - protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long req, long highReq, + protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long seq, long isSourceMarker, long ledgerId, long entryId) { // Case-1: repeatedly publish repl marker. long lastSeqReceivedAck = LAST_SEQ_ID_PUBLISHED_UPDATER.get(this); - if (req <= lastSeqReceivedAck) { + if (seq <= lastSeqReceivedAck) { // Ignoring the ack since it's referring to a message that has already timed out. if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Dropped a repl marker SendReceipt. sequenceId: {}," - + " sequenceIdPersisted: {}," - + " highReq: {}, position: {}:{}", - topic, producerName, req, lastSeqReceivedAck, highReq, ledgerId, entryId); + log.debug("[{}] [{}] Received an repl marker send receipt[repeated]. seq: {}, seqPersisted: {}," + + " isSourceMarker: {}, target entry: {}:{}", + topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId); } return; } @@ -171,10 +168,16 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long req, long // Case-2, which is expected: // 1. Broker responds SendReceipt who is a repl marker. // 2. The current pending msg is also a marker. - if (isReplicationMarker(op) && req == op.sequenceId) { + boolean pendingMsgIsReplMarker = isReplicationMarker(op); + if (pendingMsgIsReplMarker && seq == op.sequenceId) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Received an repl marker send receipt[expected]. seq: {}, seqPersisted: {}," + + " isReplMarker: {}, target entry: {}:{}", + topic, producerName, seq, lastSeqReceivedAck, isSourceMarker, ledgerId, entryId); + } long calculatedSeq = getHighestSequenceId(op); LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, calculatedSeq)); - removeAndApplyCallback(op, req, highReq, ledgerId, entryId, true); + removeAndApplyCallback(op, seq, isSourceMarker, ledgerId, entryId, true); return; } @@ -182,14 +185,14 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long req, long // Case-3-1: expected a SendError if "seq <= lastInProgressSend". // Case-3-2: something went wrong. long lastInProgressSend = LAST_SEQ_ID_PUSHED_UPDATER.get(this); - String logText = String.format("[%s] [%s] Got ack for a repl marker msg. expecting %s, but got: %s." + String logText = String.format("[%s] [%s] Received an repl marker send receipt[error]. seq: %s, seqPending: %s." + " sequenceIdPersisted: %s, lastInProgressSend: %s," - + " HighReq: %s, position: %s:%s, queue-size: %s", - topic, producerName, lastSeqReceivedAck - 1, req, + + " isSourceMarker: %s, target entry: %s:%s, queue-size: %s", + topic, producerName, seq, pendingMsgIsReplMarker ? op.sequenceId : "unknown", lastSeqReceivedAck, lastInProgressSend, - highReq, ledgerId, entryId, pendingMessages.messagesCount() + isSourceMarker, ledgerId, entryId, pendingMessages.messagesCount() ); - if (req < lastInProgressSend) { + if (seq < lastInProgressSend) { log.warn(logText); } else { log.error(logText); @@ -200,10 +203,6 @@ protected void ackReceivedReplMarker(ClientCnx cnx, OpSendMsg op, long req, long private void removeAndApplyCallback(OpSendMsg op, long lIdSent, long eIdSent, long ledgerId, long entryId, boolean isMarker) { - if (log.isDebugEnabled()) { - log.debug("Got receipt for producer: [{}] -- source-message: {}:{} -- target-msg: {}:{} -- isMarker: {}", - getProducerName(), lIdSent, eIdSent, ledgerId, entryId, isMarker); - } pendingMessages.remove(); releaseSemaphoreForSendOp(op); // Since Geo-Replicator will not send batched message, skip to update the field