diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index cb11b8e0..1032b79e 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co } } -Future ClientImpl::getConnection(const std::string& topic) { +Future ClientImpl::getConnection(const std::string& topic, size_t key) { Promise promise; const auto topicNamePtr = TopicName::get(topic); @@ -528,12 +528,12 @@ Future ClientImpl::getConnection(const std::string& auto self = shared_from_this(); lookupServicePtr_->getBroker(*topicNamePtr) - .addListener([this, self, promise](Result result, const LookupService::LookupResult& data) { + .addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) { if (result != ResultOk) { promise.setFailed(result); return; } - pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress) + pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress, key) .addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) { if (result == ResultOk) { auto cnx = weakCnx.lock(); diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 0c6eeb40..f7b5a891 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -95,7 +95,7 @@ class ClientImpl : public std::enable_shared_from_this { void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback); - Future getConnection(const std::string& topic); + Future getConnection(const std::string& topic, size_t key); void closeAsync(CloseCallback callback); void shutdown(); @@ -123,6 +123,8 @@ class ClientImpl : public std::enable_shared_from_this { std::shared_ptr> getRequestIdGenerator() const { return requestIdGenerator_; } + ConnectionPool& getConnectionPool() noexcept { return pool_; } + friend class PulsarFriend; private: diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc index 32325122..01285e9a 100644 --- a/lib/ConnectionPool.cc +++ b/lib/ConnectionPool.cc @@ -61,8 +61,9 @@ bool ConnectionPool::close() { return true; } -Future ConnectionPool::getConnectionAsync( - const std::string& logicalAddress, const std::string& physicalAddress) { +Future ConnectionPool::getConnectionAsync(const std::string& logicalAddress, + const std::string& physicalAddress, + size_t keySuffix) { if (closed_) { Promise promise; promise.setFailed(ResultAlreadyClosed); @@ -72,7 +73,7 @@ Future ConnectionPool::getConnectionAsync( std::unique_lock lock(mutex_); std::stringstream ss; - ss << logicalAddress << '-' << randomDistribution_(randomEngine_); + ss << logicalAddress << '-' << keySuffix; const std::string key = ss.str(); PoolMap::iterator cnxIt = pool_.find(key); @@ -95,7 +96,7 @@ Future ConnectionPool::getConnectionAsync( // No valid or pending connection found in the pool, creating a new one ClientConnectionPtr cnx; try { - cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(), + cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix), clientConfiguration_, authentication_, clientVersion_, *this)); } catch (const std::runtime_error& e) { lock.unlock(); diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h index c582dc9e..a51205b6 100644 --- a/lib/ConnectionPool.h +++ b/lib/ConnectionPool.h @@ -65,17 +65,29 @@ class PULSAR_PUBLIC ConnectionPool { * a proxy layer. Essentially, the pool is using the logical address as a way to * decide whether to reuse a particular connection. * + * There could be many connections to the same broker, so this pool uses an integer key as the suffix of + * the key that represents the connection. + * * @param logicalAddress the address to use as the broker tag * @param physicalAddress the real address where the TCP connection should be made + * @param keySuffix the key suffix to choose which connection on the same broker * @return a future that will produce the ClientCnx object */ Future getConnectionAsync(const std::string& logicalAddress, - const std::string& physicalAddress); + const std::string& physicalAddress, + size_t keySuffix); + + Future getConnectionAsync(const std::string& logicalAddress, + const std::string& physicalAddress) { + return getConnectionAsync(logicalAddress, physicalAddress, generateRandomIndex()); + } Future getConnectionAsync(const std::string& address) { return getConnectionAsync(address, address); } + size_t generateRandomIndex() { return randomDistribution_(randomEngine_); } + private: ClientConfiguration clientConfiguration_; ExecutorServiceProviderPtr executorProvider_; diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc index a0dff0b6..53be8eaf 100644 --- a/lib/ExecutorService.cc +++ b/lib/ExecutorService.cc @@ -133,10 +133,10 @@ void ExecutorService::postWork(std::function task) { io_service_.pos ExecutorServiceProvider::ExecutorServiceProvider(int nthreads) : executors_(nthreads), executorIdx_(0), mutex_() {} -ExecutorServicePtr ExecutorServiceProvider::get() { +ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) { + idx %= executors_.size(); Lock lock(mutex_); - int idx = executorIdx_++ % executors_.size(); if (!executors_[idx]) { executors_[idx] = ExecutorService::create(); } diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h index 4717ccb5..a373c0af 100644 --- a/lib/ExecutorService.h +++ b/lib/ExecutorService.h @@ -88,7 +88,9 @@ class PULSAR_PUBLIC ExecutorServiceProvider { public: explicit ExecutorServiceProvider(int nthreads); - ExecutorServicePtr get(); + ExecutorServicePtr get() { return get(executorIdx_++); } + + ExecutorServicePtr get(size_t index); // See TimeoutProcessor for the semantics of the parameter. void close(long timeoutMs = 3000); @@ -96,7 +98,7 @@ class PULSAR_PUBLIC ExecutorServiceProvider { private: typedef std::vector ExecutorList; ExecutorList executors_; - int executorIdx_; + std::atomic_size_t executorIdx_; std::mutex mutex_; typedef std::unique_lock Lock; }; diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 986063e0..b55f7510 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -32,6 +32,7 @@ namespace pulsar { HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff) : topic_(std::make_shared(topic)), client_(client), + connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()), executor_(client->getIOExecutorProvider()->get()), mutex_(), creationTimestamp_(TimeUtils::now()), @@ -88,7 +89,8 @@ void HandlerBase::grabCnx() { return; } auto self = shared_from_this(); - client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) { + auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_); + cnxFuture.addListener([this, self](Result result, const ClientConnectionPtr& cnx) { if (result == ResultOk) { LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString()); connectionOpened(cnx).addListener([this, self](Result result, bool) { diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 937b308d..f62c4df0 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -99,6 +99,7 @@ class HandlerBase : public std::enable_shared_from_this { protected: ClientImplWeakPtr client_; + const size_t connectionKeySuffix_; ExecutorServicePtr executor_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index a66fbfbd..80ee7358 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -966,7 +966,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer } void ProducerImpl::disconnectProducer() { - LOG_DEBUG("Broker notification of Closed producer: " << producerId_); + LOG_INFO("Broker notification of Closed producer: " << producerId_); resetCnx(); scheduleReconnection(); } diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index eeda2b47..9685cc8d 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -618,4 +618,19 @@ TEST(ProducerTest, testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUp client.close(); } +TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) { + ClientConfiguration conf; + conf.setConnectionsPerBroker(10); + + Client client(serviceUrl, conf); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer("producer-test-reconnect-twice", producer)); + + for (int i = 0; i < 5; i++) { + ASSERT_TRUE(PulsarFriend::reconnect(producer)) << "i: " << i; + } + + client.close(); +} + INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false)); diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index c25b1406..de82ce47 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -21,6 +21,7 @@ #include +#include "WaitUtils.h" #include "lib/ClientConnection.h" #include "lib/ClientImpl.h" #include "lib/ConsumerConfigurationImpl.h" @@ -197,6 +198,13 @@ class PulsarFriend { lookupData->setPartitions(newPartitions); partitionedProducer.handleGetPartitions(ResultOk, lookupData); } + + static bool reconnect(Producer producer) { + auto producerImpl = std::dynamic_pointer_cast(producer.impl_); + producerImpl->disconnectProducer(); + return waitUntil(std::chrono::seconds(3), + [producerImpl] { return !producerImpl->getCnx().expired(); }); + } }; } // namespace pulsar