Skip to content

Commit

Permalink
Merge pull request redpanda-data#19964 from mmaslankaprv/fix-assert-s…
Browse files Browse the repository at this point in the history
…napshot

Always use the latest configuration for followers metadata
  • Loading branch information
mmaslankaprv authored Jun 26, 2024
2 parents fc80eab + e73aa28 commit 41a2e05
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 4 deletions.
6 changes: 2 additions & 4 deletions src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,7 @@ ss::future<> consensus::hydrate_snapshot() {
co_await truncate_to_latest_snapshot(truncate_cfg.value());
}
_snapshot_size = co_await _snapshot_mgr.get_snapshot_size();
update_follower_stats(_configuration_manager.get_latest());
}

std::optional<storage::truncate_prefix_config>
Expand Down Expand Up @@ -2331,7 +2332,6 @@ void consensus::update_offset_from_snapshot(
_last_snapshot_index = metadata.last_included_index;
_last_snapshot_term = metadata.last_included_term;

// TODO: add applying snapshot content to state machine
auto prev_commit_index = _commit_index;
_commit_index = std::max(_last_snapshot_index, _commit_index);
maybe_update_last_visible_index(_commit_index);
Expand All @@ -2340,8 +2340,6 @@ void consensus::update_offset_from_snapshot(
_replication_monitor.notify_committed();
_event_manager.notify_commit_index();
}

update_follower_stats(metadata.latest_configuration);
}

ss::future<install_snapshot_reply>
Expand All @@ -2366,10 +2364,10 @@ consensus::do_install_snapshot(install_snapshot_request r) {
_hbeat = clock_type::now();

// request received from new leader
do_step_down("install_snapshot_received");
if (r.term > _term) {
_term = r.term;
_voted_for = {};
do_step_down("install_snapshot_term_greater");
maybe_update_leader(r.source_node());
co_return co_await do_install_snapshot(std::move(r));
}
Expand Down
163 changes: 163 additions & 0 deletions src/v/raft/tests/basic_raft_fixture_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,166 @@ TEST_F_CORO(raft_fixture, test_prioritizing_longest_log) {

co_await wait_for_visible_offset(visible_offset, 10s);
}

TEST_F_CORO(raft_fixture, test_delayed_snapshot_request) {
co_await create_simple_group(3);
auto replicate_some_data = [&] {
return retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replicate(
make_batches(10, 10, 128),
replicate_options(consistency_level::quorum_ack));
})
.then([&](result<replicate_result> result) {
if (result) {
vlog(
tstlog.info,
"replication result last offset: {}",
result.value().last_offset);
} else {
vlog(
tstlog.info,
"replication error: {}",
result.error().message());
}
});
};

co_await replicate_some_data();

co_await retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replace_configuration(
{all_vnodes()[0]}, model::revision_id{1});
});
// wait for reconfiguration
auto wait_for_reconfiguration = [&](int expected_nodes) {
return tests::cooperative_spin_wait_with_timeout(
10s, [&, expected_nodes] {
return std::all_of(
nodes().begin(),
nodes().end(),
[expected_nodes](const auto& p) {
return p.second->raft()->config().all_nodes().size()
== expected_nodes
&& p.second->raft()->config().get_state()
== configuration_state::simple;
});
});
};

co_await wait_for_reconfiguration(1);

auto leader_node_id = get_leader();
ASSERT_TRUE_CORO(leader_node_id.has_value());
auto& leader_node = node(leader_node_id.value());
ASSERT_EQ_CORO(leader_node.raft()->config().all_nodes().size(), 1);

co_await replicate_some_data();

co_await retry_with_leader(
10s + model::timeout_clock::now(),
[this](raft_node_instance& leader_node) {
return leader_node.raft()->replace_configuration(
{all_vnodes()}, model::revision_id{2});
});

// wait for reconfiguration
co_await wait_for_reconfiguration(3);

co_await replicate_some_data();

auto new_leader_node_id = get_leader();
ASSERT_TRUE_CORO(new_leader_node_id.has_value());
auto& new_leader_node = node(new_leader_node_id.value());
ASSERT_EQ_CORO(new_leader_node.raft()->config().all_nodes().size(), 3);

const auto& p = std::find_if(nodes().begin(), nodes().end(), [&](auto& p) {
return p.second->get_vnode() != new_leader_node.get_vnode();
});
auto& follower_node = p->second;
auto leader_proto = new_leader_node.get_protocol();
// simulate delayed install snapshot request send to follower
install_snapshot_request request;
request.target_node_id = follower_node->get_vnode();
request.node_id = leader_node.get_vnode();
request.group = follower_node->raft()->group();

/**
* A snapshot request represent a state from the point in time when group
* had only one member. Currently the follower is already using
* configuration with 3 members
*/
auto last_included = model::offset(random_generators::get_int(105, 199));
request.last_included_index = last_included;
request.dirty_offset = leader_node.raft()->dirty_offset();
request.term = leader_node.raft()->term();

snapshot_metadata metadata{
.last_included_index = request.last_included_index,
.last_included_term = leader_node.raft()->term(),
.latest_configuration = raft::group_configuration(
{all_vnodes()[0]}, model::revision_id(1)),
.log_start_delta = offset_translator_delta(2),
};

iobuf snapshot;
// using snapshot writer to populate all relevant snapshot metadata i.e.
// header and crc
storage::snapshot_writer writer(make_iobuf_ref_output_stream(snapshot));

co_await writer.write_metadata(reflection::to_iobuf(std::move(metadata)));
co_await write_iobuf_to_output_stream(iobuf{}, writer.output());
co_await writer.close();
request.chunk = snapshot.copy();
request.file_offset = 0;
request.done = true;

auto reply = co_await leader_proto->install_snapshot(
follower_node->get_vnode().id(),
std::move(request),
rpc::client_opts(10s));
ASSERT_TRUE_CORO(reply.has_value());
vlog(tstlog.info, "snapshot reply from follower: {}", reply.value());

// the snapshot contains a configuration with one node which is older than
// the current one the follower has. latest configuration MUST remain
// unchanged

ASSERT_EQ_CORO(follower_node->raft()->config().all_nodes().size(), 3);
EXPECT_EQ(follower_node->raft()->get_follower_stats().size(), 2);
// entries in follower log should be truncated.
ASSERT_EQ_CORO(
follower_node->raft()->start_offset(), model::next_offset(last_included));

/**
* Make sure the leader steps down when it receives an install snapshot
* request
*/

auto follower_proto = follower_node->get_protocol();
install_snapshot_request request_for_leader;

request_for_leader.group = follower_node->raft()->group();
request_for_leader.target_node_id = new_leader_node.get_vnode();
request_for_leader.node_id = follower_node->get_vnode();
request_for_leader.last_included_index = model::offset(
random_generators::get_int(105, 199));
request_for_leader.dirty_offset = leader_node.raft()->dirty_offset();
request_for_leader.term = leader_node.raft()->term();
request_for_leader.chunk = std::move(snapshot);
request_for_leader.done = true;
auto term_snapshot = leader_node.raft()->term();
auto leader_reply = co_await follower_proto->install_snapshot(
new_leader_node.get_vnode().id(),
std::move(request_for_leader),
rpc::client_opts(10s));

ASSERT_TRUE_CORO(leader_reply.has_value());
vlog(tstlog.info, "snapshot reply from leader: {}", leader_reply.value());
co_await tests::cooperative_spin_wait_with_timeout(10s, [&] {
return nodes().begin()->second->raft()->term() > term_snapshot;
});
}
2 changes: 2 additions & 0 deletions src/v/raft/tests/raft_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class raft_node_instance : public ss::weakly_referencable<raft_node_instance> {
/// dispatched.
void on_dispatch(dispatch_callback_t);

ss::shared_ptr<in_memory_test_protocol> get_protocol() { return _protocol; }

private:
model::node_id _id;
model::revision_id _revision;
Expand Down

0 comments on commit 41a2e05

Please sign in to comment.