Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PacificA slave data replication Consistency scheme #2975

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "include/pika_client_conn.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "include/pika_define.h"
#include "pstd/include/env.h"


class Context : public pstd::noncopyable {
public:
Expand Down Expand Up @@ -114,10 +117,16 @@ class ConsensusCoordinator {
pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id);
pstd::Status RemoveSlaveNode(const std::string& ip, int port);
void UpdateTerm(uint32_t term);
void PutOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
offset_index[win_offset] = binlog_offset;
}
Comment on lines +120 to +122
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pass LogOffset by const reference to improve performance

The PutOffsetIndex method accepts LogOffset win_offset by value. Passing it by const reference avoids unnecessary copies and improves performance.

Apply this diff:

-void PutOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
+void PutOffsetIndex(const LogOffset& win_offset, uint64_t binlog_offset){
    offset_index[win_offset] = binlog_offset;
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PutOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
offset_index[win_offset] = binlog_offset;
}
void PutOffsetIndex(const LogOffset& win_offset,uint64_t binlog_offset){
offset_index[win_offset] = binlog_offset;
}

uint32_t term();

// invoked by follower
pstd::Status ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute, bool is_write_db=true);
pstd::Status ProcessLeaderDB(const uint64_t binlogoffset);
void GetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset);


// Negotiate
pstd::Status LeaderNegotiate(const LogOffset& f_last_offset, bool* reject, std::vector<LogOffset>* hints);
Expand Down Expand Up @@ -199,5 +208,9 @@ class ConsensusCoordinator {
SyncProgress sync_pros_;
std::shared_ptr<StableLog> stable_logger_;
std::shared_ptr<MemLog> mem_logger_;
std::unordered_map<uint64_t, std::shared_ptr<Cmd>> binlog_index;
std::unordered_map<LogOffset,uint64_t,hash_db_write_info>offset_index;
LogOffset end_db_offset_=LogOffset();
LogOffset begin_db_offset_=LogOffset();
};
#endif // INCLUDE_PIKA_CONSENSUS_H_
18 changes: 16 additions & 2 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ struct BinlogChip {
}
};

struct DbWriteChip {
LogOffset offset_;
DbWriteChip(const LogOffset& offset) : offset_(offset) {}
DbWriteChip(const DbWriteChip& chip) { offset_ = chip.offset_; }
};

struct DBInfo {
DBInfo(std::string db_name)
: db_name_(std::move(db_name)) {}
Expand All @@ -210,7 +216,11 @@ struct hash_db_info {
return std::hash<std::string>()(n.db_name_);
}
};

struct hash_db_write_info {
size_t operator()(const LogOffset& n) const {
return std::hash<uint64_t>()(n.b_offset.offset);
}
};
class Node {
public:
Node(std::string ip, int port) : ip_(std::move(ip)), port_(port) {}
Expand Down Expand Up @@ -273,10 +283,14 @@ class RmNode : public Node {

struct WriteTask {
struct RmNode rm_node_;
struct BinlogChip binlog_chip_;
struct BinlogChip binlog_chip_ = BinlogChip(LogOffset(), "");
struct DbWriteChip db_write_chip_ = DbWriteChip(LogOffset());
bool is_db_write_ = false;
LogOffset prev_offset_;
WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset)
: rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset) {}
WriteTask(const RmNode& rm_node, const DbWriteChip& db_write_chip, const LogOffset& prev_offset)
: rm_node_(rm_node), db_write_chip_(db_write_chip), is_db_write_(true), prev_offset_(prev_offset) {}
};

// slowlog define
Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class PikaReplBgWorker {
void Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back);
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
static void HandleBGWorkerDB(void* arg);
static void WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr);
void SetThreadName(const std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class PikaReplClientConn : public net::PbConn {
~PikaReplClientConn() override = default;

static void HandleMetaSyncResponse(void* arg);
static void HandleDbWriteResponse(void* arg);
static void HandleDBSyncResponse(void* arg);
static void HandleTrySyncResponse(void* arg);
static void HandleRemoveSlaveNodeResponse(void* arg);
Expand Down
5 changes: 5 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class SyncMasterDB : public SyncDB {
pstd::Status ConsensusUpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end);
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status ConsensusProcessLeaderDB(const uint64_t offset);
void ConsensusGetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset);
LogOffset ConsensusCommittedIndex();
LogOffset ConsensusLastIndex();

Expand All @@ -81,6 +83,9 @@ class SyncMasterDB : public SyncDB {
}
return coordinator_.StableLogger()->Logger();
}
void PutCoordinatorOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
coordinator_.PutOffsetIndex(win_offset,binlog_offset);
}
Comment on lines +86 to +88
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pass LogOffset by const reference and fix formatting

  • Pass win_offset by const reference in PutCoordinatorOffsetIndex to avoid unnecessary copying.
  • Add spaces after commas for consistency.

Apply this diff:

-void PutCoordinatorOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
+void PutCoordinatorOffsetIndex(const LogOffset& win_offset, uint64_t binlog_offset){
    coordinator_.PutOffsetIndex(win_offset, binlog_offset);
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PutCoordinatorOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
coordinator_.PutOffsetIndex(win_offset,binlog_offset);
}
void PutCoordinatorOffsetIndex(const LogOffset& win_offset, uint64_t binlog_offset){
coordinator_.PutOffsetIndex(win_offset, binlog_offset);
}


private:
// invoker need to hold slave_mu_
Expand Down
87 changes: 67 additions & 20 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,33 +338,80 @@ Status ConsensusCoordinator::InternalAppendLog(const std::shared_ptr<Cmd>& cmd_p
}

// precheck if prev_offset match && drop this log if this log exist
Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute, bool is_write_db) {
LogOffset last_index = mem_logger_->last_offset();
if (attribute.logic_id() < last_index.l_offset.index) {
LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id "
<< attribute.logic_id() << " cur last index " << last_index.l_offset.index;
return Status::OK();
}
if(is_write_db){
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}
Status s = InternalAppendLog(cmd_ptr);
binlog_index[attribute.offset()] = cmd_ptr;
return Status::OK();

}
Comment on lines +341 to +375
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor ProcessLeaderLog method for better maintainability

The ProcessLeaderLog method contains deeply nested conditionals and repetitive code segments. Refactor the method to improve readability and maintainability.

Consider extracting the flushdb handling logic into a separate private method and simplifying the nested if-else blocks.

Comment on lines +348 to +375
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential infinite loop in waiting mechanism

The loop waiting for unfinished write DB tasks doubles wait_ms each time without an upper bound on total wait time. This could lead to prolonged waits or an infinite loop.

Apply this diff to implement a maximum total wait time:

 int32_t wait_ms = 250;
+int32_t total_wait_ms = 0;
 while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
   std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
+  total_wait_ms += wait_ms;
   wait_ms *= 2;
   wait_ms = wait_ms < 3000 ? wait_ms : 3000;
+  if (total_wait_ms >= 10000) { // Maximum total wait time of 10 seconds
+     LOG(WARNING) << "Exceeded maximum wait time for unfinished write tasks.";
+     break;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if(is_write_db){
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}
Status s = InternalAppendLog(cmd_ptr);
binlog_index[attribute.offset()] = cmd_ptr;
return Status::OK();
}
if(is_write_db){
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
int32_t total_wait_ms = 0;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
total_wait_ms += wait_ms;
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
if (total_wait_ms >= 10000) { // Maximum total wait time of 10 seconds
LOG(WARNING) << "Exceeded maximum wait time for unfinished write tasks.";
break;
}
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}
Status s = InternalAppendLog(cmd_ptr);
binlog_index[attribute.offset()] = cmd_ptr;
return Status::OK();
}


auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
void ConsensusCoordinator::GetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset)
{
end_offset =end_db_offset_;
begin_offset=begin_db_offset_;
}

Status ConsensusCoordinator::ProcessLeaderDB(const uint64_t offset) {
end_db_offset_=LogOffset();
begin_db_offset_=LogOffset();
begin_db_offset_.b_offset.offset=UINT64_MAX;
for (const auto& iter : offset_index) {
if (iter.first.b_offset.offset > offset || binlog_index.count(iter.second) == 0) {
continue;
}
auto cmd_ptr = binlog_index[iter.second];
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
if (iter.first.b_offset.offset > end_db_offset_.b_offset.offset) {
Comment on lines +383 to +409
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure thread safety when accessing shared resources

The method ProcessLeaderDB accesses offset_index and binlog_index without any synchronization mechanisms. If these maps are accessed concurrently, it could lead to race conditions.

Add appropriate mutex locks to ensure thread safety when accessing shared data structures.

end_db_offset_ = iter.first;
}
if (iter.first.b_offset.offset < begin_db_offset_.b_offset.offset) {
begin_db_offset_ = iter.first;
}
}
return Status::OK();
}
Expand Down
21 changes: 20 additions & 1 deletion src/pika_inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ enum Type {
kBinlogSync = 4;
kHeatBeat = 5;
kRemoveSlaveNode = 6;
kDbWrite = 7;
}

enum StatusCode {
Expand Down Expand Up @@ -86,6 +87,16 @@ message InnerRequest {
required bool first_send = 7;
}

message DbWriteSync {
required Node node = 1;
required string db_name = 2;
required uint32 slot_id = 3;
required BinlogOffset ack_range_start = 4;
required BinlogOffset ack_range_end = 5;
required int32 session_id = 6;
required bool succ = 7;
}

message RemoveSlaveNode {
required Node node = 1;
required Slot slot = 2;
Expand All @@ -98,6 +109,7 @@ message InnerRequest {
optional BinlogSync binlog_sync = 5;
repeated RemoveSlaveNode remove_slave_node = 6;
optional ConsensusMeta consensus_meta = 7;
optional DbWriteSync db_write_sync = 8;
}

message SlotInfo {
Expand Down Expand Up @@ -148,6 +160,12 @@ message InnerResponse {
required int32 session_id = 4;
}

message DbWriteSync {
required Slot slot = 1;
required BinlogOffset db_write_offset = 2;
required int32 session_id = 3;
}

message RemoveSlaveNode {
required Node node = 1;
required Slot slot = 2;
Expand All @@ -163,4 +181,5 @@ message InnerResponse {
repeated RemoveSlaveNode remove_slave_node = 8;
// consensus use
optional ConsensusMeta consensus_meta = 9;
}
optional DbWriteSync db_write_sync = 10;
}
10 changes: 9 additions & 1 deletion src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
LOG(WARNING) << "Slave DB " << db_name << " Not Found";
return;
}

bool is_write_db=true;
for (int i : *index) {
const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync(i);
// if pika are not current a slave or DB not in
Expand Down Expand Up @@ -139,6 +139,14 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
slave_db->SetReplState(ReplState::kTryConnect);
return;
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for binlog processing failures.

The code continues processing even if binlog item decode fails. Consider adding proper error handling and logging.

+    if (worker->binlog_item_.offset().empty()) {
+      LOG(WARNING) << "Invalid binlog offset after decode";
+      slave_db->SetReplState(ReplState::kTryConnect);
+      return;
+    }

Also applies to: 149-149

if(!is_write_db){
std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
LogOffset cur_logoffset;
ParseBinlogOffset(binlog_res.binlog_offset(),&cur_logoffset);
db->PutCoordinatorOffsetIndex(cur_logoffset,worker->binlog_item_.offset());
}
Comment on lines +143 to +148
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Potential race condition in coordinator offset update.

The coordinator offset index update block has several issues:

  1. Redundant DB lookup inside the loop
  2. No synchronization around offset update
  3. Inconsistent spacing in the code block

Consider refactoring to:

-    if(!is_write_db){
-    std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
-    LogOffset cur_logoffset;
-    ParseBinlogOffset(binlog_res.binlog_offset(),&cur_logoffset);
-    db->PutCoordinatorOffsetIndex(cur_logoffset,worker->binlog_item_.offset());
-    }
+    if (!is_write_db) {
+      LogOffset cur_logoffset;
+      ParseBinlogOffset(binlog_res.binlog_offset(), &cur_logoffset);
+      if (db) {  // Use existing db pointer from outer scope
+        std::lock_guard<std::mutex> lock(db->GetOffsetMutex());
+        db->PutCoordinatorOffsetIndex(cur_logoffset, worker->binlog_item_.offset());
+      }
+    }

Committable suggestion skipped: line range outside the PR's diff.


const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN;
int redis_parser_len = static_cast<int>(binlog_res.binlog().size()) - BINLOG_ENCODE_LEN;
int processed_len = 0;
Expand Down
31 changes: 31 additions & 0 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,43 @@ int PikaReplClientConn::DealMessage() {
static_cast<void*>(task_arg));
break;
}
case InnerMessage::kDbWrite: {
const std::string& db_name = response->try_sync().slot().db_name();
assert(!db_name.empty());
auto task_arg =
new ReplClientTaskArg(response, std::dynamic_pointer_cast<PikaReplClientConn>(shared_from_this()));
g_pika_rm->ScheduleReplClientBGTaskByDBName(&PikaReplClientConn::HandleDbWriteResponse,
static_cast<void*>(task_arg), db_name);
break;
}
default:
break;
}
return 0;
}

void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
int32_t session_id = dbwrite_sync_response.session_id();
const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();
const std::string& db_name = db_response.db_name();
const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();

std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!db) {
LOG(WARNING) << db_name << "Not found.";
}
db->ConsensusProcessLeaderDB(binlog_offset.offset());
LogOffset ack_end;
LogOffset ack_start;
db->ConsensusGetwriteDBOffset(ack_end,ack_start);
g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}
Comment on lines +100 to +120
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper error handling and validation

The current implementation has several issues:

  1. No proper error handling when db is null
  2. Potential null pointer dereference at line 115
  3. Missing validation of binlog_offset

Consider this safer implementation:

void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
  std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
  std::shared_ptr<net::PbConn> conn = task_arg->conn;
  std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

  const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
  int32_t session_id = dbwrite_sync_response.session_id();              
  const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();  
  const std::string& db_name = db_response.db_name();                  
  const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();

  std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
  if (!db) {
    LOG(WARNING) << db_name << "Not found.";
+   return;
  }
+ 
+ if (binlog_offset.offset() == 0) {
+   LOG(WARNING) << "Invalid binlog offset received for " << db_name;
+   return;
+ }

  db->ConsensusProcessLeaderDB(binlog_offset.offset());
  LogOffset ack_end;
  LogOffset ack_start;
  db->ConsensusGetwriteDBOffset(ack_end,ack_start);
  g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
int32_t session_id = dbwrite_sync_response.session_id();
const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();
const std::string& db_name = db_response.db_name();
const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();
std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!db) {
LOG(WARNING) << db_name << "Not found.";
}
db->ConsensusProcessLeaderDB(binlog_offset.offset());
LogOffset ack_end;
LogOffset ack_start;
db->ConsensusGetwriteDBOffset(ack_end,ack_start);
g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}
void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
int32_t session_id = dbwrite_sync_response.session_id();
const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();
const std::string& db_name = db_response.db_name();
const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();
std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!db) {
LOG(WARNING) << db_name << "Not found.";
return;
}
if (binlog_offset.offset() == 0) {
LOG(WARNING) << "Invalid binlog offset received for " << db_name;
return;
}
db->ConsensusProcessLeaderDB(binlog_offset.offset());
LogOffset ack_end;
LogOffset ack_start;
db->ConsensusGetwriteDBOffset(ack_end,ack_start);
g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}


void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
Expand Down
9 changes: 9 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,19 @@ Status SyncMasterDB::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return coordinator_.ProposeLog(cmd_ptr);
}

void SyncMasterDB::ConsensusGetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset)
{
coordinator_.GetwriteDBOffset(end_offset, begin_offset);
}

Status SyncMasterDB::ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
return coordinator_.ProcessLeaderLog(cmd_ptr, attribute);
}

Status SyncMasterDB::ConsensusProcessLeaderDB(const uint64_t offset) {
return coordinator_.ProcessLeaderDB(offset);
}

LogOffset SyncMasterDB::ConsensusCommittedIndex() { return coordinator_.committed_index(); }

LogOffset SyncMasterDB::ConsensusLastIndex() { return coordinator_.MemLogger()->last_offset(); }
Expand Down
Loading