Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with SendTimeout After Broker Recovery in Pulsar Go Client #1312

Closed
sanketwaghmode-tibco opened this issue Nov 26, 2024 · 1 comment
Closed
Assignees

Comments

@sanketwaghmode-tibco
Copy link

sanketwaghmode-tibco commented Nov 26, 2024

Hello,

I am using the latest version of the Pulsar Go client and am encountering an issue with the SendTimeout option under the ProducerOptions configuration. Below is my use case and how I'm encountering the issue.

Use Case:

  1. I am creating a producer once and configuring the SendTimeout under the ProducerOptions.
    
  2. After creating the producer, I send messages at specific intervals.
    
  3. When the broker is up and running, the messages are successfully published.
    
  4. If the broker goes down, the library tries to reconnect multiple times, and eventually, I receive the SendTimeout error after a while.
    
  5. Once the broker is back up and running, I expect the message sending to succeed, but I continue to get the SendTimeout error even after the broker is back online.
    

The Issue:
Even after the broker is back up, I continue to receive the SendTimeout error, indicating that the producer is not able to send messages anymore.

Expected Behavior:
Once the broker is back online, the message should be successfully sent without further SendTimeout errors.

Observed Behavior:

Despite the broker coming back online, the producer continues to hit the SendTimeout error.
The connection logs show that the client can reconnect to the broker after the failure.
However, the producer does not resume sending messages successfully.

Example Code

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
	// Create a Pulsar client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650", // Pulsar broker URL
	})
	if err != nil {
		log.Fatal("Error creating Pulsar client: ", err)
	}

	// Define Producer configuration
	sendTimeout := 5 * time.Second  // SendTimeout duration
	maxReconnectAttempts := uint(1) // Max reconnection attempts to the broker

	// Create a Producer
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic:                "my-topic",            // Pulsar topic name
		MaxReconnectToBroker: &maxReconnectAttempts, // Set maximum reconnect attempts
		SendTimeout:          sendTimeout,           // Set send timeout
	})
	if err != nil {
		log.Fatal("Error creating producer: ", err)
	}
	defer producer.Close() // Ensure the producer is closed when the function exits

	// Send messages with intervals
	for i := 1; i <= 12; i++ {
		// Create message payload
		message := &pulsar.ProducerMessage{
			Payload: []byte(fmt.Sprintf("Hello, Pulsar - %d!", i)),
		}

		// Send message and handle potential errors
		_, err = producer.Send(context.Background(), message)
		if err != nil {
			// Output the error if message sending fails
			log.Printf("Failed to publish message %d: %v", i, err)
		} else {
			// Log successful message publication
			log.Printf("Published message %d: Hello, Pulsar - %d!", i, i)
		}

		// Wait for 30 seconds before sending the next message
		time.Sleep(30 * time.Second)
	}

	// Simulate waiting for some time before closing the client
	log.Println("Waiting for some time...")
	time.Sleep(600 * time.Second)

	// Close the Pulsar client after the operation is completed
	client.Close()
}

Additional Information:
Library Version: v0.14.0
Pulsar Version: (latest)
Go Version: go1.20.4 linux/amd64

Notes:

  • The issue occurs when the broker goes down, and the producer is unable to send messages after the broker comes back up, even though the SendTimeout has been set.
  • The expected behavior is that once the broker comes back up, the producer should be able to send messages again without hitting the timeout error.

Please let me know if any further details are needed to investigate this issue further.
Also, are there any other configuration options or best practices I should be aware of when using the Pulsar Go client, especially regarding handling broker disconnections and reconnections?

@shibd shibd self-assigned this Jan 17, 2025
@shibd
Copy link
Member

shibd commented Jan 17, 2025

hi, because you use maxReconnectAttempts := uint(1)

This way, when first reconnect fails (broker not com up at that time), the producer will stop try reconnect tothe broker, so even your broker comes up late, still can't send message.

BTW: "maxReconnectAttempts is not a good configuration value; you shouldn't set it. The client will handle backoff to control the retry intervals."

@shibd shibd closed this as completed Jan 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants