Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Producer]: handle TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced when reconnecting #1134

Merged
merged 9 commits into from
Dec 8, 2023
2 changes: 1 addition & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
pc.log.Warn("Topic Not Found.")
break
Expand Down
6 changes: 6 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ const (
TransactionNoFoundError
// ClientMemoryBufferIsFull client limit buffer is full
ClientMemoryBufferIsFull
// ProducerFenced When a producer asks and fail to get exclusive producer access,
// or loses the exclusive status after a reconnection, the broker will
// use this error to indicate that this producer is now permanently
// fenced. Applications are now supposed to close it and create a
// new producer
ProducerFenced
)

// Error implement error interface, composed of two parts: msg and result.
Expand Down
74 changes: 45 additions & 29 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ var (
sendRequestPool *sync.Pool
)

var errTopicNotFount = "TopicNotFound"
const (
errMsgTopicNotFound = "TopicNotFound"
errMsgTopicTerminated = "TopicTerminatedError"
errMsgProducerBlockedQuotaExceededException = "ProducerBlockedQuotaExceededException"
errMsgProducerFenced = "ProducerFenced"
)

func init() {
sendRequestPool = &sync.Pool{
Expand Down Expand Up @@ -441,30 +446,28 @@ func (p *partitionProducer) reconnectToBroker() {
}
p.log.WithError(err).Error("Failed to create producer at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic Not Found.")
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(newError(TopicNotFound, err.Error()))
break
}

if strings.Contains(errMsg, "TopicTerminatedError") {
p.log.Info("Topic was terminated, failing pending messages, will not reconnect")
pendingItems := p.pendingQueue.ReadableSlice()
for _, item := range pendingItems {
pi := item.(*pendingItem)
if pi != nil {
pi.Lock()
requests := pi.sendRequests
for _, req := range requests {
sr := req.(*sendRequest)
if sr != nil {
sr.done(nil, newError(TopicTerminated, err.Error()))
}
}
pi.Unlock()
}
}
p.setProducerState(producerClosing)
if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(newError(TopicTerminated, err.Error()))
break
}

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error()))
break
}

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(newError(ProducerFenced, err.Error()))
break
}

Expand All @@ -481,10 +484,18 @@ func (p *partitionProducer) reconnectToBroker() {
func (p *partitionProducer) runEventsLoop() {
for {
select {
case data := <-p.dataChan:
case data, ok := <-p.dataChan:
// when doClose() is call, p.dataChan will be closed, data will be nil
if !ok {
return
}
p.internalSend(data)
case i := <-p.cmdChan:
switch v := i.(type) {
case cmd, ok := <-p.cmdChan:
// when doClose() is call, p.dataChan will be closed, cmd will be nil
if !ok {
return
}
switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
Expand Down Expand Up @@ -1321,13 +1332,18 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)

func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)

p.doClose(errProducerClosed)
}

func (p *partitionProducer) doClose(reason error) {
if !p.casProducerState(producerReady, producerClosing) {
return
}

p.log.Info("Closing producer")
defer close(p.dataChan)
defer close(p.cmdChan)
p.log.Info("Closing producer")

id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
Expand All @@ -1340,7 +1356,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
} else {
p.log.Info("Closed producer")
}
p.failPendingMessages()
p.failPendingMessages(reason)

if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
Expand All @@ -1353,7 +1369,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.batchFlushTicker.Stop()
}

func (p *partitionProducer) failPendingMessages() {
func (p *partitionProducer) failPendingMessages(err error) {
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
Expand All @@ -1378,11 +1394,11 @@ func (p *partitionProducer) failPendingMessages() {

for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
sr.done(nil, errProducerClosed)
sr.done(nil, err)
}

// flag the sending has completed with error, flush make no effect
pi.done(errProducerClosed)
pi.done(err)
pi.Unlock()

// finally reached the last view item, current iteration ends
Expand Down
13 changes: 8 additions & 5 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ import (
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/crypto"
plog "github.com/apache/pulsar-client-go/pulsar/log"
log "github.com/sirupsen/logrus"
)

func TestInvalidURL(t *testing.T) {
Expand Down Expand Up @@ -1168,7 +1170,7 @@ func TestTopicTermination(t *testing.T) {
topicName := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "send_timeout_sub",
SubscriptionName: "topic_terminated_sub",
})
assert.Nil(t, err)
defer consumer.Close() // subscribe but do nothing
Expand All @@ -1189,7 +1191,7 @@ func TestTopicTermination(t *testing.T) {
})
if err != nil {
e := err.(*Error)
if e.result == TopicTerminated {
if e.result == TopicTerminated || err == errProducerClosed {
terminatedChan <- true
} else {
terminatedChan <- false
Expand All @@ -1210,6 +1212,7 @@ func TestTopicTermination(t *testing.T) {
return
case <-afterCh:
assert.Fail(t, "Time is up. Topic should have been terminated by now")
return
}
}
}
Expand Down