From 066a5b1a2e82deb8fc9fb3984c11c2228e02cc7e Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 17 Nov 2023 11:35:08 +0800 Subject: [PATCH 1/9] fix: handle TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced when reconnecting --- pulsar/consumer_partition.go | 2 +- pulsar/error.go | 6 ++++ pulsar/producer_partition.go | 58 ++++++++++++++++++++---------------- 3 files changed, 40 insertions(+), 26 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 2ba842744c..fd6441c1cb 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1659,7 +1659,7 @@ func (pc *partitionConsumer) reconnectToBroker() { } pc.log.WithError(err).Error("Failed to create consumer at reconnect") errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) { + if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. pc.log.Warn("Topic Not Found.") break diff --git a/pulsar/error.go b/pulsar/error.go index 73a0b6067b..25498cfba4 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -114,6 +114,12 @@ const ( TransactionNoFoundError // ClientMemoryBufferIsFull client limit buffer is full ClientMemoryBufferIsFull + // ProducerFenced When a producer asks and fail to get exclusive producer access, + // or loses the exclusive status after a reconnection, the broker will + // use this error to indicate that this producer is now permanently + // fenced. Applications are now supposed to close it and create a + // new producer + ProducerFenced ) // Error implement error interface, composed of two parts: msg and result. diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 65eef5b66b..5dfe380e23 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -64,7 +64,12 @@ var ( sendRequestPool *sync.Pool ) -var errTopicNotFount = "TopicNotFound" +const ( + errMsgTopicNotFound = "TopicNotFound" + errMsgTopicTerminated = "TopicTerminatedError" + errMsgProducerBlockedQuotaExceededException = "ProducerBlockedQuotaExceededException" + errMsgProducerFenced = "ProducerFenced" +) func init() { sendRequestPool = &sync.Pool{ @@ -441,30 +446,33 @@ func (p *partitionProducer) reconnectToBroker() { } p.log.WithError(err).Error("Failed to create producer at reconnect") errMsg := err.Error() - if strings.Contains(errMsg, errTopicNotFount) { + if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. - p.log.Warn("Topic Not Found.") + p.log.Warn("Topic not found, stop reconnecting") break } - if strings.Contains(errMsg, "TopicTerminatedError") { - p.log.Info("Topic was terminated, failing pending messages, will not reconnect") - pendingItems := p.pendingQueue.ReadableSlice() - for _, item := range pendingItems { - pi := item.(*pendingItem) - if pi != nil { - pi.Lock() - requests := pi.sendRequests - for _, req := range requests { - sr := req.(*sendRequest) - if sr != nil { - sr.done(nil, newError(TopicTerminated, err.Error())) - } - } - pi.Unlock() - } - } - p.setProducerState(producerClosing) + if strings.Contains(errMsg, errMsgTopicTerminated) { + p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting") + p.failPendingMessages(newError(TopicTerminated, err.Error())) + // can not set to producerClosing , or it will fail when we call internalClose() + // there is a Terminated state in JAVA client, maybe we should add a producerTerminated state ? + // p.setProducerState(producerClosing) + break + } + + if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) { + p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting") + p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error())) + break + } + + if strings.Contains(errMsg, errMsgProducerFenced) { + p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting") + p.failPendingMessages(newError(ProducerFenced, err.Error())) + // can not set to producerClosing , or it will fail when we call internalClose() + // there is a ProducerFenced state in JAVA client, maybe we should add a producerFenced state ? + // p.setProducerState(producerClosing) break } @@ -1340,7 +1348,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { } else { p.log.Info("Closed producer") } - p.failPendingMessages() + p.failPendingMessages(errProducerClosed) if p.batchBuilder != nil { if err = p.batchBuilder.Close(); err != nil { @@ -1353,7 +1361,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.batchFlushTicker.Stop() } -func (p *partitionProducer) failPendingMessages() { +func (p *partitionProducer) failPendingMessages(err error) { curViewItems := p.pendingQueue.ReadableSlice() viewSize := len(curViewItems) if viewSize <= 0 { @@ -1378,11 +1386,11 @@ func (p *partitionProducer) failPendingMessages() { for _, i := range pi.sendRequests { sr := i.(*sendRequest) - sr.done(nil, errProducerClosed) + sr.done(nil, err) } // flag the sending has completed with error, flush make no effect - pi.done(errProducerClosed) + pi.done(err) pi.Unlock() // finally reached the last view item, current iteration ends From 51f7a78bd257e8af5271daf349f3552fa87a4eff Mon Sep 17 00:00:00 2001 From: gunli Date: Mon, 20 Nov 2023 11:28:58 +0800 Subject: [PATCH 2/9] close producer when TopicNotFound/TopicTerminated/ProducerFenced --- pulsar/producer_partition.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5dfe380e23..1824cd99e8 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -448,16 +448,20 @@ func (p *partitionProducer) reconnectToBroker() { errMsg := err.Error() if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. - p.log.Warn("Topic not found, stop reconnecting") + p.log.Warn("Topic not found, stop reconnecting, close the producer") + // in JAVA client, when topic not found, closeAsync() will be called, see + // nolint: lll + // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1792-L1799 + p.doClose(newError(TopicNotFound, err.Error())) break } if strings.Contains(errMsg, errMsgTopicTerminated) { - p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting") - p.failPendingMessages(newError(TopicTerminated, err.Error())) - // can not set to producerClosing , or it will fail when we call internalClose() - // there is a Terminated state in JAVA client, maybe we should add a producerTerminated state ? - // p.setProducerState(producerClosing) + p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer") + // in JAVA client, producer will be set to `Terminated` state and close, see + // nolint: lll + // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1822-L1828 + p.doClose(newError(TopicTerminated, err.Error())) break } @@ -469,10 +473,10 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errMsgProducerFenced) { p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting") - p.failPendingMessages(newError(ProducerFenced, err.Error())) - // can not set to producerClosing , or it will fail when we call internalClose() - // there is a ProducerFenced state in JAVA client, maybe we should add a producerFenced state ? - // p.setProducerState(producerClosing) + // in JAVA client, producer will be set to `Fenced` state and close, see + // nolint: lll + // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1830-L1836 + p.doClose(newError(ProducerFenced, err.Error())) break } @@ -1333,6 +1337,10 @@ func (p *partitionProducer) internalClose(req *closeProducer) { return } + p.doClose(errProducerClosed) +} + +func (p *partitionProducer) doClose(reason error) { defer close(p.dataChan) defer close(p.cmdChan) p.log.Info("Closing producer") @@ -1348,7 +1356,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { } else { p.log.Info("Closed producer") } - p.failPendingMessages(errProducerClosed) + p.failPendingMessages(reason) if p.batchBuilder != nil { if err = p.batchBuilder.Close(); err != nil { From 1712eed50ee8e4671cf89ed842e2d73df12e712f Mon Sep 17 00:00:00 2001 From: gunli Date: Tue, 21 Nov 2023 11:12:49 +0800 Subject: [PATCH 3/9] add TopicNotFound test case --- pulsar/producer_test.go | 124 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 120 insertions(+), 4 deletions(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f30ae65fcf..829c444ddd 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -29,14 +29,16 @@ import ( "testing" "time" - "github.com/apache/pulsar-client-go/pulsar/internal" - pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" + "github.com/apache/pulsar-client-go/pulsar/internal" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + + log "github.com/sirupsen/logrus" + "github.com/apache/pulsar-client-go/pulsar/crypto" plog "github.com/apache/pulsar-client-go/pulsar/log" - log "github.com/sirupsen/logrus" ) func TestInvalidURL(t *testing.T) { @@ -1168,7 +1170,7 @@ func TestTopicTermination(t *testing.T) { topicName := newTopicName() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, - SubscriptionName: "send_timeout_sub", + SubscriptionName: "topic_terminated_sub", }) assert.Nil(t, err) defer consumer.Close() // subscribe but do nothing @@ -1214,6 +1216,120 @@ func TestTopicTermination(t *testing.T) { } } +func TestTopicNotFound(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "topic_not_found_sub", + }) + assert.Nil(t, err) + defer consumer.Close() // subscribe but do nothing + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + }) + assert.Nil(t, err) + defer producer.Close() + + afterCh := time.After(5 * time.Second) + topicNotFoundChan := make(chan bool) + go func() { + for { + _, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + }) + if err != nil { + e := err.(*Error) + if e.result == TopicNotFound { + topicNotFoundChan <- true + } else { + topicNotFoundChan <- false + } + } + time.Sleep(1 * time.Millisecond) + } + }() + + deleteURL := adminURL + "/admin/v2/persistent/public/default/" + topicName + "/deleteTopic" + log.Info(deleteURL) + makeHTTPCall(t, http.MethodDelete, deleteURL, "") + + for { + select { + case d := <-topicNotFoundChan: + assert.Equal(t, d, true) + return + case <-afterCh: + assert.Fail(t, "Time is up. Topic should have been deleted by now") + } + } +} + +/* +func TestProducerFenced(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "producer_fenced_sub", + }) + assert.Nil(t, err) + defer consumer.Close() // subscribe but do nothing + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + }) + assert.Nil(t, err) + defer producer.Close() + + afterCh := time.After(5 * time.Second) + producerFencedChan := make(chan bool) + go func() { + for { + _, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + }) + if err != nil { + e := err.(*Error) + if e.result == ProducerFenced { + producerFencedChan <- true + } else { + producerFencedChan <- false + } + } + time.Sleep(1 * time.Millisecond) + } + }() + + fenceURL := adminURL + "/admin/v2/persistent/public/default/" + topicName + "/terminate" + log.Info(fenceURL) + makeHTTPCall(t, http.MethodPost, fenceURL, "") + + for { + select { + case d := <-producerFencedChan: + assert.Equal(t, d, true) + return + case <-afterCh: + assert.Fail(t, "Time is up. Producer should have been fenced by now") + } + } +} +*/ + func TestSendTimeout(t *testing.T) { quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota" quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}` From cc377a28f2eae28968a058d8bee77c72006dce1c Mon Sep 17 00:00:00 2001 From: gunli Date: Wed, 22 Nov 2023 17:07:14 +0800 Subject: [PATCH 4/9] fix: handle channel close --- pulsar/producer_partition.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1824cd99e8..2ce57cc253 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -493,10 +493,18 @@ func (p *partitionProducer) reconnectToBroker() { func (p *partitionProducer) runEventsLoop() { for { select { - case data := <-p.dataChan: + case data, ok := <-p.dataChan: + // when doClose() is call, p.dataChan will be closed, data will nil, we need to ignore it, or it will panic + if !ok { + continue + } p.internalSend(data) - case i := <-p.cmdChan: - switch v := i.(type) { + case cmd, ok := <-p.cmdChan: + // when doClose() is call, p.dataChan will be closed, cmd will nil, we need to ignore it, or it will panic + if !ok { + continue + } + switch v := cmd.(type) { case *flushRequest: p.internalFlush(v) case *closeProducer: From 6a3dc45ca0c9a315200b3c17d4fe4c8b8f9560c1 Mon Sep 17 00:00:00 2001 From: gunli Date: Wed, 22 Nov 2023 17:08:07 +0800 Subject: [PATCH 5/9] comment TestTopicNotFound/TestProducerFenced --- pulsar/producer_test.go | 101 ++++++++++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 30 deletions(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 829c444ddd..ae62d41429 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1191,7 +1191,7 @@ func TestTopicTermination(t *testing.T) { }) if err != nil { e := err.(*Error) - if e.result == TopicTerminated { + if e.result == TopicTerminated || err == errProducerClosed { terminatedChan <- true } else { terminatedChan <- false @@ -1212,10 +1212,12 @@ func TestTopicTermination(t *testing.T) { return case <-afterCh: assert.Fail(t, "Time is up. Topic should have been terminated by now") + return } } } +/* func TestTopicNotFound(t *testing.T) { client, err := NewClient(ClientOptions{ URL: serviceURL, @@ -1224,13 +1226,6 @@ func TestTopicNotFound(t *testing.T) { defer client.Close() topicName := newTopicName() - consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topicName, - SubscriptionName: "topic_not_found_sub", - }) - assert.Nil(t, err) - defer consumer.Close() // subscribe but do nothing - producer, err := client.CreateProducer(ProducerOptions{ Topic: topicName, SendTimeout: 2 * time.Second, @@ -1247,7 +1242,7 @@ func TestTopicNotFound(t *testing.T) { }) if err != nil { e := err.(*Error) - if e.result == TopicNotFound { + if e.result == TopicNotFound || err == errProducerClosed { topicNotFoundChan <- true } else { topicNotFoundChan <- false @@ -1257,7 +1252,7 @@ func TestTopicNotFound(t *testing.T) { } }() - deleteURL := adminURL + "/admin/v2/persistent/public/default/" + topicName + "/deleteTopic" + deleteURL := adminURL + "/admin/v2/persistent/public/default/" + topicName log.Info(deleteURL) makeHTTPCall(t, http.MethodDelete, deleteURL, "") @@ -1268,11 +1263,11 @@ func TestTopicNotFound(t *testing.T) { return case <-afterCh: assert.Fail(t, "Time is up. Topic should have been deleted by now") + return } } } -/* func TestProducerFenced(t *testing.T) { client, err := NewClient(ClientOptions{ URL: serviceURL, @@ -1288,43 +1283,89 @@ func TestProducerFenced(t *testing.T) { assert.Nil(t, err) defer consumer.Close() // subscribe but do nothing - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - SendTimeout: 2 * time.Second, + // create the first producer exclusively + producer1, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + ProducerAccessMode: ProducerAccessModeWaitForExclusive, + BatchingMaxMessages: 2, + BatchingMaxSize: 200, + BatchingMaxPublishDelay: 1 * time.Second, }) assert.Nil(t, err) - defer producer.Close() + defer producer1.Close() - afterCh := time.After(5 * time.Second) + go func() { + // create the second producer wait for exclusive + fmt.Println("create the second producer wait for exclusive...") + producer2, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + ProducerAccessMode: ProducerAccessModeWaitForExclusive, + }) + assert.Nil(t, err) + defer producer2.Close() + fmt.Println("the second producer is ready") + // keep producer2 alive + time.Sleep(30 * time.Second) + }() + + time.Sleep(3 * time.Second) + afterCh := time.After(10 * time.Second) producerFencedChan := make(chan bool) go func() { for { - _, err := producer.Send(context.Background(), &ProducerMessage{ - Payload: make([]byte, 1024), - }) - if err != nil { - e := err.(*Error) - if e.result == ProducerFenced { - producerFencedChan <- true - } else { - producerFencedChan <- false - } - } + producer1.SendAsync(context.Background(), + &ProducerMessage{Payload: make([]byte, 100)}, + func(id MessageID, producerMessage *ProducerMessage, err error) { + if err != nil { + fmt.Println(err) + e := err.(*Error) + if e.result == ProducerFenced || err == errProducerClosed { + producerFencedChan <- true + } else { + producerFencedChan <- false + } + } + }, + ) + time.Sleep(1 * time.Millisecond) } }() - fenceURL := adminURL + "/admin/v2/persistent/public/default/" + topicName + "/terminate" - log.Info(fenceURL) - makeHTTPCall(t, http.MethodPost, fenceURL, "") + // trigger reconnecting + doneChan := make(chan bool) + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-doneChan: + return + case <-ticker.C: + fmt.Println("close connections...") + producers := producer1.(*producer).producers + for i := 0; i < len(producers); i++ { + partitionProducerImp := producers[i].(*partitionProducer) + partitionProducerImp.ConnectionClosed() + } + default: + + } + } + }() for { select { case d := <-producerFencedChan: assert.Equal(t, d, true) + doneChan <- true return case <-afterCh: assert.Fail(t, "Time is up. Producer should have been fenced by now") + doneChan <- true + return } } } From c1751f43e389a61b42bf3afdd89a786580569536 Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 24 Nov 2023 09:17:51 +0800 Subject: [PATCH 6/9] delete useless test cases --- pulsar/producer_test.go | 154 ---------------------------------------- 1 file changed, 154 deletions(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index ae62d41429..0f89069243 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1217,160 +1217,6 @@ func TestTopicTermination(t *testing.T) { } } -/* -func TestTopicNotFound(t *testing.T) { - client, err := NewClient(ClientOptions{ - URL: serviceURL, - }) - assert.NoError(t, err) - defer client.Close() - - topicName := newTopicName() - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - SendTimeout: 2 * time.Second, - }) - assert.Nil(t, err) - defer producer.Close() - - afterCh := time.After(5 * time.Second) - topicNotFoundChan := make(chan bool) - go func() { - for { - _, err := producer.Send(context.Background(), &ProducerMessage{ - Payload: make([]byte, 1024), - }) - if err != nil { - e := err.(*Error) - if e.result == TopicNotFound || err == errProducerClosed { - topicNotFoundChan <- true - } else { - topicNotFoundChan <- false - } - } - time.Sleep(1 * time.Millisecond) - } - }() - - deleteURL := adminURL + "/admin/v2/persistent/public/default/" + topicName - log.Info(deleteURL) - makeHTTPCall(t, http.MethodDelete, deleteURL, "") - - for { - select { - case d := <-topicNotFoundChan: - assert.Equal(t, d, true) - return - case <-afterCh: - assert.Fail(t, "Time is up. Topic should have been deleted by now") - return - } - } -} - -func TestProducerFenced(t *testing.T) { - client, err := NewClient(ClientOptions{ - URL: serviceURL, - }) - assert.NoError(t, err) - defer client.Close() - - topicName := newTopicName() - consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topicName, - SubscriptionName: "producer_fenced_sub", - }) - assert.Nil(t, err) - defer consumer.Close() // subscribe but do nothing - - // create the first producer exclusively - producer1, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - SendTimeout: 2 * time.Second, - ProducerAccessMode: ProducerAccessModeWaitForExclusive, - BatchingMaxMessages: 2, - BatchingMaxSize: 200, - BatchingMaxPublishDelay: 1 * time.Second, - }) - assert.Nil(t, err) - defer producer1.Close() - - go func() { - // create the second producer wait for exclusive - fmt.Println("create the second producer wait for exclusive...") - producer2, err := client.CreateProducer(ProducerOptions{ - Topic: topicName, - SendTimeout: 2 * time.Second, - ProducerAccessMode: ProducerAccessModeWaitForExclusive, - }) - assert.Nil(t, err) - defer producer2.Close() - fmt.Println("the second producer is ready") - // keep producer2 alive - time.Sleep(30 * time.Second) - }() - - time.Sleep(3 * time.Second) - afterCh := time.After(10 * time.Second) - producerFencedChan := make(chan bool) - go func() { - for { - producer1.SendAsync(context.Background(), - &ProducerMessage{Payload: make([]byte, 100)}, - func(id MessageID, producerMessage *ProducerMessage, err error) { - if err != nil { - fmt.Println(err) - e := err.(*Error) - if e.result == ProducerFenced || err == errProducerClosed { - producerFencedChan <- true - } else { - producerFencedChan <- false - } - } - }, - ) - - time.Sleep(1 * time.Millisecond) - } - }() - - // trigger reconnecting - doneChan := make(chan bool) - go func() { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - for { - select { - case <-doneChan: - return - case <-ticker.C: - fmt.Println("close connections...") - producers := producer1.(*producer).producers - for i := 0; i < len(producers); i++ { - partitionProducerImp := producers[i].(*partitionProducer) - partitionProducerImp.ConnectionClosed() - } - default: - - } - } - }() - - for { - select { - case d := <-producerFencedChan: - assert.Equal(t, d, true) - doneChan <- true - return - case <-afterCh: - assert.Fail(t, "Time is up. Producer should have been fenced by now") - doneChan <- true - return - } - } -} -*/ - func TestSendTimeout(t *testing.T) { quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota" quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}` From 0ac083fc83e0ded8d007a0af97c049c11a8878de Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 24 Nov 2023 09:20:13 +0800 Subject: [PATCH 7/9] delete comments and exit event loop when dataChan/cmdChan is closed --- pulsar/producer_partition.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 2ce57cc253..3b66c0002e 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -449,18 +449,12 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. p.log.Warn("Topic not found, stop reconnecting, close the producer") - // in JAVA client, when topic not found, closeAsync() will be called, see - // nolint: lll - // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1792-L1799 p.doClose(newError(TopicNotFound, err.Error())) break } if strings.Contains(errMsg, errMsgTopicTerminated) { p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer") - // in JAVA client, producer will be set to `Terminated` state and close, see - // nolint: lll - // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1822-L1828 p.doClose(newError(TopicTerminated, err.Error())) break } @@ -473,9 +467,6 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errMsgProducerFenced) { p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting") - // in JAVA client, producer will be set to `Fenced` state and close, see - // nolint: lll - // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1830-L1836 p.doClose(newError(ProducerFenced, err.Error())) break } @@ -494,15 +485,17 @@ func (p *partitionProducer) runEventsLoop() { for { select { case data, ok := <-p.dataChan: - // when doClose() is call, p.dataChan will be closed, data will nil, we need to ignore it, or it will panic + // when doClose() is call, p.dataChan will be closed, data will be nil if !ok { - continue + p.batchFlushTicker.Stop() + return } p.internalSend(data) case cmd, ok := <-p.cmdChan: - // when doClose() is call, p.dataChan will be closed, cmd will nil, we need to ignore it, or it will panic + // when doClose() is call, p.dataChan will be closed, cmd will be nil if !ok { - continue + p.batchFlushTicker.Stop() + return } switch v := cmd.(type) { case *flushRequest: From 3b278a9b0c27632e0828e08581d484b630be8906 Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 24 Nov 2023 16:49:41 +0800 Subject: [PATCH 8/9] revert p.batchFlushTicker.Stop() --- pulsar/producer_partition.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3b66c0002e..7aec15e649 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -487,14 +487,12 @@ func (p *partitionProducer) runEventsLoop() { case data, ok := <-p.dataChan: // when doClose() is call, p.dataChan will be closed, data will be nil if !ok { - p.batchFlushTicker.Stop() return } p.internalSend(data) case cmd, ok := <-p.cmdChan: // when doClose() is call, p.dataChan will be closed, cmd will be nil if !ok { - p.batchFlushTicker.Stop() return } switch v := cmd.(type) { From e99da7fbdf9fa67b47249c5cb3c16cfddde815c9 Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 7 Dec 2023 10:08:53 +0800 Subject: [PATCH 9/9] fix data race --- pulsar/producer_partition.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 7aec15e649..46167d0cf1 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1332,17 +1332,18 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) func (p *partitionProducer) internalClose(req *closeProducer) { defer close(req.doneCh) - if !p.casProducerState(producerReady, producerClosing) { - return - } p.doClose(errProducerClosed) } func (p *partitionProducer) doClose(reason error) { + if !p.casProducerState(producerReady, producerClosing) { + return + } + + p.log.Info("Closing producer") defer close(p.dataChan) defer close(p.cmdChan) - p.log.Info("Closing producer") id := p.client.rpcClient.NewRequestID() _, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{