Skip to content

Commit

Permalink
curvefs: support replace a servev which donen't works anymore.
Browse files Browse the repository at this point in the history
If a server with some metaservers on it, and it crashed. Then will need
add a new server into the cluster.
  • Loading branch information
SeanHai authored and YunhuiChen committed Jan 14, 2022
1 parent 7e0227c commit 29f51ac
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 8 deletions.
2 changes: 1 addition & 1 deletion curvefs/proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ enum TopoStatusCode {
TOPO_IP_PORT_DUPLICATED = 14;
TOPO_NAME_DUPLICATED = 15;
TOPO_CREATE_COPYSET_ON_METASERVER_FAIL = 16;
TOPO_CANNOT_REMOVE_NOT_RETIRED = 17;
TOPO_CANNOT_REMOVE_NOT_OFFLINE = 17;
TOPO_POOL_EXIST = 18;
TOPO_LEADER_NOT_FOUND = 19;
TOPO_PARTITION_NOT_FOUND = 20;
Expand Down
13 changes: 13 additions & 0 deletions curvefs/src/mds/topology/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ TopoStatusCode TopologyImpl::RemoveMetaServer(MetaServerIdType id) {
WriteLockGuard wlockMetaServer(metaServerMutex_);
auto it = metaServerMap_.find(id);
if (it != metaServerMap_.end()) {
uint64_t metaserverCapacity =
it->second.GetMetaServerSpace().GetDiskCapacity();
if (!storage_->DeleteMetaServer(id)) {
return TopoStatusCode::TOPO_STORGE_FAIL;
}
Expand All @@ -230,6 +232,17 @@ TopoStatusCode TopologyImpl::RemoveMetaServer(MetaServerIdType id) {
ix->second.RemoveMetaServer(id);
}
metaServerMap_.erase(it);

// update pool
WriteLockGuard wlockPool(poolMutex_);
PoolIdType poolId = ix->second.GetPoolId();
auto it = poolMap_.find(poolId);
if (it != poolMap_.end()) {
it->second.SetDiskCapacity(it->second.GetDiskCapacity() -
metaserverCapacity);
} else {
return TopoStatusCode::TOPO_POOL_NOT_FOUND;
}
return TopoStatusCode::TOPO_OK;
} else {
return TopoStatusCode::TOPO_METASERVER_NOT_FOUND;
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/mds/topology/topology_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ void TopologyManager::DeleteServer(const DeleteServerRequest *request,
<< ", serverId = " << request->serverid();
response->set_statuscode(TopoStatusCode::TOPO_INTERNAL_ERROR);
return;
} else if (OnlineState::OFFLINE != ms.GetOnlineState()) {
LOG(ERROR) << "Can not delete server which have "
<< "metaserver not offline.";
response->set_statuscode(
TopoStatusCode::TOPO_CANNOT_REMOVE_NOT_OFFLINE);
return;
} else {
errcode = topology_->RemoveMetaServer(msId);
if (errcode != TopoStatusCode::TOPO_OK) {
Expand Down
138 changes: 131 additions & 7 deletions curvefs/src/tools/create/curvefs_create_topology_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ int CurvefsBuildTopologyTool::HandleBuildCluster() {
return DealFailedRet(ret, "scan cluster");
}

ret = RemoveServersNotInNewTopo();
if (ret != 0) {
return DealFailedRet(ret, "remove server");
}

ret = RemoveZonesNotInNewTopo();
if (ret != 0) {
return DealFailedRet(ret, "remove zone");
}

ret = RemovePoolsNotInNewTopo();
if (ret != 0) {
return DealFailedRet(ret, "remove pool");
}

ret = CreatePool();
if (ret != 0) {
return DealFailedRet(ret, "create pool");
Expand Down Expand Up @@ -270,6 +285,8 @@ int CurvefsBuildTopologyTool::ScanCluster() {
[it](Pool& data) { return data.name == it->poolname(); });
if (ix != poolDatas.end()) {
poolDatas.erase(ix);
} else {
poolToDel.emplace_back(it->poolid());
}
}

Expand All @@ -291,6 +308,8 @@ int CurvefsBuildTopologyTool::ScanCluster() {
});
if (ix != zoneDatas.end()) {
zoneDatas.erase(ix);
} else {
zoneToDel.emplace_back(it->zoneid());
}
}

Expand All @@ -313,6 +332,8 @@ int CurvefsBuildTopologyTool::ScanCluster() {
});
if (ix != serverDatas.end()) {
serverDatas.erase(ix);
} else {
serverToDel.emplace_back(it->serverid());
}
}

Expand All @@ -325,7 +346,6 @@ int CurvefsBuildTopologyTool::ListPool(std::list<PoolInfo>* poolInfos) {
ListPoolResponse response;
brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);
cntl.set_log_id(1);

LOG(INFO) << "ListPool send request: " << request.DebugString();
stub.ListPool(&cntl, &request, &response, nullptr);
Expand Down Expand Up @@ -357,7 +377,6 @@ int CurvefsBuildTopologyTool::GetZonesInPool(PoolIdType poolid,

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);
cntl.set_log_id(1);

LOG(INFO) << "ListZoneInPool, send request: " << request.DebugString();

Expand Down Expand Up @@ -390,7 +409,6 @@ int CurvefsBuildTopologyTool::GetServersInZone(
request.set_zoneid(zoneid);
brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);
cntl.set_log_id(1);

LOG(INFO) << "ListZoneServer, send request: " << request.DebugString();

Expand All @@ -415,6 +433,115 @@ int CurvefsBuildTopologyTool::GetServersInZone(
return 0;
}

int CurvefsBuildTopologyTool::RemovePoolsNotInNewTopo() {
TopologyService_Stub stub(&channel_);
for (auto it : poolToDel) {
DeletePoolRequest request;
DeletePoolResponse response;
request.set_poolid(it);

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);

LOG(INFO) << "ClearPool, send request: " << request.DebugString();

stub.DeletePool(&cntl, &request, &response, nullptr);

if (cntl.ErrorCode() == EHOSTDOWN ||
cntl.ErrorCode() == brpc::ELOGOFF) {
return kRetCodeRedirectMds;
} else if (cntl.Failed()) {
LOG(ERROR) << "ClearPool errcorde = " << response.statuscode()
<< ", error content:" << cntl.ErrorText()
<< " , poolId = " << it;
return kRetCodeCommonErr;
}

if (response.statuscode() != TopoStatusCode::TOPO_OK) {
LOG(ERROR) << "ClearPool rpc response fail. "
<< "Message is :" << response.DebugString()
<< " , poolId =" << it;
return response.statuscode();
} else {
LOG(INFO) << "Received ClearPool response success, "
<< response.DebugString();
}
}
return 0;
}

int CurvefsBuildTopologyTool::RemoveZonesNotInNewTopo() {
TopologyService_Stub stub(&channel_);
for (auto it : zoneToDel) {
DeleteZoneRequest request;
DeleteZoneResponse response;
request.set_zoneid(it);

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);

LOG(INFO) << "ClearZone, send request: " << request.DebugString();

stub.DeleteZone(&cntl, &request, &response, nullptr);

if (cntl.ErrorCode() == EHOSTDOWN ||
cntl.ErrorCode() == brpc::ELOGOFF) {
return kRetCodeRedirectMds;
} else if (cntl.Failed()) {
LOG(ERROR) << "ClearZone, errcorde = " << response.statuscode()
<< ", error content:" << cntl.ErrorText()
<< " , zoneId = " << it;
return kRetCodeCommonErr;
}
if (response.statuscode() != TopoStatusCode::TOPO_OK) {
LOG(ERROR) << "ClearZone Rpc response fail. "
<< "Message is :" << response.DebugString()
<< " , zoneId = " << it;
return response.statuscode();
} else {
LOG(INFO) << "Received ClearZone Rpc success, "
<< response.DebugString();
}
}
return 0;
}

int CurvefsBuildTopologyTool::RemoveServersNotInNewTopo() {
TopologyService_Stub stub(&channel_);
for (auto it : serverToDel) {
DeleteServerRequest request;
DeleteServerResponse response;
request.set_serverid(it);

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);

LOG(INFO) << "ClearServer, send request: " << request.DebugString();

stub.DeleteServer(&cntl, &request, &response, nullptr);

if (cntl.ErrorCode() == EHOSTDOWN ||
cntl.ErrorCode() == brpc::ELOGOFF) {
return kRetCodeRedirectMds;
} else if (cntl.Failed()) {
LOG(ERROR) << "ClearServer, errcorde = " << response.statuscode()
<< ", error content : " << cntl.ErrorText()
<< " , serverId = " << it;
return kRetCodeCommonErr;
}
if (response.statuscode() != TopoStatusCode::TOPO_OK) {
LOG(ERROR) << "ClearServer Rpc response fail. "
<< "Message is :" << response.DebugString()
<< " , serverId = " << it;
return response.statuscode();
} else {
LOG(INFO) << "Received ClearServer Rpc success, "
<< response.DebugString();
}
}
return 0;
}

int CurvefsBuildTopologyTool::CreatePool() {
TopologyService_Stub stub(&channel_);
for (auto it : poolDatas) {
Expand All @@ -431,7 +558,6 @@ int CurvefsBuildTopologyTool::CreatePool() {

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);
cntl.set_log_id(1);

LOG(INFO) << "CreatePool, send request: " << request.DebugString();

Expand Down Expand Up @@ -470,7 +596,6 @@ int CurvefsBuildTopologyTool::CreateZone() {

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);
cntl.set_log_id(1);

LOG(INFO) << "CreateZone, send request: " << request.DebugString();

Expand All @@ -485,7 +610,7 @@ int CurvefsBuildTopologyTool::CreateZone() {
<< " , zoneName = " << it.name;
return kRetCodeCommonErr;
}
if (response.statuscode() != 0) {
if (response.statuscode() != TopoStatusCode::TOPO_OK) {
LOG(ERROR) << "CreateZone Rpc response fail. "
<< "Message is :" << response.DebugString()
<< " , zoneName = " << it.name;
Expand Down Expand Up @@ -513,7 +638,6 @@ int CurvefsBuildTopologyTool::CreateServer() {

brpc::Controller cntl;
cntl.set_timeout_ms(FLAGS_rpcTimeoutMs);
cntl.set_log_id(1);

LOG(INFO) << "CreateServer, send request: " << request.DebugString();

Expand Down
7 changes: 7 additions & 0 deletions curvefs/src/tools/create/curvefs_create_topology_tool.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class CurvefsBuildTopologyTool : public curvefs::tools::CurvefsTool {
int InitPoolData();
int ScanCluster();
int ScanPool();
int RemovePoolsNotInNewTopo();
int RemoveZonesNotInNewTopo();
int RemoveServersNotInNewTopo();
int CreatePool();
int CreateZone();
int CreateServer();
Expand All @@ -150,6 +153,10 @@ class CurvefsBuildTopologyTool : public curvefs::tools::CurvefsTool {
std::list<Zone> zoneDatas;
std::list<Pool> poolDatas;

std::list<ServerIdType> serverToDel;
std::list<ZoneIdType> zoneToDel;
std::list<PoolIdType> poolToDel;

std::vector<std::string> mdsAddressStr_;
int mdsAddressIndex_;
brpc::Channel channel_;
Expand Down
42 changes: 42 additions & 0 deletions curvefs/test/mds/topology/test_topology_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,48 @@ TEST_F(TestTopologyManager, test_DeleteServer_success) {
ASSERT_EQ(TopoStatusCode::TOPO_OK, response.statuscode());
}

TEST_F(TestTopologyManager, test_DeleteServerHaveMetaserver_success) {
PoolIdType poolId = 0x11;
ZoneIdType zoneId = 0x21;
ServerIdType serverId = 0x31;
PrepareAddPool(poolId);
PrepareAddZone(zoneId);
PrepareAddServer(serverId, "hostname1", "ip1", 0, "ip2", 0, zoneId, poolId);
PrepareAddMetaServer(0x41, "ms1", "token1", 0x31, "ip1", 0, "ip2", 8888,
OnlineState::OFFLINE);

DeleteServerRequest request;
request.set_serverid(serverId);

DeleteServerResponse response;

EXPECT_CALL(*storage_, DeleteMetaServer(_)).WillOnce(Return(true));
EXPECT_CALL(*storage_, DeleteServer(_)).WillOnce(Return(true));

serviceManager_->DeleteServer(&request, &response);

ASSERT_EQ(TopoStatusCode::TOPO_OK, response.statuscode());
}

TEST_F(TestTopologyManager, test_DeleteServerHaveMetaserver_fail) {
PoolIdType poolId = 0x11;
ZoneIdType zoneId = 0x21;
ServerIdType serverId = 0x31;
PrepareAddPool(poolId);
PrepareAddZone(zoneId);
PrepareAddServer(serverId, "hostname1", "ip1", 0, "ip2", 0, zoneId, poolId);
PrepareAddMetaServer(0x41, "ms1", "token1", 0x31, "ip1", 0, "ip2", 8888);
DeleteServerRequest request;
request.set_serverid(serverId);

DeleteServerResponse response;

serviceManager_->DeleteServer(&request, &response);

ASSERT_EQ(TopoStatusCode::TOPO_CANNOT_REMOVE_NOT_OFFLINE,
response.statuscode());
}

TEST_F(TestTopologyManager, test_ListZoneServer_ByIdSuccess) {
PoolIdType poolId = 0x11;
ZoneIdType zoneId = 0x21;
Expand Down

0 comments on commit 29f51ac

Please sign in to comment.