Skip to content

Commit

Permalink
cloud_storage/topic_mount_handler: expose 2-phase mount
Browse files Browse the repository at this point in the history
Needed for topic migrations: prepare, replicate the fact we've prepared,
commit. Otherwise we cannot distinguish between two situations:
a) the cluster has mounted a topic but forgot about it as the calling
node went down shortly after
b) the topic was never available for mount
  • Loading branch information
bashtanov committed Aug 9, 2024
1 parent 51b01a2 commit 5b58baa
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
20 changes: 14 additions & 6 deletions src/v/cloud_storage/tests/topic_mount_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicManifestDoesNotExist) {
auto handler = topic_mount_handler(bucket_name, remote.local());

retry_chain_node rtc(never_abort, 10s, 20ms);
auto mount_result = handler.mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(mount_result, topic_mount_result::mount_manifest_does_not_exist);
auto prepare_result = handler.prepare_mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(
prepare_result, topic_mount_result::mount_manifest_does_not_exist);
auto confirm_result = handler.confirm_mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(
confirm_result, topic_mount_result::mount_manifest_does_not_exist);
}

TEST_P(TopicMountHandlerFixture, TestMountTopicManifestNotDeleted) {
Expand Down Expand Up @@ -151,8 +155,10 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicManifestNotDeleted) {

auto handler = topic_mount_handler(bucket_name, remote.local());

auto mount_result = handler.mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(mount_result, topic_mount_result::mount_manifest_not_deleted);
auto prepare_result = handler.prepare_mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(prepare_result, topic_mount_result::mount_manifest_not_deleted);
auto confirm_result = handler.confirm_mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(confirm_result, topic_mount_result::mount_manifest_not_deleted);

const auto exists_result
= remote.local()
Expand Down Expand Up @@ -197,8 +203,10 @@ TEST_P(TopicMountHandlerFixture, TestMountTopicSuccess) {

auto handler = topic_mount_handler(bucket_name, remote.local());

auto mount_result = handler.mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(mount_result, topic_mount_result::success);
auto prepare_result = handler.prepare_mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(prepare_result, topic_mount_result::success);
auto confirm_result = handler.confirm_mount_topic(topic_cfg, rtc).get();
ASSERT_EQ(confirm_result, topic_mount_result::success);

const auto exists_result
= remote.local()
Expand Down
22 changes: 19 additions & 3 deletions src/v/cloud_storage/topic_mount_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ ss::future<topic_mount_result> topic_mount_handler::commit_mount(
}

ss::future<topic_mount_result> topic_mount_handler::mount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent) {
const cluster::topic_configuration& topic_cfg,
bool prepare_only,
retry_chain_node& parent) {
const auto remote_tp_ns = topic_cfg.remote_tp_ns();
const auto path_provider = remote_path_provider(
topic_cfg.properties.remote_label, remote_tp_ns);
Expand All @@ -99,19 +101,23 @@ ss::future<topic_mount_result> topic_mount_handler::mount_topic(
manifest, path_provider, parent);
if (check_result != topic_mount_result::success) {
vlog(
cst_log.error,
cst_log.warn,
"Couldn't mount topic {}, check result was {}.",
topic_cfg.tp_ns,
check_result);
co_return check_result;
}

if (prepare_only) {
co_return check_result;
}

const auto commit_result = co_await commit_mount(
manifest, path_provider, parent);

if (commit_result != topic_mount_result::success) {
vlog(
cst_log.error,
cst_log.warn,
"Couldn't mount topic {}, commit result was {}.",
topic_cfg.tp_ns,
commit_result);
Expand Down Expand Up @@ -147,4 +153,14 @@ ss::future<topic_unmount_result> topic_mount_handler::unmount_topic(
co_return topic_unmount_result::success;
}

ss::future<topic_mount_result> topic_mount_handler::prepare_mount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent) {
return mount_topic(topic_cfg, true, parent);
}

ss::future<topic_mount_result> topic_mount_handler::confirm_mount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent) {
return mount_topic(topic_cfg, false, parent);
}

} // namespace cloud_storage
18 changes: 15 additions & 3 deletions src/v/cloud_storage/topic_mount_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,35 @@ class topic_mount_handler {
topic_mount_handler(
const cloud_storage_clients::bucket_name& bucket, remote& remote);

// Perform the unmounting process by uploading the topic mount manifest.
// Perform the first step of mounting process by checking the topic mount
// manifest exists.
ss::future<topic_mount_result> prepare_mount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent);

// Perform the second step of mounting process by deleting the topic mount
// manifest.
ss::future<topic_mount_result> confirm_mount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent);

// Perform the unmounting process by creating the topic mount manifest.
// topic_cfg should be the recovered topic configuration from a topic
// manifest in cloud storage. If it has a value, the remote_label stored in
// the topic properties is used as the "source" label. Otherwise, the
// default uuid (all zeros) is used.
ss::future<topic_unmount_result> unmount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent);

private:
// Perform the mounting process by deleting the topic mount manifest.
// topic_cfg should be the recovered topic configuration from a topic
// manifest in cloud storage. If it has a value, the remote_label stored in
// the topic properties is used as the "source" label. Otherwise, the
// default uuid (all zeros) is used.
ss::future<topic_mount_result> mount_topic(
const cluster::topic_configuration& topic_cfg, retry_chain_node& parent);
const cluster::topic_configuration& topic_cfg,
bool prepare_only,
retry_chain_node& parent);

private:
// Check for the existence of a topic mount manifest in tiered storage.
// If it exists, then the topic can be mounted.
ss::future<topic_mount_result> check_mount(
Expand Down

0 comments on commit 5b58baa

Please sign in to comment.