diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 2ba842744c..fd6441c1cb 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1659,7 +1659,7 @@ func (pc *partitionConsumer) reconnectToBroker() { } pc.log.WithError(err).Error("Failed to create consumer at reconnect") errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) { + if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") break diff --git a/pulsar/error.go b/pulsar/error.go index 73a0b6067b..25498cfba4 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -114,6 +114,12 @@ const ( TransactionNoFoundError // ClientMemoryBufferIsFull client limit buffer is full ClientMemoryBufferIsFull + // ProducerFenced When a producer asks and fail to get exclusive producer access, + // or loses the exclusive status after a reconnection, the broker will + // use this error to indicate that this producer is now permanently + // fenced. Applications are now supposed to close it and create a + // new producer + ProducerFenced ) // Error implement error interface, composed of two parts: msg and result. diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 65eef5b66b..46167d0cf1 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -64,7 +64,12 @@ var ( sendRequestPool *sync.Pool ) -var errTopicNotFount = "TopicNotFound" +const ( + errMsgTopicNotFound = "TopicNotFound" + errMsgTopicTerminated = "TopicTerminatedError" + errMsgProducerBlockedQuotaExceededException = "ProducerBlockedQuotaExceededException" + errMsgProducerFenced = "ProducerFenced" +) func init() { sendRequestPool = &sync.Pool{ @@ -441,30 +446,28 @@ func (p *partitionProducer) reconnectToBroker() { } p.log.WithError(err).Error("Failed to create producer at reconnect") errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) { + if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. - p.log.Warn("Topic Not Found.") + p.log.Warn("Topic not found, stop reconnecting, close the producer") + p.doClose(newError(TopicNotFound, err.Error())) break } - if strings.Contains(errMsg, "TopicTerminatedError") { - p.log.Info("Topic was terminated, failing pending messages, will not reconnect") - pendingItems := p.pendingQueue.ReadableSlice() - for _, item := range pendingItems { - pi := item.(*pendingItem) - if pi != nil { - pi.Lock() - requests := pi.sendRequests - for _, req := range requests { - sr := req.(*sendRequest) - if sr != nil { - sr.done(nil, newError(TopicTerminated, err.Error())) - } - } - pi.Unlock() - } - } - p.setProducerState(producerClosing) + if strings.Contains(errMsg, errMsgTopicTerminated) { + p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer") + p.doClose(newError(TopicTerminated, err.Error())) + break + } + + if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) { + p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting") + p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error())) + break + } + + if strings.Contains(errMsg, errMsgProducerFenced) { + p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting") + p.doClose(newError(ProducerFenced, err.Error())) break } @@ -481,10 +484,18 @@ func (p *partitionProducer) reconnectToBroker() { func (p *partitionProducer) runEventsLoop() { for { select { - case data := <-p.dataChan: + case data, ok := <-p.dataChan: + // when doClose() is call, p.dataChan will be closed, data will be nil + if !ok { + return + } p.internalSend(data) - case i := <-p.cmdChan: - switch v := i.(type) { + case cmd, ok := <-p.cmdChan: + // when doClose() is call, p.dataChan will be closed, cmd will be nil + if !ok { + return + } + switch v := cmd.(type) { case *flushRequest: p.internalFlush(v) case *closeProducer: @@ -1321,13 +1332,18 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) func (p *partitionProducer) internalClose(req *closeProducer) { defer close(req.doneCh) + + p.doClose(errProducerClosed) +} + +func (p *partitionProducer) doClose(reason error) { if !p.casProducerState(producerReady, producerClosing) { return } + p.log.Info("Closing producer") defer close(p.dataChan) defer close(p.cmdChan) - p.log.Info("Closing producer") id := p.client.rpcClient.NewRequestID() _, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{ @@ -1340,7 +1356,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { } else { p.log.Info("Closed producer") } - p.failPendingMessages() + p.failPendingMessages(reason) if p.batchBuilder != nil { if err = p.batchBuilder.Close(); err != nil { @@ -1353,7 +1369,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.batchFlushTicker.Stop() } -func (p *partitionProducer) failPendingMessages() { +func (p *partitionProducer) failPendingMessages(err error) { curViewItems := p.pendingQueue.ReadableSlice() viewSize := len(curViewItems) if viewSize <= 0 { @@ -1378,11 +1394,11 @@ func (p *partitionProducer) failPendingMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - sr.done(nil, errProducerClosed) + sr.done(nil, err) } // flag the sending has completed with error, flush make no effect - pi.done(errProducerClosed) + pi.done(err) pi.Unlock() // finally reached the last view item, current iteration ends diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f30ae65fcf..0f89069243 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -29,14 +29,16 @@ import ( "testing" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" - pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" + "github.com/apache/pulsar-client-go/pulsar/internal" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + + log "github.com/sirupsen/logrus" + "github.com/apache/pulsar-client-go/pulsar/crypto" plog "github.com/apache/pulsar-client-go/pulsar/log" - log "github.com/sirupsen/logrus" ) func TestInvalidURL(t *testing.T) { @@ -1168,7 +1170,7 @@ func TestTopicTermination(t *testing.T) { topicName := newTopicName() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, - SubscriptionName: "send_timeout_sub", + SubscriptionName: "topic_terminated_sub", }) assert.Nil(t, err) defer consumer.Close() // subscribe but do nothing @@ -1189,7 +1191,7 @@ func TestTopicTermination(t *testing.T) { }) if err != nil { e := err.(*Error) - if e.result == TopicTerminated { + if e.result == TopicTerminated || err == errProducerClosed { terminatedChan <- true } else { terminatedChan <- false @@ -1210,6 +1212,7 @@ func TestTopicTermination(t *testing.T) { return case <-afterCh: assert.Fail(t, "Time is up. Topic should have been terminated by now") + return } } }