diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index 87f02ea4..beb65881 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -30,20 +30,21 @@ DECLARE_LOG_OBJECT() namespace pulsar { auto BinaryProtoLookupService::getBroker(const TopicName& topicName) -> LookupResultFuture { - return findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString(), 0); + Promise promise; + findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString(), 0, promise); + return promise.getFuture(); } -auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative, - const std::string& topic, size_t redirectCount) - -> LookupResultFuture { +void BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative, + const std::string& topic, size_t redirectCount, + Promise promise) { LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic << ", redirect count: " << redirectCount); - auto promise = std::make_shared>(); if (maxLookupRedirects_ > 0 && redirectCount > maxLookupRedirects_) { LOG_ERROR("Too many lookup request redirects on topic " << topic << ", configured limit is " << maxLookupRedirects_); - promise->setFailed(ResultTooManyLookupRequestException); - return promise->getFuture(); + promise.setFailed(ResultTooManyLookupRequestException); + return; } // NOTE: we can use move capture for topic since C++14 @@ -51,13 +52,13 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho redirectCount](Result result, const ClientConnectionWeakPtr& weakCnx) { if (result != ResultOk) { - promise->setFailed(result); + promise.setFailed(result); return; } auto cnx = weakCnx.lock(); if (!cnx) { LOG_ERROR("Connection to " << address << " is expired before lookup"); - promise->setFailed(ResultNotConnected); + promise.setFailed(ResultNotConnected); return; } auto lookupPromise = std::make_shared(); @@ -66,35 +67,26 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho Result result, const LookupDataResultPtr& data) { if (result != ResultOk || !data) { LOG_ERROR("Lookup failed for " << topic << ", result " << result); - promise->setFailed(result); + promise.setFailed(result); return; } - const auto responseBrokerAddress = (serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl()); if (data->isRedirect()) { LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress); - findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1) - .addListener([promise](Result result, const LookupResult& value) { - if (result == ResultOk) { - promise->setValue(value); - } else { - promise->setFailed(result); - } - }); + findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1, promise); } else { - LOG_INFO("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl() - << ", from " << cnx->cnxString()); + LOG_DEBUG("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl() + << ", from " << cnx->cnxString()); if (data->shouldProxyThroughServiceUrl()) { // logicalAddress is the proxy's address, we should still connect through proxy - promise->setValue({responseBrokerAddress, address}); + promise.setValue({responseBrokerAddress, address}); } else { - promise->setValue({responseBrokerAddress, responseBrokerAddress}); + promise.setValue({responseBrokerAddress, responseBrokerAddress}); } } }); }); - return promise->getFuture(); } /* diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index a3c059e4..6689eb4b 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -55,9 +55,10 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; protected: + using LookupResultPromise = Promise; // Mark findBroker as protected to make it accessible from test. - LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic, - size_t redirectCount); + void findBroker(const std::string& address, bool authoritative, const std::string& topic, + size_t redirectCount, LookupResultPromise promise); private: std::mutex mutex_; diff --git a/lib/CurlWrapper.h b/lib/CurlWrapper.h index 89b7919d..749a79c3 100644 --- a/lib/CurlWrapper.h +++ b/lib/CurlWrapper.h @@ -172,7 +172,10 @@ inline CurlWrapper::Result CurlWrapper::get(const std::string& url, const std::s if (responseCode == 307 || responseCode == 302 || responseCode == 301) { char* url; curl_easy_getinfo(handle_, CURLINFO_REDIRECT_URL, &url); - result.redirectUrl = url; + // `url` is null when the host of the redirect URL cannot be resolved + if (url) { + result.redirectUrl = url; + } } return result; } diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 4ec72c1e..0959af2a 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -191,98 +191,77 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string & Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData, long &responseCode) { - uint16_t reqCount = 0; - Result retResult = ResultOk; - while (++reqCount <= maxLookupRedirects_) { - // Authorization data - AuthenticationDataPtr authDataContent; - Result authResult = authenticationPtr_->getAuthData(authDataContent); - if (authResult != ResultOk) { - LOG_ERROR("Failed to getAuthData: " << authResult); - return authResult; - } + // Authorization data + AuthenticationDataPtr authDataContent; + Result authResult = authenticationPtr_->getAuthData(authDataContent); + if (authResult != ResultOk) { + LOG_ERROR("Failed to getAuthData: " << authResult); + return authResult; + } - CurlWrapper curl; - if (!curl.init()) { - LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); - return ResultLookupError; - } + CurlWrapper curl; + if (!curl.init()) { + LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); + return ResultLookupError; + } - std::unique_ptr tlsContext; - if (isUseTls_) { - tlsContext.reset(new CurlWrapper::TlsContext); - tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_; - tlsContext->validateHostname = tlsValidateHostname_; - tlsContext->allowInsecure = tlsAllowInsecure_; - if (authDataContent->hasDataForTls()) { - tlsContext->certPath = authDataContent->getTlsCertificates(); - tlsContext->keyPath = authDataContent->getTlsPrivateKey(); - } else { - tlsContext->certPath = tlsCertificateFilePath_; - tlsContext->keyPath = tlsPrivateFilePath_; - } + std::unique_ptr tlsContext; + if (isUseTls_) { + tlsContext.reset(new CurlWrapper::TlsContext); + tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_; + tlsContext->validateHostname = tlsValidateHostname_; + tlsContext->allowInsecure = tlsAllowInsecure_; + if (authDataContent->hasDataForTls()) { + tlsContext->certPath = authDataContent->getTlsCertificates(); + tlsContext->keyPath = authDataContent->getTlsPrivateKey(); + } else { + tlsContext->certPath = tlsCertificateFilePath_; + tlsContext->keyPath = tlsPrivateFilePath_; } + } - LOG_INFO("Curl [" << reqCount << "] Lookup Request sent for " << completeUrl); - CurlWrapper::Options options; - options.timeoutInSeconds = lookupTimeoutInSeconds_; - options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR; - options.maxLookupRedirects = 1; // redirection is implemented by the outer loop - auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get()); - const auto &error = result.error; - if (!error.empty()) { - LOG_ERROR(completeUrl << " failed: " << error); - return ResultConnectError; - } + LOG_INFO("Curl Lookup Request sent for " << completeUrl); + CurlWrapper::Options options; + options.timeoutInSeconds = lookupTimeoutInSeconds_; + options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR; + options.maxLookupRedirects = maxLookupRedirects_; + auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get()); + const auto &error = result.error; + if (!error.empty()) { + LOG_ERROR(completeUrl << " failed: " << error); + return ResultConnectError; + } - responseData = result.responseData; - responseCode = result.responseCode; - auto res = result.code; - LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode - << " curl res " << res); - - const auto &redirectUrl = result.redirectUrl; - switch (res) { - case CURLE_OK: - if (responseCode == 200) { - retResult = ResultOk; - } else if (!redirectUrl.empty()) { - LOG_INFO("Response from url " << completeUrl << " to new url " << redirectUrl); - completeUrl = redirectUrl; - retResult = ResultLookupError; - } else { - retResult = ResultLookupError; - } - break; - case CURLE_COULDNT_CONNECT: - LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - retResult = ResultRetryable; - break; - case CURLE_COULDNT_RESOLVE_PROXY: - case CURLE_COULDNT_RESOLVE_HOST: - case CURLE_HTTP_RETURNED_ERROR: - LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - retResult = ResultConnectError; - break; - case CURLE_READ_ERROR: - LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - retResult = ResultReadError; - break; - case CURLE_OPERATION_TIMEDOUT: - LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - retResult = ResultTimeout; - break; - default: - LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); - retResult = ResultLookupError; - break; - } - if (redirectUrl.empty()) { - break; - } + responseData = result.responseData; + responseCode = result.responseCode; + auto res = result.code; + if (res == CURLE_OK) { + LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode); + } else if (res == CURLE_TOO_MANY_REDIRECTS) { + LOG_ERROR("Response received for url " << completeUrl << ": " << curl_easy_strerror(res) + << ", curl error: " << result.serverError + << ", redirect URL: " << result.redirectUrl); + } else { + LOG_ERROR("Response failed for url " << completeUrl << ": " << curl_easy_strerror(res) + << ", curl error: " << result.serverError); } - return retResult; + switch (res) { + case CURLE_OK: + return ResultOk; + case CURLE_COULDNT_CONNECT: + return ResultRetryable; + case CURLE_COULDNT_RESOLVE_PROXY: + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_HTTP_RETURNED_ERROR: + return ResultConnectError; + case CURLE_READ_ERROR: + return ResultReadError; + case CURLE_OPERATION_TIMEDOUT: + return ResultTimeout; + default: + return ResultLookupError; + } } LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) { diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 3457bd4d..bf0c0a34 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -441,7 +441,9 @@ class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupServi LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic, size_t redirectCount) { - return BinaryProtoLookupService::findBroker(address, authoritative, topic, redirectCount); + Promise promise; + BinaryProtoLookupService::findBroker(address, authoritative, topic, redirectCount, promise); + return promise.getFuture(); } }; // class BinaryProtoLookupServiceRedirectTestHelper