From a81c673a50f4974b23407e5444b4b6f32fd4d217 Mon Sep 17 00:00:00 2001 From: Hanqing Wu Date: Sun, 10 Dec 2023 16:06:21 +0800 Subject: [PATCH] curvefs(metaserver): Use DoublyBufferedData to replace read-write locks to manage copysets Signed-off-by: Hanqing Wu --- curvefs/src/metaserver/copyset/copyset_node.h | 6 + .../copyset/copyset_node_manager.cpp | 223 +++++++++++------- .../metaserver/copyset/copyset_node_manager.h | 43 ++-- 3 files changed, 175 insertions(+), 97 deletions(-) diff --git a/curvefs/src/metaserver/copyset/copyset_node.h b/curvefs/src/metaserver/copyset/copyset_node.h index b1a6f14760..3f895a3ed8 100644 --- a/curvefs/src/metaserver/copyset/copyset_node.h +++ b/curvefs/src/metaserver/copyset/copyset_node.h @@ -102,6 +102,8 @@ class CopysetNode : public braft::StateMachine { virtual PeerId GetLeaderId() const; + GroupId GetGroupId() const; + MetaStore* GetMetaStore() const; virtual uint64_t GetConfEpoch() const; @@ -344,6 +346,10 @@ inline bool CopysetNode::IsLoading() const { return isLoading_.load(std::memory_order_acquire); } +inline GroupId CopysetNode::GetGroupId() const { + return groupId_; +} + } // namespace copyset } // namespace metaserver } // namespace curvefs diff --git a/curvefs/src/metaserver/copyset/copyset_node_manager.cpp b/curvefs/src/metaserver/copyset/copyset_node_manager.cpp index a3ceeb5b04..75786135db 100644 --- a/curvefs/src/metaserver/copyset/copyset_node_manager.cpp +++ b/curvefs/src/metaserver/copyset/copyset_node_manager.cpp @@ -32,58 +32,48 @@ #include "curvefs/src/metaserver/copyset/utils.h" #include "src/common/timeutility.h" +#include "src/common/concurrent/generic_name_lock.h" + namespace curvefs { namespace metaserver { namespace copyset { using ::curve::common::TimeUtility; +using NameLockGuard = curve::common::GenericNameLockGuard; bool CopysetNodeManager::IsLoadFinished() const { return loadFinished_.load(std::memory_order_acquire); } -bool CopysetNodeManager::DeleteCopysetNodeInternal(PoolId poolId, - CopysetId copysetId, - bool removeData) { - GroupId groupId = ToGroupId(poolId, copysetId); - - // stop copyset node first - { - ReadLockGuard lock(lock_); - auto it = copysets_.find(groupId); - if (it != copysets_.end()) { - it->second->Stop(); - } else { - LOG(WARNING) << "Delete copyset failed, copyset " - << ToGroupIdString(poolId, copysetId) << " not found"; - return false; - } +bool CopysetNodeManager::DeleteCopysetNodeInternalLocked(PoolId poolId, + CopysetId copysetId, + bool removeData) { + auto node = GetSharedCopysetNode(poolId, copysetId); + if (node == nullptr) { + LOG(WARNING) << "Delete copyset failed, copyset: " + << ToGroupIdString(poolId, copysetId) << " not found"; + return false; } - // remove copyset node - { - WriteLockGuard lock(lock_); - auto it = copysets_.find(groupId); - if (it != copysets_.end()) { - bool ret = true; - if (removeData) { - std::string copysetDataDir = it->second->GetCopysetDataDir(); - if (!trash_.RecycleCopyset(copysetDataDir)) { - LOG(WARNING) << "Recycle copyset remote data failed, " - "copyset data path: '" - << copysetDataDir << "'"; - ret = false; - } - } - - copysets_.erase(it); - LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId) - << " success"; - return ret; + node->Stop(); + LOG(INFO) << "Copyset " << ToGroupIdString(poolId, copysetId) << " stopped"; + + bool ret = true; + if (removeData) { + std::string copysetDataDir = node->GetCopysetDataDir(); + if (!trash_.RecycleCopyset(copysetDataDir)) { + LOG(WARNING) << "Recycle copyset remove data failed, " + "copyset data path: '" + << copysetDataDir << "'"; + ret = false; } } - return false; + // delete node + copysets_.Modify(RemoveCopysetNode, node); + LOG(INFO) << "Delete copyset " << ToGroupIdString(poolId, copysetId) + << " success"; + return ret; } bool CopysetNodeManager::Init(const CopysetNodeOptions& options) { @@ -107,11 +97,11 @@ bool CopysetNodeManager::Start() { loadFinished_.store(true, std::memory_order_release); LOG(INFO) << "Reload copysets success"; return true; - } else { - running_.store(false, std::memory_order_release); - LOG(ERROR) << "Reload copysets failed"; - return false; } + + running_.store(false, std::memory_order_release); + LOG(ERROR) << "Reload copysets failed"; + return false; } bool CopysetNodeManager::Stop() { @@ -123,16 +113,24 @@ bool CopysetNodeManager::Stop() { loadFinished_.store(false); { - ReadLockGuard lock(lock_); - for (auto& copyset : copysets_) { + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(ERROR) << "Fail to get copyset nodes"; + return false; + } + + for (const auto& copyset : *ptr) { copyset.second->Stop(); } } - { - WriteLockGuard lock(lock_); - copysets_.clear(); - } + auto clear = [](CopysetNodeMap& map) -> size_t { + map.clear(); + return 1; + }; + + // clear copysets + copysets_.Modify(clear); if (!trash_.Stop()) { LOG(ERROR) << "Stop trash failed"; @@ -146,54 +144,105 @@ bool CopysetNodeManager::Stop() { CopysetNode* CopysetNodeManager::GetCopysetNode(PoolId poolId, CopysetId copysetId) { - ReadLockGuard lock(lock_); + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); + return nullptr; + } - auto it = copysets_.find(ToGroupId(poolId, copysetId)); - if (it != copysets_.end()) { + auto it = ptr->find(ToGroupId(poolId, copysetId)); + if (it != ptr->end()) { return it->second.get(); } + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); return nullptr; } std::shared_ptr CopysetNodeManager::GetSharedCopysetNode( PoolId poolId, CopysetId copysetId) { - ReadLockGuard lock(lock_); + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); + return nullptr; + } - auto it = copysets_.find(ToGroupId(poolId, copysetId)); - if (it != copysets_.end()) { + auto it = ptr->find(ToGroupId(poolId, copysetId)); + if (it != ptr->end()) { return it->second; } + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(poolId, copysetId); return nullptr; } int CopysetNodeManager::IsCopysetNodeExist( const CreateCopysetRequest::Copyset& copyset) { - ReadLockGuard lock(lock_); - auto iter = copysets_.find(ToGroupId(copyset.poolid(), - copyset.copysetid())); - if (iter == copysets_.end()) { + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get copyset: " + << ToGroupIdString(copyset.poolid(), copyset.copysetid()); + return 0; + } + + auto iter = ptr->find(ToGroupId(copyset.poolid(), copyset.copysetid())); + if (iter == ptr->end()) { return 0; - } else { - auto copysetNode = iter->second.get(); - std::vector peers; - copysetNode->ListPeers(&peers); - if (peers.size() != static_cast(copyset.peers_size())) { + } + + auto* copysetNode = iter->second.get(); + std::vector peers; + copysetNode->ListPeers(&peers); + if (peers.size() != static_cast(copyset.peers_size())) { + return -1; + } + + for (int i = 0; i < copyset.peers_size(); i++) { + const auto& cspeer = copyset.peers(i); + auto iter = + std::find_if(peers.begin(), peers.end(), [&cspeer](const Peer& p) { + return cspeer.address() == p.address(); + }); + if (iter == peers.end()) { return -1; } + } - for (int i = 0; i < copyset.peers_size(); i++) { - const auto& cspeer = copyset.peers(i); - auto iter = std::find_if(peers.begin(), peers.end(), - [&cspeer](const Peer& p) { - return cspeer.address() == p.address(); - }); - if (iter == peers.end()) { - return -1; - } - } + return 1; +} + +size_t CopysetNodeManager::AddCopysetNode( + CopysetNodeMap& map, const std::shared_ptr& copysetNode) { + assert(copysetNode != nullptr); + + auto groupId = copysetNode->GetGroupId(); + auto it = map.find(groupId); + if (it != map.end()) { + LOG(WARNING) << "Copyset node already exists: " << groupId; + return 0; } + + auto ret = map.emplace(groupId, copysetNode); + CHECK(ret.second); + return 1; +} + +size_t CopysetNodeManager::RemoveCopysetNode( + CopysetNodeMap& map, const std::shared_ptr& copysetNode) { + assert(copysetNode != nullptr); + + auto groupId = copysetNode->GetGroupId(); + auto it = map.find(groupId); + if (it == map.end()) { + LOG(WARNING) << "Copyset node not found: " << groupId; + return 0; + } + + map.erase(it); return 1; } @@ -209,9 +258,11 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, braft::GroupId groupId = ToGroupId(poolId, copysetId); std::shared_ptr copysetNode; + NameLockGuard guard(copysetLifetimeNameLock_, groupId); + { - WriteLockGuard lock(lock_); - if (copysets_.count(groupId) != 0) { + auto* exist = GetCopysetNode(poolId, copysetId); + if (exist != nullptr) { LOG(WARNING) << "Copyset node already exists: " << ToGroupIdString(poolId, copysetId); return false; @@ -225,7 +276,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, return false; } - copysets_.emplace(groupId, copysetNode); + copysets_.Modify(AddCopysetNode, copysetNode); } // node start maybe time-consuming @@ -234,7 +285,7 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, // restart, in this case we should not automaticaly remove copyset's // data const bool removeData = checkLoadFinish; - DeleteCopysetNodeInternal(poolId, copysetId, removeData); + DeleteCopysetNodeInternalLocked(poolId, copysetId, removeData); LOG(ERROR) << "Copyset " << ToGroupIdString(poolId, copysetId) << " start failed"; return false; @@ -246,10 +297,16 @@ bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId, } void CopysetNodeManager::GetAllCopysets( - std::vector *nodes) const { + std::vector* nodes) const { nodes->clear(); - ReadLockGuard lock(lock_); - for (auto& copyset : copysets_) { + butil::DoublyBufferedData::ScopedPtr ptr; + if (copysets_.Read(&ptr) != 0) { + LOG(WARNING) << "Fail to get all copysets"; + return; + } + + nodes->reserve(ptr->size()); + for (const auto& copyset : *ptr) { nodes->push_back(copyset.second.get()); } } @@ -266,11 +323,15 @@ void CopysetNodeManager::AddService(brpc::Server* server, } bool CopysetNodeManager::DeleteCopysetNode(PoolId poolId, CopysetId copysetId) { - return DeleteCopysetNodeInternal(poolId, copysetId, false); + GroupId groupId = ToGroupId(poolId, copysetId); + NameLockGuard guard(copysetLifetimeNameLock_, groupId); + return DeleteCopysetNodeInternalLocked(poolId, copysetId, false); } bool CopysetNodeManager::PurgeCopysetNode(PoolId poolId, CopysetId copysetId) { - return DeleteCopysetNodeInternal(poolId, copysetId, true); + GroupId groupId = ToGroupId(poolId, copysetId); + NameLockGuard guard(copysetLifetimeNameLock_, groupId); + return DeleteCopysetNodeInternalLocked(poolId, copysetId, true); } } // namespace copyset diff --git a/curvefs/src/metaserver/copyset/copyset_node_manager.h b/curvefs/src/metaserver/copyset/copyset_node_manager.h index 41b58dde67..a0f23a4df6 100644 --- a/curvefs/src/metaserver/copyset/copyset_node_manager.h +++ b/curvefs/src/metaserver/copyset/copyset_node_manager.h @@ -29,10 +29,14 @@ #include #include +#include "butil/containers/doubly_buffered_data.h" + #include "curvefs/src/metaserver/common/types.h" #include "curvefs/src/metaserver/copyset/copyset_node.h" #include "curvefs/src/metaserver/copyset/types.h" +#include "src/common/concurrent/generic_name_lock.h" + namespace curvefs { namespace metaserver { namespace copyset { @@ -79,12 +83,7 @@ class CopysetNodeManager { virtual bool IsLoadFinished() const; public: - CopysetNodeManager() - : options_(), - running_(false), - loadFinished_(false), - lock_(), - copysets_() {} + CopysetNodeManager() = default; public: /** @@ -92,28 +91,40 @@ class CopysetNodeManager { */ void AddService(brpc::Server* server, const butil::EndPoint& listenAddr); - private: - bool DeleteCopysetNodeInternal(PoolId poolId, CopysetId copysetId, - bool removeData); - private: using CopysetNodeMap = std::unordered_map>; + bool DeleteCopysetNodeInternalLocked(PoolId poolId, CopysetId copysetId, + bool removeData); + + // Add copyset node to copyset map. + // Return 1 if success, 0 otherwise + static size_t AddCopysetNode( + CopysetNodeMap& map, // NOLINT(runtime/references) + const std::shared_ptr& copysetNode); + + // Remove copyset node from copyset map. + // Return 1 if success, 0 otherwise + static size_t RemoveCopysetNode( + CopysetNodeMap& map, // NOLINT(runtime/references) + const std::shared_ptr& copysetNode); + + private: CopysetNodeOptions options_; - std::atomic running_; + std::atomic running_{false}; // whether copyset is loaded finished, manager will reject create copyset // request if load unfinished - std::atomic loadFinished_; + std::atomic loadFinished_{false}; - // protected copysets_ - mutable RWLock lock_; + CopysetTrash trash_; - CopysetNodeMap copysets_; + mutable butil::DoublyBufferedData copysets_; - CopysetTrash trash_; + // copyset name lock + curve::common::GenericNameLock copysetLifetimeNameLock_; }; } // namespace copyset