Skip to content

Commit

Permalink
Fix ProducerBusy or ConsumerBusy error when configuring multiple brok…
Browse files Browse the repository at this point in the history
…ers per connection (#337)

### Motivation

This is a catch up for apache/pulsar#21144

When a producer or consumer reconnects, a random number will be generated as the key suffix in `ConnectionPool` to create or get the `ClientConnection` object from the pool.

https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75

If a new connection is created with the same producer or consumer name to the broker, the broker will respond with a  `ProducerBusy` or `ConsumerBusy` error so that the reconnection will never succeed.

### Modifications

- Add an overload of `ConnectionPool::getConnectionAsync` that accepts an integer parameter as the key suffix. If it's not specified, generate the random number as the suffix. In this method, choose the executor by `key suffix % size`.
- Generate the random number and save it when creating the `HandlerBase` object. When connecting the owner broker of its topic, pass that index so that the reconnection will always reuse the same `ClientConnection` object.

### Verifying this change

`ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected the change.

(cherry picked from commit 6f115e7)
  • Loading branch information
BewareMyPower committed Nov 7, 2023
1 parent 272e1a1 commit f337eff
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 15 deletions.
6 changes: 3 additions & 3 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
}
}

Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic) {
Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
Promise<Result, ClientConnectionPtr> promise;

const auto topicNamePtr = TopicName::get(topic);
Expand All @@ -528,12 +528,12 @@ Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string&

auto self = shared_from_this();
lookupServicePtr_->getBroker(*topicNamePtr)
.addListener([this, self, promise](Result result, const LookupService::LookupResult& data) {
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
if (result != ResultOk) {
promise.setFailed(result);
return;
}
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress, key)
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
if (result == ResultOk) {
auto cnx = weakCnx.lock();
Expand Down
4 changes: 3 additions & 1 deletion lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {

void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

Future<Result, ClientConnectionPtr> getConnection(const std::string& topic);
Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);

void closeAsync(CloseCallback callback);
void shutdown();
Expand Down Expand Up @@ -123,6 +123,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {

std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { return requestIdGenerator_; }

ConnectionPool& getConnectionPool() noexcept { return pool_; }

friend class PulsarFriend;

private:
Expand Down
9 changes: 5 additions & 4 deletions lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ bool ConnectionPool::close() {
return true;
}

Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
const std::string& logicalAddress, const std::string& physicalAddress) {
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
const std::string& physicalAddress,
size_t keySuffix) {
if (closed_) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultAlreadyClosed);
Expand All @@ -72,7 +73,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
std::unique_lock<std::recursive_mutex> lock(mutex_);

std::stringstream ss;
ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
ss << logicalAddress << '-' << keySuffix;
const std::string key = ss.str();

PoolMap::iterator cnxIt = pool_.find(key);
Expand All @@ -95,7 +96,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
// No valid or pending connection found in the pool, creating a new one
ClientConnectionPtr cnx;
try {
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
clientConfiguration_, authentication_, clientVersion_, *this));
} catch (const std::runtime_error& e) {
lock.unlock();
Expand Down
14 changes: 13 additions & 1 deletion lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,29 @@ class PULSAR_PUBLIC ConnectionPool {
* a proxy layer. Essentially, the pool is using the logical address as a way to
* decide whether to reuse a particular connection.
*
* There could be many connections to the same broker, so this pool uses an integer key as the suffix of
* the key that represents the connection.
*
* @param logicalAddress the address to use as the broker tag
* @param physicalAddress the real address where the TCP connection should be made
* @param keySuffix the key suffix to choose which connection on the same broker
* @return a future that will produce the ClientCnx object
*/
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
const std::string& physicalAddress);
const std::string& physicalAddress,
size_t keySuffix);

Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
const std::string& physicalAddress) {
return getConnectionAsync(logicalAddress, physicalAddress, generateRandomIndex());
}

Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& address) {
return getConnectionAsync(address, address);
}

size_t generateRandomIndex() { return randomDistribution_(randomEngine_); }

private:
ClientConfiguration clientConfiguration_;
ExecutorServiceProviderPtr executorProvider_;
Expand Down
4 changes: 2 additions & 2 deletions lib/ExecutorService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ void ExecutorService::postWork(std::function<void(void)> task) { io_service_.pos
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
: executors_(nthreads), executorIdx_(0), mutex_() {}

ExecutorServicePtr ExecutorServiceProvider::get() {
ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
idx %= executors_.size();
Lock lock(mutex_);

int idx = executorIdx_++ % executors_.size();
if (!executors_[idx]) {
executors_[idx] = ExecutorService::create();
}
Expand Down
6 changes: 4 additions & 2 deletions lib/ExecutorService.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,17 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
public:
explicit ExecutorServiceProvider(int nthreads);

ExecutorServicePtr get();
ExecutorServicePtr get() { return get(executorIdx_++); }

ExecutorServicePtr get(size_t index);

// See TimeoutProcessor for the semantics of the parameter.
void close(long timeoutMs = 3000);

private:
typedef std::vector<ExecutorServicePtr> ExecutorList;
ExecutorList executors_;
int executorIdx_;
std::atomic_size_t executorIdx_;
std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;
};
Expand Down
4 changes: 3 additions & 1 deletion lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace pulsar {
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
: topic_(std::make_shared<std::string>(topic)),
client_(client),
connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()),
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
Expand Down Expand Up @@ -88,7 +89,8 @@ void HandlerBase::grabCnx() {
return;
}
auto self = shared_from_this();
client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_);
cnxFuture.addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx).addListener([this, self](Result result, bool) {
Expand Down
1 change: 1 addition & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {

protected:
ClientImplWeakPtr client_;
const size_t connectionKeySuffix_;
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
Expand Down
2 changes: 1 addition & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer
}

void ProducerImpl::disconnectProducer() {
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
LOG_INFO("Broker notification of Closed producer: " << producerId_);
resetCnx();
scheduleReconnection();
}
Expand Down
15 changes: 15 additions & 0 deletions tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,19 @@ TEST(ProducerTest, testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUp
client.close();
}

TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) {
ClientConfiguration conf;
conf.setConnectionsPerBroker(10);

Client client(serviceUrl, conf);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer("producer-test-reconnect-twice", producer));

for (int i = 0; i < 5; i++) {
ASSERT_TRUE(PulsarFriend::reconnect(producer)) << "i: " << i;
}

client.close();
}

INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));
8 changes: 8 additions & 0 deletions tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <string>

#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/ClientImpl.h"
#include "lib/ConsumerConfigurationImpl.h"
Expand Down Expand Up @@ -197,6 +198,13 @@ class PulsarFriend {
lookupData->setPartitions(newPartitions);
partitionedProducer.handleGetPartitions(ResultOk, lookupData);
}

static bool reconnect(Producer producer) {
auto producerImpl = std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
producerImpl->disconnectProducer();
return waitUntil(std::chrono::seconds(3),
[producerImpl] { return !producerImpl->getCnx().expired(); });
}
};
} // namespace pulsar

Expand Down

0 comments on commit f337eff

Please sign in to comment.