From 03d4a6ef3efdf6ea5828ae989da454d36dca5d7d Mon Sep 17 00:00:00 2001 From: buzhimingyonghu <1512049108@qq.com> Date: Tue, 14 Jan 2025 19:49:32 +0000 Subject: [PATCH] adjust code --- include/pika_server.h | 8 +++-- src/pika_consensus.cc | 64 ++++++++++++++++++++---------------- src/pika_repl_server_conn.cc | 3 +- src/pika_server.cc | 4 +-- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/include/pika_server.h b/include/pika_server.h index 2e066957b..5128f1428 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -271,8 +271,12 @@ class PikaServer : public pstd::noncopyable { /* * Disk usage statistic */ - uint64_t GetDBSize() const { return disk_statistic_.db_size_.load(); } - uint64_t GetLogSize() const { return disk_statistic_.log_size_.load(); } + uint64_t GetDBSize() const { + return disk_statistic_.db_size_.load(); + } + uint64_t GetLogSize() const { + return disk_statistic_.log_size_.load(); + } /* * Network Statistic used diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index f16f67fc3..33f3602ce 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -22,7 +22,7 @@ extern std::unique_ptr g_pika_cmd_table_manager; /* Context */ -Context::Context(std::string path) : path_(std::move(path)) {} +Context::Context(std::string path) : path_(std::move(path)) {} Status Context::StableSave() { char* p = save_->GetData(); @@ -81,7 +81,9 @@ void Context::Reset(const LogOffset& offset) { /* SyncProgress */ -std::string MakeSlaveKey(const std::string& ip, int port) { return ip + ":" + std::to_string(port); } +std::string MakeSlaveKey(const std::string& ip, int port) { + return ip + ":" + std::to_string(port); +} std::shared_ptr SyncProgress::GetSlaveNode(const std::string& ip, int port) { std::string slave_key = MakeSlaveKey(ip, port); @@ -159,7 +161,7 @@ int SyncProgress::SlaveSize() { /* MemLog */ -MemLog::MemLog() = default; +MemLog::MemLog() = default; int MemLog::Size() { return static_cast(logs_.size()); } @@ -217,7 +219,8 @@ int MemLog::InternalFindLogByBinlogOffset(const LogOffset& offset) { /* ConsensusCoordinator */ -ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name) : db_name_(db_name) { +ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name) + : db_name_(db_name) { std::string db_log_path = g_pika_conf->log_path() + "log_" + db_name + "/"; std::string log_path = db_log_path; context_ = std::make_shared(log_path + kContext); @@ -237,8 +240,8 @@ void ConsensusCoordinator::Init() { // load term_ term_ = stable_logger_->Logger()->term(); - LOG(INFO) << DBInfo(db_name_).ToString() << "Restore applied index " << context_->applied_index_.ToString() - << " current term " << term_; + LOG(INFO) << DBInfo(db_name_).ToString() << "Restore applied index " + << context_->applied_index_.ToString() << " current term " << term_; if (committed_index_ == LogOffset()) { return; } @@ -300,7 +303,8 @@ Status ConsensusCoordinator::Reset(const LogOffset& offset) { Status s = stable_logger_->Logger()->SetProducerStatus(offset.b_offset.filenum, offset.b_offset.offset, offset.l_offset.term, offset.l_offset.index); if (!s.ok()) { - LOG(WARNING) << DBInfo(db_name_).ToString() << "Consensus reset status failed " << s.ToString(); + LOG(WARNING) << DBInfo(db_name_).ToString() << "Consensus reset status failed " + << s.ToString(); return s; } @@ -316,8 +320,7 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr& cmd_ptr) { std::vector keys = cmd_ptr->current_key(); // slotkey shouldn't add binlog if (cmd_ptr->name() == kCmdNameSAdd && !keys.empty() && - (keys[0].compare(0, SlotKeyPrefix.length(), SlotKeyPrefix) == 0 || - keys[0].compare(0, SlotTagPrefix.length(), SlotTagPrefix) == 0)) { + (keys[0].compare(0, SlotKeyPrefix.length(), SlotKeyPrefix) == 0 || keys[0].compare(0, SlotTagPrefix.length(), SlotTagPrefix) == 0)) { return Status::OK(); } @@ -339,8 +342,8 @@ Status ConsensusCoordinator::InternalAppendLog(const std::shared_ptr& cmd_p Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { LogOffset last_index = mem_logger_->last_offset(); if (attribute.logic_id() < last_index.l_offset.index) { - LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id " << attribute.logic_id() - << " cur last index " << last_index.l_offset.index; + LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id " + << attribute.logic_id() << " cur last index " << last_index.l_offset.index; return Status::OK(); } @@ -468,8 +471,8 @@ Status ConsensusCoordinator::TruncateTo(const LogOffset& offset) { if (!s.ok()) { return s; } - LOG(INFO) << DBInfo(db_name_).ToString() << " Founded truncate pos " << founded_offset.ToString(); - LogOffset committed = committed_index(); + LOG(INFO) << DBInfo(db_name_).ToString() << " Founded truncate pos " + << founded_offset.ToString(); LogOffset committed = committed_index(); stable_logger_->Logger()->Lock(); if (founded_offset.l_offset.index == committed.l_offset.index) { mem_logger_->Reset(committed); @@ -593,8 +596,8 @@ Status ConsensusCoordinator::FindBinlogFileNum(const std::map binlogs; if (!stable_logger_->GetBinlogFiles(&binlogs)) { @@ -626,7 +629,8 @@ Status ConsensusCoordinator::FindLogicOffsetBySearchingBinlog(const BinlogOffset } for (auto& offset : offsets) { if (offset.l_offset.index == target_index) { - LOG(INFO) << DBInfo(db_name_).ToString() << "Founded " << target_index << " " << offset.ToString(); + LOG(INFO) << DBInfo(db_name_).ToString() << "Founded " << target_index << " " + << offset.ToString(); *found_offset = offset; return Status::OK(); } @@ -642,8 +646,8 @@ Status ConsensusCoordinator::FindLogicOffset(const BinlogOffset& start_offset, u if (!s.ok()) { LOG(INFO) << DBInfo(db_name_).ToString() << "GetBinlogOffset res: " << s.ToString(); } else { - LOG(INFO) << DBInfo(db_name_).ToString() << "GetBInlogOffset res: " << s.ToString() << " possible_offset " - << possible_offset.ToString() << " target_index " << target_index; + LOG(INFO) << DBInfo(db_name_).ToString() << "GetBInlogOffset res: " << s.ToString() + << " possible_offset " << possible_offset.ToString() << " target_index " << target_index; } return FindLogicOffsetBySearchingBinlog(start_offset, target_index, found_offset); } @@ -677,9 +681,9 @@ Status ConsensusCoordinator::GetLogsBefore(const BinlogOffset& start_offset, std Status ConsensusCoordinator::LeaderNegotiate(const LogOffset& f_last_offset, bool* reject, std::vector* hints) { uint64_t f_index = f_last_offset.l_offset.index; - LOG(INFO) << DBInfo(db_name_).ToString() << "LeaderNeotiate follower last offset " << f_last_offset.ToString() - << " first_offsert " << stable_logger_->first_offset().ToString() << " last_offset " - << mem_logger_->last_offset().ToString(); + LOG(INFO) << DBInfo(db_name_).ToString() << "LeaderNeotiate follower last offset " + << f_last_offset.ToString() << " first_offsert " << stable_logger_->first_offset().ToString() + << " last_offset " << mem_logger_->last_offset().ToString(); *reject = true; if (f_index > mem_logger_->last_offset().l_offset.index) { // hints starts from last_offset() - 100; @@ -689,7 +693,8 @@ Status ConsensusCoordinator::LeaderNegotiate(const LogOffset& f_last_offset, boo << " get logs before last index failed " << s.ToString(); return s; } - LOG(INFO) << DBInfo(db_name_).ToString() << "follower index larger then last_offset index, get logs before " + LOG(INFO) << DBInfo(db_name_).ToString() + << "follower index larger then last_offset index, get logs before " << mem_logger_->last_offset().ToString(); return Status::OK(); } @@ -708,10 +713,12 @@ Status ConsensusCoordinator::LeaderNegotiate(const LogOffset& f_last_offset, boo Status s = FindLogicOffset(f_last_offset.b_offset, f_index, &found_offset); if (!s.ok()) { if (s.IsNotFound()) { - LOG(INFO) << DBInfo(db_name_).ToString() << f_last_offset.ToString() << " not found " << s.ToString(); + LOG(INFO) << DBInfo(db_name_).ToString() << f_last_offset.ToString() << " not found " + << s.ToString(); return s; } else { - LOG(WARNING) << DBInfo(db_name_).ToString() << "find logic offset failed" << s.ToString(); + LOG(WARNING) << DBInfo(db_name_).ToString() << "find logic offset failed" + << s.ToString(); return s; } } @@ -719,7 +726,8 @@ Status ConsensusCoordinator::LeaderNegotiate(const LogOffset& f_last_offset, boo if (found_offset.l_offset.term != f_last_offset.l_offset.term || !(f_last_offset.b_offset == found_offset.b_offset)) { Status s = GetLogsBefore(found_offset.b_offset, hints); if (!s.ok()) { - LOG(WARNING) << DBInfo(db_name_).ToString() << "Try to get logs before " << found_offset.ToString() << " failed"; + LOG(WARNING) << DBInfo(db_name_).ToString() << "Try to get logs before " + << found_offset.ToString() << " failed"; return s; } return Status::OK(); @@ -735,8 +743,8 @@ Status ConsensusCoordinator::FollowerNegotiate(const std::vector& hin if (hints.empty()) { return Status::Corruption("hints empty"); } - LOG(INFO) << DBInfo(db_name_).ToString() << "FollowerNegotiate from " << hints[0].ToString() << " to " - << hints[hints.size() - 1].ToString(); + LOG(INFO) << DBInfo(db_name_).ToString() << "FollowerNegotiate from " << hints[0].ToString() + << " to " << hints[hints.size() - 1].ToString(); if (mem_logger_->last_offset().l_offset.index < hints[0].l_offset.index) { *reply_offset = mem_logger_->last_offset(); return Status::OK(); @@ -745,7 +753,7 @@ Status ConsensusCoordinator::FollowerNegotiate(const std::vector& hin return Status::Corruption("invalid hints all smaller than committed_index"); } if (mem_logger_->last_offset().l_offset.index > hints[hints.size() - 1].l_offset.index) { - const auto& truncate_offset = hints[hints.size() - 1]; + const auto &truncate_offset = hints[hints.size() - 1]; // trunck to hints end Status s = TruncateTo(truncate_offset); if (!s.ok()) { diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 562654ddb..a75a2455e 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -450,7 +450,8 @@ void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) { std::string db_name = slot.db_name(); std::shared_ptr master_db = - g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); if (!master_db) { + g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); + if (!master_db) { LOG(WARNING) << "Sync Master DB: " << db_name << ", NotFound"; } Status s = master_db->RemoveSlaveNode(node.ip(), node.port()); diff --git a/src/pika_server.cc b/src/pika_server.cc index 5eaad4c55..6d49e1b8f 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -838,7 +838,6 @@ pstd::Status PikaServer::GetDumpUUID(const std::string& db_name, std::string* sn } pstd::Status PikaServer::GetDumpMeta(const std::string& db_name, std::vector* fileNames, std::string* snapshot_uuid) { - std::shared_ptr db = GetDB(db_name); if (!db) { LOG(WARNING) << "cannot find db for db_name " << db_name; @@ -849,7 +848,8 @@ pstd::Status PikaServer::GetDumpMeta(const std::string& db_name, std::vector db = GetDB(db_name); + int32_t top) { + std::shared_ptr db = GetDB(db_name); if (!db) { LOG(WARNING) << "can not find DB : " << db_name << ", TryDBSync Failed";