Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into 240624_use_fwrite_binlog
Browse files Browse the repository at this point in the history
  • Loading branch information
cheniujh authored Jul 1, 2024
2 parents a2f3158 + c259401 commit b7d5e0f
Show file tree
Hide file tree
Showing 17 changed files with 998 additions and 53 deletions.
9 changes: 9 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,18 @@ slow-cmd-pool : no
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
admin-thread-pool-size : 2

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
# Default commands: info, ping, monitor
# This parameter is only supported by the CONFIG GET command and not by CONFIG SET.
admin-cmd-list : info, ping, monitor

# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6
Expand Down
29 changes: 29 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slow_cmd_thread_pool_size_;
}
int admin_thread_pool_size() {
std::shared_lock l(rwlock_);
return admin_thread_pool_size_;
}
int sync_thread_num() {
std::shared_lock l(rwlock_);
return sync_thread_num_;
Expand Down Expand Up @@ -441,6 +445,12 @@ class PikaConf : public pstd::BaseConf {
return pstd::Set2String(slow_cmd_set_, ',');
}

// Admin Commands configuration
const std::string GetAdminCmd() {
std::shared_lock l(rwlock_);
return pstd::Set2String(admin_cmd_set_, ',');
}

const std::string GetUserBlackList() {
std::shared_lock l(rwlock_);
return userblacklist_;
Expand All @@ -451,6 +461,10 @@ class PikaConf : public pstd::BaseConf {
return slow_cmd_set_.find(cmd) != slow_cmd_set_.end();
}

bool is_admin_cmd(const std::string& cmd) {
return admin_cmd_set_.find(cmd) != admin_cmd_set_.end();
}

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
Expand Down Expand Up @@ -489,6 +503,11 @@ class PikaConf : public pstd::BaseConf {
slow_cmd_thread_pool_size_ = value;
}

void SetAdminThreadPoolSize(const int value) {
std::lock_guard l(rwlock_);
admin_thread_pool_size_ = value;
}

void SetSlaveof(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slaveof", value);
Expand Down Expand Up @@ -814,6 +833,14 @@ class PikaConf : public pstd::BaseConf {
pstd::StringSplit2Set(lower_value, ',', slow_cmd_set_);
}

void SetAdminCmd(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
pstd::StringToLower(lower_value);
TryPushDiffCommands("admin-cmd-list", lower_value);
pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_);
}

void SetCacheType(const std::string &value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
Expand All @@ -832,7 +859,9 @@ class PikaConf : public pstd::BaseConf {
int thread_num_ = 0;
int thread_pool_size_ = 0;
int slow_cmd_thread_pool_size_ = 0;
int admin_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
std::unordered_set<std::string> admin_cmd_set_ = {"info", "ping", "monitor"};
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
int expire_dump_days_ = 3;
Expand Down
3 changes: 2 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class PikaServer : public pstd::noncopyable {
/*
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
Expand Down Expand Up @@ -554,6 +554,7 @@ class PikaServer : public pstd::noncopyable {
int worker_num_ = 0;
std::unique_ptr<PikaClientProcessor> pika_client_processor_;
std::unique_ptr<net::ThreadPool> pika_slow_cmd_thread_pool_;
std::unique_ptr<net::ThreadPool> pika_admin_cmd_thread_pool_;
std::unique_ptr<PikaDispatchThread> pika_dispatch_thread_ = nullptr;

/*
Expand Down
13 changes: 12 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,13 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slow-cmd-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "admin-thread-pool-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "admin-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->admin_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "userblacklist", 1) != 0) {
elements += 2;
EncodeString(&config_body, "userblacklist");
Expand All @@ -1506,7 +1513,11 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slow-cmd-list");
EncodeString(&config_body, g_pika_conf->GetSlowCmd());
}

if (pstd::stringmatch(pattern.data(), "admin-cmd-list", 1) != 0) {
elements += 2;
EncodeString(&config_body, "admin-cmd-list");
EncodeString(&config_body, g_pika_conf->GetAdminCmd());
}
if (pstd::stringmatch(pattern.data(), "sync-thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "sync-thread-num");
Expand Down
3 changes: 2 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
std::string opt = argvs[0][0];
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd);
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
BatchExecRedisCmd(argvs);
Expand Down
17 changes: 16 additions & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,25 @@ int PikaConf::Load() {
slow_cmd_thread_pool_size_ = 50;
}

GetConfInt("admin-thread-pool-size", &admin_thread_pool_size_);
if (admin_thread_pool_size_ <= 0) {
admin_thread_pool_size_ = 2;
}
if (admin_thread_pool_size_ > 4) {
admin_thread_pool_size_ = 4;
}

std::string slow_cmd_list;
GetConfStr("slow-cmd-list", &slow_cmd_list);
SetSlowCmd(slow_cmd_list);

std::string admin_cmd_list;
GetConfStr("admin-cmd-list", &admin_cmd_list);
if (admin_cmd_list == ""){
admin_cmd_list = "info, monitor, ping";
SetAdminCmd(admin_cmd_list);
}

GetConfInt("sync-thread-num", &sync_thread_num_);
if (sync_thread_num_ <= 0) {
sync_thread_num_ = 3;
Expand Down Expand Up @@ -647,7 +662,7 @@ int PikaConf::Load() {

// rocksdb blob configure
GetConfBool("enable-blob-files", &enable_blob_files_);
GetConfInt64("min-blob-size", &min_blob_size_);
GetConfInt64Human("min-blob-size", &min_blob_size_);
if (min_blob_size_ <= 0) {
min_blob_size_ = 4096;
}
Expand Down
45 changes: 34 additions & 11 deletions src/pika_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "pstd/include/pstd_string.h"

#include "include/pika_geohash_helper.h"
#include "rocksdb/status.h"

void GeoAddCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down Expand Up @@ -59,7 +60,7 @@ void GeoAddCmd::Do() {
rocksdb::Status s = db_->storage()->ZAdd(key_, score_members, &count);
if (s.ok()) {
res_.AppendInteger(count);
} else if (s_.IsInvalidArgument()) {
} else if (s.IsInvalidArgument()) {
res_.SetRes(CmdRes::kMultiKey);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
Expand Down Expand Up @@ -103,7 +104,7 @@ void GeoPosCmd::Do() {
} else if (s.IsNotFound()) {
res_.AppendStringLen(-1);
continue;
} else if (s_.IsInvalidArgument()) {
} else if (s.IsInvalidArgument()) {
res_.SetRes(CmdRes::kMultiKey);
continue;
} else {
Expand Down Expand Up @@ -163,13 +164,14 @@ void GeoDistCmd::Do() {
double first_xy[2];
double second_xy[2];
rocksdb::Status s = db_->storage()->ZScore(key_, first_pos_, &first_score);

if (s.ok()) {
GeoHashBits hash = {.bits = static_cast<uint64_t>(first_score), .step = GEO_STEP_MAX};
geohashDecodeToLongLatWGS84(hash, first_xy);
} else if (s.IsNotFound()) {
res_.AppendStringLen(-1);
return;
} else if (s_.IsInvalidArgument()) {
} else if (s.IsInvalidArgument()) {
res_.SetRes(CmdRes::kMultiKey);
return;
} else {
Expand Down Expand Up @@ -241,7 +243,7 @@ void GeoHashCmd::Do() {
} else if (s.IsNotFound()) {
res_.AppendStringLen(-1);
continue;
} else if (s_.IsInvalidArgument()) {
} else if (s.IsInvalidArgument()) {
res_.SetRes(CmdRes::kMultiKey);
continue;
} else {
Expand Down Expand Up @@ -300,6 +302,7 @@ static void GetAllNeighbors(const std::shared_ptr<DB>& db, std::string& key, Geo
if (HASHISZERO(neighbors[i])) {
continue;
}

min = geohashAlign52Bits(neighbors[i]);
neighbors[i].bits++;
max = geohashAlign52Bits(neighbors[i]);
Expand All @@ -312,8 +315,13 @@ static void GetAllNeighbors(const std::shared_ptr<DB>& db, std::string& key, Geo
std::vector<storage::ScoreMember> score_members;
s = db->storage()->ZRangebyscore(key, static_cast<double>(min), static_cast<double>(max), true, true, &score_members);
if (!s.ok() && !s.IsNotFound()) {
res.SetRes(CmdRes::kErrOther, s.ToString());
return;
if (s.IsInvalidArgument()) {
res.SetRes(CmdRes::kMultiKey);
return;
} else {
res.SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
// Insert into result only if the point is within the search area.
for (auto & score_member : score_members) {
Expand All @@ -339,12 +347,14 @@ static void GetAllNeighbors(const std::shared_ptr<DB>& db, std::string& key, Geo
count_limit = static_cast<int32_t>(result.size());
}
// If using sort option
if (range.sort == Asc) {
std::sort(result.begin(), result.end(), sort_distance_asc);
} else if (range.sort == Desc) {
std::sort(result.begin(), result.end(), sort_distance_desc);
if (range.sort != Unsort) {
if (range.sort == Asc) {
std::sort(result.begin(), result.end(), sort_distance_asc);
} else if (range.sort == Desc) {
std::sort(result.begin(), result.end(), sort_distance_desc);
}
}

if (range.store || range.storedist) {
// Target key, create a sorted set with the results.
std::vector<storage::ScoreMember> score_members;
Expand All @@ -354,10 +364,18 @@ static void GetAllNeighbors(const std::shared_ptr<DB>& db, std::string& key, Geo
score_members.push_back({score, result[i].member});
}
int32_t count = 0;
int32_t card = db->storage()->Exists({range.storekey});
if (card) {
if (db->storage()->Del({range.storekey}) > 0){
db->cache()->Del({range.storekey});
}
}
s = db->storage()->ZAdd(range.storekey, score_members, &count);
if (!s.ok()) {
res.SetRes(CmdRes::kErrOther, s.ToString());
return;
} else {
s = db->cache()->ZAdd(range.storekey, score_members);
}
res.AppendInteger(count_limit);
return;
Expand Down Expand Up @@ -426,6 +444,7 @@ void GeoRadiusCmd::DoInitial() {
return;
}
size_t pos = 6;
range_.sort = Asc;
while (pos < argv_.size()) {
if (strcasecmp(argv_[pos].c_str(), "withdist") == 0) {
range_.withdist = true;
Expand Down Expand Up @@ -555,6 +574,10 @@ void GeoRadiusByMemberCmd::DoInitial() {
void GeoRadiusByMemberCmd::Do() {
double score = 0.0;
rocksdb::Status s = db_->storage()->ZScore(key_, range_.member, &score);
if (s.IsNotFound() && !s.ToString().compare("NotFound: Invalid member")) {
res_.SetRes(CmdRes::kErrOther, "could not decode requested zset member");
return;
}
if (s.ok()) {
double xy[2];
GeoHashBits hash = {.bits = static_cast<uint64_t>(score), .step = GEO_STEP_MAX};
Expand Down
Loading

0 comments on commit b7d5e0f

Please sign in to comment.