Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: jbk <[email protected]>

fix_enableSumInDir

Signed-off-by: jbk <[email protected]>

add_atomic

Signed-off-by: jbk <[email protected]>

fix

Signed-off-by: jbk <[email protected]>
  • Loading branch information
bit-dance committed May 28, 2023
1 parent 4bb0b52 commit 9dea224
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ build --cxxopt -Wno-error=format-security
build:gcc7-later --cxxopt -faligned-new
build --incompatible_blacklisted_protos_requires_proto_info=false
build --copt=-fdiagnostics-color=always
run --copt=-fdiagnostics-color=always
run --copt=-fdiagnostics-color=always
1 change: 1 addition & 0 deletions curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ message RefreshSessionRequest {
message RefreshSessionResponse {
required FSStatusCode statusCode = 1;
repeated topology.PartitionTxId latestTxIdList = 2;
optional bool enableSumInDir = 3;
}

message DLockValue {
Expand Down
29 changes: 16 additions & 13 deletions curvefs/src/client/fuse_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
auto channelManager = std::make_shared<ChannelManager<MetaserverID>>();

leaseExecutor_ = absl::make_unique<LeaseExecutor>(option.leaseOpt,
metaCache, mdsClient_);
metaCache, mdsClient_,
&enableSumInDir_);

xattrManager_ = std::make_shared<XattrManager>(inodeManager_,
dentryManager_, option_.listDentryLimit, option_.listDentryThreads);
Expand Down Expand Up @@ -360,7 +361,7 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
inodeWrapper->MarkDirty();
}

if (enableSumInDir_ && length != 0) {
if (enableSumInDir_.load() && length != 0) {
// update parent summary info
const Inode *inode = inodeWrapper->GetInodeLocked();
XAttr xattr;
Expand Down Expand Up @@ -495,7 +496,7 @@ CURVEFS_ERROR FuseClient::MakeNode(fuse_req_t req, fuse_ino_t parent,
<< ", parent = " << parent << ", name = " << name
<< ", mode = " << mode;

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -568,7 +569,7 @@ CURVEFS_ERROR FuseClient::DeleteNode(uint64_t ino, fuse_ino_t parent,
<< ", parent = " << parent << ", name = " << name;
}

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -668,7 +669,7 @@ CURVEFS_ERROR FuseClient::CreateManageNode(fuse_req_t req, uint64_t parent,
<< ", parent = " << parent << ", name = " << name
<< ", mode = " << mode;

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -1025,7 +1026,7 @@ CURVEFS_ERROR FuseClient::FuseOpRename(fuse_req_t req, fuse_ino_t parent,
renameOp.UpdateInodeCtime();
renameOp.UpdateCache();

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
xattrManager_->UpdateParentXattrAfterRename(
parent, newparent, newname, &renameOp);
}
Expand Down Expand Up @@ -1117,7 +1118,7 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
inodeWrapper->GetInodeAttrLocked(&inodeAttr);
InodeAttr2ParamAttr(inodeAttr, attrOut);

if (enableSumInDir_ && changeSize != 0) {
if (enableSumInDir_.load()&& changeSize != 0) {
// update parent summary info
const Inode* inode = inodeWrapper->GetInodeLocked();
XAttr xattr;
Expand Down Expand Up @@ -1164,7 +1165,8 @@ CURVEFS_ERROR FuseClient::FuseOpGetXattr(fuse_req_t req, fuse_ino_t ino,
return ret;
}

ret = xattrManager_->GetXattr(name, value, &inodeAttr, enableSumInDir_);
ret = xattrManager_->GetXattr(name, value,
&inodeAttr, enableSumInDir_.load());
if (CURVEFS_ERROR::OK != ret) {
LOG(ERROR) << "xattrManager get xattr failed, name = " << name;
return ret;
Expand Down Expand Up @@ -1334,7 +1336,7 @@ CURVEFS_ERROR FuseClient::FuseOpSymlink(fuse_req_t req, const char *link,
return ret;
}

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -1405,7 +1407,7 @@ CURVEFS_ERROR FuseClient::FuseOpLink(fuse_req_t req, fuse_ino_t ino,
return ret;
}

if (enableSumInDir_) {
if (enableSumInDir_.load()) {
// update parent summary info
XAttr xattr;
xattr.mutable_xattrinfos()->insert({XATTRENTRIES, "1"});
Expand Down Expand Up @@ -1488,14 +1490,15 @@ FuseClient::SetMountStatus(const struct MountOption *mountOption) {
}
inodeManager_->SetFsId(fsInfo_->fsid());
dentryManager_->SetFsId(fsInfo_->fsid());
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
enableSumInDir_.store(fsInfo_->enablesumindir());
if (fsInfo_->has_recycletimehour()) {
enableSumInDir_ = enableSumInDir_ && (fsInfo_->recycletimehour() == 0);
enableSumInDir_.store(enableSumInDir_.load() &&
(fsInfo_->recycletimehour() == 0));
}

LOG(INFO) << "Mount " << fsName << " on " << mountpoint_.ShortDebugString()
<< " success!"
<< " enableSumInDir = " << enableSumInDir_;
<< " enableSumInDir = " << enableSumInDir_.load();

fsMetric_ = std::make_shared<FSMetric>(fsName);

Expand Down
3 changes: 2 additions & 1 deletion curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <list>
#include <utility>
#include <vector>
#include <atomic>

#include "curvefs/proto/common.pb.h"
#include "curvefs/proto/mds.pb.h"
Expand Down Expand Up @@ -394,7 +395,7 @@ class FuseClient {
bool init_;

// enable record summary info in dir inode xattr
bool enableSumInDir_;
std::atomic<bool> enableSumInDir_;

std::shared_ptr<FSMetric> fsMetric_;

Expand Down
5 changes: 3 additions & 2 deletions curvefs/src/client/lease/lease_excutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ LeaseExecutor::~LeaseExecutor() {
bool LeaseExecutor::Start() {
if (opt_.leaseTimeUs <= 0 || opt_.refreshTimesPerLease <= 0) {
LOG(ERROR) << "LeaseExecutor start fail. Invalid param in leaseopt, "
"leasTimesUs = "
"leaseTimeUs = "
<< opt_.leaseTimeUs
<< ", refreshTimePerLease = " << opt_.refreshTimesPerLease;
return false;
Expand Down Expand Up @@ -78,7 +78,8 @@ bool LeaseExecutor::RefreshLease() {
// refresh from mds
std::vector<PartitionTxId> latestTxIdList;
FSStatusCode ret = mdsCli_->RefreshSession(txIds, &latestTxIdList,
fsName_, mountpoint_);
fsName_, mountpoint_,
enableSumInDir_);
if (ret != FSStatusCode::OK) {
LOG(ERROR) << "LeaseExecutor refresh session fail, ret = " << ret
<< ", errorName = " << FSStatusCode_Name(ret);
Expand Down
8 changes: 6 additions & 2 deletions curvefs/src/client/lease/lease_excutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <memory>
#include <string>
#include <atomic>

#include "curvefs/src/client/rpcclient/metacache.h"
#include "curvefs/src/client/rpcclient/mds_client.h"
Expand All @@ -44,8 +45,10 @@ namespace client {
class LeaseExecutor : public LeaseExecutorBase {
public:
LeaseExecutor(const LeaseOpt &opt, std::shared_ptr<MetaCache> metaCache,
std::shared_ptr<MdsClient> mdsCli)
: opt_(opt), metaCache_(metaCache), mdsCli_(mdsCli) {}
std::shared_ptr<MdsClient> mdsCli,
std::atomic<bool>* enableSumInDir)
: opt_(opt), metaCache_(metaCache), mdsCli_(mdsCli),
enableSumInDir_(enableSumInDir) {}

~LeaseExecutor();

Expand Down Expand Up @@ -73,6 +76,7 @@ class LeaseExecutor : public LeaseExecutorBase {
std::unique_ptr<RefreshSessionTask> task_;
std::string fsName_;
Mountpoint mountpoint_;
std::atomic<bool>* enableSumInDir_;
};

} // namespace client
Expand Down
8 changes: 7 additions & 1 deletion curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ FSStatusCode
MdsClientImpl::RefreshSession(const std::vector<PartitionTxId> &txIds,
std::vector<PartitionTxId> *latestTxIdList,
const std::string& fsName,
const Mountpoint& mountpoint) {
const Mountpoint& mountpoint,
std::atomic<bool>* enableSumInDir) {
auto task = RPCTask {
(void)addrindex;
(void)rpctimeoutMS;
Expand Down Expand Up @@ -532,6 +533,11 @@ MdsClientImpl::RefreshSession(const std::vector<PartitionTxId> &txIds,
LOG(INFO) << "RefreshSession need update partition txid list: "
<< response.DebugString();
}
if (enableSumInDir->load() && !response.enablesumindir()) {
enableSumInDir->store(response.enablesumindir());
LOG(INFO) << "update enableSumInDir to "
<< response.enablesumindir();
}

return ret;
};
Expand Down
7 changes: 5 additions & 2 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <memory>
#include <string>
#include <vector>
#include <atomic>

#include "curvefs/proto/mds.pb.h"
#include "curvefs/proto/topology.pb.h"
Expand Down Expand Up @@ -115,7 +116,8 @@ class MdsClient {
RefreshSession(const std::vector<PartitionTxId> &txIds,
std::vector<PartitionTxId> *latestTxIdList,
const std::string& fsName,
const Mountpoint& mountpoint) = 0;
const Mountpoint& mountpoint,
std::atomic<bool>* enableSumInDir) = 0;

virtual FSStatusCode GetLatestTxId(uint32_t fsId,
std::vector<PartitionTxId>* txIds) = 0;
Expand Down Expand Up @@ -202,7 +204,8 @@ class MdsClientImpl : public MdsClient {
FSStatusCode RefreshSession(const std::vector<PartitionTxId> &txIds,
std::vector<PartitionTxId> *latestTxIdList,
const std::string& fsName,
const Mountpoint& mountpoint) override;
const Mountpoint& mountpoint,
std::atomic<bool>* enableSumInDir) override;

FSStatusCode GetLatestTxId(uint32_t fsId,
std::vector<PartitionTxId>* txIds) override;
Expand Down
26 changes: 12 additions & 14 deletions curvefs/src/client/xattr_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,18 @@ CURVEFS_ERROR XattrManager::CalOneLayerSumInfo(InodeAttr *attr) {
summaryInfo.entries++;
summaryInfo.fbytes += it.length();
}
if (!(AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRFILES)->second),
summaryInfo.files, true) &&
AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRSUBDIRS)->second),
summaryInfo.subdirs, true) &&
AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRENTRIES)->second),
summaryInfo.entries, true) &&
AddUllStringToFirst(
&(attr->mutable_xattr()->find(XATTRFBYTES)->second),
summaryInfo.fbytes + attr->length(), true))) {
ret = CURVEFS_ERROR::INTERNAL;
}
attr->mutable_xattr()->erase(XATTRFILES);
attr->mutable_xattr()->erase(XATTRSUBDIRS);
attr->mutable_xattr()->erase(XATTRENTRIES);
attr->mutable_xattr()->erase(XATTRFBYTES);
attr->mutable_xattr()->insert(
{XATTRFILES, std::to_string(summaryInfo.files)});
attr->mutable_xattr()->insert(
{XATTRSUBDIRS, std::to_string(summaryInfo.subdirs)});
attr->mutable_xattr()->insert(
{XATTRENTRIES, std::to_string(summaryInfo.entries)});
attr->mutable_xattr()->insert(
{XATTRFBYTES, std::to_string(summaryInfo.fbytes + attr->length())});
}
return ret;
}
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/mds/fs_info_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ void FsInfoWrapper::AddMountPoint(const Mountpoint& mp) {
*p = mp;

fsInfo_.set_mountnum(fsInfo_.mountnum() + 1);

if (fsInfo_.enablesumindir()&&fsInfo_.mountnum() > 1) {
fsInfo_.set_enablesumindir(false);
}
}

FSStatusCode FsInfoWrapper::DeleteMountPoint(const Mountpoint& mp) {
Expand Down
10 changes: 10 additions & 0 deletions curvefs/src/mds/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,16 @@ void FsManager::RefreshSession(const RefreshSessionRequest* request,

// update this client's alive time
UpdateClientAliveTime(request->mountpoint(), request->fsname());
FsInfoWrapper wrapper;
FSStatusCode ret = fsStorage_->Get(request->fsname(), &wrapper);
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "GetFsInfo fail, get fs fail, fsName = "
<< request->fsname()
<< ", errCode = " << FSStatusCode_Name(ret);
return;
}

response->set_enablesumindir(wrapper.ProtoFsInfo().enablesumindir());
}

FSStatusCode FsManager::ReloadMountedFsVolumeSpace() {
Expand Down
14 changes: 10 additions & 4 deletions curvefs/test/client/lease/lease_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,19 @@ TEST_F(LeaseExecutorTest, test_start) {
{
LOG(INFO) << "### case1: invalid lease time ###";
opt_.leaseTimeUs = 0;
LeaseExecutor exec(opt_, metaCache_, mdsCli_);
std::atomic<bool> enableSumInDir;
enableSumInDir.store(true);
LeaseExecutor exec(opt_, metaCache_, mdsCli_, &enableSumInDir);
ASSERT_FALSE(exec.Start());
}

{
LOG(INFO) << "### case2: invalid refresh times per lease ###";
opt_.refreshTimesPerLease = 0;
opt_.leaseTimeUs = 20;
LeaseExecutor exec(opt_, metaCache_, mdsCli_);
std::atomic<bool> enableSumInDir;
enableSumInDir.store(true);
LeaseExecutor exec(opt_, metaCache_, mdsCli_, &enableSumInDir);
ASSERT_FALSE(exec.Start());
}
}
Expand All @@ -85,15 +89,17 @@ TEST_F(LeaseExecutorTest, test_start_stop) {
EXPECT_CALL(*metaCache_, GetAllTxIds(_))
.WillOnce(SetArgPointee<0>(std::vector<PartitionTxId>{}))
.WillRepeatedly(SetArgPointee<0>(txIds));
EXPECT_CALL(*mdsCli_, RefreshSession(_, _, _, _))
EXPECT_CALL(*mdsCli_, RefreshSession(_, _, _, _, _))
.WillOnce(Return(FSStatusCode::UNKNOWN_ERROR))
.WillRepeatedly(
DoAll(SetArgPointee<1>(txIds), Return(FSStatusCode::OK)));
EXPECT_CALL(*metaCache_, SetTxId(1, 2))
.Times(AtLeast(1));

// lease executor start
LeaseExecutor exec(opt_, metaCache_, mdsCli_);
std::atomic<bool> enableSumInDir;
enableSumInDir.store(true);
LeaseExecutor exec(opt_, metaCache_, mdsCli_, &enableSumInDir);
ASSERT_TRUE(exec.Start());

std::this_thread::sleep_for(std::chrono::milliseconds(200));
Expand Down
10 changes: 6 additions & 4 deletions curvefs/test/client/rpcclient/mds_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <google/protobuf/util/message_differencer.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <atomic>

#include "curvefs/src/client/rpcclient/mds_client.h"
#include "curvefs/test/client/rpcclient/mock_mds_base_client.h"
Expand Down Expand Up @@ -837,7 +838,7 @@ TEST_F(MdsClientImplTest, RefreshSession) {

// out
std::vector<PartitionTxId> out;

std::atomic<bool>* enableSumInDir = new std::atomic<bool> (true);
RefreshSessionResponse response;

{
Expand All @@ -846,7 +847,7 @@ TEST_F(MdsClientImplTest, RefreshSession) {
EXPECT_CALL(mockmdsbasecli_, RefreshSession(_, _, _, _))
.WillOnce(SetArgPointee<1>(response));
ASSERT_FALSE(mdsclient_.RefreshSession(txIds, &out,
fsName, mountpoint));
fsName, mountpoint, enableSumInDir));
ASSERT_TRUE(out.empty());
}

Expand All @@ -857,7 +858,7 @@ TEST_F(MdsClientImplTest, RefreshSession) {
EXPECT_CALL(mockmdsbasecli_, RefreshSession(_, _, _, _))
.WillOnce(SetArgPointee<1>(response));
ASSERT_FALSE(mdsclient_.RefreshSession(txIds, &out,
fsName, mountpoint));
fsName, mountpoint, enableSumInDir));
ASSERT_EQ(1, out.size());
ASSERT_TRUE(
google::protobuf::util::MessageDifferencer::Equals(out[0], tmp))
Expand All @@ -873,7 +874,8 @@ TEST_F(MdsClientImplTest, RefreshSession) {
EXPECT_CALL(mockmdsbasecli_, RefreshSession(_, _, _, _))
.WillRepeatedly(Invoke(RefreshSessionRpcFailed));
ASSERT_EQ(FSStatusCode::RPC_ERROR,
mdsclient_.RefreshSession(txIds, &out, fsName, mountpoint));
mdsclient_.RefreshSession(txIds, &out, fsName, mountpoint,
enableSumInDir));
}
}

Expand Down
Loading

0 comments on commit 9dea224

Please sign in to comment.