From 8fcbeebc1d06c18cf01bc2e818f2a38f3adaae96 Mon Sep 17 00:00:00 2001 From: gunli Date: Mon, 25 Dec 2023 19:01:37 +0800 Subject: [PATCH] rename ErrProducerBlocked to ErrProducerBlockedQuotaExceeded --- pulsar/producer_partition.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3fdda58b7a..1b79053e38 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -51,21 +51,21 @@ const ( ) var ( - ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed") - ErrSendTimeout = newError(TimeoutError, "message send timeout") - ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") - ErrContextExpired = newError(TimeoutError, "message send context expired") - ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") - ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize") - ErrProducerClosed = newError(ProducerClosed, "producer already been closed") - ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full") - ErrSchema = newError(SchemaFailure, "schema error") - ErrTransaction = errors.New("transaction error") - ErrInvalidMessage = newError(InvalidMessage, "invalid message") - ErrTopicNotfound = newError(TopicNotFound, "topic not found") - ErrTopicTerminated = newError(TopicTerminated, "topic terminated") - ErrProducerBlocked = newError(ProducerBlockedQuotaExceededException, "producer blocked") - ErrProducerFenced = newError(ProducerFenced, "producer fenced") + ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed") + ErrSendTimeout = newError(TimeoutError, "message send timeout") + ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") + ErrContextExpired = newError(TimeoutError, "message send context expired") + ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") + ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize") + ErrProducerClosed = newError(ProducerClosed, "producer already been closed") + ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full") + ErrSchema = newError(SchemaFailure, "schema error") + ErrTransaction = errors.New("transaction error") + ErrInvalidMessage = newError(InvalidMessage, "invalid message") + ErrTopicNotfound = newError(TopicNotFound, "topic not found") + ErrTopicTerminated = newError(TopicTerminated, "topic terminated") + ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked") + ErrProducerFenced = newError(ProducerFenced, "producer fenced") buffersPool sync.Pool sendRequestPool *sync.Pool @@ -468,7 +468,7 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) { p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting") - p.failPendingMessages(joinErrors(ErrProducerBlocked, err)) + p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err)) break }