Skip to content

Commit

Permalink
curvefs_tool: query-inode and list-partition
Browse files Browse the repository at this point in the history
1. use GetOrModifyS3ChunkInfoResponse to query inode
2. add list-partition for locating inodes
3. use GeIndode to get inode baseInfo
  • Loading branch information
Cyber-SiKu committed May 5, 2022
1 parent 7941955 commit 7bffef5
Show file tree
Hide file tree
Showing 16 changed files with 1,167 additions and 28 deletions.
2 changes: 2 additions & 0 deletions curvefs/conf/tools.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ mdsDummyAddr=127.0.0.1:7700 # __CURVEADM_TEMPLATE__ ${cluster_mds_dummy_addr} _
# rpc timeout
rpcTimeoutMs=10000
rpcRetryTimes=5
# rpc stream idle timeout
rpcStreamIdleTimeoutMs=10000
# topo file path
topoFilePath=curvefs/test/tools/topo_example.json # __CURVEADM_TEMPLATE__ /curvefs/tools/conf/topology.json __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ project_root_dest }}/conf/topology.json __ANSIBLE_TEMPLATE__
# metaserver external address
Expand Down
13 changes: 0 additions & 13 deletions curvefs/proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -381,18 +381,6 @@ message CommitTxResponse {
required TopoStatusCode statusCode = 1;
}

message ListPartitionsRequest {
repeated uint32 fsId = 1;
}

message PartitionInfoList {
repeated common.PartitionInfo partitionInfoList = 1;
}

message ListPartitionsResponse {
required TopoStatusCode statusCode = 1;
map<uint32, PartitionInfoList> fsId2partitionList = 2;
}

message CopysetKey {
required uint32 poolId = 1;
Expand Down Expand Up @@ -485,7 +473,6 @@ service TopologyService {
rpc GetCopysetOfPartition(GetCopysetOfPartitionRequest) returns (GetCopysetOfPartitionResponse);
rpc CommitTx(CommitTxRequest) returns (CommitTxResponse);
rpc GetCopysetsInfo (GetCopysetsInfoRequest) returns (GetCopysetsInfoResponse);
rpc ListPartitions(ListPartitionsRequest) returns (ListPartitionsResponse);
rpc ListCopysetInfo(ListCopysetInfoRequest) returns (ListCopysetInfoResponse);
rpc StatMetadataUsage(StatMetadataUsageRequest) returns (StatMetadataUsageResponse);

Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/tools/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ cc_library(
"//curvefs/proto:space_cc_proto",
"//curvefs/src/mds/common:fs_mds_common",
"//curvefs/src/mds/topology:curvefs_deal_peerid",
"//curvefs/src/common:curvefs_common",
"//external:brpc",
"//external:gflags",
"//external:glog",
"//external:json",
"//src/common:curve_common",
"//src/mds/topology",
"@com_google_absl//absl/cleanup",
],
)
79 changes: 69 additions & 10 deletions curvefs/src/tools/curvefs_tool.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,25 @@
#include <utility>
#include <vector>

#include "absl/cleanup/cleanup.h"
#include "curvefs/src/common/rpc_stream.h"
#include "curvefs/src/tools/curvefs_tool_define.h"
#include "curvefs/src/tools/curvefs_tool_metric.h"
#include "src/common/configuration.h"

DECLARE_string(confPath);
DECLARE_uint32(rpcTimeoutMs);
DECLARE_uint32(rpcRetryTimes);
DECLARE_uint32(rpcStreamIdleTimeoutMs);

namespace curvefs {
namespace tools {

using ::curvefs::common::StreamClient;
using ::curvefs::common::StreamConnection;
using ::curvefs::common::StreamOptions;
using ::curvefs::common::StreamStatus;

class CurvefsTool {
public:
CurvefsTool() {}
Expand Down Expand Up @@ -120,13 +128,15 @@ class CurvefsToolRpc : public CurvefsTool {
const std::shared_ptr<ResponseT>& response,
const std::shared_ptr<ServiceT>& service_stub,
const std::function<void(ControllerT*, RequestT*, ResponseT*)>&
service_stub_func) {
service_stub_func,
const std::shared_ptr<StreamClient>& streamClient) {
channel_ = channel;
controller_ = controller;
requestQueue_ = requestQueue;
response_ = response;
service_stub_ = service_stub;
service_stub_func_ = service_stub_func;
streamClient_ = streamClient;
InitHostsAddr();
return 0;
}
Expand All @@ -136,6 +146,7 @@ class CurvefsToolRpc : public CurvefsTool {
controller_ = std::make_shared<ControllerT>();
response_ = std::make_shared<ResponseT>();
service_stub_ = std::make_shared<ServiceT>(channel_.get());
streamClient_ = std::make_shared<StreamClient>();

// add need update FlagInfos
AddUpdateFlags();
Expand Down Expand Up @@ -196,32 +207,60 @@ class CurvefsToolRpc : public CurvefsTool {
* as long as one succeeds, it returns true and ends sending
*/
virtual bool SendRequestToServices() {
uint32_t failHostNumner = 0;
uint32_t failHostNumber = 0;
bool ret = false;
for (const std::string& host : hostsAddr_) {
SetController();
if (channel_->Init(host.c_str(), nullptr) != 0) {
std::cerr << "fail init channel to host: " << host << std::endl;
brpc::ChannelOptions channelOpt;
if (isStreaming_) {
// set stream rpc client
channelOpt.connection_group = "streaming";
StreamOptions streamOpt(FLAGS_rpcStreamIdleTimeoutMs);
connection_ = streamClient_->Connect(
controller_.get(), receiveCallback_, streamOpt);
if (nullptr == connection_ || nullptr == receiveCallback_) {
errorOutput_ << "Stream connect " << host << " failed\n";
++failHostNumber;
continue;
}
}
if (channel_->Init(host.c_str(), &channelOpt) != 0) {
errorOutput_ << "fail init channel to host: " << host
<< std::endl;
++failHostNumber;
continue;
}
// if service_stub_func_ does not assign a value
// it will crash in there
service_stub_func_(controller_.get(), &requestQueue_.front(),
response_.get());
if (controller_->Failed()) {
++failHostNumner;
++failHostNumber;
}
if (isStreaming_) {
auto status = connection_->WaitAllDataReceived();
if (status != StreamStatus::STREAM_OK) {
errorOutput_ << "Receive stream data from " << host
<< " failed , status=" << status << std::endl;
}
}
if (AfterSendRequestToHost(host) == true) {
controller_->Reset();
return true;
ret = true;
break;
}
controller_->Reset();
if (isStreaming_ && connection_ != nullptr) {
streamClient_->Close(connection_);
connection_ = nullptr;
}
SetController();
}
if (hostsAddr_.size() != failHostNumner) {
if (hostsAddr_.size() != failHostNumber) {
errorOutput_.str("");
}
// send request to all host failed
return false;

return ret;
}

virtual void SetController() {
Expand Down Expand Up @@ -288,6 +327,10 @@ class CurvefsToolRpc : public CurvefsTool {
}
}

void SetStreamingRpc(bool isStreaming) {
isStreaming_ = isStreaming;
}

protected:
/**
* @brief save the host who will be sended request
Expand Down Expand Up @@ -317,7 +360,8 @@ class CurvefsToolRpc : public CurvefsTool {
* it is core function of this class
* make sure uint test cover SendRequestToServices
*/
std::function<void(ControllerT*, RequestT*, ResponseT*)> service_stub_func_;
std::function<void(ControllerT*, RequestT*, ResponseT*)>
service_stub_func_ = nullptr;
/**
* @brief save the functor which defined in curvefs_tool_define.h
*
Expand All @@ -326,6 +370,21 @@ class CurvefsToolRpc : public CurvefsTool {
std::vector<std::function<void(curve::common::Configuration*,
google::CommandLineFlagInfo*)>>
updateFlagsFunc_;
/**
* @brief whether to use stream rpc api
*/
bool isStreaming_ = false;
/**
* @brief rpc streaming client for too large data
*/
std::shared_ptr<StreamClient> streamClient_;
/**
* @brief rpc stream client callback function for processing received data
*
*/
std::function<bool(butil::IOBuf* buffer)> receiveCallback_ = nullptr;

std::shared_ptr<StreamConnection> connection_;
};

class CurvefsToolMetric : public CurvefsTool {
Expand Down
19 changes: 15 additions & 4 deletions curvefs/src/tools/curvefs_tool_define.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ DEFINE_bool(enableSumInDir, false, "statistic info in xattr");
DEFINE_uint64(capacity, (uint64_t)100 * 1024 * 1024 * 1024,
"capacity of fs, default 100G");
DEFINE_string(user, "anonymous", "user of request");
DEFINE_string(inodeId, "1,2,3", "inodes id");

// list-topology
DEFINE_string(jsonPath, "/tmp/topology.json", "output json path");
Expand All @@ -78,6 +79,8 @@ DEFINE_string(volumeBitmapLocation,
"AtStart",
"volume space bitmap location, support |AtStart| and |AtEnd|");

DEFINE_uint32(rpcStreamIdleTimeoutMs, 10000, "rpc stream idle timeout");

namespace curvefs {

namespace tools {
Expand Down Expand Up @@ -218,10 +221,14 @@ std::function<void(curve::common::Configuration*, google::CommandLineFlagInfo*)>
&FLAGS_s3_chunksize);

std::function<void(curve::common::Configuration*, google::CommandLineFlagInfo*)>
SetEnableSumInDir =
std::bind(&SetFlagInfo<bool>, std::placeholders::_1,
std::placeholders::_2, "enableSumInDir",
&FLAGS_enableSumInDir);
SetEnableSumInDir = std::bind(&SetFlagInfo<bool>, std::placeholders::_1,
std::placeholders::_2, "enableSumInDir",
&FLAGS_enableSumInDir);

std::function<void(curve::common::Configuration*, google::CommandLineFlagInfo*)>
SetRpcStreamIdleTimeoutMs = std::bind(
&SetFlagInfo<uint32_t>, std::placeholders::_1, std::placeholders::_2,
"rpcStreamIdleTimeoutMs", &FLAGS_rpcStreamIdleTimeoutMs);

/* check flag */
std::function<bool(google::CommandLineFlagInfo*)> CheckMetaserverIdDefault =
Expand Down Expand Up @@ -255,6 +262,10 @@ std::function<bool(google::CommandLineFlagInfo*)> CheckJsonPathDefault =
std::bind(&CheckFlagInfoDefault<fLS::clstring>, std::placeholders::_1,
"jsonPath");

std::function<bool(google::CommandLineFlagInfo*)> CheckInodeIdDefault =
std::bind(&CheckFlagInfoDefault<fLS::clstring>, std::placeholders::_1,
"inodeId");

/* translate to string */

auto StrVec2Str(const std::vector<std::string>& strVec) -> std::string {
Expand Down
11 changes: 11 additions & 0 deletions curvefs/src/tools/curvefs_tool_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,16 @@ const char kPartitionQueryCmd[] = "query-partition";
const char kMetaserverQueryCmd[] = "query-metaserver";
// fs-query
const char kFsQueryCmd[] = "query-fs";
// Inode-query
const char kInodeQueryCmd[] = "query-inode";
// fsinfo-list
const char kFsInfoListCmd[] = "list-fs";
// list-fs-copysetid
const char kCopysetInfoListCmd[] = "list-copysetInfo";
// list-topology
const char kTopologyListCmd[] = "list-topology";
// list-partition
const char kPartitionListCmd[] = "list-partition";
// no-invoke Used for commands that are not directly invoked
const char kNoInvokeCmd[] = "no-invoke";

Expand All @@ -99,6 +103,7 @@ const char kHelpStr[] =
"list-fs: list all fs in cluster\n"
"list-copysetInfo: list all copysetInfo in cluster\n"
"list-topology: list cluster's topology\n"
"list-topology: list partition by fsId\n"
"create-topology: create cluster topology based on topo.json\n"
"create-fs: create fs\n"
"umount-fs: umount curvefs from local and cluster\n"
Expand All @@ -109,6 +114,7 @@ const char kHelpStr[] =
"query-partition: query copyset in partition by partitionId\n"
"query-metaserver: query metaserver by metaserverId or metaserverName\n"
"query-fs: query fs by fsId or fsName\n"
"query-inode: query inode\n"
"You can specify the config path by -confPath to avoid typing too many "
"options\n"; // NOLINT

Expand Down Expand Up @@ -260,6 +266,9 @@ extern std::function<void(curve::common::Configuration*,
extern std::function<void(curve::common::Configuration*,
google::CommandLineFlagInfo*)>
SetEnableSumInDir;
extern std::function<void(curve::common::Configuration*,
google::CommandLineFlagInfo*)>
SetRpcStreamIdleTimeoutMs;

/* checkout the flag is default */
extern std::function<bool(google::CommandLineFlagInfo*)>
Expand All @@ -281,6 +290,8 @@ extern std::function<bool(google::CommandLineFlagInfo*)>

extern std::function<bool(google::CommandLineFlagInfo*)> CheckJsonPathDefault;

extern std::function<bool(google::CommandLineFlagInfo*)> CheckInodeIdDefault;

/* translate to string */
std::string StrVec2Str(const std::vector<std::string>&);

Expand Down
10 changes: 10 additions & 0 deletions curvefs/src/tools/curvefs_tool_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "curvefs/src/tools/list/curvefs_topology_list.h"
#include "curvefs/src/tools/query/curvefs_copyset_query.h"
#include "curvefs/src/tools/query/curvefs_fs_query.h"
#include "curvefs/src/tools/query/curvefs_inode_query.h"
#include "curvefs/src/tools/query/curvefs_metaserver_query.h"
#include "curvefs/src/tools/query/curvefs_partition_query.h"
#include "curvefs/src/tools/status/curvefs_copyset_status.h"
Expand All @@ -43,6 +44,7 @@
#include "curvefs/src/tools/umount/curvefs_umount_fs_tool.h"
#include "curvefs/src/tools/usage/curvefs_metadata_usage_tool.h"
#include "curvefs/src/tools/version/curvefs_version_tool.h"
#include "curvefs/src/tools/list/curvefs_partition_list.h"

namespace curvefs {
namespace tools {
Expand Down Expand Up @@ -99,6 +101,10 @@ CurvefsToolFactory::CurvefsToolFactory() {
RegisterCurvefsTool(std::string(kTopologyListCmd),
CurvefsToolCreator<list::TopologyListTool>::Create);

// list-partition
RegisterCurvefsTool(std::string(kPartitionListCmd),
CurvefsToolCreator<list::PartitionListTool>::Create);

// query-copyset
RegisterCurvefsTool(std::string(kCopysetQueryCmd),
CurvefsToolCreator<query::CopysetQueryTool>::Create);
Expand All @@ -115,6 +121,10 @@ CurvefsToolFactory::CurvefsToolFactory() {
RegisterCurvefsTool(std::string(kFsQueryCmd),
CurvefsToolCreator<query::FsQueryTool>::Create);

// query-inode
RegisterCurvefsTool(std::string(kInodeQueryCmd),
CurvefsToolCreator<query::InodeQueryTool>::Create);

// delete-fs
RegisterCurvefsTool(std::string(kDeleteFsCmd),
CurvefsToolCreator<delete_::DeleteFsTool>::Create);
Expand Down
Loading

0 comments on commit 7bffef5

Please sign in to comment.