From 951dc63c652d940e33db266bae7fa520c51b7f2b Mon Sep 17 00:00:00 2001 From: buzhimingyonghu <1512049108@qq.com> Date: Sun, 15 Dec 2024 17:05:47 +0000 Subject: [PATCH 1/2] Preliminary completion slave write db --- include/pika_consensus.h | 15 +++++- include/pika_define.h | 18 ++++++- include/pika_repl_bgworker.h | 1 + include/pika_repl_client_conn.h | 1 + include/pika_rm.h | 5 ++ src/pika_consensus.cc | 87 +++++++++++++++++++++++++-------- src/pika_inner_message.proto | 21 +++++++- src/pika_repl_bgworker.cc | 6 +++ src/pika_repl_client_conn.cc | 31 ++++++++++++ src/pika_rm.cc | 9 ++++ 10 files changed, 170 insertions(+), 24 deletions(-) diff --git a/include/pika_consensus.h b/include/pika_consensus.h index bb774b5e3b..cae952f9d2 100644 --- a/include/pika_consensus.h +++ b/include/pika_consensus.h @@ -13,6 +13,9 @@ #include "include/pika_client_conn.h" #include "include/pika_slave_node.h" #include "include/pika_stable_log.h" +#include "include/pika_define.h" +#include "pstd/include/env.h" + class Context : public pstd::noncopyable { public: @@ -114,10 +117,16 @@ class ConsensusCoordinator { pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id); pstd::Status RemoveSlaveNode(const std::string& ip, int port); void UpdateTerm(uint32_t term); + void PutOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){ + offset_index[win_offset] = binlog_offset; + } uint32_t term(); // invoked by follower - pstd::Status ProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute); + pstd::Status ProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute, bool is_write_db=true); + pstd::Status ProcessLeaderDB(const uint64_t binlogoffset); + void GetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset); + // Negotiate pstd::Status LeaderNegotiate(const LogOffset& f_last_offset, bool* reject, std::vector* hints); @@ -199,5 +208,9 @@ class ConsensusCoordinator { SyncProgress sync_pros_; std::shared_ptr stable_logger_; std::shared_ptr mem_logger_; + std::unordered_map> binlog_index; + std::unordered_mapoffset_index; + LogOffset end_db_offset_=LogOffset(); + LogOffset begin_db_offset_=LogOffset(); }; #endif // INCLUDE_PIKA_CONSENSUS_H_ diff --git a/include/pika_define.h b/include/pika_define.h index 3968f9072f..81d99c3f53 100644 --- a/include/pika_define.h +++ b/include/pika_define.h @@ -184,6 +184,12 @@ struct BinlogChip { } }; +struct DbWriteChip { + LogOffset offset_; + DbWriteChip(const LogOffset& offset) : offset_(offset) {} + DbWriteChip(const DbWriteChip& chip) { offset_ = chip.offset_; } +}; + struct DBInfo { DBInfo(std::string db_name) : db_name_(std::move(db_name)) {} @@ -210,7 +216,11 @@ struct hash_db_info { return std::hash()(n.db_name_); } }; - +struct hash_db_write_info { + size_t operator()(const LogOffset& n) const { + return std::hash()(n.b_offset.offset); + } +}; class Node { public: Node(std::string ip, int port) : ip_(std::move(ip)), port_(port) {} @@ -273,10 +283,14 @@ class RmNode : public Node { struct WriteTask { struct RmNode rm_node_; - struct BinlogChip binlog_chip_; + struct BinlogChip binlog_chip_ = BinlogChip(LogOffset(), ""); + struct DbWriteChip db_write_chip_ = DbWriteChip(LogOffset()); + bool is_db_write_ = false; LogOffset prev_offset_; WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset) : rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset) {} + WriteTask(const RmNode& rm_node, const DbWriteChip& db_write_chip, const LogOffset& prev_offset) + : rm_node_(rm_node), db_write_chip_(db_write_chip), is_db_write_(true), prev_offset_(prev_offset) {} }; // slowlog define diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h index dd62622fb9..fc72749fa9 100644 --- a/include/pika_repl_bgworker.h +++ b/include/pika_repl_bgworker.h @@ -34,6 +34,7 @@ class PikaReplBgWorker { void Schedule(net::TaskFunc func, void* arg, std::function& call_back); static void HandleBGWorkerWriteBinlog(void* arg); static void HandleBGWorkerWriteDB(void* arg); + static void HandleBGWorkerDB(void* arg); static void WriteDBInSyncWay(const std::shared_ptr& c_ptr); void SetThreadName(const std::string& thread_name) { bg_thread_.set_thread_name(thread_name); diff --git a/include/pika_repl_client_conn.h b/include/pika_repl_client_conn.h index bfd697dfa0..31d4ac11bb 100644 --- a/include/pika_repl_client_conn.h +++ b/include/pika_repl_client_conn.h @@ -24,6 +24,7 @@ class PikaReplClientConn : public net::PbConn { ~PikaReplClientConn() override = default; static void HandleMetaSyncResponse(void* arg); + static void HandleDbWriteResponse(void* arg); static void HandleDBSyncResponse(void* arg); static void HandleTrySyncResponse(void* arg); static void HandleRemoveSlaveNodeResponse(void* arg); diff --git a/include/pika_rm.h b/include/pika_rm.h index ec80c1ff58..39b4a0a16f 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -70,6 +70,8 @@ class SyncMasterDB : public SyncDB { pstd::Status ConsensusUpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end); pstd::Status ConsensusProposeLog(const std::shared_ptr& cmd_ptr); pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute); + pstd::Status ConsensusProcessLeaderDB(const uint64_t offset); + void ConsensusGetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset); LogOffset ConsensusCommittedIndex(); LogOffset ConsensusLastIndex(); @@ -81,6 +83,9 @@ class SyncMasterDB : public SyncDB { } return coordinator_.StableLogger()->Logger(); } + void PutCoordinatorOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){ + coordinator_.PutOffsetIndex(win_offset,binlog_offset); + } private: // invoker need to hold slave_mu_ diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 89f10e0317..ac87e5b0f7 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -338,33 +338,80 @@ Status ConsensusCoordinator::InternalAppendLog(const std::shared_ptr& cmd_p } // precheck if prev_offset match && drop this log if this log exist -Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { +Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute, bool is_write_db) { LogOffset last_index = mem_logger_->last_offset(); if (attribute.logic_id() < last_index.l_offset.index) { LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id " << attribute.logic_id() << " cur last index " << last_index.l_offset.index; return Status::OK(); } + if(is_write_db){ + auto opt = cmd_ptr->argv()[0]; + if (pstd::StringToLower(opt) != kCmdNameFlushdb) { + // apply binlog in sync way + Status s = InternalAppendLog(cmd_ptr); + // apply db in async way + InternalApplyFollower(cmd_ptr); + } else { + // this is a flushdb-binlog, both apply binlog and apply db are in sync way + // ensure all writeDB task that submitted before has finished before we exec this flushdb + int32_t wait_ms = 250; + while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms)); + wait_ms *= 2; + wait_ms = wait_ms < 3000 ? wait_ms : 3000; + } + // apply flushdb-binlog in sync way + Status s = InternalAppendLog(cmd_ptr); + // applyDB in sync way + PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); + } + return Status::OK(); + } + Status s = InternalAppendLog(cmd_ptr); + binlog_index[attribute.offset()] = cmd_ptr; + return Status::OK(); + +} - auto opt = cmd_ptr->argv()[0]; - if (pstd::StringToLower(opt) != kCmdNameFlushdb) { - // apply binlog in sync way - Status s = InternalAppendLog(cmd_ptr); - // apply db in async way - InternalApplyFollower(cmd_ptr); - } else { - // this is a flushdb-binlog, both apply binlog and apply db are in sync way - // ensure all writeDB task that submitted before has finished before we exec this flushdb - int32_t wait_ms = 250; - while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms)); - wait_ms *= 2; - wait_ms = wait_ms < 3000 ? wait_ms : 3000; - } - // apply flushdb-binlog in sync way - Status s = InternalAppendLog(cmd_ptr); - // applyDB in sync way - PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); +void ConsensusCoordinator::GetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset) +{ + end_offset =end_db_offset_; + begin_offset=begin_db_offset_; +} + +Status ConsensusCoordinator::ProcessLeaderDB(const uint64_t offset) { + end_db_offset_=LogOffset(); + begin_db_offset_=LogOffset(); + begin_db_offset_.b_offset.offset=UINT64_MAX; + for (const auto& iter : offset_index) { + if (iter.first.b_offset.offset > offset || binlog_index.count(iter.second) == 0) { + continue; + } + auto cmd_ptr = binlog_index[iter.second]; + auto opt = cmd_ptr->argv()[0]; + if (pstd::StringToLower(opt) != kCmdNameFlushdb) { + InternalApplyFollower(cmd_ptr); + } else { + // this is a flushdb-binlog, both apply binlog and apply db are in sync way + // ensure all writeDB task that submitted before has finished before we exec this flushdb + int32_t wait_ms = 250; + while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms)); + wait_ms *= 2; + wait_ms = wait_ms < 3000 ? wait_ms : 3000; + } + // apply flushdb-binlog in sync way + Status s = InternalAppendLog(cmd_ptr); + // applyDB in sync way + PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr); + } + if (iter.first.b_offset.offset > end_db_offset_.b_offset.offset) { + end_db_offset_ = iter.first; + } + if (iter.first.b_offset.offset < begin_db_offset_.b_offset.offset) { + begin_db_offset_ = iter.first; + } } return Status::OK(); } diff --git a/src/pika_inner_message.proto b/src/pika_inner_message.proto index 9e2a3ef04c..a3406bab2e 100644 --- a/src/pika_inner_message.proto +++ b/src/pika_inner_message.proto @@ -8,6 +8,7 @@ enum Type { kBinlogSync = 4; kHeatBeat = 5; kRemoveSlaveNode = 6; + kDbWrite = 7; } enum StatusCode { @@ -86,6 +87,16 @@ message InnerRequest { required bool first_send = 7; } + message DbWriteSync { + required Node node = 1; + required string db_name = 2; + required uint32 slot_id = 3; + required BinlogOffset ack_range_start = 4; + required BinlogOffset ack_range_end = 5; + required int32 session_id = 6; + required bool succ = 7; + } + message RemoveSlaveNode { required Node node = 1; required Slot slot = 2; @@ -98,6 +109,7 @@ message InnerRequest { optional BinlogSync binlog_sync = 5; repeated RemoveSlaveNode remove_slave_node = 6; optional ConsensusMeta consensus_meta = 7; + optional DbWriteSync db_write_sync = 8; } message SlotInfo { @@ -148,6 +160,12 @@ message InnerResponse { required int32 session_id = 4; } + message DbWriteSync { + required Slot slot = 1; + required BinlogOffset db_write_offset = 2; + required int32 session_id = 3; + } + message RemoveSlaveNode { required Node node = 1; required Slot slot = 2; @@ -163,4 +181,5 @@ message InnerResponse { repeated RemoveSlaveNode remove_slave_node = 8; // consensus use optional ConsensusMeta consensus_meta = 9; -} + optional DbWriteSync db_write_sync = 10; +} \ No newline at end of file diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 1e12ffdf0a..583548b04f 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -139,6 +139,12 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { slave_db->SetReplState(ReplState::kTryConnect); return; } + + std::shared_ptr db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); + LogOffset cur_logoffset; + ParseBinlogOffset(binlog_res.binlog_offset(),&cur_logoffset); + db->PutCoordinatorOffsetIndex(cur_logoffset,worker->binlog_item_.offset()); + const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN; int redis_parser_len = static_cast(binlog_res.binlog().size()) - BINLOG_ENCODE_LEN; int processed_len = 0; diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 8fb30d9306..7facd6a19b 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -82,12 +82,43 @@ int PikaReplClientConn::DealMessage() { static_cast(task_arg)); break; } + case InnerMessage::kDbWrite: { + const std::string& db_name = response->try_sync().slot().db_name(); + assert(!db_name.empty()); + auto task_arg = + new ReplClientTaskArg(response, std::dynamic_pointer_cast(shared_from_this())); + g_pika_rm->ScheduleReplClientBGTaskByDBName(&PikaReplClientConn::HandleDbWriteResponse, + static_cast(task_arg), db_name); + break; + } default: break; } return 0; } +void PikaReplClientConn::HandleDbWriteResponse(void* arg) { + std::unique_ptr task_arg(static_cast(arg)); + std::shared_ptr conn = task_arg->conn; + std::shared_ptr response = task_arg->res; + + const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync(); + int32_t session_id = dbwrite_sync_response.session_id(); + const InnerMessage::Slot& db_response = dbwrite_sync_response.slot(); + const std::string& db_name = db_response.db_name(); + const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset(); + + std::shared_ptr db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); + if (!db) { + LOG(WARNING) << db_name << "Not found."; + } + db->ConsensusProcessLeaderDB(binlog_offset.offset()); + LogOffset ack_end; + LogOffset ack_start; + db->ConsensusGetwriteDBOffset(ack_end,ack_start); + g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end); +} + void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { std::unique_ptr task_arg(static_cast(arg)); std::shared_ptr conn = task_arg->conn; diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 9df7b82101..156fd49378 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -384,10 +384,19 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr& cmd_ptr) { return coordinator_.ProposeLog(cmd_ptr); } +void SyncMasterDB::ConsensusGetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset) +{ + coordinator_.GetwriteDBOffset(end_offset, begin_offset); +} + Status SyncMasterDB::ConsensusProcessLeaderLog(const std::shared_ptr& cmd_ptr, const BinlogItem& attribute) { return coordinator_.ProcessLeaderLog(cmd_ptr, attribute); } +Status SyncMasterDB::ConsensusProcessLeaderDB(const uint64_t offset) { + return coordinator_.ProcessLeaderDB(offset); +} + LogOffset SyncMasterDB::ConsensusCommittedIndex() { return coordinator_.committed_index(); } LogOffset SyncMasterDB::ConsensusLastIndex() { return coordinator_.MemLogger()->last_offset(); } From 917050db1781b2e31be53cacafba88609ede9542 Mon Sep 17 00:00:00 2001 From: buzhimingyonghu <1512049108@qq.com> Date: Sun, 15 Dec 2024 17:28:44 +0000 Subject: [PATCH 2/2] add is_write_db --- src/pika_repl_bgworker.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 583548b04f..0b1e8e2c79 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -110,7 +110,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { LOG(WARNING) << "Slave DB " << db_name << " Not Found"; return; } - + bool is_write_db=true; for (int i : *index) { const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync(i); // if pika are not current a slave or DB not in @@ -139,12 +139,14 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { slave_db->SetReplState(ReplState::kTryConnect); return; } - + + if(!is_write_db){ std::shared_ptr db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); LogOffset cur_logoffset; ParseBinlogOffset(binlog_res.binlog_offset(),&cur_logoffset); db->PutCoordinatorOffsetIndex(cur_logoffset,worker->binlog_item_.offset()); - + } + const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN; int redis_parser_len = static_cast(binlog_res.binlog().size()) - BINLOG_ENCODE_LEN; int processed_len = 0;