Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into multi_get
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored Jun 5, 2024
2 parents 71b3e33 + 1705916 commit 8b089c3
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 11 deletions.
2 changes: 2 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class InfoCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new InfoCmd(*this); }
void Execute() override;

private:
InfoSection info_section_;
Expand Down Expand Up @@ -324,6 +325,7 @@ class ConfigCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new ConfigCmd(*this); }
void Execute() override;

private:
std::vector<std::string> config_args_v_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_cmd_table_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PikaCmdTableManager {
std::shared_ptr<Cmd> GetCmd(const std::string& opt);
bool CmdExist(const std::string& cmd) const;
CmdTable* GetCmdTable();
uint32_t GetCmdId();
uint32_t GetMaxCmdId();

std::vector<std::string> GetAclCategoryCmdNames(uint32_t flag);

Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
std::shared_ptr<std::string> GetResp();

void SetStage(CmdStage stage);
void SetCmdId(uint32_t cmdId){cmdId_ = cmdId;}

virtual void DoBinlog();

Expand Down
1 change: 1 addition & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const std::string kDefaultRsyncAuth = "default";

/* Rsync */
const int kMaxRsyncParallelNum = 4;
constexpr int kMaxRsyncInitReTryTimes = 64;

struct DBStruct {
DBStruct(std::string tn, int32_t inst_num)
Expand Down
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class SyncSlaveDB : public SyncDB {

private:
std::unique_ptr<rsync::RsyncClient> rsync_cli_;
int32_t rsync_init_retry_count_{0};
pstd::Mutex db_mu_;
RmNode m_info_;
ReplState repl_state_{kNoConnect};
Expand Down
1 change: 1 addition & 0 deletions include/pika_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class WatchCmd : public Cmd {
Cmd* Clone() override { return new WatchCmd(*this); }
void Merge() override {}
std::vector<std::string> current_key() const override { return keys_; }
void Execute() override;

private:
void DoInitial() override;
Expand Down
8 changes: 8 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,11 @@ const std::string InfoCmd::kDebugSection = "debug";
const std::string InfoCmd::kCommandStatsSection = "commandstats";
const std::string InfoCmd::kCacheSection = "cache";

void InfoCmd::Execute() {
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name_);
Do();
}

void InfoCmd::DoInitial() {
size_t argc = argv_.size();
if (argc > 4) {
Expand Down Expand Up @@ -1386,6 +1391,9 @@ std::string InfoCmd::CacheStatusToString(int status) {
return std::string("Unknown");
}
}
void ConfigCmd::Execute() {
Do();
}

void ConfigCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down
3 changes: 2 additions & 1 deletion src/pika_cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ void PikaCmdTableManager::InitCmdTable(void) {
CommandStatistics statistics;
for (auto& iter : *cmds_) {
cmdstat_map_.emplace(iter.first, statistics);
iter.second->SetCmdId(cmdId_++);
}
}

Expand Down Expand Up @@ -81,7 +82,7 @@ std::shared_ptr<Cmd> PikaCmdTableManager::NewCommand(const std::string& opt) {

CmdTable* PikaCmdTableManager::GetCmdTable() { return cmds_.get(); }

uint32_t PikaCmdTableManager::GetCmdId() { return ++cmdId_; }
uint32_t PikaCmdTableManager::GetMaxCmdId() { return cmdId_; }

bool PikaCmdTableManager::CheckCurrentThreadDistributionMapExist(const std::thread::id& tid) {
std::shared_lock l(map_protector_);
Expand Down
3 changes: 0 additions & 3 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "include/pika_acl.h"
#include "include/pika_admin.h"
#include "include/pika_bit.h"
#include "include/pika_cmd_table_manager.h"
#include "include/pika_command.h"
#include "include/pika_geo.h"
#include "include/pika_hash.h"
Expand Down Expand Up @@ -830,8 +829,6 @@ bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_)

Cmd::Cmd(std::string name, int arity, uint32_t flag, uint32_t aclCategory)
: name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory) {
// assign cmd id
cmdId_ = g_pika_cmd_table_manager->GetCmdId();
}

void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) {
Expand Down
15 changes: 10 additions & 5 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,19 @@ pstd::Status SyncSlaveDB::ActivateRsync() {
if (!rsync_cli_->IsIdle()) {
return s;
}
LOG(WARNING) << "ActivateRsync ...";
LOG(WARNING) << "Slave DB: " << DBName() << " Activating Rsync ... (retry count:" << rsync_init_retry_count_ << ")";
if (rsync_cli_->Init()) {
rsync_init_retry_count_ = 0;
rsync_cli_->Start();
return s;
} else {
SetReplState(ReplState::kError);
return Status::Error("rsync client init failed!");;
rsync_init_retry_count_ += 1;
if (rsync_init_retry_count_ >= kMaxRsyncInitReTryTimes) {
SetReplState(ReplState::kError);
LOG(ERROR) << "Full Sync Stage - Rsync Init failed: Slave failed to pull meta info(generated by bgsave task in Master) from Master after MaxRsyncInitReTryTimes("
<< kMaxRsyncInitReTryTimes << " times) is reached. This usually means the Master's bgsave task has costed an unexpected-long time.";
}
return Status::Error("rsync client init failed!");
}
}

Expand Down Expand Up @@ -977,8 +983,7 @@ Status PikaReplicaManager::RunSyncSlaveDBStateMachine() {
} else if (s_db->State() == ReplState::kWaitDBSync) {
Status s = s_db->ActivateRsync();
if (!s.ok()) {
g_pika_server->SetForceFullSync(true);
LOG(WARNING) << "Slave DB: " << s_db->DBName() << " rsync failed! full synchronization will be retried later, error info:" << s.ToString();
LOG(WARNING) << "Slave DB: " << s_db->DBName() << " rsync failed! full synchronization will be retried later";
continue;
}

Expand Down
4 changes: 4 additions & 0 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ void ExecCmd::ServeToBLrPopWithKeys() {
}
}

void WatchCmd::Execute() {
Do();
}

void WatchCmd::Do() {
auto mp = std::map<storage::DataType, storage::Status>{};
for (const auto& key : keys_) {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/rsync_dynamic_reconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var _ = Describe("Rsync Reconfig Test", func() {
slave1.FlushDB(ctx)
master1.FlushDB(ctx)
time.Sleep(3 * time.Second)
RefillMaster(MASTERADDR, 128, ctx)
RefillMaster(MASTERADDR, 64, ctx)
key1 := "45vs45f4s5d6"
value1 := "afd54g5s4f545"
//set key before sync happened, slave is supposed to fetch it when sync done
Expand Down

0 comments on commit 8b089c3

Please sign in to comment.