Skip to content

Commit

Permalink
async nameserver rpc service
Browse files Browse the repository at this point in the history
  • Loading branch information
00k committed Aug 5, 2016
1 parent 5b093fb commit ea85403
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ thirdsrc
thirdparty
output
.build
src/version.cc
1 change: 1 addition & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ DEFINE_int32(recover_timeout, 180, "Recover timeout for one chunkserver");
DEFINE_bool(clean_redundancy, false, "Clean redundant replica");
DEFINE_int32(nameserver_report_thread_num, 20, "Threads to handle block report");
DEFINE_int32(nameserver_work_thread_num, 20, "Work threads num");
DEFINE_int32(nameserver_heartbeat_thread_num, 5, "Heartbeat handle threads num");
DEFINE_bool(select_chunkserver_by_zone, false, "Select chunkserver by zone");
DEFINE_double(select_chunkserver_local_factor, 0.1, "Weighting factors of locality");
DEFINE_int32(blockmapping_bucket_num, 19, "Partation num of blockmapping");
Expand Down
82 changes: 58 additions & 24 deletions src/nameserver/nameserver_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DECLARE_int32(nameserver_safemode_time);
DECLARE_int32(chunkserver_max_pending_buffers);
DECLARE_int32(nameserver_report_thread_num);
DECLARE_int32(nameserver_work_thread_num);
DECLARE_int32(nameserver_heartbeat_thread_num);
DECLARE_int32(blockmapping_bucket_num);

namespace baidu {
Expand All @@ -50,6 +51,7 @@ NameServerImpl::NameServerImpl(Sync* sync) : safe_mode_(FLAGS_nameserver_safemod
block_mapping_manager_ = new BlockMappingManager(FLAGS_blockmapping_bucket_num);
report_thread_pool_ = new common::ThreadPool(FLAGS_nameserver_report_thread_num);
work_thread_pool_ = new common::ThreadPool(FLAGS_nameserver_work_thread_num);
heartbeat_thread_pool_ = new common::ThreadPool(FLAGS_nameserver_heartbeat_thread_num);
chunkserver_manager_ = new ChunkServerManager(work_thread_pool_, block_mapping_manager_);
namespace_ = new NameSpace(false);
if (sync_) {
Expand Down Expand Up @@ -173,14 +175,8 @@ void NameServerImpl::BlockReceived(::google::protobuf::RpcController* controller
done->Run();
return;
}
if (!response->has_sequence_id()) {
response->set_sequence_id(request->sequence_id());
boost::function<void ()> task =
boost::bind(&NameServerImpl::BlockReceived, this, controller, request, response, done);
work_thread_pool_->AddTask(task);
return;
}
g_block_report.Inc();
response->set_sequence_id(request->sequence_id());
int32_t cs_id = request->chunkserver_id();
LOG(INFO, "BlockReceived from C%d, %s, %d blocks",
cs_id, request->chunkserver_addr().c_str(), request->blocks_size());
Expand Down Expand Up @@ -227,14 +223,6 @@ void NameServerImpl::BlockReport(::google::protobuf::RpcController* controller,
return;
}
g_block_report.Inc();
if (!response->has_sequence_id()) {
int64_t receive_report_time = common::timer::get_micros();
response->set_sequence_id(receive_report_time);
boost::function<void ()> task =
boost::bind(&NameServerImpl::BlockReport, this, controller, request, response, done);
report_thread_pool_->AddTask(task);
return;
}
int32_t cs_id = request->chunkserver_id();
LOG(INFO, "Report from C%d %s %d blocks\n",
cs_id, request->chunkserver_addr().c_str(), request->blocks_size());
Expand Down Expand Up @@ -295,9 +283,9 @@ void NameServerImpl::BlockReport(::google::protobuf::RpcController* controller,
block_mapping_manager_->GetCloseBlocks(cs_id, response->mutable_close_blocks());
int64_t end_report = common::timer::get_micros();
if (end_report - start_report > 100 * 1000) {
LOG(WARNING, "C%d report use %d micors, update use %d micors, add block use %d micors, wait %d micros",
LOG(WARNING, "C%d report use %d micors, update use %d micors, add block use %d micors",
cs_id, end_report - start_report,
before_add_block - before_update, after_add_block - before_add_block, start_report - response->sequence_id());
before_add_block - before_update, after_add_block - before_add_block);
}
response->set_status(kOK);
done->Run();
Expand Down Expand Up @@ -579,13 +567,6 @@ void NameServerImpl::ListDirectory(::google::protobuf::RpcController* controller
done->Run();
return;
}
if (!response->has_sequence_id()) {
response->set_sequence_id(request->sequence_id());
boost::function<void ()> task =
boost::bind(&NameServerImpl::ListDirectory, this, controller, request, response, done);
work_thread_pool_->AddTask(task);
return;
}
g_list_dir.Inc();
response->set_sequence_id(request->sequence_id());
std::string path = NameSpace::NormalizePath(request->path());
Expand Down Expand Up @@ -1111,6 +1092,59 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
return true;
}

static void CallMethodHelper(NameServerImpl* impl,
const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) {
impl->NameServer::CallMethod(method, controller, request, response, done);
}

void NameServerImpl::CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) {
// the sequence of following list must correspond to the sequence of rpc in
// 'service NameServer { ... }' at nameserver.proto file
static std::pair<std::string, ThreadPool*> ThreadPoolOfMethod[] = {
std::make_pair("CreateFile", work_thread_pool_),
std::make_pair("AddBlock", work_thread_pool_),
std::make_pair("GetFileLocation", work_thread_pool_),
std::make_pair("ListDirectory", work_thread_pool_),
std::make_pair("Stat", work_thread_pool_),
std::make_pair("Rename", work_thread_pool_),
std::make_pair("FinishBlock", work_thread_pool_),
std::make_pair("Unlink", work_thread_pool_),
std::make_pair("DeleteDirectory", work_thread_pool_),
std::make_pair("ChangeReplicaNum", work_thread_pool_),
std::make_pair("ShutdownChunkServer", work_thread_pool_),
std::make_pair("ShutdownChunkServerStat", work_thread_pool_),
std::make_pair("DiskUsage", work_thread_pool_),
std::make_pair("Register", work_thread_pool_),
std::make_pair("HeartBeat", heartbeat_thread_pool_),
std::make_pair("BlockReport", report_thread_pool_),
std::make_pair("BlockReceived", work_thread_pool_),
std::make_pair("PushBlockReport", work_thread_pool_),
std::make_pair("SysStat", work_thread_pool_)
};
static int method_num = sizeof(ThreadPoolOfMethod) /
sizeof(std::pair<std::string, ThreadPool*>);
int id = method->index();
assert(id < method_num);
assert(method->name() == ThreadPoolOfMethod[id].first);

ThreadPool* thread_pool = ThreadPoolOfMethod[id].second;
if (thread_pool != NULL) {
boost::function<void ()> task =
boost::bind(&CallMethodHelper, this, method, controller, request, response, done);
thread_pool->AddTask(task);
} else {
NameServer::CallMethod(method, controller, request, response, done);
}
}

} // namespace bfs
} // namespace baidu

Expand Down
6 changes: 6 additions & 0 deletions src/nameserver/nameserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ class NameServerImpl : public NameServer {
void TransToString(const std::map<int32_t, std::set<int64_t> >& chk_set,
std::string* output);
void TransToString(const std::set<int64_t>& block_set, std::string* output);
void CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done);
private:
/// Global thread pool
ThreadPool* work_thread_pool_;
ThreadPool* report_thread_pool_;
ThreadPool* heartbeat_thread_pool_;
/// ChunkServer map
ChunkServerManager* chunkserver_manager_;
/// Block map
Expand Down

0 comments on commit ea85403

Please sign in to comment.