From ea85403392b78146b325a3a19251527b33041c61 Mon Sep 17 00:00:00 2001 From: 00k Date: Fri, 5 Aug 2016 17:06:54 +0800 Subject: [PATCH] async nameserver rpc service --- .gitignore | 1 + src/flags.cc | 1 + src/nameserver/nameserver_impl.cc | 82 ++++++++++++++++++++++--------- src/nameserver/nameserver_impl.h | 6 +++ 4 files changed, 66 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index 0ff19e07..57b5c7ce 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ thirdsrc thirdparty output .build +src/version.cc diff --git a/src/flags.cc b/src/flags.cc index f27ef7b9..14c05f09 100644 --- a/src/flags.cc +++ b/src/flags.cc @@ -27,6 +27,7 @@ DEFINE_int32(recover_timeout, 180, "Recover timeout for one chunkserver"); DEFINE_bool(clean_redundancy, false, "Clean redundant replica"); DEFINE_int32(nameserver_report_thread_num, 20, "Threads to handle block report"); DEFINE_int32(nameserver_work_thread_num, 20, "Work threads num"); +DEFINE_int32(nameserver_heartbeat_thread_num, 5, "Heartbeat handle threads num"); DEFINE_bool(select_chunkserver_by_zone, false, "Select chunkserver by zone"); DEFINE_double(select_chunkserver_local_factor, 0.1, "Weighting factors of locality"); DEFINE_int32(blockmapping_bucket_num, 19, "Partation num of blockmapping"); diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index 26838798..13a14733 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -31,6 +31,7 @@ DECLARE_int32(nameserver_safemode_time); DECLARE_int32(chunkserver_max_pending_buffers); DECLARE_int32(nameserver_report_thread_num); DECLARE_int32(nameserver_work_thread_num); +DECLARE_int32(nameserver_heartbeat_thread_num); DECLARE_int32(blockmapping_bucket_num); namespace baidu { @@ -50,6 +51,7 @@ NameServerImpl::NameServerImpl(Sync* sync) : safe_mode_(FLAGS_nameserver_safemod block_mapping_manager_ = new BlockMappingManager(FLAGS_blockmapping_bucket_num); report_thread_pool_ = new common::ThreadPool(FLAGS_nameserver_report_thread_num); work_thread_pool_ = new common::ThreadPool(FLAGS_nameserver_work_thread_num); + heartbeat_thread_pool_ = new common::ThreadPool(FLAGS_nameserver_heartbeat_thread_num); chunkserver_manager_ = new ChunkServerManager(work_thread_pool_, block_mapping_manager_); namespace_ = new NameSpace(false); if (sync_) { @@ -173,14 +175,8 @@ void NameServerImpl::BlockReceived(::google::protobuf::RpcController* controller done->Run(); return; } - if (!response->has_sequence_id()) { - response->set_sequence_id(request->sequence_id()); - boost::function task = - boost::bind(&NameServerImpl::BlockReceived, this, controller, request, response, done); - work_thread_pool_->AddTask(task); - return; - } g_block_report.Inc(); + response->set_sequence_id(request->sequence_id()); int32_t cs_id = request->chunkserver_id(); LOG(INFO, "BlockReceived from C%d, %s, %d blocks", cs_id, request->chunkserver_addr().c_str(), request->blocks_size()); @@ -227,14 +223,6 @@ void NameServerImpl::BlockReport(::google::protobuf::RpcController* controller, return; } g_block_report.Inc(); - if (!response->has_sequence_id()) { - int64_t receive_report_time = common::timer::get_micros(); - response->set_sequence_id(receive_report_time); - boost::function task = - boost::bind(&NameServerImpl::BlockReport, this, controller, request, response, done); - report_thread_pool_->AddTask(task); - return; - } int32_t cs_id = request->chunkserver_id(); LOG(INFO, "Report from C%d %s %d blocks\n", cs_id, request->chunkserver_addr().c_str(), request->blocks_size()); @@ -295,9 +283,9 @@ void NameServerImpl::BlockReport(::google::protobuf::RpcController* controller, block_mapping_manager_->GetCloseBlocks(cs_id, response->mutable_close_blocks()); int64_t end_report = common::timer::get_micros(); if (end_report - start_report > 100 * 1000) { - LOG(WARNING, "C%d report use %d micors, update use %d micors, add block use %d micors, wait %d micros", + LOG(WARNING, "C%d report use %d micors, update use %d micors, add block use %d micors", cs_id, end_report - start_report, - before_add_block - before_update, after_add_block - before_add_block, start_report - response->sequence_id()); + before_add_block - before_update, after_add_block - before_add_block); } response->set_status(kOK); done->Run(); @@ -579,13 +567,6 @@ void NameServerImpl::ListDirectory(::google::protobuf::RpcController* controller done->Run(); return; } - if (!response->has_sequence_id()) { - response->set_sequence_id(request->sequence_id()); - boost::function task = - boost::bind(&NameServerImpl::ListDirectory, this, controller, request, response, done); - work_thread_pool_->AddTask(task); - return; - } g_list_dir.Inc(); response->set_sequence_id(request->sequence_id()); std::string path = NameSpace::NormalizePath(request->path()); @@ -1111,6 +1092,59 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request, return true; } +static void CallMethodHelper(NameServerImpl* impl, + const ::google::protobuf::MethodDescriptor* method, + ::google::protobuf::RpcController* controller, + const ::google::protobuf::Message* request, + ::google::protobuf::Message* response, + ::google::protobuf::Closure* done) { + impl->NameServer::CallMethod(method, controller, request, response, done); +} + +void NameServerImpl::CallMethod(const ::google::protobuf::MethodDescriptor* method, + ::google::protobuf::RpcController* controller, + const ::google::protobuf::Message* request, + ::google::protobuf::Message* response, + ::google::protobuf::Closure* done) { + // the sequence of following list must correspond to the sequence of rpc in + // 'service NameServer { ... }' at nameserver.proto file + static std::pair ThreadPoolOfMethod[] = { + std::make_pair("CreateFile", work_thread_pool_), + std::make_pair("AddBlock", work_thread_pool_), + std::make_pair("GetFileLocation", work_thread_pool_), + std::make_pair("ListDirectory", work_thread_pool_), + std::make_pair("Stat", work_thread_pool_), + std::make_pair("Rename", work_thread_pool_), + std::make_pair("FinishBlock", work_thread_pool_), + std::make_pair("Unlink", work_thread_pool_), + std::make_pair("DeleteDirectory", work_thread_pool_), + std::make_pair("ChangeReplicaNum", work_thread_pool_), + std::make_pair("ShutdownChunkServer", work_thread_pool_), + std::make_pair("ShutdownChunkServerStat", work_thread_pool_), + std::make_pair("DiskUsage", work_thread_pool_), + std::make_pair("Register", work_thread_pool_), + std::make_pair("HeartBeat", heartbeat_thread_pool_), + std::make_pair("BlockReport", report_thread_pool_), + std::make_pair("BlockReceived", work_thread_pool_), + std::make_pair("PushBlockReport", work_thread_pool_), + std::make_pair("SysStat", work_thread_pool_) + }; + static int method_num = sizeof(ThreadPoolOfMethod) / + sizeof(std::pair); + int id = method->index(); + assert(id < method_num); + assert(method->name() == ThreadPoolOfMethod[id].first); + + ThreadPool* thread_pool = ThreadPoolOfMethod[id].second; + if (thread_pool != NULL) { + boost::function task = + boost::bind(&CallMethodHelper, this, method, controller, request, response, done); + thread_pool->AddTask(task); + } else { + NameServer::CallMethod(method, controller, request, response, done); + } +} + } // namespace bfs } // namespace baidu diff --git a/src/nameserver/nameserver_impl.h b/src/nameserver/nameserver_impl.h index a875e493..ecfcfb56 100644 --- a/src/nameserver/nameserver_impl.h +++ b/src/nameserver/nameserver_impl.h @@ -127,10 +127,16 @@ class NameServerImpl : public NameServer { void TransToString(const std::map >& chk_set, std::string* output); void TransToString(const std::set& block_set, std::string* output); + void CallMethod(const ::google::protobuf::MethodDescriptor* method, + ::google::protobuf::RpcController* controller, + const ::google::protobuf::Message* request, + ::google::protobuf::Message* response, + ::google::protobuf::Closure* done); private: /// Global thread pool ThreadPool* work_thread_pool_; ThreadPool* report_thread_pool_; + ThreadPool* heartbeat_thread_pool_; /// ChunkServer map ChunkServerManager* chunkserver_manager_; /// Block map