Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid stack overflow when many lookup redirection happens #25

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,35 @@ DECLARE_LOG_OBJECT()
namespace pulsar {

auto BinaryProtoLookupService::getBroker(const TopicName& topicName) -> LookupResultFuture {
return findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString(), 0);
Promise<Result, LookupResult> promise;
findBroker(serviceNameResolver_.resolveHost(), false, topicName.toString(), 0, promise);
return promise.getFuture();
}

auto BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative,
const std::string& topic, size_t redirectCount)
-> LookupResultFuture {
void BinaryProtoLookupService::findBroker(const std::string& address, bool authoritative,
const std::string& topic, size_t redirectCount,
Promise<Result, LookupResult> promise) {
LOG_DEBUG("find broker from " << address << ", authoritative: " << authoritative << ", topic: " << topic
<< ", redirect count: " << redirectCount);
auto promise = std::make_shared<Promise<Result, LookupResult>>();
if (maxLookupRedirects_ > 0 && redirectCount > maxLookupRedirects_) {
LOG_ERROR("Too many lookup request redirects on topic " << topic << ", configured limit is "
<< maxLookupRedirects_);
promise->setFailed(ResultTooManyLookupRequestException);
return promise->getFuture();
promise.setFailed(ResultTooManyLookupRequestException);
return;
}

// NOTE: we can use move capture for topic since C++14
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, address, authoritative,
redirectCount](Result result,
const ClientConnectionWeakPtr& weakCnx) {
if (result != ResultOk) {
promise->setFailed(result);
promise.setFailed(result);
return;
}
auto cnx = weakCnx.lock();
if (!cnx) {
LOG_ERROR("Connection to " << address << " is expired before lookup");
promise->setFailed(ResultNotConnected);
promise.setFailed(ResultNotConnected);
return;
}
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
Expand All @@ -66,35 +67,26 @@ auto BinaryProtoLookupService::findBroker(const std::string& address, bool autho
Result result, const LookupDataResultPtr& data) {
if (result != ResultOk || !data) {
LOG_ERROR("Lookup failed for " << topic << ", result " << result);
promise->setFailed(result);
promise.setFailed(result);
return;
}

const auto responseBrokerAddress =
(serviceNameResolver_.useTls() ? data->getBrokerUrlTls() : data->getBrokerUrl());
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << topic << " redirected to " << responseBrokerAddress);
findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1)
.addListener([promise](Result result, const LookupResult& value) {
if (result == ResultOk) {
promise->setValue(value);
} else {
promise->setFailed(result);
}
});
findBroker(responseBrokerAddress, data->isAuthoritative(), topic, redirectCount + 1, promise);
} else {
LOG_INFO("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl()
<< ", from " << cnx->cnxString());
LOG_DEBUG("Lookup response for " << topic << ", lookup-broker-url " << data->getBrokerUrl()
<< ", from " << cnx->cnxString());
if (data->shouldProxyThroughServiceUrl()) {
// logicalAddress is the proxy's address, we should still connect through proxy
promise->setValue({responseBrokerAddress, address});
promise.setValue({responseBrokerAddress, address});
} else {
promise->setValue({responseBrokerAddress, responseBrokerAddress});
promise.setValue({responseBrokerAddress, responseBrokerAddress});
}
}
});
});
return promise->getFuture();
}

/*
Expand Down
5 changes: 3 additions & 2 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;

protected:
using LookupResultPromise = Promise<Result, LookupResult>;
// Mark findBroker as protected to make it accessible from test.
LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
size_t redirectCount);
void findBroker(const std::string& address, bool authoritative, const std::string& topic,
size_t redirectCount, LookupResultPromise promise);

private:
std::mutex mutex_;
Expand Down
5 changes: 4 additions & 1 deletion lib/CurlWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ inline CurlWrapper::Result CurlWrapper::get(const std::string& url, const std::s
if (responseCode == 307 || responseCode == 302 || responseCode == 301) {
char* url;
curl_easy_getinfo(handle_, CURLINFO_REDIRECT_URL, &url);
result.redirectUrl = url;
// `url` is null when the host of the redirect URL cannot be resolved
if (url) {
result.redirectUrl = url;
}
}
return result;
}
Expand Down
149 changes: 64 additions & 85 deletions lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,98 +191,77 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &

Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData,
long &responseCode) {
uint16_t reqCount = 0;
Result retResult = ResultOk;
while (++reqCount <= maxLookupRedirects_) {
// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("Failed to getAuthData: " << authResult);
return authResult;
}
// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("Failed to getAuthData: " << authResult);
return authResult;
}

CurlWrapper curl;
if (!curl.init()) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
return ResultLookupError;
}
CurlWrapper curl;
if (!curl.init()) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
return ResultLookupError;
}

std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
if (isUseTls_) {
tlsContext.reset(new CurlWrapper::TlsContext);
tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_;
tlsContext->validateHostname = tlsValidateHostname_;
tlsContext->allowInsecure = tlsAllowInsecure_;
if (authDataContent->hasDataForTls()) {
tlsContext->certPath = authDataContent->getTlsCertificates();
tlsContext->keyPath = authDataContent->getTlsPrivateKey();
} else {
tlsContext->certPath = tlsCertificateFilePath_;
tlsContext->keyPath = tlsPrivateFilePath_;
}
std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
if (isUseTls_) {
tlsContext.reset(new CurlWrapper::TlsContext);
tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_;
tlsContext->validateHostname = tlsValidateHostname_;
tlsContext->allowInsecure = tlsAllowInsecure_;
if (authDataContent->hasDataForTls()) {
tlsContext->certPath = authDataContent->getTlsCertificates();
tlsContext->keyPath = authDataContent->getTlsPrivateKey();
} else {
tlsContext->certPath = tlsCertificateFilePath_;
tlsContext->keyPath = tlsPrivateFilePath_;
}
}

LOG_INFO("Curl [" << reqCount << "] Lookup Request sent for " << completeUrl);
CurlWrapper::Options options;
options.timeoutInSeconds = lookupTimeoutInSeconds_;
options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR;
options.maxLookupRedirects = 1; // redirection is implemented by the outer loop
auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get());
const auto &error = result.error;
if (!error.empty()) {
LOG_ERROR(completeUrl << " failed: " << error);
return ResultConnectError;
}
LOG_INFO("Curl Lookup Request sent for " << completeUrl);
CurlWrapper::Options options;
options.timeoutInSeconds = lookupTimeoutInSeconds_;
options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR;
options.maxLookupRedirects = maxLookupRedirects_;
auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get());
const auto &error = result.error;
if (!error.empty()) {
LOG_ERROR(completeUrl << " failed: " << error);
return ResultConnectError;
}

responseData = result.responseData;
responseCode = result.responseCode;
auto res = result.code;
LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode
<< " curl res " << res);

const auto &redirectUrl = result.redirectUrl;
switch (res) {
case CURLE_OK:
if (responseCode == 200) {
retResult = ResultOk;
} else if (!redirectUrl.empty()) {
LOG_INFO("Response from url " << completeUrl << " to new url " << redirectUrl);
completeUrl = redirectUrl;
retResult = ResultLookupError;
} else {
retResult = ResultLookupError;
}
break;
case CURLE_COULDNT_CONNECT:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultRetryable;
break;
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultConnectError;
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultReadError;
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultTimeout;
break;
default:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultLookupError;
break;
}
if (redirectUrl.empty()) {
break;
}
responseData = result.responseData;
responseCode = result.responseCode;
auto res = result.code;
if (res == CURLE_OK) {
LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode);
} else if (res == CURLE_TOO_MANY_REDIRECTS) {
LOG_ERROR("Response received for url " << completeUrl << ": " << curl_easy_strerror(res)
<< ", curl error: " << result.serverError
<< ", redirect URL: " << result.redirectUrl);
} else {
LOG_ERROR("Response failed for url " << completeUrl << ": " << curl_easy_strerror(res)
<< ", curl error: " << result.serverError);
}

return retResult;
switch (res) {
case CURLE_OK:
return ResultOk;
case CURLE_COULDNT_CONNECT:
return ResultRetryable;
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
return ResultConnectError;
case CURLE_READ_ERROR:
return ResultReadError;
case CURLE_OPERATION_TIMEDOUT:
return ResultTimeout;
default:
return ResultLookupError;
}
}

LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) {
Expand Down
4 changes: 3 additions & 1 deletion tests/LookupServiceTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,9 @@ class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupServi

LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
size_t redirectCount) {
return BinaryProtoLookupService::findBroker(address, authoritative, topic, redirectCount);
Promise<Result, LookupResult> promise;
BinaryProtoLookupService::findBroker(address, authoritative, topic, redirectCount, promise);
return promise.getFuture();
}
}; // class BinaryProtoLookupServiceRedirectTestHelper

Expand Down