Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify user code about follower loss #517

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/libnuraft/callback.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ public:
*/
ResignationFromLeader = 27,

/**
* When a peer RPC errors count exceeds raft_server::limits.warning_limit_, or
* a peer doesn't respond for a long time (raft_params::leadership_expiry_),
* the peer is considered lost.
* ctx: null.
*/
FollowerLost = 28,
};

struct Param {
Expand Down
11 changes: 11 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public:
, reconn_backoff_(0)
, suppress_following_error_(false)
, abandoned_(false)
, lost_by_leader_(false)
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, l_(logger)
Expand Down Expand Up @@ -302,6 +303,10 @@ public:
ptr<req_msg> get_rsv_msg() const { return rsv_msg_; }
rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; }

bool is_lost() const { return lost_by_leader_; }
void set_lost() { lost_by_leader_ = true; }
void set_recovered() { lost_by_leader_ = false; }

private:
void handle_rpc_result(ptr<peer> myself,
ptr<rpc_client> my_rpc_client,
Expand Down Expand Up @@ -498,6 +503,12 @@ private:
*/
std::atomic<bool> abandoned_;

/**
* If `true`, this peer is considered unresponsive
* and treated as if it has been lost.
*/
std::atomic<bool> lost_by_leader_;

/**
* Reserved message that should be sent next time.
*/
Expand Down
5 changes: 4 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,12 @@ protected:
int32 get_quorum_for_election();
int32 get_quorum_for_commit();
int32 get_leadership_expiry();
size_t get_not_responding_peers();
std::list<ptr<peer>> get_not_responding_peers();
size_t get_not_responding_peers_count();
size_t get_num_stale_peers();

void apply_to_not_responding_peers(const std::function<void(const ptr<peer>&)>&);

ptr<resp_msg> handle_append_entries(req_msg& req);
ptr<resp_msg> handle_prevote_req(req_msg& req);
ptr<resp_msg> handle_vote_req(req_msg& req);
Expand Down
4 changes: 2 additions & 2 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ bool raft_server::request_append_entries(ptr<peer> p) {
chk_timer.timeout_and_reset() ) {
// If auto adjust mode is on for 2-node cluster, and
// the follower is not responding, adjust the quorum.
size_t num_not_responding_peers = get_not_responding_peers();
size_t num_not_responding_peers = get_not_responding_peers_count();
size_t cur_quorum_size = get_quorum_for_commit();
size_t num_stale_peers = get_num_stale_peers();
if (cur_quorum_size >= 1) {
Expand Down Expand Up @@ -1191,7 +1191,7 @@ ulong raft_server::get_expected_committed_log_idx() {

size_t quorum_idx = get_quorum_for_commit();
if (ctx_->get_params()->use_full_consensus_among_healthy_members_) {
size_t not_responding_peers = get_not_responding_peers();
size_t not_responding_peers = get_not_responding_peers_count();
if (not_responding_peers < voting_members - quorum_idx) {
// If full consensus option is on, commit should be
// agreed by all healthy members, and the number of
Expand Down
64 changes: 51 additions & 13 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -608,27 +608,43 @@ int32 raft_server::get_leadership_expiry() {
return expiry;
}

size_t raft_server::get_not_responding_peers() {
// Check if quorum nodes are not responding
// (i.e., don't respond 20x heartbeat time long).
std::list<ptr<peer>> raft_server::get_not_responding_peers() {
std::list<ptr<peer>> rs;
auto cb = [&rs](const ptr<peer>& peer_ptr) {
rs.push_back(peer_ptr);
};
apply_to_not_responding_peers(cb);
return rs;
}

size_t raft_server::get_not_responding_peers_count() {
size_t num_not_resp_nodes = 0;
auto cb = [&num_not_resp_nodes](const ptr<peer>&) {
++num_not_resp_nodes;
};
apply_to_not_responding_peers(cb);
return num_not_resp_nodes;
}

void raft_server::apply_to_not_responding_peers(
const std::function<void(const ptr<peer>&)>& callback) {
// Check if quorum nodes are not responding
// (i.e., don't respond 20x heartbeat time long).
ptr<raft_params> params = ctx_->get_params();
int expiry = params->heart_beat_interval_ *
raft_server::raft_limits_.response_limit_;
int expiry = params->heart_beat_interval_ * raft_server::raft_limits_.response_limit_;

// Check the number of not responding peers.
// Check not responding peers.
for (auto& entry: peers_) {
ptr<peer> p = entry.second;
const auto& peer_ptr = entry.second;

if (!is_regular_member(p)) continue;
if (!is_regular_member(peer_ptr)) continue;

int32 resp_elapsed_ms = (int32)(p->get_resp_timer_us() / 1000);
if ( resp_elapsed_ms > expiry ) {
num_not_resp_nodes++;
const auto resp_elapsed_ms =
static_cast<int32>(peer_ptr->get_resp_timer_us() / 1000);
if (resp_elapsed_ms > expiry) {
callback(peer_ptr);
}
}
return num_not_resp_nodes;
}

size_t raft_server::get_num_stale_peers() {
Expand Down Expand Up @@ -804,6 +820,14 @@ void raft_server::handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err)
} else if (rpc_errs == raft_server::raft_limits_.warning_limit_) {
p_wn("too verbose RPC error on peer (%d), "
"will suppress it from now", peer_id);
if (!pp || !pp->is_lost()) {
if (pp) {
pp->set_lost();
}
cb_func::Param param(id_, leader_, peer_id);
const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, &param);
assert(rc == cb_func::ReturnCode::Ok);
}
}

if (pp && pp->is_leave_flag_set()) {
Expand Down Expand Up @@ -840,6 +864,7 @@ void raft_server::handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err)
p_wn("recovered from RPC failure from peer %d, %d errors",
resp->get_src(), rpc_errs);
}
pp->set_recovered();
pp->reset_rpc_errs();
pp->reset_resp_timer();
}
Expand Down Expand Up @@ -1019,6 +1044,7 @@ void raft_server::become_leader() {

pp->set_next_log_idx(log_store_->next_slot());
enable_hb_for_peer(*pp);
pp->set_recovered();
}

// If there are uncommitted logs, search if conf log exists.
Expand Down Expand Up @@ -1085,7 +1111,7 @@ bool raft_server::check_leadership_validity() {
int32 num_voting_members = get_num_voting_members();

int leadership_expiry = get_leadership_expiry();
int32 nr_peers = (int32)get_not_responding_peers();
int32 nr_peers = (int32)get_not_responding_peers_count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since get_not_responding_peers is also used in the same function, can we just call it once?

const auto nr_peers_list = get_not_responding_peers();
int32 nr_peers = nr_peers_list.size();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding this point:

  • on one hand, yes nothing prevents us from using get_not_responding_peers() once;
  • but on another hand, this way we can get an overhead: creating an std::list, filling it up with ptr<peer>s, returning it (though, RVO should work here) - and we need the list only in the case if the condition in the if below is true ((num_voting_members - nr_peers < min_quorum_size) == true), in all other cases we don't need the list; I assume that in the majority of cases, in "normal" circumstances, that condition should be false

But if you are ok with this overhead, plese let me know and I will change the code

Also, please pay attention, I have changed the code of the get_not_responding_peers_count/get_not_responding_peers functions. They contained a duplicated part of code, which could potentially lead to some bugs in the future (if someone change the code of one function, but not another). Now I extracted that common part into a separate function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, your change looks good.

if (leadership_expiry < 0) {
// Negative expiry: leadership will never expire.
nr_peers = 0;
Expand All @@ -1102,6 +1128,18 @@ bool raft_server::check_leadership_validity() {
get_leadership_expiry(),
min_quorum_size);

const auto nr_peers_list = get_not_responding_peers();
assert(nr_peers_list.size() == static_cast<std::size_t>(nr_peers));
for (auto& peer : nr_peers_list) {
if (peer->is_lost()) {
continue;
}
peer->set_lost();
cb_func::Param param(id_, leader_, peer->get_id());
const auto rc = ctx_->cb_func_.call(cb_func::FollowerLost, &param);
assert(rc == cb_func::ReturnCode::Ok);
}

// NOTE:
// For a cluster where the number of members is the same
// as the size of quorum, we should not expire leadership,
Expand Down
Loading