Skip to content

Commit

Permalink
[config] Add config can set brpc connection type and brpc failed need…
Browse files Browse the repository at this point in the history
… reconnect the server
  • Loading branch information
HappenLee committed Oct 17, 2024
1 parent 02c671b commit 3e99534
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 14 deletions.
7 changes: 4 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,9 @@ DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");

DEFINE_Int32(query_cache_size, "512");

DEFINE_String(brpc_connection_type, "single");

DEFINE_mBool(enable_brpc_failed_reconnected, "true");
// clang-format off
#ifdef BE_TEST
// test s3
Expand Down Expand Up @@ -1688,9 +1691,7 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t

if (config::is_cloud_mode()) {
auto st = config::set_config("enable_file_cache", "true", true, true);
LOG(INFO) << "set config enable_file_cache "
<< "true"
<< " " << st;
LOG(INFO) << "set config enable_file_cache " << "true" << " " << st;
}

return true;
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,11 @@ DECLARE_mBool(enable_pipeline_task_leakage_detect);
// MB
DECLARE_Int32(query_cache_size);

// use which connection type to connect brpc service
DECLARE_String(brpc_connection_type);

DECLARE_mBool(enable_brpc_failed_reconnected);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
_failed(id, err);
if (config::enable_brpc_failed_reconnected) {
auto remote_side = send_callback->cntl_->remote_side();
_state->exec_env()->brpc_internal_client_cache()->refresh_client(
remote_side, request.channel->_brpc_stub);
}
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()](
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <parallel_hashmap/phmap.h>
#include <stdint.h>

#include <atomic>
#include <cstdint>
#include <list>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -53,7 +53,7 @@ class ExchangeSinkLocalState;
namespace vectorized {
class PipChannel;

// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
// We use BroadcastPBlockHolder to hold a broadcast PBlock. For broadcast shuffle, one PBlock
// will be shared between different channel, so we have to use a ref count to mark if this
// PBlock is available for next serialization.
class BroadcastPBlockHolderMemLimiter;
Expand Down Expand Up @@ -82,8 +82,6 @@ class BroadcastPBlockHolderMemLimiter
ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderMemLimiter);

public:
BroadcastPBlockHolderMemLimiter() = delete;

BroadcastPBlockHolderMemLimiter(std::shared_ptr<pipeline::Dependency>& broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
}
Expand Down
11 changes: 6 additions & 5 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu);
_load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
_new_load_stream_mgr = NewLoadStreamMgr::create_shared();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_streaming_client_cache =
new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming");
_function_client_cache =
new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
_internal_client_cache =
new BrpcClientCache<PBackendService_Stub>("baidu_std", config::brpc_connection_type);
_streaming_client_cache = new BrpcClientCache<PBackendService_Stub>(
"baidu_std", config::brpc_connection_type, "streaming");
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>(
config::function_service_protocol, config::brpc_connection_type);
if (config::is_cloud_mode()) {
_stream_load_executor = std::make_shared<CloudStreamLoadExecutor>(this);
} else {
Expand Down
46 changes: 44 additions & 2 deletions be/src/util/brpc_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,49 @@ class BrpcClientCache {
return get_client(butil::endpoint2str(endpoint).c_str());
}

void refresh_client(const butil::EndPoint& endpoint, std::shared_ptr<T> origin_stub) {
refresh_client(butil::endpoint2str(endpoint).c_str(), origin_stub);
}

void refresh_client(const std::string& host_port, std::shared_ptr<T> origin_stub) {
int pos = host_port.rfind(':');
std::string host = host_port.substr(0, pos);
int port = 0;
try {
port = stoi(host_port.substr(pos + 1));
} catch (const std::exception& err) {
LOG(WARNING) << "failed to parse port from " << host_port << ": " << err.what();
}
refresh_client(host, port, origin_stub);
}

void refresh_client(const std::string& host, int port, std::shared_ptr<T> origin_stub) {
std::string realhost = host;
auto dns_cache = ExecEnv::GetInstance()->dns_cache();
if (dns_cache == nullptr) {
LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
} else if (!is_valid_ip(host)) {
Status status = dns_cache->get(host, &realhost);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host:" << status.to_string();
}
}
std::string host_port = get_host_port(realhost, port);

// new one stub and insert into map
auto stub = get_new_client_no_cache(host_port);
if (stub != nullptr) {
_stub_map.try_emplace_l(
host_port,
[&stub, &origin_stub](auto& v) {
if (v.second.get() == origin_stub.get()) {
v.second = stub;
}
},
stub);
}
}

#ifdef BE_TEST
virtual std::shared_ptr<T> get_client(const TNetworkAddress& taddr) {
std::string host_port = fmt::format("{}:{}", taddr.hostname, taddr.port);
Expand Down Expand Up @@ -105,8 +148,7 @@ class BrpcClientCache {
// new one stub and insert into map
auto stub = get_new_client_no_cache(host_port);
if (stub != nullptr) {
_stub_map.try_emplace_l(
host_port, [&stub](const auto& v) { stub = v.second; }, stub);
_stub_map.try_emplace_l(host_port, [&stub](const auto& v) { stub = v.second; }, stub);
}
return stub;
}
Expand Down

0 comments on commit 3e99534

Please sign in to comment.