From 74cf62fbb86dd6285ea105ec9d63c9f4d7b2c29e Mon Sep 17 00:00:00 2001 From: jbk <2634358021@qq.com> Date: Tue, 14 Mar 2023 12:10:17 +0000 Subject: [PATCH] wip_enablesumindir --- curvefs/src/client/fuse_client.cpp | 3 + curvefs/src/client/fuse_client.h | 8 +-- curvefs/src/client/xattr_manager.cpp | 103 +++++++++++++++++++++++++++ curvefs/src/client/xattr_manager.h | 10 +++ 4 files changed, 120 insertions(+), 4 deletions(-) diff --git a/curvefs/src/client/fuse_client.cpp b/curvefs/src/client/fuse_client.cpp index f6146ad62e..1b7af5691a 100644 --- a/curvefs/src/client/fuse_client.cpp +++ b/curvefs/src/client/fuse_client.cpp @@ -205,6 +205,9 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata, LOG(INFO) << "Mount " << fsName << " on " << mountpoint_.ShortDebugString() << " success!" << " enableSumInDir = " << enableSumInDir_; + if (enableSumInDir_) { + xattrManager_->RefreshAllXAttr(); + } fsMetric_ = std::make_shared(fsName); diff --git a/curvefs/src/client/fuse_client.h b/curvefs/src/client/fuse_client.h index 6b16df6e01..7437a29119 100644 --- a/curvefs/src/client/fuse_client.h +++ b/curvefs/src/client/fuse_client.h @@ -131,10 +131,10 @@ class FuseClient { virtual void FuseOpDestroy(void* userdata); - virtual CURVEFS_ERROR FuseOpWrite(fuse_req_t req, fuse_ino_t ino, - const char* buf, size_t size, off_t off, - struct fuse_file_info* fi, - size_t* wSize) = 0; + virtual CURVEFS_ERROR FuseOpWrite(fuse_req_t req, fuse_ino_t ino, + const char* buf, size_t size, off_t off, + struct fuse_file_info* fi, + size_t* wSize) = 0; virtual CURVEFS_ERROR FuseOpRead(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, diff --git a/curvefs/src/client/xattr_manager.cpp b/curvefs/src/client/xattr_manager.cpp index e9f72edcc6..12b7ca473e 100644 --- a/curvefs/src/client/xattr_manager.cpp +++ b/curvefs/src/client/xattr_manager.cpp @@ -283,6 +283,109 @@ CURVEFS_ERROR XattrManager::CalAllLayerSumInfo(InodeAttr *attr) { return CURVEFS_ERROR::OK; } +CURVEFS_ERROR XattrManager::RefreshAllXAttr() { + std::stack iStack; + std::mutex stackMutex; + Atomic inflightNum(0); + Atomic ret(true); + iStack.emplace(ROOTINODEID); + std::vector threadpool; + for (auto i = listDentryThreads_; i > 0; i--) { + try { + threadpool.emplace_back( + Thread(&XattrManager::ConcurrentRefreshInodeXattr, this, + &iStack, &stackMutex, &inflightNum, &ret)); + } catch (const std::exception &e) { + LOG(WARNING) << "RefreshInodeXattr create thread failed," + << " err: " << e.what(); + } + } + + if (threadpool.empty()) { + return CURVEFS_ERROR::INTERNAL; + } + + for (auto &thread : threadpool) { + thread.join(); + } + + if (!ret.load()) { + return CURVEFS_ERROR::INTERNAL; + } + + return CURVEFS_ERROR::OK; +} + +void XattrManager::ConcurrentRefreshInodeXattr( + std::stack *iStack, std::mutex *stackMutex, Atomic *inflightNum, Atomic *ret) { + while (1) { + std::list dentryList; + std::set inodeIds; + std::list attrs; + auto tret = ConcurrentListDentry(&dentryList, iStack, stackMutex, true, + inflightNum, ret); + if (!tret) { + return; + } + { + std::lock_guard guard(*stackMutex); + for (const auto &it : dentryList) { + iStack->emplace(it.inodeid()); + inodeIds.emplace(it.inodeid()); + } + inflightNum->fetch_sub(1); + } + if (!inodeIds.empty()){ + auto tret = inodeManager_->BatchGetInodeAttr(&inodeIds, &attrs); + if (tret == CURVEFS_ERROR::OK) { + for (auto &it : attrs) { + if (it.type() == FsFileType::TYPE_DIRECTORY) { + RefreshInodeXAttr(&it); + } + } + } else { + ret->store(false); + return; + } + } + } +} + +CURVEFS_ERROR XattrManager::RefreshInodeXAttr(InodeAttr *attr) { + auto ret = CalAllLayerSumInfo(attr); + std::shared_ptr InodeWrapper; + CURVEFS_ERROR ret = inodeManager_->GetInode(attr->inodeid(), InodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(ERROR) << "RefreshXAttr get root inode fail, ret = " << ret + << ", inodeid = " << ROOTINODEID; + return ret; + } + ::curve::common::UniqueLock lgGuard = InodeWrapper->GetUniqueLock(); + auto inodeXAttr = InodeWrapper->GetInodeLocked()->xattr(); + bool update = false; + for (const auto &it : *attr->mutable_xattr()) { + auto iter = inodeXAttr.find(it.first); + if (iter != inodeXAttr.end()) { + uint64_t dat = 0; + if (StringToUll(it.second, &dat)) { + if (!AddUllStringToFirst(&(iter->second), dat, true)) { + return CURVEFS_ERROR::INTERNAL; + } + } else { + LOG(ERROR) << "StringToUll failed, first = " << it.second; + return CURVEFS_ERROR::INTERNAL; + } + update = true; + } + } + if (update) { + InodeWrapper->MergeXAttrLocked(inodeXAttr); + inodeManager_->ShipToFlush(InodeWrapper); + } + + return ret; +} + void XattrManager::ConcurrentGetInodeXattr( std::stack *iStack, std::mutex *stackMutex, diff --git a/curvefs/src/client/xattr_manager.h b/curvefs/src/client/xattr_manager.h index ec36fbb2b3..31701410f8 100644 --- a/curvefs/src/client/xattr_manager.h +++ b/curvefs/src/client/xattr_manager.h @@ -74,6 +74,10 @@ class XattrManager { isStop_.store(true); } + CURVEFS_ERROR RefreshAllXAttr(); + + CURVEFS_ERROR RefreshInodeXAttr(InodeAttr *attr); + CURVEFS_ERROR GetXattr(const char* name, std::string *value, InodeAttr *attr, bool enableSumInDir); @@ -103,6 +107,12 @@ class XattrManager { Atomic *inflightNum, Atomic *ret); + void ConcurrentRefreshInodeXattr( + std::stack *iStack, + std::mutex *stackMutex, + Atomic *inflightNum, + Atomic *ret); + void ConcurrentGetInodeXattr( std::stack *iStack, std::mutex *stackMutex,