Skip to content

Commit

Permalink
adjust code
Browse files Browse the repository at this point in the history
  • Loading branch information
buzhimingyonghu committed Jan 14, 2025
1 parent a8aa80a commit 7317faf
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 70 deletions.
11 changes: 6 additions & 5 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,20 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
LogOffset master_prepared_id(BinlogOffset(prepared_id.filenum(),prepared_id.offset()),LogicOffset(prepared_id.term(),prepared_id.index()));
LOG(INFO)<<"PacificA master TrySync Response master_prepared_id: "<<master_prepared_id.ToString();
LOG(INFO)<<"PacificA slave cur_prepared_id: "<<db->GetPreparedId().ToString();
if(master_prepared_id<db->GetPreparedId()){
if(master_prepared_id<db->GetCommittedId()){

if(master_prepared_id < db->GetPreparedId()){
if(master_prepared_id < db->GetCommittedId()){
slave_db->SetReplState(ReplState::kError);
LOG(WARNING) << "DB: " << db_name << " master committedId > slave committedId";
return;
}
db->SetPreparedId(master_prepared_id);
// 向主的preparedid看齐,多余的裁剪掉
db->Truncate(master_prepared_id);
}else{
LOG(WARNING) << "consistency master trySync no preparedID";
return ;
}
}else{
LOG(WARNING) << "consistency master trySync no preparedID";
return ;
}
}
g_pika_rm->SendBinlogSyncAckRequest(db_name, offset, offset, true);
Expand Down
39 changes: 23 additions & 16 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr<SyncMasterDB>&
return false;
}
PikaBinlogReader reader;

LOG(INFO)<<"db->GetISConsistency(): "<<db->GetISConsistency()<<", try_sync_request.has_committed_id: "<<try_sync_request.has_committed_id();
if(db->GetISConsistency()){
if(try_sync_request.has_committed_id()){
//Compare committedID offset
Expand All @@ -238,6 +238,7 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr<SyncMasterDB>&
<< " s_committedID: " << committed_id.ToString();
return false;
}
LOG(INFO)<<"master_CommittedId >= slave committed_id";
reader.Seek(db->Logger(), committed_id.b_offset.filenum, committed_id.b_offset.offset);
BinlogOffset seeked_offset;
reader.GetReaderStatus(&(seeked_offset.filenum), &(seeked_offset.offset));
Expand Down Expand Up @@ -267,8 +268,8 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr<SyncMasterDB>&
if (seeked_offset.filenum != slave_boffset.filenum() || seeked_offset.offset != slave_boffset.offset()) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
LOG(WARNING) << "Slave offset is not a start point of cur log, Slave ip: " << node.ip()
<< ", Slave port: " << node.port() << ", DB: " << db_name
<< " closest start point, filenum: " << seeked_offset.filenum << ", offset: " << seeked_offset.offset;
<< ", Slave port: " << node.port() << ", DB: " << db_name << " closest start point, filenum: "
<< seeked_offset.filenum << ", offset: " << seeked_offset.offset;
return false;
}
return true;
Expand Down Expand Up @@ -300,7 +301,8 @@ void PikaReplServerConn::HandleDBSyncRequest(void* arg) {

LOG(INFO) << "Handle DBSync Request";
bool prior_success = true;
std::shared_ptr<SyncMasterDB> master_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
std::shared_ptr<SyncMasterDB> master_db =
g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!master_db) {
LOG(WARNING) << "Sync Master DB: " << db_name << ", NotFound";
prior_success = false;
Expand Down Expand Up @@ -377,9 +379,10 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {
LogOffset range_start(b_range_start, l_range_start);
LogOffset range_end(b_range_end, l_range_end);

std::shared_ptr<SyncMasterDB> master_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
std::shared_ptr<SyncMasterDB> master_db =
g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!master_db) {
LOG(WARNING) << "Sync Master DB: " << db_name << ", NotFound";
LOG(WARNING) << "Sync Master DB: " << db_name << ", NotFound";
return;
}

Expand All @@ -393,8 +396,7 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) {

Status s = master_db->SetLastRecvTime(node.ip(), node.port(), pstd::NowMicros());
if (!s.ok()) {
LOG(WARNING) << "SetMasterLastRecvTime failed " << node.ip() << ":" << node.port() << ", " << db_name << " "
<< s.ToString();
LOG(WARNING) << "SetMasterLastRecvTime failed " << node.ip() << ":" << node.port() << ", " << db_name << " " << s.ToString();
conn->NotifyClose();
return;
}
Expand Down Expand Up @@ -447,8 +449,8 @@ void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) {
const InnerMessage::Slot& slot = remove_slave_node_req.slot();

std::string db_name = slot.db_name();
std::shared_ptr<SyncMasterDB> master_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name));
if (!master_db) {
std::shared_ptr<SyncMasterDB> master_db =
g_pika_rm->GetSyncMasterDBByName(DBInfo(db_name)); if (!master_db) {
LOG(WARNING) << "Sync Master DB: " << db_name << ", NotFound";
}
Status s = master_db->RemoveSlaveNode(node.ip(), node.port());
Expand All @@ -457,7 +459,7 @@ void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) {
response.set_code(InnerMessage::kOk);
response.set_type(InnerMessage::Type::kRemoveSlaveNode);
InnerMessage::InnerResponse::RemoveSlaveNode* remove_slave_node_response = response.add_remove_slave_node();
InnerMessage::Slot* db_response = remove_slave_node_response->mutable_slot();
InnerMessage::Slot* db_response = remove_slave_node_response->mutable_slot ();
db_response->set_db_name(db_name);
/*
* Since the slot field is written in protobuffer,
Expand Down Expand Up @@ -487,27 +489,32 @@ int PikaReplServerConn::DealMessage() {
}
switch (req->type()) {
case InnerMessage::kMetaSync: {
auto task_arg = new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
auto task_arg =
new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
g_pika_rm->ScheduleReplServerBGTask(&PikaReplServerConn::HandleMetaSyncRequest, task_arg);
break;
}
case InnerMessage::kTrySync: {
auto task_arg = new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
auto task_arg =
new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
g_pika_rm->ScheduleReplServerBGTask(&PikaReplServerConn::HandleTrySyncRequest, task_arg);
break;
}
case InnerMessage::kDBSync: {
auto task_arg = new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
auto task_arg =
new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
g_pika_rm->ScheduleReplServerBGTask(&PikaReplServerConn::HandleDBSyncRequest, task_arg);
break;
}
case InnerMessage::kBinlogSync: {
auto task_arg = new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
auto task_arg =
new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
g_pika_rm->ScheduleReplServerBGTask(&PikaReplServerConn::HandleBinlogSyncRequest, task_arg);
break;
}
case InnerMessage::kRemoveSlaveNode: {
auto task_arg = new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
auto task_arg =
new ReplServerTaskArg(req, std::dynamic_pointer_cast<PikaReplServerConn>(shared_from_this()));
g_pika_rm->ScheduleReplServerBGTask(&PikaReplServerConn::HandleRemoveSlaveNodeRequest, task_arg);
break;
}
Expand Down
Loading

0 comments on commit 7317faf

Please sign in to comment.