diff --git a/tools/benchmark_client/CMakeLists.txt b/tools/benchmark_client/CMakeLists.txt index ac4c652edd..8cdd683174 100644 --- a/tools/benchmark_client/CMakeLists.txt +++ b/tools/benchmark_client/CMakeLists.txt @@ -1,5 +1,5 @@ -set(WARNING_FLAGS "-W -Wextra -Wall -Wsign-compare \ --Wno-unused-parameter -Wno-redundant-decls -Wwrite-strings \ +set(WARNING_FLAGS "-g -W -Wextra -Wall -Wsign-compare \ +-Wno-unused-parameter -Wno-redundant-decls -Wwrite-strings -Wno-unused-result \ -Wpointer-arith -Wreorder -Wswitch -Wsign-promo \ -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers") @@ -10,9 +10,11 @@ aux_source_directory(${SRC_DIR} BASE_OBJS) add_executable(benchmark_client ${BASE_OBJS}) -target_include_directories(benchmark_client PRIVATE ${INSTALL_INCLUDEDIR} PRIVATE ${PROJECT_SOURCE_DIR}) +target_include_directories(benchmark_client PRIVATE ${INSTALL_INCLUDEDIR} PRIVATE ${PROJECT_SOURCE_DIR} ${ROCKSDB_SOURCE_DIR} ${GLOG_INCLUDE_DIR}) target_link_libraries(benchmark_client pthread) +target_link_libraries(benchmark_client ${ROCKSDB_LIBRARY}) +target_link_libraries(benchmark_client ${GLOG_LIBRARY}) target_link_libraries(benchmark_client ${SNAPPY_LIBRARY}) target_link_libraries(benchmark_client pstd) target_link_libraries(benchmark_client ${HIREDIS_LIBRARY}) @@ -20,4 +22,4 @@ set_target_properties(benchmark_client PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR} CMAKE_COMPILER_IS_GNUCXX TRUE COMPILE_FLAGS ${CXXFLAGS}) -add_dependencies(benchmark_client snappy hiredis) \ No newline at end of file +add_dependencies(benchmark_client snappy hiredis) diff --git a/tools/benchmark_client/README.md b/tools/benchmark_client/README.md new file mode 100644 index 0000000000..83fdb1ee5e --- /dev/null +++ b/tools/benchmark_client/README.md @@ -0,0 +1,86 @@ +## 参数 +所有的命令行参数如下所示: +``` + -command (command to execute, eg: generate/get/set/zadd) type: string default: "generate" + -compare_value (whether compare result or not) type: bool default: false + -count (request counts per thread) type: int32 default: 100000 + -dbs (dbs name, eg: 0,1,2) type: string default: "0" + -element_count (elements number in hash/list/set/zset) type: int32 default: 1 + -host (target server's host) type: string default: "127.0.0.1" + -key_size (key size int bytes) type: int32 default: 50 + -password (password) type: string default: "" + -port (target server's listen port) type: int32 default: 9221 + -thread_num (concurrent thread num) type: int32 default: 10 + -timeout (request timeout) type: int32 default: 1000 + -value_size (value size in bytes) type: int32 default: 100 +``` + +compare_value: 是否进行数据校验。如需进行数据校验,set和set操作都需要设置为true。如果compare_value为true,执行set命令时的value值即为key值拼接得到,如果哦compare_value为false,value为随机值。 + +count: 每个线程请求的key个数。 + +element_count: list/zset/set/hash 每个pkey下的member个数。 + +目前支持的command包括:get,set,hset,hgetall,sadd,smembers,lpush,lrange,zadd,zrange + +## 使用方式 +需要先执行generate方式生成待请求的key,如: +``` +./benchmark_client --command=generate --count=2 --element_count=10 --port=9271 --thread_num=2 --key_size=10 --value_size=25 --host=127.0.0.1 --compare_value=1 +``` +执行完成后会在当前目录生成两个benchmark_keyfile_*文件(每个线程生成一个),每个文件行数为20(element_count * count),每个key长度为10. + +接下来可以执行redis API命令,命令执行时,会首先从benchmark_keyfile_*文件中读取到key,在根据compare_value值设置value值。 + +set命令: +``` +./benchmark_client --command=set --count=2 --port=9271 --thread_num=2 --key_size=10 --value_size=25 --host=127.0.0.1 --compare_value=1 +``` + +get命令: +``` +./benchmark_client --command=get --count=2 --port=9271 --thread_num=2 --key_size=10 --value_size=25 --host=127.0.0.1 --compare_value=1 +``` + +hset命令: +``` +//将向pika写入共4个hash pkey,每个pkey包含10个member。 +./benchmark_client --command=set --count=2 --element_count=10 --port=9271 --thread_num=2 --key_size=10 --value_size=25 --host=127.0.0.1 --compare_value=1 +``` + +hgetall命令: +``` +./benchmark_client --command=hgetall --count=2 --element_count=10 --port=9271 --thread_num=2 --key_size=10 --value_size=25 --host=127.0.0.1 --compare_value=1 +``` + +## 执行结果 +``` +=================== Benchmark Client =================== +Server host name: 127.0.0.1 +Server port: 9271 +command: set +Thread num : 2 +Payload size : 25 +Number of request : 2 +Transmit mode: No Pipeline +Collection of dbs: 0 +Elements num: 1 +CompareValue : 1 +Startup Time : Tue Dec 19 20:21:44 2023 + +======================================================== +Table 0 Thread 0 Select DB Success, start to write data... +Table 0 Thread 1 Select DB Success, start to write data... +finish 0 request +finish 0 request +Finish Time : Tue Dec 19 20:21:44 2023 +Total Time Cost : 0 hours 0 minutes 0 seconds +Timeout Count: 0 Error Count: 0 +stats: Count: 4 Average: 166.5000 StdDev: 24.64 +Min: 0 Median: 202.0000 Max: 202 +Percentiles: P50: 202.00 P75: 202.00 P99: 202.00 P99.9: 202.00 P99.99: 202.00 +------------------------------------------------------ + +``` +包括两部分,第一部分是本次benchmark描述信息。第二部分是统计信息,包括超时请求个数,错误请求个数,请求耗时的统计信息。 + diff --git a/tools/benchmark_client/benchmark_client.cc b/tools/benchmark_client/benchmark_client.cc index c34cf34382..2dbd2bb34e 100644 --- a/tools/benchmark_client/benchmark_client.cc +++ b/tools/benchmark_client/benchmark_client.cc @@ -7,43 +7,152 @@ #include #include #include +#include #include #include +#include "gflags/gflags.h" +#include "glog/logging.h" +#include "monitoring/histogram.h" #include "hiredis/hiredis.h" + #include "pstd/include/pstd_status.h" #include "pstd/include/pstd_string.h" +#include "pstd/include/env.h" + +DEFINE_string(command, "generate", "command to execute, eg: generate/get/set/zadd"); +DEFINE_bool(pipeline, false, "whether to enable pipeline"); +DEFINE_string(host, "127.0.0.1", "target server's host"); +DEFINE_int32(port, 9221, "target server's listen port"); +DEFINE_int32(timeout, 1000, "request timeout"); +DEFINE_string(password, "", "password"); +DEFINE_int32(key_size, 50, "key size int bytes"); +DEFINE_int32(value_size, 100, "value size in bytes"); +DEFINE_int32(count, 100000, "request counts"); +DEFINE_int32(thread_num, 10, "concurrent thread num"); +DEFINE_string(dbs, "0", "dbs name, eg: 0,1,2"); +DEFINE_int32(element_count, 1, "elements number in hash/list/set/zset"); +DEFINE_bool(compare_value, false, "whether compare result or not"); -#define TIME_OF_LOOP 1000000 - -using pstd::Status; using std::default_random_engine; +using pstd::Status; -Status RunSetCommand(redisContext* c); -Status RunSetCommandPipeline(redisContext* c); -Status RunZAddCommand(redisContext* c); +struct RequestStat { + int success_cnt = 0; + int timeout_cnt = 0; + int error_cnt = 0; + RequestStat operator+(const RequestStat& stat) { + RequestStat res; + res.success_cnt = this->success_cnt + stat.success_cnt; + res.timeout_cnt = this->timeout_cnt + stat.timeout_cnt; + res.error_cnt = this->error_cnt + stat.error_cnt; + return res; + } +}; struct ThreadArg { + ThreadArg(pthread_t t, const std::string& tn, int i) + : idx(i), tid(t), table_name(tn), stat() {} + int idx; pthread_t tid; std::string table_name; - size_t idx; + RequestStat stat; }; -enum TransmitMode { kNormal = 0, kPipeline = 1 }; +thread_local int last_seed = 0; +std::vector thread_args; +std::vector tables; +std::unique_ptr hist; +int pipeline_num = 0; -static int32_t last_seed = 0; +bool CompareValue(std::set expect, const redisReply* reply) { + if (!FLAGS_compare_value) { + return true; + } + if (reply == nullptr || + reply->type != REDIS_REPLY_ARRAY || + reply->elements != expect.size()) { + return false; + } + for (size_t i = 0; i < reply->elements; i++) { + std::string key = reply->element[i]->str; + auto it = expect.find(key); + if (it == expect.end()) { + return false; + } + expect.erase(key); + } + return expect.size() == 0; +} -std::string tables_str = "0"; -std::vector tables; +// for hash type +bool CompareValue(std::map expect, const redisReply* reply) { + if (!FLAGS_compare_value) { + return true; + } + if (reply == nullptr || + reply->type != REDIS_REPLY_ARRAY || + reply->elements != expect.size() * 2) { + return false; + } + std::unordered_map actual; + for (size_t i = 0; i < reply->elements; i++) { + std::string key = reply->element[i]->str; + std::string value = reply->element[++i]->str; + auto it = expect.find(key); + if (it == expect.end() || + it->second != value) { + return false; + } + expect.erase(key); + } + return expect.size() == 0; +} -std::string hostname = "127.0.0.1"; -int port = 9221; -std::string password = ""; -uint32_t payload_size = 50; -uint32_t number_of_request = 100000; -uint32_t thread_num_each_table = 1; -TransmitMode transmit_mode = kNormal; -int pipeline_num = 0; +// for string type +bool CompareValue(const std::string& expect, const std::string& actual) { + if (!FLAGS_compare_value) { + return true; + } + return expect == actual; +} + +void PrepareKeys(int suffix, std::vector* keys) { + keys->resize(FLAGS_count); + std::string filename = "benchmark_keyfile_" + std::to_string(suffix); + FILE* fp = fopen(filename.c_str(), "r"); + for (int idx = 0; idx < FLAGS_count; ++idx) { + char* key = new char[FLAGS_key_size + 2]; + fgets(key, FLAGS_key_size + 2, fp); + key[FLAGS_key_size] = '\0'; + (*keys)[idx] = std::string(key); + delete []key; + } + fclose(fp); +} + +void PreparePkeyMembers(int suffix, std::vector>>* keys) { + keys->resize(FLAGS_count); + std::string filename = "benchmark_keyfile_" + std::to_string(suffix); + FILE* fp = fopen(filename.c_str(), "r"); + for (int idx = 0; idx < FLAGS_count; ++idx) { + char* key = new char[FLAGS_key_size + 2]; + fgets(key, FLAGS_key_size + 2, fp); + key[FLAGS_key_size] = '\0'; + std::set elements; + elements.insert(std::string(key)); + for (int idy = 1; idy < FLAGS_element_count; ++idy) { + char* element = new char[FLAGS_key_size + 2]; + fgets(element, FLAGS_key_size + 2, fp); + element[FLAGS_key_size] = '\0'; + elements.insert(std::string(element)); + delete []element; + } + (*keys)[idx] = std::make_pair(std::string(key), elements); + delete []key; + } + fclose(fp); +} void GenerateRandomString(int32_t len, std::string* target) { target->clear(); @@ -61,65 +170,85 @@ void GenerateRandomString(int32_t len, std::string* target) { } } +void GenerateValue(const std::string& key, int32_t len, std::string* value) { + if (!FLAGS_compare_value) { + return GenerateRandomString(len, value); + } + value->resize(0); + while (len - value->size() != 0) { + size_t min_len = len - value->size(); + if (min_len > key.size()) { + min_len = key.size(); + } + value->append(key.substr(0, min_len)); + } + return; +} + void PrintInfo(const std::time_t& now) { std::cout << "=================== Benchmark Client ===================" << std::endl; - std::cout << "Server host name: " << hostname << std::endl; - std::cout << "Server port: " << port << std::endl; - std::cout << "Thread num : " << thread_num_each_table << std::endl; - std::cout << "Payload size : " << payload_size << std::endl; - std::cout << "Number of request : " << number_of_request << std::endl; - std::cout << "Transmit mode: " << (transmit_mode == kNormal ? "No Pipeline" : "Pipeline") << std::endl; - std::cout << "Collection of tables: " << tables_str << std::endl; - std::cout << "Startup Time : " << asctime(localtime(&now)); + std::cout << "Server host name: " << FLAGS_host << std::endl; + std::cout << "Server port: " << FLAGS_port << std::endl; + std::cout << "command: " << FLAGS_command << std::endl; + std::cout << "Thread num : " << FLAGS_thread_num << std::endl; + std::cout << "Payload size : " << FLAGS_value_size << std::endl; + std::cout << "Number of request : " << FLAGS_count << std::endl; + std::cout << "Transmit mode: " << (FLAGS_pipeline ? "Pipeline" : "No Pipeline") << std::endl; + std::cout << "Collection of dbs: " << FLAGS_dbs << std::endl; + std::cout << "Elements num: " << FLAGS_element_count << std::endl; + std::cout << "CompareValue : " << FLAGS_compare_value << std::endl; + std::cout << "Startup Time : " << asctime(localtime(&now)) << std::endl; std::cout << "========================================================" << std::endl; } -void Usage() { - std::cout << "Usage: " << std::endl; - std::cout << "\tBenchmark_client writes data to the specified table" << std::endl; - std::cout << "\t-h -- server hostname (default 127.0.0.1)" << std::endl; - std::cout << "\t-p -- server port (default 9221)" << std::endl; - std::cout << "\t-t -- thread num of each table (default 1)" << std::endl; - std::cout << "\t-c -- collection of table names (default db1)" << std::endl; - std::cout << "\t-d -- data size of SET value in bytes (default 50)" << std::endl; - std::cout << "\t-n -- number of requests single thread (default 100000)" << std::endl; - std::cout << "\t-P -- pipeline requests. (default no pipeline)" << std::endl; - std::cout << "\texample: ./benchmark_client -t 3 -c db1,db2 -d 1024" << std::endl; +void RunGenerateCommand(int index) { + std::string filename = "benchmark_keyfile_" + std::to_string(index); + FILE* fp = fopen(filename.c_str(), "w+"); + if (fp == nullptr) { + LOG(INFO) << "open file error"; + return; + } + for (int i = 0; i < FLAGS_count * FLAGS_element_count; i++) { + std::string key; + GenerateRandomString(FLAGS_key_size, &key); + key.append("\n"); + fwrite(key.data(), sizeof(char), key.size(), fp); + } + fclose(fp); } -std::vector thread_args; - -void* ThreadMain(void* arg) { - ThreadArg* ta = reinterpret_cast(arg); - redisContext* c; - redisReply* res = nullptr; - struct timeval timeout = {1, 500000}; // 1.5 seconds - c = redisConnectWithTimeout(hostname.data(), port, timeout); - +redisContext* Prepare(ThreadArg* arg) { + int index = arg->idx; + std::string table = arg->table_name; + struct timeval timeout = {FLAGS_timeout / 1000, (FLAGS_timeout % 1000) * 1000}; + redisContext* c = redisConnectWithTimeout(FLAGS_host.data(), FLAGS_port, timeout); if (!c || c->err) { if (c) { - printf("Thread %lu, Connection error: %s\n", ta->tid, c->errstr); + printf("Table: %s Thread %d, Connection error: %s\n", + table.c_str(), index, c->errstr); redisFree(c); } else { - printf("Thread %lu, Connection error: can't allocate redis context\n", ta->tid); + printf("Table %s Thread %d, Connection error: " + "can't allocate redis context\n", table.c_str(), index); } return nullptr; } - if (!password.empty()) { - const char* auth_argv[2] = {"AUTH", password.data()}; - size_t auth_argv_len[2] = {4, password.size()}; - res = reinterpret_cast(redisCommandArgv(c, 2, reinterpret_cast(auth_argv), - reinterpret_cast(auth_argv_len))); + if (!FLAGS_password.empty()) { + const char* auth_argv[2] = {"AUTH", FLAGS_password.data()}; + size_t auth_argv_len[2] = {4, FLAGS_password.size()}; + auto res = reinterpret_cast( + redisCommandArgv(c, 2, reinterpret_cast(auth_argv), + reinterpret_cast(auth_argv_len))); if (!res) { - printf("Thread %lu Auth Failed, Get reply Error\n", ta->tid); + printf("Table %s Thread %d Auth Failed: Get reply Error\n", table.c_str(), index); freeReplyObject(res); redisFree(c); return nullptr; } else { if (!strcasecmp(res->str, "OK")) { } else { - printf("Thread %lu Auth Failed: %s, thread exit...\n", ta->idx, res->str); + printf("Table %s Thread %d Auth Failed: %s\n", table.c_str(), index, res->str); freeReplyObject(res); redisFree(c); return nullptr; @@ -128,106 +257,302 @@ void* ThreadMain(void* arg) { freeReplyObject(res); } - const char* select_argv[2] = {"SELECT", ta->table_name.data()}; - size_t select_argv_len[2] = {6, ta->table_name.size()}; - res = reinterpret_cast(redisCommandArgv(c, 2, reinterpret_cast(select_argv), - reinterpret_cast(select_argv_len))); + const char* select_argv[2] = {"SELECT", arg->table_name.data()}; + size_t select_argv_len[2] = {6, arg->table_name.size()}; + auto res = reinterpret_cast( + redisCommandArgv(c, 2, reinterpret_cast(select_argv), + reinterpret_cast(select_argv_len))); if (!res) { - printf("Thread %lu Select Table %s Failed, Get reply Error\n", ta->tid, ta->table_name.data()); + printf("Thread %d Select Table %s Failed, Get reply Error\n", index, table.c_str()); freeReplyObject(res); redisFree(c); return nullptr; } else { if (!strcasecmp(res->str, "OK")) { - printf("Table %s Thread %lu Select DB Success, start to write data...\n", ta->table_name.data(), ta->idx); + printf("Table %s Thread %d Select DB Success, start to write data...\n", + table.c_str(), index); } else { - printf("Table %s Thread %lu Select DB Failed: %s, thread exit...\n", ta->table_name.data(), ta->idx, res->str); + printf("Table %s Thread %d Select DB Failed: %s, thread exit...\n", + table.c_str(), index, res->str); freeReplyObject(res); redisFree(c); return nullptr; } } - freeReplyObject(res); + return c; +} - if (transmit_mode == kNormal) { - Status s = RunSetCommand(c); - if (!s.ok()) { - std::string thread_info = "Table " + ta->table_name + ", Thread " + std::to_string(ta->idx); - printf("%s, %s, thread exit...\n", thread_info.c_str(), s.ToString().c_str()); - redisFree(c); - return nullptr; +Status RunGetCommand(redisContext*& c, ThreadArg* arg) { + redisReply* res = nullptr; + std::vector keys; + PrepareKeys(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + if (idx % 10000 == 0) { + LOG(INFO) << "finish " << idx << " request"; } - } else if (transmit_mode == kPipeline) { - Status s = RunSetCommandPipeline(c); - if (!s.ok()) { - std::string thread_info = "Table " + ta->table_name + ", Thread " + std::to_string(ta->idx); - printf("%s, %s, thread exit...\n", thread_info.c_str(), s.ToString().c_str()); + const char* argv[2]; + size_t argvlen[2]; + std::string value; + std::string key = keys[idx]; + argv[0] = "get"; + argvlen[0] = 3; + argv[1] = key.c_str(); + argvlen[1] = key.size(); + GenerateValue(key, FLAGS_value_size, &value); + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 2, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << key; + arg->stat.timeout_cnt++; redisFree(c); - return nullptr; + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_STRING) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << key; + arg->stat.error_cnt++; + } else { + if (CompareValue(value, std::string(res->str))) { + arg->stat.success_cnt++; + } else { + LOG(INFO) << FLAGS_command << " key: " << key + << " compare value failed"; + arg->stat.error_cnt++; + } } + + freeReplyObject(res); } + return Status::OK(); +} - redisFree(c); - return nullptr; +Status RunSAddCommand(redisContext*& c, ThreadArg* arg) { + redisReply* res = nullptr; + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + const char* argv[3]; + size_t argvlen[3]; + std::string pkey = keys[idx].first; + argv[0] = "sadd"; + argvlen[0] = 4; + argv[1] = pkey.c_str(); + argvlen[1] = pkey.size(); + for (const auto& member : keys[idx].second) { + argv[2] = member.c_str(); + argvlen[2] = member.size(); + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 3, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_INTEGER) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + arg->stat.success_cnt++; + } + freeReplyObject(res); + } + } + return Status::OK(); } -Status RunSetCommandPipeline(redisContext* c) { +Status RunSMembersCommand(redisContext*& c, ThreadArg* arg) { redisReply* res = nullptr; - for (size_t idx = 0; idx < number_of_request; (idx += pipeline_num)) { - const char* argv[3] = {"SET", nullptr, nullptr}; - size_t argv_len[3] = {3, 0, 0}; - for (int32_t batch_idx = 0; batch_idx < pipeline_num; ++batch_idx) { - std::string key; - std::string value; + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); - default_random_engine e; - e.seed(last_seed); - last_seed = e(); - int32_t rand_num = last_seed % 10 + 1; - GenerateRandomString(rand_num, &key); - GenerateRandomString(payload_size, &value); - - argv[1] = key.c_str(); - argv_len[1] = key.size(); - argv[2] = value.c_str(); - argv_len[2] = value.size(); - - if (redisAppendCommandArgv(c, 3, reinterpret_cast(argv), - reinterpret_cast(argv_len)) == REDIS_ERR) { - return Status::Corruption("Redis Append Command Argv Error"); + for (int idx = 0; idx < FLAGS_count; ++idx) { + if (idx % 10000 == 0) { + LOG(INFO) << "finish " << idx << " request"; + } + const char* argv[2]; + size_t argvlen[2]; + std::string pkey = keys[idx].first; + auto elements = keys[idx].second; + + argv[0] = "smembers"; + argvlen[0] = 8; + argv[1] = pkey.c_str(); + argvlen[1] = pkey.size(); + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 2, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); } + } else if (res->type != REDIS_REPLY_ARRAY) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + if (CompareValue(elements, res)) { + arg->stat.success_cnt++; + } else { + LOG(INFO) << FLAGS_command << " key: " << pkey + << " compare value failed"; + arg->stat.error_cnt++; + } + } + freeReplyObject(res); + } + return Status::OK(); +} + +Status RunHGetAllCommand(redisContext*& c, ThreadArg* arg) { + redisReply* res = nullptr; + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + if (idx % 10000 == 0) { + LOG(INFO) << "finish " << idx << " request"; + } + const char* argv[2]; + size_t argvlen[2]; + std::string pkey = keys[idx].first; + auto elements = keys[idx].second; + std::map m; + for (const auto& ele : elements) { + std::string value; + GenerateValue(ele, FLAGS_value_size, &value); + m[ele] = value; } - for (int32_t batch_idx = 0; batch_idx < pipeline_num; ++batch_idx) { - if (redisGetReply(c, reinterpret_cast(&res)) == REDIS_ERR) { - return Status::Corruption("Redis Pipeline Get Reply Error"); + argv[0] = "hgetall"; + argvlen[0] = 7; + argv[1] = pkey.c_str(); + argvlen[1] = pkey.size(); + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 2, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_ARRAY) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + if (CompareValue(m, res)) { + arg->stat.success_cnt++; } else { - if (!res || strcasecmp(res->str, "OK")) { - std::string res_str = "Exec command error: " + (res != nullptr ? std::string(res->str) : ""); - freeReplyObject(res); - return Status::Corruption(res_str); + LOG(INFO) << FLAGS_command << " key: " << pkey + << " compare value failed"; + arg->stat.error_cnt++; + } + } + + freeReplyObject(res); + } + return Status::OK(); +} + +Status RunHSetCommand(redisContext*& c, ThreadArg* arg) { + redisReply* res = nullptr; + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + const char* set_argv[4]; + size_t set_argvlen[4]; + std::string pkey; + std::string value; + + pkey = keys[idx].first; + set_argv[0] = "hset"; + set_argvlen[0] = 4; + set_argv[1] = pkey.c_str(); + set_argvlen[1] = pkey.size(); + + for (const auto& member : keys[idx].second) { + GenerateValue(member, FLAGS_value_size, &value); + set_argv[2] = member.c_str(); + set_argvlen[2] = member.size(); + set_argv[3] = value.c_str(); + set_argvlen[3] = value.size(); + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 4, reinterpret_cast(set_argv), + reinterpret_cast(set_argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); } - freeReplyObject(res); + } else if (res->type != REDIS_REPLY_INTEGER) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + arg->stat.success_cnt++; } + freeReplyObject(res); } } return Status::OK(); } -Status RunSetCommand(redisContext* c) { +Status RunSetCommand(redisContext*& c, ThreadArg* arg) { redisReply* res = nullptr; - for (size_t idx = 0; idx < number_of_request; ++idx) { + std::vector keys; + PrepareKeys(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + if (idx % 10000 == 0) { + LOG(INFO) << "finish " << idx << " request"; + } const char* set_argv[3]; size_t set_argvlen[3]; - std::string key; - std::string value; - default_random_engine e; - e.seed(last_seed); - last_seed = e(); - int32_t rand_num = last_seed % 10 + 1; - GenerateRandomString(rand_num, &key); - GenerateRandomString(payload_size, &value); + std::string value; + std::string key = keys[idx]; + GenerateValue(key, FLAGS_value_size, &value); set_argv[0] = "set"; set_argvlen[0] = 3; @@ -236,46 +561,175 @@ Status RunSetCommand(redisContext* c) { set_argv[2] = value.c_str(); set_argvlen[2] = value.size(); + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( - redisCommandArgv(c, 3, reinterpret_cast(set_argv), reinterpret_cast(set_argvlen))); - if (!res || strcasecmp(res->str, "OK")) { - std::string res_str = "Exec command error: " + (res != nullptr ? std::string(res->str) : ""); + redisCommandArgv(c, 3, reinterpret_cast(set_argv), + reinterpret_cast(set_argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << key; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_STATUS) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << key; + arg->stat.error_cnt++; + } else { + arg->stat.success_cnt++; + } + freeReplyObject(res); + } + return Status::OK(); +} + +Status RunZAddCommand(redisContext*& c, ThreadArg* arg) { + redisReply* res = nullptr; + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + const char* argv[4]; + size_t argvlen[4]; + std::string pkey = keys[idx].first; + argv[0] = "zadd"; + argvlen[0] = 4; + argv[1] = pkey.c_str(); + argvlen[1] = pkey.size(); + int score = 0; + for (const auto& member : keys[idx].second) { + score++; + argv[2] = std::to_string(score).c_str(); + argvlen[2] = std::to_string(score).size(); + argv[3] = member.c_str(); + argvlen[3] = member.size(); + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 4, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_INTEGER) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + arg->stat.success_cnt++; + } + freeReplyObject(res); - return Status::Corruption(res_str); } + } + return Status::OK(); +} + +Status RunZRangeCommand(redisContext*& c, ThreadArg* arg) { + redisReply* res = nullptr; + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + if (idx % 10000 == 0) { + LOG(INFO) << "finish " << idx << " request"; + } + const char* argv[4]; + size_t argvlen[4]; + std::string pkey = keys[idx].first; + auto elements = keys[idx].second; + + argv[0] = "zrange"; + argvlen[0] = 6; + argv[1] = pkey.c_str(); + argvlen[1] = pkey.size(); + argv[2] = "0"; + argvlen[2] = 1; + argv[3] = "-1"; + argvlen[3] = 2; + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 4, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_ARRAY) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + if (CompareValue(elements, res)) { + arg->stat.success_cnt++; + } else { + LOG(INFO) << FLAGS_command << " key: " << pkey + << " compare value failed"; + arg->stat.error_cnt++; + } + } + freeReplyObject(res); } return Status::OK(); } -Status RunZAddCommand(redisContext* c) { +Status RunLPushCommand(redisContext*& c, ThreadArg* arg) { redisReply* res = nullptr; - for (size_t idx = 0; idx < 1; ++idx) { - const char* zadd_argv[4]; - size_t zadd_argvlen[4]; - std::string key; - std::string score; - std::string member; - GenerateRandomString(10, &key); - - zadd_argv[0] = "zadd"; - zadd_argvlen[0] = 4; - zadd_argv[1] = key.c_str(); - zadd_argvlen[1] = key.size(); - for (size_t sidx = 0; sidx < 10000; ++sidx) { - score = std::to_string(sidx * 2); - GenerateRandomString(payload_size, &member); - zadd_argv[2] = score.c_str(); - zadd_argvlen[2] = score.size(); - zadd_argv[3] = member.c_str(); - zadd_argvlen[3] = member.size(); - - res = reinterpret_cast(redisCommandArgv(c, 4, reinterpret_cast(zadd_argv), - reinterpret_cast(zadd_argvlen))); - if (!res || !res->integer) { - std::string res_str = "Exec command error: " + (res != nullptr ? std::string(res->str) : ""); - freeReplyObject(res); - return Status::Corruption(res_str); + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + const char* argv[3]; + size_t argvlen[3]; + std::string pkey = keys[idx].first; + argv[0] = "lpush"; + argvlen[0] = 5; + argv[1] = pkey.c_str(); + argvlen[1] = pkey.size(); + for (const auto& member : keys[idx].second) { + argv[2] = member.c_str(); + argvlen[2] = member.size(); + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 3, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_INTEGER) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + arg->stat.success_cnt++; } freeReplyObject(res); } @@ -283,57 +737,126 @@ Status RunZAddCommand(redisContext* c) { return Status::OK(); } -// ./benchmark_client -// ./benchmark_client -h -// ./benchmark_client -b db1:5:10000,db2:3:10000 -int main(int argc, char* argv[]) { - int opt; - while ((opt = getopt(argc, argv, "P:h:p:a:t:c:d:n:")) != -1) { - switch (opt) { - case 'h': - hostname = std::string(optarg); - break; - case 'p': - port = atoi(optarg); - break; - case 'P': - transmit_mode = kPipeline; - pipeline_num = atoi(optarg); - break; - case 'a': - password = std::string(optarg); - break; - case 't': - thread_num_each_table = atoi(optarg); - break; - case 'c': - tables_str = std::string(optarg); - break; - case 'd': - payload_size = atoi(optarg); - break; - case 'n': - number_of_request = atoi(optarg); - break; - default: - Usage(); - exit(-1); +Status RunLRangeCommand(redisContext*& c, ThreadArg* arg) { + redisReply* res = nullptr; + std::vector>> keys; + PreparePkeyMembers(arg->idx, &keys); + + for (int idx = 0; idx < FLAGS_count; ++idx) { + if (idx % 10000 == 0) { + LOG(INFO) << "finish " << idx << " request"; } + const char* argv[4]; + size_t argvlen[4]; + std::string pkey = keys[idx].first; + auto elements = keys[idx].second; + + argv[0] = "lrange"; + argvlen[0] = 6; + argv[1] = pkey.c_str(); + argvlen[1] = pkey.size(); + argv[2] = "0"; + argvlen[2] = 1; + argv[3] = "-1"; + argvlen[3] = 2; + + uint64_t begin = pstd::NowMicros(); + res = reinterpret_cast( + redisCommandArgv(c, 4, reinterpret_cast(argv), + reinterpret_cast(argvlen))); + hist->Add(pstd::NowMicros() - begin); + + if (!res) { + LOG(INFO) << FLAGS_command << " timeout, key: " << pkey; + arg->stat.timeout_cnt++; + redisFree(c); + c = Prepare(arg); + if (!c) { + return Status::InvalidArgument("reconnect failed"); + } + } else if (res->type != REDIS_REPLY_ARRAY) { + LOG(INFO) << FLAGS_command << " invalid type: " << res->type + << " key: " << pkey; + arg->stat.error_cnt++; + } else { + if (CompareValue(elements, res)) { + arg->stat.success_cnt++; + } else { + LOG(INFO) << FLAGS_command << " key: " << pkey + << " compare value failed"; + arg->stat.error_cnt++; + } + } + + freeReplyObject(res); + } + return Status::OK(); +} + +void* ThreadMain(void* arg) { + ThreadArg* ta = reinterpret_cast(arg); + last_seed = ta->idx; + + if (FLAGS_command == "generate") { + RunGenerateCommand(ta->idx); + return nullptr; } - pstd::StringSplit(tables_str, ',', tables); + redisContext* c = Prepare(ta); + if (!c) { + return nullptr; + } + + Status s; + if (FLAGS_command == "get") { + s = RunGetCommand(c, ta); + } else if (FLAGS_command == "set") { + s = RunSetCommand(c, ta); + } else if (FLAGS_command == "hset") { + s = RunHSetCommand(c, ta); + } else if (FLAGS_command == "hgetall") { + s = RunHGetAllCommand(c,ta); + } else if (FLAGS_command == "sadd") { + s = RunSAddCommand(c, ta); + } else if (FLAGS_command == "smembers") { + s = RunSMembersCommand(c,ta); + } else if (FLAGS_command == "zadd") { + s = RunZAddCommand(c, ta); + } else if (FLAGS_command == "zrange") { + s = RunZRangeCommand(c,ta); + } else if (FLAGS_command == "lpush") { + s = RunLPushCommand(c, ta); + } else if (FLAGS_command == "lrange") { + s = RunLRangeCommand(c,ta); + } + + if (!s.ok()) { + std::string thread_info = "Table " + ta->table_name + ", Thread " + std::to_string(ta->idx); + printf("%s, %s, thread exit...\n", thread_info.c_str(), s.ToString().c_str()); + } + redisFree(c); + return nullptr; +} +int main(int argc, char* argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + pstd::StringSplit(FLAGS_dbs, ',', tables); if (tables.empty()) { - Usage(); exit(-1); } + FLAGS_logtostdout = true; + FLAGS_minloglevel = 0; + FLAGS_logbufsecs = 0; + ::google::InitGoogleLogging("benchmark_client"); + + hist.reset(new rocksdb::HistogramImpl()); std::chrono::system_clock::time_point start_time = std::chrono::system_clock::now(); std::time_t now = std::chrono::system_clock::to_time_t(start_time); PrintInfo(now); for (const auto& table : tables) { - for (size_t idx = 0; idx < thread_num_each_table; ++idx) { + for (int idx = 0; idx < FLAGS_thread_num; ++idx) { thread_args.push_back({0, table, idx}); } } @@ -342,8 +865,10 @@ int main(int argc, char* argv[]) { pthread_create(&thread_args[idx].tid, nullptr, ThreadMain, &thread_args[idx]); } + RequestStat stat; for (size_t idx = 0; idx < thread_args.size(); ++idx) { pthread_join(thread_args[idx].tid, nullptr); + stat = stat + thread_args[idx].stat; } std::chrono::system_clock::time_point end_time = std::chrono::system_clock::now(); @@ -354,7 +879,10 @@ int main(int argc, char* argv[]) { auto minutes = std::chrono::duration_cast(end_time - start_time).count(); auto seconds = std::chrono::duration_cast(end_time - start_time).count(); + std::cout << "Total Time Cost : " << hours << " hours " << minutes % 60 << " minutes " << seconds % 60 << " seconds " << std::endl; + std::cout << "Timeout Count: " << stat.timeout_cnt << " Error Count: " << stat.error_cnt << std::endl; + std::cout << "stats: " << hist->ToString() << std::endl; return 0; }