Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PacificA slave data replication Consistency scheme #2975

Closed

Conversation

buzhimingyonghu
Copy link
Contributor

@buzhimingyonghu buzhimingyonghu commented Dec 15, 2024

目前根据主的方面的要求,在收到写binlog请求之后,不要立即写db,等到主实例收到,回复后,再次发送写db请求。

目前我的处理逻辑是,收到binlog请求后,加入到已准备列表,等待下次写db请求后,根据这次请求的offset,把之前这个offset之前所有的binlog写入,也就是说,主收到所有从实例的准备好之后,设置提交状态,给所有从实例回复,告诉他们,可以将准备列表的binlog提交了。

对于主从的强一致性,我在对的从处理中,加了“开关”,目前情况,在收到binlog请求,binlog和db都会写,只有在收到binlog中,解析出有不要写db的请求(这块等主完成后,协商加一下),这次才会不写db,

Summary by CodeRabbit

  • New Features

    • Introduced new methods for handling database write operations and offsets in the consensus mechanism.
    • Added support for a new message type related to database write synchronization.
  • Bug Fixes

    • Enhanced error handling and logging for database operations.
  • Documentation

    • Updated protocol definitions to include new message types and fields for database write operations.
  • Chores

    • Various internal logic adjustments to improve control flow and state validation in database handling.

Copy link

coderabbitai bot commented Dec 15, 2024

Warning

There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure.

🔧 buf (1.47.2)
src/pika_inner_message.proto

Config file YAML parsing error: yaml: unmarshal errors:
line 1: cannot unmarshal !!str lint:{u... into bufconfig.externalFileVersion. Please check your buf configuration file for YAML syntax errors.

Walkthrough

This pull request introduces enhancements to Pika's consensus and replication mechanisms, focusing on improving database write synchronization and logging. The changes span multiple files, including consensus coordination, replication background worker, client connection, and protocol definition. The modifications introduce new methods for processing leader logs, managing database write offsets, and handling database write synchronization across different components of the system.

Changes

File Change Summary
include/pika_consensus.h Added methods PutOffsetIndex, ProcessLeaderDB, GetwriteDBOffset; Updated ProcessLeaderLog method signature; Added new member variables for tracking database write offsets
include/pika_define.h Introduced new structs DbWriteChip and hash_db_write_info; Modified WriteTask to include database write tracking
include/pika_repl_bgworker.h Added static method HandleBGWorkerDB
include/pika_repl_client_conn.h Added static method HandleDbWriteResponse
include/pika_rm.h Added methods ConsensusProcessLeaderDB, ConsensusGetwriteDBOffset, PutCoordinatorOffsetIndex
src/pika_inner_message.proto Added new enum value kDbWrite; Introduced DbWriteSync message for database write synchronization
src/pika_repl_bgworker.cc Modified background worker logic to handle database write operations
src/pika_repl_client_conn.cc Added handler for database write response messages
src/pika_rm.cc Implemented new methods for consensus database offset management

Possibly related issues

  • Pika一致性开发 #2944: This PR directly addresses the consensus development tasks outlined in the issue, specifically improving incremental synchronization and database write consistency mechanisms.

Possibly related PRs

Suggested labels

✏️ Feature

Suggested reviewers

  • cheniujh
  • wangshao1
  • Mixficsol

Poem

🐰 In the realm of bits and bytes so bright,
Consensus dances with log's delight
Offsets sync, databases align
A rabbit's code, a protocol's design
Pika leaps forward, consensus takes flight! 🚀

Tip

CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command @coderabbitai generate docstrings to have CodeRabbit automatically generate docstrings for your pull request. This feature will be included in our Pro Plan when released.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the ✏️ Feature New feature or request label Dec 15, 2024
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (9)
include/pika_consensus.h (3)

127-127: Rename parameter for clarity and consistency

In the ProcessLeaderDB method, consider renaming the parameter binlogoffset to binlog_offset to improve readability and adhere to naming conventions.

Apply this diff:

-pstd::Status ProcessLeaderDB(const uint64_t binlogoffset);
+pstd::Status ProcessLeaderDB(const uint64_t binlog_offset);

128-128: Correct method name to follow CamelCase convention

Rename GetwriteDBOffset to GetWriteDBOffset to maintain consistent CamelCase naming throughout the codebase.

Apply this diff:

-void GetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset);
+void GetWriteDBOffset(LogOffset& end_offset, LogOffset& begin_offset);

213-214: Use uniform initialization for member variables

Consider initializing end_db_offset_ and begin_db_offset_ using uniform initialization for consistency.

Apply this diff:

-LogOffset end_db_offset_=LogOffset();
-LogOffset begin_db_offset_=LogOffset();
+LogOffset end_db_offset_{};
+LogOffset begin_db_offset_{};
include/pika_rm.h (1)

73-74: Improve parameter naming and formatting

  • In ConsensusProcessLeaderDB, rename the parameter offset to binlog_offset for clarity.
  • In ConsensusGetwriteDBOffset, correct the method name to ConsensusGetWriteDBOffset and add a space after the comma.

Apply this diff:

-pstd::Status ConsensusProcessLeaderDB(const uint64_t offset);
+pstd::Status ConsensusProcessLeaderDB(const uint64_t binlog_offset);

-void ConsensusGetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset);
+void ConsensusGetWriteDBOffset(LogOffset& end_offset, LogOffset& begin_offset);
src/pika_consensus.cc (1)

377-382: Correct method naming to match convention and add spaces

Rename GetwriteDBOffset to GetWriteDBOffset and add spaces after commas for consistency and readability.

Apply this diff:

-void ConsensusCoordinator::GetwriteDBOffset(LogOffset& end_offset,LogOffset& begin_offset)
+void ConsensusCoordinator::GetWriteDBOffset(LogOffset& end_offset, LogOffset& begin_offset)
include/pika_repl_bgworker.h (1)

37-37: Consider adding documentation for the new method.

The new HandleBGWorkerDB method appears to be part of the consistency scheme implementation, but its purpose and usage are not immediately clear from the declaration.

Add a documentation comment explaining:

  • The purpose of this method
  • The expected format/content of the void* arg parameter
  • Any assumptions or preconditions
src/pika_inner_message.proto (1)

163-167: Consider adding validation rules for db_write_offset.

The response message structure is good, but consider adding validation rules or documentation for acceptable ranges of db_write_offset.

src/pika_repl_bgworker.cc (1)

113-113: Initialize is_write_db with a named constant.

The boolean flag controls critical write behavior. Consider using a named constant for better maintainability and clarity.

-  bool is_write_db=true;
+  static const bool kDefaultWriteDBEnabled = true;
+  bool is_write_db = kDefaultWriteDBEnabled;
include/pika_define.h (1)

219-223: Consider enhancing hash function to reduce collision probability

The current hash function only uses b_offset.offset, ignoring filenum and logic offset fields. This could lead to hash collisions when different log entries have the same offset but different file numbers.

Consider incorporating more fields in the hash calculation:

struct hash_db_write_info {
  size_t operator()(const LogOffset& n) const {
-    return std::hash<uint64_t>()(n.b_offset.offset);
+    size_t h1 = std::hash<uint32_t>()(n.b_offset.filenum);
+    size_t h2 = std::hash<uint64_t>()(n.b_offset.offset);
+    return h1 ^ (h2 << 1);
  }
};
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0906644 and 917050d.

📒 Files selected for processing (10)
  • include/pika_consensus.h (3 hunks)
  • include/pika_define.h (3 hunks)
  • include/pika_repl_bgworker.h (1 hunks)
  • include/pika_repl_client_conn.h (1 hunks)
  • include/pika_rm.h (2 hunks)
  • src/pika_consensus.cc (1 hunks)
  • src/pika_inner_message.proto (5 hunks)
  • src/pika_repl_bgworker.cc (2 hunks)
  • src/pika_repl_client_conn.cc (1 hunks)
  • src/pika_rm.cc (1 hunks)
🔇 Additional comments (11)
include/pika_consensus.h (2)

126-126: Add space after comma for consistent formatting

In the ProcessLeaderLog method declaration, add a space after the comma for better readability and to maintain consistent coding style.

[refactor_suggestion_nitpick]

Apply this diff:

-pstd::Status ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute, bool is_write_db=true);
+pstd::Status ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute, bool is_write_db = true);

211-212: Fix formatting issues for better readability

  • Add spaces after commas and between the type and variable name.
  • Correct the indentation and spacing for improved code clarity.

[refactor_suggestion_nitpick]

Apply this diff:

-std::unordered_map<uint64_t, std::shared_ptr<Cmd>> binlog_index;
-std::unordered_map<LogOffset,uint64_t,hash_db_write_info>offset_index;
+std::unordered_map<uint64_t, std::shared_ptr<Cmd>> binlog_index;
+std::unordered_map<LogOffset, uint64_t, hash_db_write_info> offset_index;
src/pika_consensus.cc (1)

388-392: 🛠️ Refactor suggestion

Check for existence before accessing map elements

Before accessing binlog_index[iter.second], confirm that the key exists to prevent potential exceptions.

Apply this diff:

 if (iter.first.b_offset.offset > offset || binlog_index.count(iter.second) == 0) {
   continue;
 }
 auto cmd_ptr = binlog_index[iter.second];

Likely invalid or redundant comment.

include/pika_repl_client_conn.h (1)

27-27: Add new handler method HandleDbWriteResponse

The method HandleDbWriteResponse is appropriately added to handle database write responses. Ensure that it is implemented consistently with existing handler methods.

src/pika_inner_message.proto (2)

11-11: LGTM: New message type for database write operations.

The addition of kDbWrite enum value properly extends the protocol to support the new consistency scheme.


90-98: Verify field requirements in DbWriteSync request message.

The message structure looks good, but consider:

  1. Should succ field be optional since it indicates operation success/failure?
  2. Consider adding documentation for each field's purpose
✅ Verification successful

The previous command failed because the file type wasn't recognized. Let's search more broadly to understand the usage of DbWriteSync and find all proto files in the codebase.


The succ field should remain required, and documentation would be beneficial

The succ field should remain as required because:

  1. It's used in a database write synchronization context where the success/failure status is crucial
  2. The field is part of a response message (evidenced by usage in pika_repl_client_conn.cc) where the operation result must always be communicated
  3. Making it optional could lead to ambiguous states where the success status is unknown

Suggested improvements:

  • Add field documentation to explain:
    • node: The node involved in the sync operation
    • db_name: Target database name
    • slot_id: Slot identifier for the operation
    • ack_range_start and ack_range_end: Binlog offset range being acknowledged
    • session_id: Session identifier for the sync operation
    • succ: Operation success status
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if any other proto files reference these fields
rg "db_write_sync" --type proto

Length of output: 61


Script:

#!/bin/bash
# Search for references to DbWriteSync in all files
rg "DbWriteSync" -A 3

# Search for proto files in the repository
fd "\.proto$"

Length of output: 1514

include/pika_define.h (2)

187-191: LGTM: Clean implementation of DbWriteChip struct

The struct follows the established pattern of the codebase and properly implements required constructors.


286-293: LGTM: Clean extension of WriteTask struct

The modifications properly integrate database write functionality while maintaining backward compatibility.

src/pika_repl_client_conn.cc (1)

85-93: LGTM: Consistent implementation of DB write message handling

The new case follows the established pattern for message handling and includes proper validation.

src/pika_rm.cc (2)

387-391: LGTM: Clean implementation of ConsensusGetwriteDBOffset

The method properly delegates to the coordinator while maintaining consistent parameter passing.


396-399: LGTM: Clean implementation of ConsensusProcessLeaderDB

The method properly delegates to the coordinator while maintaining consistent parameter types.

Comment on lines +120 to +122
void PutOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
offset_index[win_offset] = binlog_offset;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pass LogOffset by const reference to improve performance

The PutOffsetIndex method accepts LogOffset win_offset by value. Passing it by const reference avoids unnecessary copies and improves performance.

Apply this diff:

-void PutOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
+void PutOffsetIndex(const LogOffset& win_offset, uint64_t binlog_offset){
    offset_index[win_offset] = binlog_offset;
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PutOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
offset_index[win_offset] = binlog_offset;
}
void PutOffsetIndex(const LogOffset& win_offset,uint64_t binlog_offset){
offset_index[win_offset] = binlog_offset;
}

Comment on lines +86 to +88
void PutCoordinatorOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
coordinator_.PutOffsetIndex(win_offset,binlog_offset);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Pass LogOffset by const reference and fix formatting

  • Pass win_offset by const reference in PutCoordinatorOffsetIndex to avoid unnecessary copying.
  • Add spaces after commas for consistency.

Apply this diff:

-void PutCoordinatorOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
+void PutCoordinatorOffsetIndex(const LogOffset& win_offset, uint64_t binlog_offset){
    coordinator_.PutOffsetIndex(win_offset, binlog_offset);
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PutCoordinatorOffsetIndex(LogOffset win_offset,uint64_t binlog_offset){
coordinator_.PutOffsetIndex(win_offset,binlog_offset);
}
void PutCoordinatorOffsetIndex(const LogOffset& win_offset, uint64_t binlog_offset){
coordinator_.PutOffsetIndex(win_offset, binlog_offset);
}

Comment on lines +341 to +375
Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute, bool is_write_db) {
LogOffset last_index = mem_logger_->last_offset();
if (attribute.logic_id() < last_index.l_offset.index) {
LOG(WARNING) << DBInfo(db_name_).ToString() << "Drop log from leader logic_id "
<< attribute.logic_id() << " cur last index " << last_index.l_offset.index;
return Status::OK();
}
if(is_write_db){
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}
Status s = InternalAppendLog(cmd_ptr);
binlog_index[attribute.offset()] = cmd_ptr;
return Status::OK();

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor ProcessLeaderLog method for better maintainability

The ProcessLeaderLog method contains deeply nested conditionals and repetitive code segments. Refactor the method to improve readability and maintainability.

Consider extracting the flushdb handling logic into a separate private method and simplifying the nested if-else blocks.

Comment on lines +383 to +409
Status ConsensusCoordinator::ProcessLeaderDB(const uint64_t offset) {
end_db_offset_=LogOffset();
begin_db_offset_=LogOffset();
begin_db_offset_.b_offset.offset=UINT64_MAX;
for (const auto& iter : offset_index) {
if (iter.first.b_offset.offset > offset || binlog_index.count(iter.second) == 0) {
continue;
}
auto cmd_ptr = binlog_index[iter.second];
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
if (iter.first.b_offset.offset > end_db_offset_.b_offset.offset) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure thread safety when accessing shared resources

The method ProcessLeaderDB accesses offset_index and binlog_index without any synchronization mechanisms. If these maps are accessed concurrently, it could lead to race conditions.

Add appropriate mutex locks to ensure thread safety when accessing shared data structures.

Comment on lines +348 to +375
if(is_write_db){
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}
Status s = InternalAppendLog(cmd_ptr);
binlog_index[attribute.offset()] = cmd_ptr;
return Status::OK();

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential infinite loop in waiting mechanism

The loop waiting for unfinished write DB tasks doubles wait_ms each time without an upper bound on total wait time. This could lead to prolonged waits or an infinite loop.

Apply this diff to implement a maximum total wait time:

 int32_t wait_ms = 250;
+int32_t total_wait_ms = 0;
 while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
   std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
+  total_wait_ms += wait_ms;
   wait_ms *= 2;
   wait_ms = wait_ms < 3000 ? wait_ms : 3000;
+  if (total_wait_ms >= 10000) { // Maximum total wait time of 10 seconds
+     LOG(WARNING) << "Exceeded maximum wait time for unfinished write tasks.";
+     break;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if(is_write_db){
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}
Status s = InternalAppendLog(cmd_ptr);
binlog_index[attribute.offset()] = cmd_ptr;
return Status::OK();
}
if(is_write_db){
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
int32_t total_wait_ms = 0;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
total_wait_ms += wait_ms;
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
if (total_wait_ms >= 10000) { // Maximum total wait time of 10 seconds
LOG(WARNING) << "Exceeded maximum wait time for unfinished write tasks.";
break;
}
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}
Status s = InternalAppendLog(cmd_ptr);
binlog_index[attribute.offset()] = cmd_ptr;
return Status::OK();
}

@@ -139,6 +139,14 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
slave_db->SetReplState(ReplState::kTryConnect);
return;
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for binlog processing failures.

The code continues processing even if binlog item decode fails. Consider adding proper error handling and logging.

+    if (worker->binlog_item_.offset().empty()) {
+      LOG(WARNING) << "Invalid binlog offset after decode";
+      slave_db->SetReplState(ReplState::kTryConnect);
+      return;
+    }

Also applies to: 149-149

Comment on lines +143 to +148
if(!is_write_db){
std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
LogOffset cur_logoffset;
ParseBinlogOffset(binlog_res.binlog_offset(),&cur_logoffset);
db->PutCoordinatorOffsetIndex(cur_logoffset,worker->binlog_item_.offset());
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Potential race condition in coordinator offset update.

The coordinator offset index update block has several issues:

  1. Redundant DB lookup inside the loop
  2. No synchronization around offset update
  3. Inconsistent spacing in the code block

Consider refactoring to:

-    if(!is_write_db){
-    std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
-    LogOffset cur_logoffset;
-    ParseBinlogOffset(binlog_res.binlog_offset(),&cur_logoffset);
-    db->PutCoordinatorOffsetIndex(cur_logoffset,worker->binlog_item_.offset());
-    }
+    if (!is_write_db) {
+      LogOffset cur_logoffset;
+      ParseBinlogOffset(binlog_res.binlog_offset(), &cur_logoffset);
+      if (db) {  // Use existing db pointer from outer scope
+        std::lock_guard<std::mutex> lock(db->GetOffsetMutex());
+        db->PutCoordinatorOffsetIndex(cur_logoffset, worker->binlog_item_.offset());
+      }
+    }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +100 to +120
void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
int32_t session_id = dbwrite_sync_response.session_id();
const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();
const std::string& db_name = db_response.db_name();
const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();

std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!db) {
LOG(WARNING) << db_name << "Not found.";
}
db->ConsensusProcessLeaderDB(binlog_offset.offset());
LogOffset ack_end;
LogOffset ack_start;
db->ConsensusGetwriteDBOffset(ack_end,ack_start);
g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add proper error handling and validation

The current implementation has several issues:

  1. No proper error handling when db is null
  2. Potential null pointer dereference at line 115
  3. Missing validation of binlog_offset

Consider this safer implementation:

void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
  std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
  std::shared_ptr<net::PbConn> conn = task_arg->conn;
  std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

  const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
  int32_t session_id = dbwrite_sync_response.session_id();              
  const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();  
  const std::string& db_name = db_response.db_name();                  
  const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();

  std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
  if (!db) {
    LOG(WARNING) << db_name << "Not found.";
+   return;
  }
+ 
+ if (binlog_offset.offset() == 0) {
+   LOG(WARNING) << "Invalid binlog offset received for " << db_name;
+   return;
+ }

  db->ConsensusProcessLeaderDB(binlog_offset.offset());
  LogOffset ack_end;
  LogOffset ack_start;
  db->ConsensusGetwriteDBOffset(ack_end,ack_start);
  g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
int32_t session_id = dbwrite_sync_response.session_id();
const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();
const std::string& db_name = db_response.db_name();
const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();
std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!db) {
LOG(WARNING) << db_name << "Not found.";
}
db->ConsensusProcessLeaderDB(binlog_offset.offset());
LogOffset ack_end;
LogOffset ack_start;
db->ConsensusGetwriteDBOffset(ack_end,ack_start);
g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}
void PikaReplClientConn::HandleDbWriteResponse(void* arg) {
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
const InnerMessage::InnerResponse_DbWriteSync& dbwrite_sync_response = response->db_write_sync();
int32_t session_id = dbwrite_sync_response.session_id();
const InnerMessage::Slot& db_response = dbwrite_sync_response.slot();
const std::string& db_name = db_response.db_name();
const InnerMessage::BinlogOffset& binlog_offset = dbwrite_sync_response.db_write_offset();
std::shared_ptr<SyncMasterDB> db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!db) {
LOG(WARNING) << db_name << "Not found.";
return;
}
if (binlog_offset.offset() == 0) {
LOG(WARNING) << "Invalid binlog offset received for " << db_name;
return;
}
db->ConsensusProcessLeaderDB(binlog_offset.offset());
LogOffset ack_end;
LogOffset ack_start;
db->ConsensusGetwriteDBOffset(ack_end,ack_start);
g_pika_rm->SendBinlogSyncAckRequest(db_name, ack_start, ack_end);
}

@cheniujh
Copy link
Collaborator

主那一块的逻辑需要重新梳理,梳理完还需要改代码,可能需要一些时间@hengsheng

@Issues-translate-bot
Copy link

Bot detected the issue body's language is not English, translate it automatically.


The logic of the main part needs to be reorganized. After that, the code needs to be changed. It may take some time @hengsheng

@JoeyVee8666
Copy link

Better not let the English guy see we actually hid it with Chinese. -some random chinamen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
✏️ Feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants