Skip to content

Commit

Permalink
r/tests: added test validating delayed install snapshot request delivery
Browse files Browse the repository at this point in the history
When install snapshot request is delayed and deliver to the node after
it already made progress it can not lead to state inconsistencies. Added
test validating handing of delayed `install_snapshot` requests. The test
is validating behavior of both follower and the leader.

Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Jun 24, 2024
1 parent 571aa19 commit e73aa28
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 0 deletions.
163 changes: 163 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,166 @@ TEST_F_CORO(raft_fixture, test_prioritizing_longest_log) {

co_await wait_for_visible_offset(visible_offset, 10s);
}

TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) {
co_await create_simple_group(3);
auto replicate_some_data = [&] {
return retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::quorum_ack));
})
.then([&](result<replicate_result> result) {
if (result) {
vlog(
tstlog.info,
"replication result last offset: {}",
result.value().last_offset);
} else {
vlog(
tstlog.info,
"replication error: {}",
result.error().message());
}
});
};

co_await replicate_some_data();

co_await retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replace_configuration(
{all_vnodes()[0]}, model::revision_id{1});
});
// wait for reconfiguration
auto wait_for_reconfiguration = [&](int expected_nodes) {
return tests::cooperative_spin_wait_with_timeout(
10s, [&, expected_nodes] {
return std::all_of(
nodes().begin(),
nodes().end(),
[expected_nodes](const auto& p) {
return p.second->raft()->config().all_nodes().size()
== expected_nodes
&& p.second->raft()->config().get_state()
== configuration_state::simple;
});
});
};

co_await wait_for_reconfiguration(1);

auto leader_node_id = get_leader();
ASSERT_TRUE_CORO(leader_node_id.has_value());
auto& leader_node = node(leader_node_id.value());
ASSERT_EQ_CORO(leader_node.raft()->config().all_nodes().size(), 1);

co_await replicate_some_data();

co_await retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replace_configuration(
{all_vnodes()}, model::revision_id{2});
});

// wait for reconfiguration
co_await wait_for_reconfiguration(3);

co_await replicate_some_data();

auto new_leader_node_id = get_leader();
ASSERT_TRUE_CORO(new_leader_node_id.has_value());
auto& new_leader_node = node(new_leader_node_id.value());
ASSERT_EQ_CORO(new_leader_node.raft()->config().all_nodes().size(), 3);

const auto& p = std::find_if(nodes().begin(), nodes().end(), [&](auto& p) {
return p.second->get_vnode() != new_leader_node.get_vnode();
});
auto& follower_node = p->second;
auto leader_proto = new_leader_node.get_protocol();
// simulate delayed install snapshot request send to follower
install_snapshot_request request;
request.target_node_id = follower_node->get_vnode();
request.node_id = leader_node.get_vnode();
request.group = follower_node->raft()->group();

/**
* A snapshot request represent a state from the point in time when group
* had only one member. Currently the follower is already using
* configuration with 3 members
*/
auto last_included = model::offset(random_generators::get_int(105, 199));
request.last_included_index = last_included;
request.dirty_offset = leader_node.raft()->dirty_offset();
request.term = leader_node.raft()->term();

snapshot_metadata metadata{
.last_included_index = request.last_included_index,
.last_included_term = leader_node.raft()->term(),
.latest_configuration = raft::group_configuration(
{all_vnodes()[0]}, model::revision_id(1)),
.log_start_delta = offset_translator_delta(2),
};

iobuf snapshot;
// using snapshot writer to populate all relevant snapshot metadata i.e.
// header and crc
storage::snapshot_writer writer(make_iobuf_ref_output_stream(snapshot));

co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata)));
co_await write_iobuf_to_output_stream(iobuf{}, writer.output());
co_await writer.close();
request.chunk = snapshot.copy();
request.file_offset = 0;
request.done = true;

auto reply = co_await leader_proto->install_snapshot(
follower_node->get_vnode().id(),
std::move(request),
rpc::client_opts(10s));
ASSERT_TRUE_CORO(reply.has_value());
vlog(tstlog.info, "snapshot reply from follower: {}", reply.value());

// the snapshot contains a configuration with one node which is older than
// the current one the follower has. latest configuration MUST remain
// unchanged

ASSERT_EQ_CORO(follower_node->raft()->config().all_nodes().size(), 3);
EXPECT_EQ(follower_node->raft()->get_follower_stats().size(), 2);
// entries in follower log should be truncated.
ASSERT_EQ_CORO(
follower_node->raft()->start_offset(), model::next_offset(last_included));

/**
* Make sure the leader steps down when it receives an install snapshot
* request
*/

auto follower_proto = follower_node->get_protocol();
install_snapshot_request request_for_leader;

request_for_leader.group = follower_node->raft()->group();
request_for_leader.target_node_id = new_leader_node.get_vnode();
request_for_leader.node_id = follower_node->get_vnode();
request_for_leader.last_included_index = model::offset(
random_generators::get_int(105, 199));
request_for_leader.dirty_offset = leader_node.raft()->dirty_offset();
request_for_leader.term = leader_node.raft()->term();
request_for_leader.chunk = std::move(snapshot);
request_for_leader.done = true;
auto term_snapshot = leader_node.raft()->term();
auto leader_reply = co_await follower_proto->install_snapshot(
new_leader_node.get_vnode().id(),
std::move(request_for_leader),
rpc::client_opts(10s));

ASSERT_TRUE_CORO(leader_reply.has_value());
vlog(tstlog.info, "snapshot reply from leader: {}", leader_reply.value());
co_await tests::cooperative_spin_wait_with_timeout(10s, [&] {
return nodes().begin()->second->raft()->term() > term_snapshot;
});
}
2 changes: 2 additions & 0 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
/// dispatched.
void on_dispatch(dispatch_callback_t);

ss::shared_ptr<in_memory_test_protocol> get_protocol() { return _protocol; }

private:
model::node_id _id;
model::revision_id _revision;
Expand Down

0 comments on commit e73aa28

Please sign in to comment.