Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Producer]: handle TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced when reconnecting #1134

Merged
merged 9 commits into from
Dec 8, 2023

Conversation

gunli
Copy link
Contributor

@gunli gunli commented Nov 17, 2023

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #1128

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #1128

Motivation

In Java client, when we get TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, we should failPendingMessages, and close producer. But in Go client, we forget to handle ProducerBlockedQuotaExceededException/ProducerFenced, and in #1128, we just call sr.done(), actually we should call failPendingMessages().

https://github.com/apache/pulsar-client-go/pull/1128/files
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1663

Modifications

  1. rename errMsgTopicNotFount to errMsgTopicNotFound
  2. handle TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, call failPendingMessages();
                if strings.Contains(errMsg, errMsgTopicNotFound) {
			// when topic is deleted, we should give up reconnection.
			p.log.Warn("Topic not found, stop reconnecting")
			break
		}

		if strings.Contains(errMsg, errMsgTopicTerminated) {
			p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting")
			p.failPendingMessages(newError(TopicTerminated, err.Error()))
			// can not set to producerClosing , or it will fail when we call internalClose()
			// there is a Terminated state in JAVA client, maybe we should add a producerTerminated state ?
			// p.setProducerState(producerClosing)
			break
		}

		if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
			p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
			p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error()))
			break
		}

		if strings.Contains(errMsg, errMsgProducerFenced) {
			p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
			p.failPendingMessages(newError(ProducerFenced, err.Error()))
			// can not set to producerClosing , or it will fail when we call internalClose()
			// there is a ProducerFenced state in JAVA client, maybe we should add a producerFenced state ?
			// p.setProducerState(producerClosing)
			break
		}

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@gunli
Copy link
Contributor Author

gunli commented Nov 17, 2023

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to handle the errors when creating the producer here:

if err != nil {
p.batchFlushTicker.Stop()
logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
return nil, err
}

But this could be considered as a separate issue and fixed in a separate PR.

pulsar/producer_partition.go Outdated Show resolved Hide resolved
pulsar/producer_partition.go Outdated Show resolved Hide resolved
@pkumar-singh
Copy link
Member

LGTM

@gunli
Copy link
Contributor Author

gunli commented Nov 20, 2023

We also need to handle the errors when creating the producer here:

if err != nil {
p.batchFlushTicker.Stop()
logger.WithError(err).Error("Failed to create producer at newPartitionProducer")
return nil, err
}

But this could be considered as a separate issue and fixed in a separate PR.

Hmm, in newPartitionProducer() we just call p.grabCnx(), when p.grabCnx() return an error, we just stop creating, I think it is OK now, but the consumer should close itself when we get TopicNotFound in reconnectToBroker(), see https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L915-L917, I am not familiar with consumer, I can summit an Issue to keep track of it.

@RobertIndie
Copy link
Member

Hmm, in newPartitionProducer() we just call p.grabCnx(), when p.grabCnx() return an error, we just stop creating, I think it is OK now,

If we don't introduce new states, then I think it's OK. I'm OK with not importing new producer states. TopicTerminated/ProducerFenced could be considered an error.

but the consumer should close the itself when we get TopicNotFound in reconnectToBroker(), see https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L915-L917, I am not familiar with consumer, I can summit an Issue to keep track of it.

We need to investigate it further. Please submit an issue for it. Thanks!

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Could you add some tests?

@gunli
Copy link
Contributor Author

gunli commented Nov 21, 2023

Overall LGTM. Could you add some tests?

Sure, could you please tell me how to trigger a ProducerFenced error?

@RobertIndie
Copy link
Member

Sure, could you please tell me how to trigger a ProducerFenced error?

You could create two producers both with ProducerAccessModeExclusive. The creation of the second producer should fails with ProducerFenced.
You can see this test: https://github.com/apache/pulsar/blob/3fdbc9fca6b6206bcbeef8e48937ccb3cb2d273f/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java#L83

@gunli
Copy link
Contributor Author

gunli commented Nov 21, 2023

Sure, could you please tell me how to trigger a ProducerFenced error?

You could create two producers both with ProducerAccessModeExclusive. The creation of the second producer should fails with ProducerFenced. You can see this test: https://github.com/apache/pulsar/blob/3fdbc9fca6b6206bcbeef8e48937ccb3cb2d273f/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java#L83

@RobertIndie Hmm, this PR is about reconnecting, not producer creation, I think it is difficult to simulate when a producer's connection is closed while another producer connect to the same topic exclusively at the same time, and then the closed producer reconneting, it receive a ProducerFenced error.

@RobertIndie
Copy link
Member

RobertIndie commented Nov 22, 2023

@RobertIndie Hmm, this PR is about reconnecting, not producer creation, I think it is difficult to simulate when a producer's connection is closed while another producer connect to the same topic exclusively at the same time, and then the closed producer reconneting, it receive a ProducerFenced error.

@gunli
You could invoke the ConnectionClose to simulate the connection loss on a producer:

func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p._getConn().ID()).Warn("Connection was closed")
p.connectClosedCh <- connectionClosed{}
}

You could refer to this way:

partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer)
to get the internal partition producer.

@gunli
Copy link
Contributor Author

gunli commented Nov 22, 2023

@RobertIndie Hmm, this PR is about reconnecting, not producer creation, I think it is difficult to simulate when a producer's connection is closed while another producer connect to the same topic exclusively at the same time, and then the closed producer reconneting, it receive a ProducerFenced error.

@gunli You could invoke the ConnectionClose to simulate the connection loss on a producer:

func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p._getConn().ID()).Warn("Connection was closed")
p.connectClosedCh <- connectionClosed{}
}

You could refer to this way:

partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer)

to get the internal partition producer.

I know that, but the timing is difficult, when we call ConnectionClosed(), it will start to reconnect, but before reconnection, we must use another producer to occupy the same topic, the time window size is very small

@RobertIndie
Copy link
Member

I know that, but the timing is difficult, when we call ConnectionClosed(), it will start to reconnect, but before reconnection, we must use another producer to occupy the same topic, the time window size is very small

Could you try using ProducerAccessModeWaitForExclusive for the second producer?

ProducerAccessModeWaitForExclusive

@gunli
Copy link
Contributor Author

gunli commented Nov 22, 2023

ProducerAccessModeWaitForExclusive

I see, I will try that later, thank you.

@gunli
Copy link
Contributor Author

gunli commented Nov 22, 2023

I know that, but the timing is difficult, when we call ConnectionClosed(), it will start to reconnect, but before reconnection, we must use another producer to occupy the same topic, the time window size is very small

Could you try using ProducerAccessModeWaitForExclusive for the second producer?

ProducerAccessModeWaitForExclusive

@RobertIndie I have tried that but failed, reconnecting is too fast, the second producer has no chance to get connected. And I also failed in simulating TopicNotFound, 'cause when there is an active producer, deleting a topic is denied by the server. I have pushed but commented the test cases, you can check them out.

func TestTopicNotFound(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: serviceURL,
	})
	assert.NoError(t, err)
	defer client.Close()

	topicName := newTopicName()
	producer, err := client.CreateProducer(ProducerOptions{
		Topic:       topicName,
		SendTimeout: 2 * time.Second,
	})
	assert.Nil(t, err)
	defer producer.Close()

	afterCh := time.After(5 * time.Second)
	topicNotFoundChan := make(chan bool)
	go func() {
		for {
			_, err := producer.Send(context.Background(), &ProducerMessage{
				Payload: make([]byte, 1024),
			})
			if err != nil {
				e := err.(*Error)
				if e.result == TopicNotFound || err == errProducerClosed {
					topicNotFoundChan <- true
				} else {
					topicNotFoundChan <- false
				}
			}
			time.Sleep(1 * time.Millisecond)
		}
	}()

	deleteURL := adminURL + "/admin/v2/persistent/public/default/" + topicName
	log.Info(deleteURL)
	makeHTTPCall(t, http.MethodDelete, deleteURL, "")

	for {
		select {
		case d := <-topicNotFoundChan:
			assert.Equal(t, d, true)
			return
		case <-afterCh:
			assert.Fail(t, "Time is up. Topic should have been deleted by now")
			return
		}
	}
}

func TestProducerFenced(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: serviceURL,
	})
	assert.NoError(t, err)
	defer client.Close()

	topicName := newTopicName()
	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:            topicName,
		SubscriptionName: "producer_fenced_sub",
	})
	assert.Nil(t, err)
	defer consumer.Close() // subscribe but do nothing

	// create the first producer exclusively
	producer1, err := client.CreateProducer(ProducerOptions{
		Topic:                   topicName,
		SendTimeout:             2 * time.Second,
		ProducerAccessMode:      ProducerAccessModeWaitForExclusive,
		BatchingMaxMessages:     2,
		BatchingMaxSize:         200,
		BatchingMaxPublishDelay: 1 * time.Second,
	})
	assert.Nil(t, err)
	defer producer1.Close()

	go func() {
		// create the second producer wait for exclusive
		fmt.Println("create the second producer wait for exclusive...")
		producer2, err := client.CreateProducer(ProducerOptions{
			Topic:              topicName,
			SendTimeout:        2 * time.Second,
			ProducerAccessMode: ProducerAccessModeWaitForExclusive,
		})
		assert.Nil(t, err)
		defer producer2.Close()
		fmt.Println("the second producer is ready")
		// keep producer2 alive
		time.Sleep(30 * time.Second)
	}()

	time.Sleep(3 * time.Second)
	afterCh := time.After(10 * time.Second)
	producerFencedChan := make(chan bool)
	go func() {
		for {
			producer1.SendAsync(context.Background(),
				&ProducerMessage{Payload: make([]byte, 100)},
				func(id MessageID, producerMessage *ProducerMessage, err error) {
					if err != nil {
						fmt.Println(err)
						e := err.(*Error)
						if e.result == ProducerFenced || err == errProducerClosed {
							producerFencedChan <- true
						} else {
							producerFencedChan <- false
						}
					}
				},
			)

			time.Sleep(1 * time.Millisecond)
		}
	}()

	// trigger reconnecting
	doneChan := make(chan bool)
	go func() {
		ticker := time.NewTicker(1 * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-doneChan:
				return
			case <-ticker.C:
				fmt.Println("close connections...")
				producers := producer1.(*producer).producers
				for i := 0; i < len(producers); i++ {
					partitionProducerImp := producers[i].(*partitionProducer)
					partitionProducerImp.ConnectionClosed()
				}
			default:

			}
		}
	}()

	for {
		select {
		case d := <-producerFencedChan:
			assert.Equal(t, d, true)
			doneChan <- true
			return
		case <-afterCh:
			assert.Fail(t, "Time is up. Producer should have been fenced by now")
			doneChan <- true
			return
		}
	}
}

pulsar/producer_partition.go Outdated Show resolved Hide resolved
pulsar/producer_partition.go Outdated Show resolved Hide resolved
pulsar/producer_test.go Outdated Show resolved Hide resolved
@gunli gunli force-pushed the fix-failPendingMessages branch from 551f98d to 9662c60 Compare November 24, 2023 02:15
@gunli gunli force-pushed the fix-failPendingMessages branch from 9662c60 to 3b278a9 Compare November 24, 2023 08:50
@gunli
Copy link
Contributor Author

gunli commented Nov 30, 2023

@tisonkun

@gunli
Copy link
Contributor Author

gunli commented Dec 1, 2023

@RobertIndie The CI is failed, but I can find out the root cause from the logs, would you please check it out?

@RobertIndie
Copy link
Member

There is a data race issue in the CI: https://github.com/apache/pulsar-client-go/actions/runs/6978661724/job/19165650715?pr=1134#step:5:9630

@gunli Could you take a look?

@gunli
Copy link
Contributor Author

gunli commented Dec 7, 2023

There is a data race issue in the CI: https://github.com/apache/pulsar-client-go/actions/runs/6978661724/job/19165650715?pr=1134#step:5:9630

@gunli Could you take a look?

@RobertIndie I have pushed a commit to fix it, PTAL and run the CI.

@gunli gunli mentioned this pull request Dec 7, 2023
1 task
@RobertIndie RobertIndie merged commit bd11581 into apache:master Dec 8, 2023
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants