Skip to content

Commit

Permalink
rename ErrProducerBlocked to ErrProducerBlockedQuotaExceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Dec 25, 2023
1 parent 9dc894c commit 8fcbeeb
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ const (
)

var (
ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
ErrSendTimeout = newError(TimeoutError, "message send timeout")
ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
ErrContextExpired = newError(TimeoutError, "message send context expired")
ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
ErrProducerClosed = newError(ProducerClosed, "producer already been closed")
ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrSchema = newError(SchemaFailure, "schema error")
ErrTransaction = errors.New("transaction error")
ErrInvalidMessage = newError(InvalidMessage, "invalid message")
ErrTopicNotfound = newError(TopicNotFound, "topic not found")
ErrTopicTerminated = newError(TopicTerminated, "topic terminated")
ErrProducerBlocked = newError(ProducerBlockedQuotaExceededException, "producer blocked")
ErrProducerFenced = newError(ProducerFenced, "producer fenced")
ErrFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
ErrSendTimeout = newError(TimeoutError, "message send timeout")
ErrSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
ErrContextExpired = newError(TimeoutError, "message send context expired")
ErrMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")
ErrMetaTooLarge = newError(InvalidMessage, "message metadata size exceeds MaxMessageSize")
ErrProducerClosed = newError(ProducerClosed, "producer already been closed")
ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client memory buffer is full")
ErrSchema = newError(SchemaFailure, "schema error")
ErrTransaction = errors.New("transaction error")
ErrInvalidMessage = newError(InvalidMessage, "invalid message")
ErrTopicNotfound = newError(TopicNotFound, "topic not found")
ErrTopicTerminated = newError(TopicTerminated, "topic terminated")
ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked")
ErrProducerFenced = newError(ProducerFenced, "producer fenced")

buffersPool sync.Pool
sendRequestPool *sync.Pool
Expand Down Expand Up @@ -468,7 +468,7 @@ func (p *partitionProducer) reconnectToBroker() {

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(joinErrors(ErrProducerBlocked, err))
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
break
}

Expand Down

0 comments on commit 8fcbeeb

Please sign in to comment.