Skip to content

Commit

Permalink
adjust code
Browse files Browse the repository at this point in the history
  • Loading branch information
buzhimingyonghu committed Jan 14, 2025
1 parent 7317faf commit 03d4a6e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 33 deletions.
8 changes: 6 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,12 @@ class PikaServer : public pstd::noncopyable {
/*
* Disk usage statistic
*/
uint64_t GetDBSize() const { return disk_statistic_.db_size_.load(); }
uint64_t GetLogSize() const { return disk_statistic_.log_size_.load(); }
uint64_t GetDBSize() const {
return disk_statistic_.db_size_.load();
}
uint64_t GetLogSize() const {
return disk_statistic_.log_size_.load();
}

/*
* Network Statistic used
Expand Down
64 changes: 36 additions & 28 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;

/* Context */

Context::Context(std::string path) : path_(std::move(path)) {}
Context::Context(std::string path) : path_(std::move(path)) {}

Status Context::StableSave() {
char* p = save_->GetData();
Expand Down Expand Up @@ -81,7 +81,9 @@ void Context::Reset(const LogOffset& offset) {

/* SyncProgress */

std::string MakeSlaveKey(const std::string& ip, int port) { return ip + ":" + std::to_string(port); }
std::string MakeSlaveKey(const std::string& ip, int port) {
return ip + ":" + std::to_string(port);
}

std::shared_ptr<SlaveNode> SyncProgress::GetSlaveNode(const std::string& ip, int port) {
std::string slave_key = MakeSlaveKey(ip, port);
Expand Down Expand Up @@ -159,7 +161,7 @@ int SyncProgress::SlaveSize() {

/* MemLog */

MemLog::MemLog() = default;
MemLog::MemLog() = default;

int MemLog::Size() { return static_cast<int>(logs_.size()); }

Expand Down Expand Up @@ -217,7 +219,8 @@ int MemLog::InternalFindLogByBinlogOffset(const LogOffset& offset) {

/* ConsensusCoordinator */

ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name) : db_name_(db_name) {
ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name)
: db_name_(db_name) {
std::string db_log_path = g_pika_conf->log_path() + "log_" + db_name + "/";
std::string log_path = db_log_path;
context_ = std::make_shared<Context>(log_path + kContext);
Expand All @@ -237,8 +240,8 @@ void ConsensusCoordinator::Init() {
// load term_
term_ = stable_logger_->Logger()->term();

LOG(INFO) << DBInfo(db_name_).ToString() << "Restore applied index " << context_->applied_index_.ToString()
<< " current term " << term_;
LOG(INFO) << DBInfo(db_name_).ToString() << "Restore applied index "
<< context_->applied_index_.ToString() << " current term " << term_;
if (committed_index_ == LogOffset()) {
return;
}
Expand Down Expand Up @@ -300,7 +303,8 @@ Status ConsensusCoordinator::Reset(const LogOffset& offset) {
Status s = stable_logger_->Logger()->SetProducerStatus(offset.b_offset.filenum, offset.b_offset.offset,
offset.l_offset.term, offset.l_offset.index);
if (!s.ok()) {
LOG(WARNING) << DBInfo(db_name_).ToString() << "Consensus reset status failed " << s.ToString();
LOG(WARNING) << DBInfo(db_name_).ToString() << "Consensus reset status failed "
<< s.ToString();
return s;
}

Expand All @@ -316,8 +320,7 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
std::vector<std::string> keys = cmd_ptr->current_key();
// slotkey shouldn't add binlog
if (cmd_ptr->name() == kCmdNameSAdd && !keys.empty() &&
(keys[0].compare(0, SlotKeyPrefix.length(), SlotKeyPrefix) == 0 ||
keys[0].compare(0, SlotTagPrefix.length(), SlotTagPrefix) == 0)) {
(keys[0].compare(0, SlotKeyPrefix.length(), SlotKeyPrefix) == 0 || keys[0].compare(0, SlotTagPrefix.length(), SlotTagPrefix) == 0)) {
return Status::OK();
}

Expand All @@ -339,8 +342,8 @@ Status ConsensusCoordinator::InternalAppendLog(const std::shared_ptr<Cmd>& cmd_p
Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
LogOffset last_index = mem_logger_->last_offset();
if (attribute.logic_id() < last_index.l_offset.index) {
LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id " << attribute.logic_id()
<< " cur last index " << last_index.l_offset.index;
LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id "
<< attribute.logic_id() << " cur last index " << last_index.l_offset.index;
return Status::OK();
}

Expand Down Expand Up @@ -468,8 +471,8 @@ Status ConsensusCoordinator::TruncateTo(const LogOffset& offset) {
if (!s.ok()) {
return s;
}
LOG(INFO) << DBInfo(db_name_).ToString() << " Founded truncate pos " << founded_offset.ToString();
LogOffset committed = committed_index();
LOG(INFO) << DBInfo(db_name_).ToString() << " Founded truncate pos "
<< founded_offset.ToString(); LogOffset committed = committed_index();
stable_logger_->Logger()->Lock();
if (founded_offset.l_offset.index == committed.l_offset.index) {
mem_logger_->Reset(committed);
Expand Down Expand Up @@ -593,8 +596,8 @@ Status ConsensusCoordinator::FindBinlogFileNum(const std::map<uint32_t, std::str

Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog(const BinlogOffset& hint_offset, uint64_t target_index,
LogOffset* found_offset) {
LOG(INFO) << DBInfo(db_name_).ToString() << "FindLogicOffsetBySearchingBinlog hint offset " << hint_offset.ToString()
<< " target_index " << target_index;
LOG(INFO) << DBInfo(db_name_).ToString() << "FindLogicOffsetBySearchingBinlog hint offset "
<< hint_offset.ToString() << " target_index " << target_index;
BinlogOffset start_offset;
std::map<uint32_t, std::string> binlogs;
if (!stable_logger_->GetBinlogFiles(&binlogs)) {
Expand Down Expand Up @@ -626,7 +629,8 @@ Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog(const BinlogOffset
}
for (auto& offset : offsets) {
if (offset.l_offset.index == target_index) {
LOG(INFO) << DBInfo(db_name_).ToString() << "Founded " << target_index << " " << offset.ToString();
LOG(INFO) << DBInfo(db_name_).ToString() << "Founded " << target_index << " "
<< offset.ToString();
*found_offset = offset;
return Status::OK();
}
Expand All @@ -642,8 +646,8 @@ Status ConsensusCoordinator::FindLogicOffset(const BinlogOffset& start_offset, u
if (!s.ok()) {
LOG(INFO) << DBInfo(db_name_).ToString() << "GetBinlogOffset res: " << s.ToString();
} else {
LOG(INFO) << DBInfo(db_name_).ToString() << "GetBInlogOffset res: " << s.ToString() << " possible_offset "
<< possible_offset.ToString() << " target_index " << target_index;
LOG(INFO) << DBInfo(db_name_).ToString() << "GetBInlogOffset res: " << s.ToString()
<< " possible_offset " << possible_offset.ToString() << " target_index " << target_index;
}
return FindLogicOffsetBySearchingBinlog(start_offset, target_index, found_offset);
}
Expand Down Expand Up @@ -677,9 +681,9 @@ Status ConsensusCoordinator::GetLogsBefore(const BinlogOffset& start_offset, std
Status ConsensusCoordinator::LeaderNegotiate(const LogOffset& f_last_offset, bool* reject,
std::vector<LogOffset>* hints) {
uint64_t f_index = f_last_offset.l_offset.index;
LOG(INFO) << DBInfo(db_name_).ToString() << "LeaderNeotiate follower last offset " << f_last_offset.ToString()
<< " first_offsert " << stable_logger_->first_offset().ToString() << " last_offset "
<< mem_logger_->last_offset().ToString();
LOG(INFO) << DBInfo(db_name_).ToString() << "LeaderNeotiate follower last offset "
<< f_last_offset.ToString() << " first_offsert " << stable_logger_->first_offset().ToString()
<< " last_offset " << mem_logger_->last_offset().ToString();
*reject = true;
if (f_index > mem_logger_->last_offset().l_offset.index) {
// hints starts from last_offset() - 100;
Expand All @@ -689,7 +693,8 @@ Status ConsensusCoordinator::LeaderNegotiate(const LogOffset& f_last_offset, boo
<< " get logs before last index failed " << s.ToString();
return s;
}
LOG(INFO) << DBInfo(db_name_).ToString() << "follower index larger then last_offset index, get logs before "
LOG(INFO) << DBInfo(db_name_).ToString()
<< "follower index larger then last_offset index, get logs before "
<< mem_logger_->last_offset().ToString();
return Status::OK();
}
Expand All @@ -708,18 +713,21 @@ Status ConsensusCoordinator::LeaderNegotiate(const LogOffset& f_last_offset, boo
Status s = FindLogicOffset(f_last_offset.b_offset, f_index, &found_offset);
if (!s.ok()) {
if (s.IsNotFound()) {
LOG(INFO) << DBInfo(db_name_).ToString() << f_last_offset.ToString() << " not found " << s.ToString();
LOG(INFO) << DBInfo(db_name_).ToString() << f_last_offset.ToString() << " not found "
<< s.ToString();
return s;
} else {
LOG(WARNING) << DBInfo(db_name_).ToString() << "find logic offset failed" << s.ToString();
LOG(WARNING) << DBInfo(db_name_).ToString() << "find logic offset failed"
<< s.ToString();
return s;
}
}

if (found_offset.l_offset.term != f_last_offset.l_offset.term || !(f_last_offset.b_offset == found_offset.b_offset)) {
Status s = GetLogsBefore(found_offset.b_offset, hints);
if (!s.ok()) {
LOG(WARNING) << DBInfo(db_name_).ToString() << "Try to get logs before " << found_offset.ToString() << " failed";
LOG(WARNING) << DBInfo(db_name_).ToString() << "Try to get logs before "
<< found_offset.ToString() << " failed";
return s;
}
return Status::OK();
Expand All @@ -735,8 +743,8 @@ Status ConsensusCoordinator::FollowerNegotiate(const std::vector<LogOffset>& hin
if (hints.empty()) {
return Status::Corruption("hints empty");
}
LOG(INFO) << DBInfo(db_name_).ToString() << "FollowerNegotiate from " << hints[0].ToString() << " to "
<< hints[hints.size() - 1].ToString();
LOG(INFO) << DBInfo(db_name_).ToString() << "FollowerNegotiate from " << hints[0].ToString()
<< " to " << hints[hints.size() - 1].ToString();
if (mem_logger_->last_offset().l_offset.index < hints[0].l_offset.index) {
*reply_offset = mem_logger_->last_offset();
return Status::OK();
Expand All @@ -745,7 +753,7 @@ Status ConsensusCoordinator::FollowerNegotiate(const std::vector<LogOffset>& hin
return Status::Corruption("invalid hints all smaller than committed_index");
}
if (mem_logger_->last_offset().l_offset.index > hints[hints.size() - 1].l_offset.index) {
const auto& truncate_offset = hints[hints.size() - 1];
const auto &truncate_offset = hints[hints.size() - 1];
// trunck to hints end
Status s = TruncateTo(truncate_offset);
if (!s.ok()) {
Expand Down
3 changes: 2 additions & 1 deletion src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) {

std::string db_name = slot.db_name();
std::shared_ptr<SyncMasterDB> master_db =
g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); if (!master_db) {
g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!master_db) {
LOG(WARNING) << "Sync Master DB: " << db_name << ", NotFound";
}
Status s = master_db->RemoveSlaveNode(node.ip(), node.port());
Expand Down
4 changes: 2 additions & 2 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,6 @@ pstd::Status PikaServer::GetDumpUUID(const std::string& db_name, std::string* sn
}

pstd::Status PikaServer::GetDumpMeta(const std::string& db_name, std::vector<std::string>* fileNames, std::string* snapshot_uuid) {

std::shared_ptr<DB> db = GetDB(db_name);
if (!db) {
LOG(WARNING) << "cannot find db for db_name " << db_name;
Expand All @@ -849,7 +848,8 @@ pstd::Status PikaServer::GetDumpMeta(const std::string& db_name, std::vector<std
}

void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& db_name,
int32_t top) { std::shared_ptr<DB> db = GetDB(db_name);
int32_t top) {
std::shared_ptr<DB> db = GetDB(db_name);
if (!db) {
LOG(WARNING) << "can not find DB : " << db_name
<< ", TryDBSync Failed";
Expand Down

0 comments on commit 03d4a6e

Please sign in to comment.