diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h index a05a8fb6..071b2446 100644 --- a/include/pulsar/MessageId.h +++ b/include/pulsar/MessageId.h @@ -107,6 +107,7 @@ class PULSAR_PUBLIC MessageId { friend class MultiTopicsConsumerImpl; friend class UnAckedMessageTrackerEnabled; friend class BatchAcknowledgementTracker; + friend struct OpSendMsg; friend class PulsarWrapper; friend class PulsarFriend; friend class NegativeAcksTracker; diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc index ae0425e2..d8c5de1a 100644 --- a/lib/BatchMessageContainer.cc +++ b/lib/BatchMessageContainer.cc @@ -21,13 +21,14 @@ #include #include "LogUtils.h" +#include "OpSendMsg.h" DECLARE_LOG_OBJECT() namespace pulsar { BatchMessageContainer::BatchMessageContainer(const ProducerImpl& producer) - : BatchMessageContainerBase(producer) {} + : BatchMessageContainerBase(producer), batch_(producerConfig_.getBatchingMaxMessages()) {} BatchMessageContainer::~BatchMessageContainer() { LOG_DEBUG(*this << " destructed"); @@ -35,9 +36,9 @@ BatchMessageContainer::~BatchMessageContainer() { << "] [averageBatchSize_ = " << averageBatchSize_ << "]"); } -bool BatchMessageContainer::add(const Message& msg, const SendCallback& callback) { +bool BatchMessageContainer::add(const Message& msg, SendCallback&& callback) { LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]"); - batch_.add(msg, callback); + batch_.add(msg, std::move(callback)); updateStats(msg); LOG_DEBUG("After add: " << *this); return isFull(); @@ -52,14 +53,13 @@ void BatchMessageContainer::clear() { LOG_DEBUG(*this << " clear() called"); } -Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback) const { - return createOpSendMsgHelper(opSendMsg, flushCallback, batch_); -} - -std::vector BatchMessageContainer::createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback) const { - throw std::runtime_error("createOpSendMsgs is not supported for BatchMessageContainer"); +std::unique_ptr BatchMessageContainer::createOpSendMsg(const FlushCallback& flushCallback) { + auto op = createOpSendMsgHelper(batch_); + if (flushCallback) { + op->addTrackerCallback(flushCallback); + } + clear(); + return op; } void BatchMessageContainer::serialize(std::ostream& os) const { diff --git a/lib/BatchMessageContainer.h b/lib/BatchMessageContainer.h index cd8a62cb..4c55a9fb 100644 --- a/lib/BatchMessageContainer.h +++ b/lib/BatchMessageContainer.h @@ -39,25 +39,22 @@ class BatchMessageContainer : public BatchMessageContainerBase { ~BatchMessageContainer(); - size_t getNumBatches() const override { return 1; } + bool hasMultiOpSendMsgs() const override { return false; } bool isFirstMessageToAdd(const Message& msg) const override { return batch_.empty(); } - bool add(const Message& msg, const SendCallback& callback) override; - - void clear() override; - - Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override; - - std::vector createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback) const override; + bool add(const Message& msg, SendCallback&& callback) override; void serialize(std::ostream& os) const override; + std::unique_ptr createOpSendMsg(const FlushCallback& flushCallback) override; + private: MessageAndCallbackBatch batch_; size_t numberOfBatchesSent_ = 0; double averageBatchSize_ = 0; + + void clear() override; }; } // namespace pulsar diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc index 807a2615..c1b17270 100644 --- a/lib/BatchMessageContainerBase.cc +++ b/lib/BatchMessageContainerBase.cc @@ -18,14 +18,10 @@ */ #include "BatchMessageContainerBase.h" -#include "ClientConnection.h" -#include "CompressionCodec.h" #include "MessageAndCallbackBatch.h" #include "MessageCrypto.h" -#include "MessageImpl.h" #include "OpSendMsg.h" #include "ProducerImpl.h" -#include "PulsarApi.pb.h" #include "SharedBuffer.h" namespace pulsar { @@ -37,78 +33,12 @@ BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& produce producerId_(producer.producerId_), msgCryptoWeakPtr_(producer.msgCrypto_) {} -Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback, - const MessageAndCallbackBatch& batch) const { - opSendMsg.sendCallback_ = batch.createSendCallback(); - opSendMsg.messagesCount_ = batch.messagesCount(); - opSendMsg.messagesSize_ = batch.messagesSize(); +BatchMessageContainerBase::~BatchMessageContainerBase() {} - if (flushCallback) { - auto sendCallback = opSendMsg.sendCallback_; - opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result, const MessageId& id) { - sendCallback(result, id); - flushCallback(result); - }; - } - - if (batch.empty()) { - return ResultOperationNotSupported; - } - - MessageImplPtr impl = batch.msgImpl(); - impl->metadata.set_num_messages_in_batch(batch.size()); - auto compressionType = producerConfig_.getCompressionType(); - if (compressionType != CompressionNone) { - impl->metadata.set_compression(static_cast(compressionType)); - impl->metadata.set_uncompressed_size(impl->payload.readableBytes()); - } - impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload); - - auto msgCrypto = msgCryptoWeakPtr_.lock(); - if (msgCrypto && producerConfig_.isEncryptionEnabled()) { - SharedBuffer encryptedPayload; - if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(), - impl->metadata, impl->payload, encryptedPayload)) { - return ResultCryptoError; - } - impl->payload = encryptedPayload; - } - - if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) { - return ResultMessageTooBig; - } - - opSendMsg.metadata_ = impl->metadata; - opSendMsg.payload_ = impl->payload; - opSendMsg.sequenceId_ = impl->metadata.sequence_id(); - opSendMsg.producerId_ = producerId_; - opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout()); - - return ResultOk; -} - -void BatchMessageContainerBase::processAndClear( - std::function opSendMsgCallback, FlushCallback flushCallback) { - if (isEmpty()) { - if (flushCallback) { - // do nothing, flushCallback complete until the lastOpSend complete - } - } else { - const auto numBatches = getNumBatches(); - if (numBatches == 1) { - OpSendMsg opSendMsg; - Result result = createOpSendMsg(opSendMsg, flushCallback); - opSendMsgCallback(result, opSendMsg); - } else if (numBatches > 1) { - std::vector opSendMsgs; - std::vector results = createOpSendMsgs(opSendMsgs, flushCallback); - for (size_t i = 0; i < results.size(); i++) { - opSendMsgCallback(results[i], opSendMsgs[i]); - } - } // else numBatches is 0, do nothing - } - clear(); +std::unique_ptr BatchMessageContainerBase::createOpSendMsgHelper( + MessageAndCallbackBatch& batch) const { + auto crypto = msgCryptoWeakPtr_.lock(); + return batch.createOpSendMsg(producerId_, producerConfig_, crypto.get()); } } // namespace pulsar diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h index fe4e5df7..b372f69d 100644 --- a/lib/BatchMessageContainerBase.h +++ b/lib/BatchMessageContainerBase.h @@ -26,6 +26,7 @@ #include #include +#include #include namespace pulsar { @@ -44,14 +45,9 @@ class BatchMessageContainerBase : public boost::noncopyable { public: BatchMessageContainerBase(const ProducerImpl& producer); - virtual ~BatchMessageContainerBase() {} + virtual ~BatchMessageContainerBase(); - /** - * Get number of batches in the batch message container - * - * @return number of batches - */ - virtual size_t getNumBatches() const = 0; + virtual bool hasMultiOpSendMsgs() const = 0; /** * Check the message will be the 1st message to be added to the batch @@ -71,34 +67,16 @@ class BatchMessageContainerBase : public boost::noncopyable { * @param callback message send callback * @return true if the batch is full, otherwise false */ - virtual bool add(const Message& msg, const SendCallback& callback) = 0; + virtual bool add(const Message& msg, SendCallback&& callback) = 0; - /** - * Clear the batch message container - */ - virtual void clear() = 0; - - /** - * Create a OpSendMsg object to send - * - * @param opSendMsg the OpSendMsg object to create - * @param flushCallback the callback to trigger after the OpSendMsg was completed - * @return ResultOk if create successfully - * @note OpSendMsg's sendCallback_ must be set even if it failed - */ - virtual Result createOpSendMsg(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback = nullptr) const = 0; + virtual std::unique_ptr createOpSendMsg(const FlushCallback& flushCallback = nullptr) { + throw std::runtime_error("createOpSendMsg is not supported"); + } - /** - * Create a OpSendMsg list to send - * - * @param opSendMsgList the OpSendMsg list to create - * @param flushCallback the callback to trigger after the OpSendMsg was completed - * @return all create results of `opSendMsgs`, ResultOk means create successfully - * @note OpSendMsg's sendCallback_ must be set even if it failed - */ - virtual std::vector createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback = nullptr) const = 0; + virtual std::vector> createOpSendMsgs( + const FlushCallback& flushCallback = nullptr) { + throw std::runtime_error("createOpSendMsgs is not supported"); + } /** * Serialize into a std::ostream for logging @@ -110,9 +88,6 @@ class BatchMessageContainerBase : public boost::noncopyable { bool hasEnoughSpace(const Message& msg) const noexcept; bool isEmpty() const noexcept; - void processAndClear(std::function opSendMsgCallback, - FlushCallback flushCallback); - protected: // references to ProducerImpl's fields const std::shared_ptr topicName_; @@ -134,8 +109,9 @@ class BatchMessageContainerBase : public boost::noncopyable { void updateStats(const Message& msg); void resetStats(); - Result createOpSendMsgHelper(OpSendMsg& opSendMsg, const FlushCallback& flushCallback, - const MessageAndCallbackBatch& batch) const; + std::unique_ptr createOpSendMsgHelper(MessageAndCallbackBatch& flushCallback) const; + + virtual void clear() = 0; }; inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg) const noexcept { diff --git a/lib/BatchMessageKeyBasedContainer.cc b/lib/BatchMessageKeyBasedContainer.cc index 05baf342..aeb90675 100644 --- a/lib/BatchMessageKeyBasedContainer.cc +++ b/lib/BatchMessageKeyBasedContainer.cc @@ -55,9 +55,15 @@ bool BatchMessageKeyBasedContainer::isFirstMessageToAdd(const Message& msg) cons } } -bool BatchMessageKeyBasedContainer::add(const Message& msg, const SendCallback& callback) { +bool BatchMessageKeyBasedContainer::add(const Message& msg, SendCallback&& callback) { LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]"); - batches_[getKey(msg)].add(msg, callback); + const auto key = getKey(msg); + auto it = batches_.find(getKey(msg)); + if (it == batches_.end()) { + // Do not preallocate for key based batching in case there are many keys + it = batches_.emplace(key, 1).first; + } + it->second.add(msg, std::move(callback)); updateStats(msg); LOG_DEBUG("After add: " << *this); return isFull(); @@ -72,38 +78,24 @@ void BatchMessageKeyBasedContainer::clear() { LOG_DEBUG(*this << " clear() called"); } -Result BatchMessageKeyBasedContainer::createOpSendMsg(OpSendMsg& opSendMsg, - const FlushCallback& flushCallback) const { - if (batches_.size() < 1) { - return ResultOperationNotSupported; - } - return createOpSendMsgHelper(opSendMsg, flushCallback, batches_.begin()->second); -} - -std::vector BatchMessageKeyBasedContainer::createOpSendMsgs( - std::vector& opSendMsgs, const FlushCallback& flushCallback) const { - // Sorted the batches by sequence id - std::vector sortedBatches; - for (const auto& kv : batches_) { - sortedBatches.emplace_back(&kv.second); - } - std::sort(sortedBatches.begin(), sortedBatches.end(), - [](const MessageAndCallbackBatch* lhs, const MessageAndCallbackBatch* rhs) { - return lhs->sequenceId() < rhs->sequenceId(); - }); - - size_t numBatches = sortedBatches.size(); - opSendMsgs.resize(numBatches); - - std::vector results(numBatches); - for (size_t i = 0; i + 1 < numBatches; i++) { - results[i] = createOpSendMsgHelper(opSendMsgs[i], nullptr, *sortedBatches[i]); +std::vector> BatchMessageKeyBasedContainer::createOpSendMsgs( + const FlushCallback& flushCallback) { + // Store raw pointers to use std::sort + std::vector rawOpSendMsgs; + for (auto& kv : batches_) { + rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release()); } - if (numBatches > 0) { - // Add flush callback to the last batch - results.back() = createOpSendMsgHelper(opSendMsgs.back(), flushCallback, *sortedBatches.back()); + std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg* lhs, const OpSendMsg* rhs) { + return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId; + }); + rawOpSendMsgs.back()->addTrackerCallback(flushCallback); + + std::vector> opSendMsgs{rawOpSendMsgs.size()}; + for (size_t i = 0; i < opSendMsgs.size(); i++) { + opSendMsgs[i].reset(rawOpSendMsgs[i]); } - return results; + clear(); + return opSendMsgs; } void BatchMessageKeyBasedContainer::serialize(std::ostream& os) const { diff --git a/lib/BatchMessageKeyBasedContainer.h b/lib/BatchMessageKeyBasedContainer.h index f580a053..7fa6b136 100644 --- a/lib/BatchMessageKeyBasedContainer.h +++ b/lib/BatchMessageKeyBasedContainer.h @@ -32,18 +32,13 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase { ~BatchMessageKeyBasedContainer(); - size_t getNumBatches() const override { return batches_.size(); } + bool hasMultiOpSendMsgs() const override { return true; } bool isFirstMessageToAdd(const Message& msg) const override; - bool add(const Message& msg, const SendCallback& callback) override; + bool add(const Message& msg, SendCallback&& callback) override; - void clear() override; - - Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override; - - std::vector createOpSendMsgs(std::vector& opSendMsgs, - const FlushCallback& flushCallback) const override; + std::vector> createOpSendMsgs(const FlushCallback& flushCallback) override; void serialize(std::ostream& os) const override; @@ -53,8 +48,7 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase { size_t numberOfBatchesSent_ = 0; double averageBatchSize_ = 0; - Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback, - MessageAndCallbackBatch& batch) const; + void clear() override; }; } // namespace pulsar diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 2a63c7ab..4e1a667c 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1025,37 +1025,30 @@ void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) { std::bind(&ClientConnection::handleSend, shared_from_this(), std::placeholders::_1, cmd))); } -void ClientConnection::sendMessage(const OpSendMsg& opSend) { +void ClientConnection::sendMessage(const std::shared_ptr& args) { Lock lock(mutex_); - - if (pendingWriteOperations_++ == 0) { - // Write immediately to socket - if (tlsSocket_) { + if (pendingWriteOperations_++ > 0) { + pendingWriteBuffers_.emplace_back(args); + return; + } + auto self = shared_from_this(); + auto sendMessageInternal = [this, self, args] { + BaseCommand outgoingCmd; + auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); + asyncWrite(buffer, customAllocReadHandler(std::bind(&ClientConnection::handleSendPair, + shared_from_this(), std::placeholders::_1))); + }; + if (tlsSocket_) { #if BOOST_VERSION >= 106600 - boost::asio::post(strand_, - std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend)); + boost::asio::post(strand_, sendMessageInternal); #else - strand_.post(std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend)); + strand_.post(sendMessageInternal); #endif - } else { - sendMessageInternal(opSend); - } } else { - // Queue to send later - pendingWriteBuffers_.push_back(opSend); + sendMessageInternal(); } } -void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) { - BaseCommand outgoingCmd; - PairSharedBuffer buffer = - Commands::newSend(outgoingBuffer_, outgoingCmd, opSend.producerId_, opSend.sequenceId_, - getChecksumType(), opSend.metadata_, opSend.payload_); - - asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair, - shared_from_this(), std::placeholders::_1))); -} - void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) { if (err) { LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message()); @@ -1088,13 +1081,12 @@ void ClientConnection::sendPendingCommands() { customAllocWriteHandler(std::bind(&ClientConnection::handleSend, shared_from_this(), std::placeholders::_1, buffer))); } else { - assert(any.type() == typeid(OpSendMsg)); + assert(any.type() == typeid(std::shared_ptr)); - const OpSendMsg& op = boost::any_cast(any); + auto args = boost::any_cast>(any); BaseCommand outgoingCmd; PairSharedBuffer buffer = - Commands::newSend(outgoingBuffer_, outgoingCmd, op.producerId_, op.sequenceId_, - getChecksumType(), op.metadata_, op.payload_); + Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair, shared_from_this(), std::placeholders::_1))); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 38b814ce..9abff9d4 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -69,8 +69,7 @@ typedef std::weak_ptr ConsumerImplWeakPtr; class LookupDataResult; class BrokerConsumerStatsImpl; class PeriodicTask; - -struct OpSendMsg; +struct SendArguments; namespace proto { class BaseCommand; @@ -153,8 +152,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& args); void registerProducer(int producerId, ProducerImplPtr producer); void registerConsumer(int consumerId, ConsumerImplPtr consumer); diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 762d2b83..ff149a03 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -694,6 +694,7 @@ void ClientImpl::shutdown() { LOG_DEBUG(producers.size() << " producers and " << consumers.size() << " consumers have been shutdown."); } + sendCallbackPool_.stop(); if (!pool_.close()) { // pool_ has already been closed. It means shutdown() has been called before. return; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 9ee70951..16720d96 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -22,6 +22,8 @@ #include #include +#include +#include #include #include "ConnectionPool.h" @@ -126,6 +128,11 @@ class ClientImpl : public std::enable_shared_from_this { friend class PulsarFriend; + template + void dispatch(Function&& f) { + boost::asio::dispatch(sendCallbackPool_, std::move(f)); + } + private: void handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata, TopicNamePtr topicName, ProducerConfiguration conf, @@ -176,6 +183,8 @@ class ClientImpl : public std::enable_shared_from_this { ExecutorServiceProviderPtr ioExecutorProvider_; ExecutorServiceProviderPtr listenerExecutorProvider_; ExecutorServiceProviderPtr partitionListenerExecutorProvider_; + // TODO: make it configurable + boost::asio::thread_pool sendCallbackPool_{1}; LookupServicePtr lookupServicePtr_; ConnectionPool pool_; diff --git a/lib/Commands.cc b/lib/Commands.cc index d9251a0a..b3362610 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -32,6 +32,7 @@ #include "ChunkMessageIdImpl.h" #include "LogUtils.h" #include "MessageImpl.h" +#include "OpSendMsg.h" #include "PulsarApi.pb.h" #include "Url.h" #include "checksum/ChecksumProvider.h" @@ -193,13 +194,13 @@ SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) return buffer; } -PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint64_t producerId, - uint64_t sequenceId, ChecksumType checksumType, - const proto::MessageMetadata& metadata, const SharedBuffer& payload) { +PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, ChecksumType checksumType, + const SendArguments& args) { cmd.set_type(BaseCommand::SEND); CommandSend* send = cmd.mutable_send(); - send->set_producer_id(producerId); - send->set_sequence_id(sequenceId); + send->set_producer_id(args.producerId); + send->set_sequence_id(args.sequenceId); + const auto& metadata = args.metadata; if (metadata.has_num_messages_in_batch()) { send->set_num_messages(metadata.num_messages_in_batch()); } @@ -210,8 +211,9 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint // / Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] - int cmdSize = cmd.ByteSize(); - int msgMetadataSize = metadata.ByteSize(); + int cmdSize = cmd.ByteSizeLong(); + int msgMetadataSize = metadata.ByteSizeLong(); + const auto& payload = args.payload; int payloadSize = payload.readableBytes(); int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic + checksumLength*/) : 0; @@ -837,7 +839,8 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad, unsigned long maxMessageSizeInBytes) { const auto& msgMetadata = msg.impl_->metadata; - SingleMessageMetadata metadata; + thread_local SingleMessageMetadata metadata; + metadata.Clear(); if (msgMetadata.has_partition_key()) { metadata.set_partition_key(msgMetadata.partition_key()); } @@ -866,7 +869,7 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, int payloadSize = msg.impl_->payload.readableBytes(); metadata.set_payload_size(payloadSize); - int msgMetadataSize = metadata.ByteSize(); + auto msgMetadataSize = metadata.ByteSizeLong(); unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize; if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) { diff --git a/lib/Commands.h b/lib/Commands.h index c21adb4f..22a4b7bd 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -40,6 +40,7 @@ using BatchMessageAckerPtr = std::shared_ptr; class MessageIdImpl; using MessageIdImplPtr = std::shared_ptr; class BitSet; +struct SendArguments; namespace proto { class BaseCommand; @@ -98,9 +99,8 @@ class Commands { static SharedBuffer newGetSchema(const std::string& topic, const std::string& version, uint64_t requestId); - static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId, - uint64_t sequenceId, ChecksumType checksumType, - const proto::MessageMetadata& metadata, const SharedBuffer& payload); + static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, ChecksumType checksumType, + const SendArguments& args); static SharedBuffer newSubscribe( const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc index 56725389..c4b644df 100644 --- a/lib/MessageAndCallbackBatch.cc +++ b/lib/MessageAndCallbackBatch.cc @@ -22,52 +22,84 @@ #include "ClientConnection.h" #include "Commands.h" -#include "LogUtils.h" -#include "MessageImpl.h" - -DECLARE_LOG_OBJECT() +#include "CompressionCodec.h" +#include "MessageCrypto.h" +#include "OpSendMsg.h" +#include "PulsarApi.pb.h" namespace pulsar { -void MessageAndCallbackBatch::add(const Message& msg, const SendCallback& callback) { - if (empty()) { - msgImpl_.reset(new MessageImpl); - Commands::initBatchMessageMetadata(msg, msgImpl_->metadata); - } - LOG_DEBUG(" Before serialization payload size in bytes = " << msgImpl_->payload.readableBytes()); - sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, msgImpl_->payload, - ClientConnection::getMaxMessageSize()); - LOG_DEBUG(" After serialization payload size in bytes = " << msgImpl_->payload.readableBytes()); - callbacks_.emplace_back(callback); +MessageAndCallbackBatch::MessageAndCallbackBatch(size_t capacity) { + messages_.reserve(capacity); + callbacks_.reserve(capacity); +} + +MessageAndCallbackBatch::~MessageAndCallbackBatch() {} - ++messagesCount_; +void MessageAndCallbackBatch::add(const Message& msg, SendCallback&& callback) { + if (callbacks_.empty()) { + metadata_.reset(new proto::MessageMetadata); + Commands::initBatchMessageMetadata(msg, *metadata_); + sequenceId_ = metadata_->sequence_id(); + } + messages_.emplace_back(msg); + callbacks_.emplace_back(std::move(callback)); messagesSize_ += msg.getLength(); } -void MessageAndCallbackBatch::clear() { - msgImpl_.reset(); - callbacks_.clear(); - messagesCount_ = 0; - messagesSize_ = 0; -} +std::unique_ptr MessageAndCallbackBatch::createOpSendMsg( + uint64_t producerId, const ProducerConfiguration& producerConfig, MessageCrypto* crypto) { + if (empty()) { + return OpSendMsg::create(ResultOperationNotSupported, std::move(callbacks_)); + } -static void completeSendCallbacks(const std::vector& callbacks, Result result, - const MessageId& id) { - int32_t numOfMessages = static_cast(callbacks.size()); - LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]"); - for (int32_t i = 0; i < numOfMessages; i++) { - callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build()); + // The magic number 64 is just an estimated size increment after setting some fields of the + // SingleMessageMetadata. It does not have to be accurate because it's only used to reduce the + // reallocation of the payload buffer. + static const size_t kEstimatedHeaderSize = + sizeof(uint32_t) + proto::MessageMetadata{}.ByteSizeLong() + 64; + const auto maxMessageSize = ClientConnection::getMaxMessageSize(); + // Estimate the buffer size just to avoid resizing the buffer + size_t maxBufferSize = kEstimatedHeaderSize * messages_.size(); + for (const auto& msg : messages_) { + maxBufferSize += msg.getLength(); + } + auto payload = SharedBuffer::allocate(maxBufferSize); + for (const auto& msg : messages_) { + sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, payload, maxMessageSize); + } + metadata_->set_sequence_id(sequenceId_); + metadata_->set_num_messages_in_batch(messages_.size()); + auto compressionType = producerConfig.getCompressionType(); + if (compressionType != CompressionNone) { + metadata_->set_compression(static_cast(compressionType)); + metadata_->set_uncompressed_size(payload.readableBytes()); + } + payload = CompressionCodecProvider::getCodec(compressionType).encode(payload); + + if (producerConfig.isEncryptionEnabled() && crypto) { + SharedBuffer encryptedPayload; + if (!crypto->encrypt(producerConfig.getEncryptionKeys(), producerConfig.getCryptoKeyReader(), + *metadata_, payload, encryptedPayload)) { + return OpSendMsg::create(ResultCryptoError, std::move(callbacks_)); + } + payload = encryptedPayload; } -} -void MessageAndCallbackBatch::complete(Result result, const MessageId& id) const { - completeSendCallbacks(callbacks_, result, id); + if (payload.readableBytes() > ClientConnection::getMaxMessageSize()) { + return OpSendMsg::create(ResultMessageTooBig, std::move(callbacks_)); + } + + auto op = OpSendMsg::create(*metadata_, callbacks_.size(), messagesSize_, producerConfig.getSendTimeout(), + std::move(callbacks_), nullptr, producerId, payload); + clear(); + return op; } -SendCallback MessageAndCallbackBatch::createSendCallback() const { - const auto& callbacks = callbacks_; - return [callbacks] // save a copy of `callbacks_` - (Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); }; +void MessageAndCallbackBatch::clear() { + messages_.clear(); + callbacks_.clear(); + messagesSize_ = 0; } } // namespace pulsar diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h index 3d107c63..d1fd3d5a 100644 --- a/lib/MessageAndCallbackBatch.h +++ b/lib/MessageAndCallbackBatch.h @@ -24,15 +24,26 @@ #include #include +#include #include namespace pulsar { -class MessageImpl; -using MessageImplPtr = std::shared_ptr; +struct OpSendMsg; +class MessageCrypto; +using FlushCallback = std::function; -class MessageAndCallbackBatch : public boost::noncopyable { +namespace proto { +class MessageMetadata; +} + +class MessageAndCallbackBatch final : public boost::noncopyable { public: + // This default constructor is added just to make this class able to be stored in a map + MessageAndCallbackBatch() = default; + MessageAndCallbackBatch(size_t capacity); + ~MessageAndCallbackBatch(); + // Wrapper methods of STL container bool empty() const noexcept { return callbacks_.empty(); } size_t size() const noexcept { return callbacks_.size(); } @@ -43,42 +54,21 @@ class MessageAndCallbackBatch : public boost::noncopyable { * @param message * @callback the associated send callback */ - void add(const Message& msg, const SendCallback& callback); + void add(const Message& msg, SendCallback&& callback); - /** - * Clear the internal stats - */ - void clear(); + std::unique_ptr createOpSendMsg(uint64_t producerId, + const ProducerConfiguration& producerConfig, + MessageCrypto* crypto); - /** - * Complete all the callbacks with given parameters - * - * @param result this batch's send result - * @param id this batch's message id - */ - void complete(Result result, const MessageId& id) const; - - /** - * Create a single callback to trigger all the internal callbacks in order - * It's used when you want to clear and add new messages and callbacks but current callbacks need to be - * triggered later. - * - * @return the merged send callback - */ - SendCallback createSendCallback() const; - - const MessageImplPtr& msgImpl() const { return msgImpl_; } uint64_t sequenceId() const noexcept { return sequenceId_; } - uint32_t messagesCount() const { return messagesCount_; } - uint64_t messagesSize() const { return messagesSize_; } + void clear(); private: - MessageImplPtr msgImpl_; + std::unique_ptr metadata_; + std::vector messages_; std::vector callbacks_; std::atomic sequenceId_{static_cast(-1L)}; - - uint32_t messagesCount_{0}; uint64_t messagesSize_{0ull}; }; diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h index d365b906..421f3af3 100644 --- a/lib/OpSendMsg.h +++ b/lib/OpSendMsg.h @@ -21,56 +21,112 @@ #include #include +#include #include #include "ChunkMessageIdImpl.h" +#include "MessageIdImpl.h" #include "PulsarApi.pb.h" #include "SharedBuffer.h" #include "TimeUtils.h" namespace pulsar { -struct OpSendMsg { - proto::MessageMetadata metadata_; - SharedBuffer payload_; - SendCallback sendCallback_; - uint64_t producerId_; - uint64_t sequenceId_; - boost::posix_time::ptime timeout_; - uint32_t messagesCount_; - uint64_t messagesSize_; - std::vector> trackerCallbacks_; - ChunkMessageIdImplPtr chunkedMessageId_; +struct SendArguments { + const uint64_t producerId; + const uint64_t sequenceId; + const proto::MessageMetadata metadata; + SharedBuffer payload; - OpSendMsg() = default; + SendArguments(uint64_t producerId, uint64_t sequenceId, const proto::MessageMetadata& metadata, + const SharedBuffer& payload) + : producerId(producerId), sequenceId(sequenceId), metadata(metadata), payload(payload) {} + SendArguments(const SendArguments&) = delete; + SendArguments& operator=(const SendArguments&) = delete; +}; - OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& payload, - const SendCallback& sendCallback, uint64_t producerId, uint64_t sequenceId, int sendTimeoutMs, - uint32_t messagesCount, uint64_t messagesSize, ChunkMessageIdImplPtr chunkedMessageId = nullptr) - : metadata_(metadata), // the copy happens here because OpSendMsg of chunks are constructed with - // a shared metadata object - payload_(payload), - sendCallback_(sendCallback), - producerId_(producerId), - sequenceId_(sequenceId), - timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)), - messagesCount_(messagesCount), - messagesSize_(messagesSize), - chunkedMessageId_(chunkedMessageId) {} +struct OpSendMsg { + const Result result; + const int32_t chunkId; + const int32_t numChunks; + const uint32_t messagesCount; + const uint64_t messagesSize; + const boost::posix_time::ptime timeout; + SendCallback nonBatchSendCallback; + std::vector batchSendCallbacks; + std::vector> trackerCallbacks; + ChunkMessageIdImplPtr chunkedMessageId; + // Use shared_ptr here because producer might resend the message with the same arguments + const std::shared_ptr sendArgs; + + template + static std::unique_ptr create(Args&&... args) { + return std::unique_ptr(new OpSendMsg(std::forward(args)...)); + } void complete(Result result, const MessageId& messageId) const { - if (sendCallback_) { - sendCallback_(result, messageId); + for (size_t i = 0; i < batchSendCallbacks.size(); i++) { + auto& callback = batchSendCallbacks[i]; + if (callback) { + if (result == ResultOk) { + auto msgIdImpl = std::make_shared( + messageId.partition(), messageId.ledgerId(), messageId.entryId(), i); + callback(result, MessageId{msgIdImpl}); + } else { + callback(result, {}); + } + } } - for (const auto& trackerCallback : trackerCallbacks_) { + if (nonBatchSendCallback) { + nonBatchSendCallback(result, messageId); + } + for (const auto& trackerCallback : trackerCallbacks) { trackerCallback(result); } } void addTrackerCallback(std::function trackerCallback) { - trackerCallbacks_.emplace_back(trackerCallback); + if (trackerCallback) { + trackerCallbacks.emplace_back(trackerCallback); + } } + + private: + OpSendMsg(Result result, std::vector&& callbacks) + : result(result), + chunkId(-1), + numChunks(-1), + messagesCount(0), + messagesSize(0), + batchSendCallbacks(std::move(callbacks)), + sendArgs(nullptr) {} + + OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize, + int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId, + uint64_t producerId, SharedBuffer payload) + : result(ResultOk), + chunkId(metadata.chunk_id()), + numChunks(metadata.num_chunks_from_msg()), + messagesCount(messagesCount), + messagesSize(messagesSize), + timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), + nonBatchSendCallback(std::move(callback)), + chunkedMessageId(chunkedMessageId), + sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} + + OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize, + int sendTimeoutMs, std::vector&& callbacks, + ChunkMessageIdImplPtr chunkedMessageId, uint64_t producerId, SharedBuffer payload) + : result(ResultOk), + chunkId(metadata.chunk_id()), + numChunks(metadata.num_chunks_from_msg()), + messagesCount(messagesCount), + messagesSize(messagesSize), + timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)), + batchSendCallbacks(std::move(callbacks)), + chunkedMessageId(chunkedMessageId), + sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {} }; } // namespace pulsar diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 71559ffa..3aea15e7 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -39,22 +39,11 @@ #include "Semaphore.h" #include "TimeUtils.h" #include "TopicName.h" -#include "stats/ProducerStatsDisabled.h" #include "stats/ProducerStatsImpl.h" namespace pulsar { DECLARE_LOG_OBJECT() -struct ProducerImpl::PendingCallbacks { - std::vector opSendMsgs; - - void complete(Result result) { - for (const auto& opSendMsg : opSendMsgs) { - opSendMsg.complete(result, {}); - } - } -}; - ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors, int32_t partition) @@ -64,7 +53,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, milliseconds(std::max(100, conf.getSendTimeout() - 100)))), conf_(conf), semaphore_(), - pendingMessagesQueue_(), partition_(partition), producerName_(conf_.getProducerName()), userProvidedProducerName_(false), @@ -93,13 +81,11 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, } unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds(); - if (statsIntervalInSeconds) { + if (statsIntervalInSeconds > 0) { producerStatsBasePtr_ = std::make_shared(producerStr_, executor_, statsIntervalInSeconds); - } else { - producerStatsBasePtr_ = std::make_shared(); + producerStatsBasePtr_->start(); } - producerStatsBasePtr_->start(); if (conf_.isEncryptionEnabled()) { std::ostringstream logCtxStream; @@ -297,43 +283,46 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r } } -std::shared_ptr ProducerImpl::getPendingCallbacksWhenFailed() { - auto callbacks = std::make_shared(); - callbacks->opSendMsgs.reserve(pendingMessagesQueue_.size()); +auto ProducerImpl::getPendingCallbacksWhenFailed() -> decltype(pendingMessagesQueue_) { + decltype(pendingMessagesQueue_) pendingMessages; LOG_DEBUG(getName() << "# messages in pending queue : " << pendingMessagesQueue_.size()); - // Iterate over a copy of the pending messages queue, to trigger the future completion - // without holding producer mutex. - for (auto& op : pendingMessagesQueue_) { - callbacks->opSendMsgs.push_back(op); - releaseSemaphoreForSendOp(op); + pendingMessages.swap(pendingMessagesQueue_); + for (const auto& op : pendingMessages) { + releaseSemaphoreForSendOp(*op); } - if (batchMessageContainer_) { - batchMessageContainer_->processAndClear( - [this, &callbacks](Result result, const OpSendMsg& opSendMsg) { - if (result == ResultOk) { - callbacks->opSendMsgs.emplace_back(opSendMsg); - } - releaseSemaphoreForSendOp(opSendMsg); - }, - nullptr); + if (!batchMessageContainer_ || batchMessageContainer_->isEmpty()) { + return pendingMessages; } - pendingMessagesQueue_.clear(); - return callbacks; + auto handleOp = [this, &pendingMessages](std::unique_ptr&& op) { + releaseSemaphoreForSendOp(*op); + if (op->result == ResultOk) { + pendingMessages.emplace_back(std::move(op)); + } + }; + + if (batchMessageContainer_->hasMultiOpSendMsgs()) { + auto opSendMsgs = batchMessageContainer_->createOpSendMsgs(); + for (auto&& op : opSendMsgs) { + handleOp(std::move(op)); + } + } else { + handleOp(batchMessageContainer_->createOpSendMsg()); + } + return pendingMessages; } -std::shared_ptr ProducerImpl::getPendingCallbacksWhenFailedWithLock() { +auto ProducerImpl::getPendingCallbacksWhenFailedWithLock() -> decltype(pendingMessagesQueue_) { Lock lock(mutex_); return getPendingCallbacksWhenFailed(); } void ProducerImpl::failPendingMessages(Result result, bool withLock) { - if (withLock) { - getPendingCallbacksWhenFailedWithLock()->complete(result); - } else { - getPendingCallbacksWhenFailed()->complete(result); + auto opSendMsgs = withLock ? getPendingCallbacksWhenFailedWithLock() : getPendingCallbacksWhenFailed(); + for (const auto& op : opSendMsgs) { + op->complete(result, {}); } } @@ -345,24 +334,8 @@ void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { LOG_DEBUG(getName() << "Re-Sending " << pendingMessagesQueue_.size() << " messages to server"); for (const auto& op : pendingMessagesQueue_) { - LOG_DEBUG(getName() << "Re-Sending " << op.sequenceId_); - cnx->sendMessage(op); - } -} - -void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequenceId, - const uint32_t& uncompressedSize) { - // Call this function after acquiring the mutex_ - proto::MessageMetadata& msgMetadata = msg.impl_->metadata; - msgMetadata.set_producer_name(producerName_); - msgMetadata.set_publish_time(TimeUtils::currentTimeMillis()); - msgMetadata.set_sequence_id(sequenceId); - if (conf_.getCompressionType() != CompressionNone) { - msgMetadata.set_compression(static_cast(conf_.getCompressionType())); - msgMetadata.set_uncompressed_size(uncompressedSize); - } - if (!this->getSchemaVersion().empty()) { - msgMetadata.set_schema_version(this->getSchemaVersion()); + LOG_DEBUG(getName() << "Re-Sending " << op->sendArgs->sequenceId); + cnx->sendMessage(op->sendArgs); } } @@ -378,7 +351,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) { auto& opSendMsg = pendingMessagesQueue_.back(); lock.unlock(); failures.complete(); - opSendMsg.addTrackerCallback(callback); + opSendMsg->addTrackerCallback(callback); } else { lock.unlock(); failures.complete(); @@ -389,7 +362,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) { if (!pendingMessagesQueue_.empty()) { auto& opSendMsg = pendingMessagesQueue_.back(); lock.unlock(); - opSendMsg.addTrackerCallback(callback); + opSendMsg->addTrackerCallback(callback); } else { lock.unlock(); callback(ResultOk); @@ -443,18 +416,24 @@ static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload, } void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { - producerStatsBasePtr_->messageSent(msg); + if (!producerStatsBasePtr_ && !interceptors_) { + return sendAsyncWithStatsUpdate(msg, std::move(callback)); + } + auto self = shared_from_this(); + if (producerStatsBasePtr_) { + producerStatsBasePtr_->messageSent(msg); + } - Producer producer = Producer(shared_from_this()); - auto interceptorMessage = interceptors_->beforeSend(producer, msg); + const auto interceptorMessage = interceptors_ ? interceptors_->beforeSend(Producer{self}, msg) : msg; + const auto now = producerStatsBasePtr_ ? TimeUtils::now() : boost::posix_time::ptime{}; - const auto now = boost::posix_time::microsec_clock::universal_time(); - auto self = shared_from_this(); - sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage]( + sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, interceptorMessage]( Result result, const MessageId& messageId) { - producerStatsBasePtr_->messageReceived(result, now); + if (producerStatsBasePtr_) { + producerStatsBasePtr_->messageReceived(result, now); + } - interceptors_->onSendAcknowledgement(producer, result, interceptorMessage, messageId); + interceptors_->onSendAcknowledgement(Producer{self}, result, interceptorMessage, messageId); if (callback) { callback(result, messageId); @@ -462,7 +441,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { }); } -void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback) { +void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback) { if (!isValidProducerState(callback)) { return; } @@ -509,10 +488,22 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba uint64_t sequenceId; if (!msgMetadata.has_sequence_id()) { sequenceId = msgSequenceGenerator_++; + msgMetadata.set_sequence_id(sequenceId); } else { sequenceId = msgMetadata.sequence_id(); } - setMessageMetadata(msg, sequenceId, uncompressedSize); + + if (compressed || batchMessageContainer_->isFirstMessageToAdd(msg)) { + msgMetadata.set_producer_name(producerName_); + msgMetadata.set_publish_time(TimeUtils::currentTimeMillis()); + if (conf_.getCompressionType() != CompressionNone) { + msgMetadata.set_compression(static_cast(conf_.getCompressionType())); + msgMetadata.set_uncompressed_size(uncompressedSize); + } + if (!schemaVersion_.empty()) { + msgMetadata.set_schema_version(schemaVersion_); + } + } // else: The 2nd or later message in the batch only needs to set the sequence id auto payloadChunkSize = maxMessageSize; int totalChunks; @@ -539,13 +530,13 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } } - if (canAddToBatch(msg)) { + if (!compressed) { // Batching is enabled and the message is not delayed if (!batchMessageContainer_->hasEnoughSpace(msg)) { batchMessageAndSend().complete(); } bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); - bool isFull = batchMessageContainer_->add(msg, callback); + bool isFull = batchMessageContainer_->add(msg, std::move(callback)); if (isFirstMessage) { batchTimer_->expires_from_now( boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs())); @@ -601,17 +592,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba handleFailedResult(ResultCryptoError); return; } - OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr, - producerId_, sequenceId, conf_.getSendTimeout(), - 1, uncompressedSize, chunkMessageId}; + + auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(), + (chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId, + producerId_, encryptedPayload); if (!chunkingEnabled_) { - const uint32_t msgMetadataSize = op.metadata_.ByteSize(); - const uint32_t payloadSize = op.payload_.readableBytes(); + const uint32_t msgMetadataSize = op->sendArgs->metadata.ByteSizeLong(); + const uint32_t payloadSize = op->sendArgs->payload.readableBytes(); const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize; if (msgHeadersAndPayloadSize > maxMessageSize) { lock.unlock(); - releaseSemaphoreForSendOp(op); + releaseSemaphoreForSendOp(*op); LOG_WARN(getName() << " - compressed Message size " << msgHeadersAndPayloadSize << " cannot exceed " << maxMessageSize << " bytes unless chunking is enabled"); @@ -620,7 +612,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba } } - sendMessage(op); + sendMessage(std::move(op)); } } } @@ -667,10 +659,10 @@ void ProducerImpl::releaseSemaphore(uint32_t payloadSize) { void ProducerImpl::releaseSemaphoreForSendOp(const OpSendMsg& op) { if (semaphore_) { - semaphore_->release(op.messagesCount_); + semaphore_->release(op.messagesCount); } - memoryLimitController_.releaseMemory(op.messagesSize_); + memoryLimitController_.releaseMemory(op.messagesSize); } // It must be called while `mutex_` is acquired @@ -678,37 +670,50 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall PendingFailures failures; LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_); batchTimer_->cancel(); + if (batchMessageContainer_->isEmpty()) { + return failures; + } - batchMessageContainer_->processAndClear( - [this, &failures](Result result, const OpSendMsg& opSendMsg) { - if (result == ResultOk) { - sendMessage(opSendMsg); - } else { - // A spot has been reserved for this batch, but the batch failed to be pushed to the queue, so - // we need to release the spot manually - LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << result); - releaseSemaphoreForSendOp(opSendMsg); - failures.add([opSendMsg, result] { opSendMsg.complete(result, {}); }); - } - }, - flushCallback); + auto handleOp = [this, &failures](std::unique_ptr&& op) { + if (op->result == ResultOk) { + sendMessage(std::move(op)); + } else { + LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << op->result); + releaseSemaphoreForSendOp(*op); + auto rawOpPtr = op.release(); + failures.add([rawOpPtr] { + std::unique_ptr op{rawOpPtr}; + op->complete(op->result, {}); + }); + } + }; + + if (batchMessageContainer_->hasMultiOpSendMsgs()) { + auto opSendMsgs = batchMessageContainer_->createOpSendMsgs(flushCallback); + for (auto&& op : opSendMsgs) { + handleOp(std::move(op)); + } + } else { + handleOp(batchMessageContainer_->createOpSendMsg(flushCallback)); + } return failures; } // Precondition - // a. we have a reserved spot on the queue // b. call this function after acquiring the ProducerImpl mutex_ -void ProducerImpl::sendMessage(const OpSendMsg& op) { - const auto sequenceId = op.metadata_.sequence_id(); +void ProducerImpl::sendMessage(std::unique_ptr opSendMsg) { + const auto sequenceId = opSendMsg->sendArgs->sequenceId; LOG_DEBUG("Inserting data to pendingMessagesQueue_"); - pendingMessagesQueue_.push_back(op); + auto args = opSendMsg->sendArgs; + pendingMessagesQueue_.emplace_back(std::move(opSendMsg)); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { // If we do have a connection, the message is sent immediately, otherwise // we'll try again once a new connection is established LOG_DEBUG(getName() << "Sending msg immediately - seq: " << sequenceId); - cnx->sendMessage(op); + cnx->sendMessage(args); } else { LOG_DEBUG(getName() << "Connection is not ready - seq: " << sequenceId); } @@ -808,7 +813,7 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { return; } - std::shared_ptr pendingCallbacks; + decltype(pendingMessagesQueue_) pendingMessages; if (pendingMessagesQueue_.empty()) { // If there are no pending messages, reset the timeout to the configured value. LOG_DEBUG(getName() << "Producer timeout triggered on empty pending message queue"); @@ -816,11 +821,11 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { } else { // If there is at least one message, calculate the diff between the message timeout and // the current time. - time_duration diff = pendingMessagesQueue_.front().timeout_ - TimeUtils::now(); + time_duration diff = pendingMessagesQueue_.front()->timeout - TimeUtils::now(); if (diff.total_milliseconds() <= 0) { // The diff is less than or equal to zero, meaning that the message has been expired. LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks."); - pendingCallbacks = getPendingCallbacksWhenFailed(); + pendingMessages = getPendingCallbacksWhenFailed(); // Since the pending queue is cleared now, set timer to expire after configured value. asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout())); } else { @@ -831,8 +836,8 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { } lock.unlock(); - if (pendingCallbacks) { - pendingCallbacks->complete(ResultTimeout); + for (const auto& op : pendingMessages) { + op->complete(ResultTimeout, {}); } } @@ -844,8 +849,8 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) { return true; } - OpSendMsg op = pendingMessagesQueue_.front(); - uint64_t expectedSequenceId = op.sequenceId_; + std::unique_ptr op{std::move(pendingMessagesQueue_.front().release())}; + uint64_t expectedSequenceId = op->sendArgs->sequenceId; if (sequenceId > expectedSequenceId) { LOG_WARN(getName() << "Got ack failure for msg " << sequenceId // << " expecting: " << expectedSequenceId << " queue size=" // @@ -860,11 +865,11 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) { lock.unlock(); try { // to protect from client callback exception - op.complete(ResultChecksumError, {}); + op->complete(ResultChecksumError, {}); } catch (const std::exception& e) { LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); } - releaseSemaphoreForSendOp(op); + releaseSemaphoreForSendOp(*op); return true; } } @@ -880,8 +885,14 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { return true; } - OpSendMsg op = pendingMessagesQueue_.front(); - uint64_t expectedSequenceId = op.sequenceId_; + const auto& op = *pendingMessagesQueue_.front(); + if (op.result != ResultOk) { + LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and " + << rawMessageId); + return false; + } + + uint64_t expectedSequenceId = op.sendArgs->sequenceId; if (sequenceId > expectedSequenceId) { LOG_WARN(getName() << "Got ack for msg " << sequenceId // << " expecting: " << expectedSequenceId << " queue size=" // @@ -898,26 +909,40 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { // Message was persisted correctly LOG_DEBUG(getName() << "Received ack for msg " << sequenceId); - if (op.chunkedMessageId_) { + if (op.chunkedMessageId) { // Handling the chunk message id. - if (op.metadata_.chunk_id() == 0) { - op.chunkedMessageId_->setFirstChunkMessageId(messageId); - } else if (op.metadata_.chunk_id() == op.metadata_.num_chunks_from_msg() - 1) { - op.chunkedMessageId_->setLastChunkMessageId(messageId); - messageId = op.chunkedMessageId_->build(); + if (op.chunkId == 0) { + op.chunkedMessageId->setFirstChunkMessageId(messageId); + } else if (op.chunkId == op.numChunks - 1) { + op.chunkedMessageId->setLastChunkMessageId(messageId); + messageId = op.chunkedMessageId->build(); } } releaseSemaphoreForSendOp(op); - lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1; + lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1; + std::unique_ptr opSendMsg{pendingMessagesQueue_.front().release()}; pendingMessagesQueue_.pop_front(); lock.unlock(); - try { - op.complete(ResultOk, messageId); - } catch (const std::exception& e) { - LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); + auto client = client_.lock(); + if (client) { + auto rawOpPtr{opSendMsg.release()}; + client->dispatch([rawOpPtr, messageId] { + std::unique_ptr opSendMsg{rawOpPtr}; + try { + opSendMsg->complete(ResultOk, messageId); + } catch (const std::exception& e) { + LOG_ERROR("Exception thrown from callback " << e.what()); + } + }); + } else { + try { + opSendMsg->complete(ResultOk, messageId); + } catch (const std::exception& e) { + LOG_ERROR(getName() << "Exception thrown from callback " << e.what()); + } } return true; } diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index afc6346b..0226d060 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -104,11 +104,7 @@ class ProducerImpl : public HandlerBase, protected: ProducerStatsBasePtr producerStatsBasePtr_; - typedef std::deque MessageQueue; - - void setMessageMetadata(const Message& msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize); - - void sendMessage(const OpSendMsg& opSendMsg); + void sendMessage(std::unique_ptr opSendMsg); void startSendTimeoutTimer(); @@ -138,7 +134,7 @@ class ProducerImpl : public HandlerBase, bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, SharedBuffer& encryptedPayload); - void sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& callback); + void sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback); /** * Reserve a spot in the messages queue before acquiring the ProducerImpl mutex. When the queue is full, @@ -163,7 +159,7 @@ class ProducerImpl : public HandlerBase, ProducerConfiguration conf_; std::unique_ptr semaphore_; - MessageQueue pendingMessagesQueue_; + std::list> pendingMessagesQueue_; const int32_t partition_; // -1 if topic is non-partitioned std::string producerName_; @@ -187,8 +183,8 @@ class ProducerImpl : public HandlerBase, Promise producerCreatedPromise_; struct PendingCallbacks; - std::shared_ptr getPendingCallbacksWhenFailed(); - std::shared_ptr getPendingCallbacksWhenFailedWithLock(); + decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed(); + decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailedWithLock(); void failPendingMessages(Result result, bool withLock); diff --git a/lib/ProducerInterceptors.h b/lib/ProducerInterceptors.h index f83394f1..d9c8c10b 100644 --- a/lib/ProducerInterceptors.h +++ b/lib/ProducerInterceptors.h @@ -41,6 +41,8 @@ class ProducerInterceptors { void close(); + operator bool() const noexcept { return interceptors_.empty(); } + private: enum State { diff --git a/lib/stats/ProducerStatsDisabled.h b/lib/stats/ProducerStatsDisabled.h deleted file mode 100644 index df1df0f8..00000000 --- a/lib/stats/ProducerStatsDisabled.h +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef PULSAR_PRODUCER_STATS_DISABLED_HEADER -#define PULSAR_PRODUCER_STATS_DISABLED_HEADER -#include "ProducerStatsBase.h" - -namespace pulsar { -class ProducerStatsDisabled : public ProducerStatsBase { - public: - virtual void messageSent(const Message& msg){}; - virtual void messageReceived(Result, const boost::posix_time::ptime&){}; -}; -} // namespace pulsar -#endif // PULSAR_PRODUCER_STATS_DISABLED_HEADER