Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement partition store restore-from-snapshot #2353

Merged
merged 12 commits into from
Dec 6, 2024
46 changes: 27 additions & 19 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;

use rocksdb::ExportImportFilesMetaData;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
Expand Down Expand Up @@ -86,9 +86,11 @@ impl PartitionStoreManager {
})
}

pub async fn has_partition(&self, partition_id: PartitionId) -> bool {
let guard = self.lookup.lock().await;
guard.live.contains_key(&partition_id)
/// Check whether we have a partition store for the given partition id, irrespective of whether
/// the store is open or not.
pub async fn has_partition_store(&self, partition_id: PartitionId) -> bool {
let cf_name = cf_for_partition(partition_id);
self.rocksdb.inner().cf_handle(&cf_name).is_some()
}

pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option<PartitionStore> {
Expand Down Expand Up @@ -147,8 +149,7 @@ impl PartitionStoreManager {
let mut guard = self.lookup.lock().await;
if guard.live.contains_key(&partition_id) {
warn!(
?partition_id,
?snapshot,
%partition_id,
"The partition store is already open, refusing to import snapshot"
);
return Err(RocksError::AlreadyOpen);
Expand All @@ -158,32 +159,39 @@ impl PartitionStoreManager {
let cf_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some();
if cf_exists {
warn!(
?partition_id,
?cf_name,
?snapshot,
%partition_id,
%cf_name,
"The column family for partition already exists in the database, cannot import snapshot"
);
return Err(RocksError::ColumnFamilyExists);
}

if snapshot.key_range.start() > partition_key_range.start()
|| snapshot.key_range.end() < partition_key_range.end()
{
warn!(
%partition_id,
snapshot_range = ?snapshot.key_range,
partition_range = ?partition_key_range,
"The snapshot key range does not fully cover the partition key range"
);
return Err(RocksError::SnapshotKeyRangeMismatch);
}
Comment on lines +169 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good check :-)


let mut import_metadata = ExportImportFilesMetaData::default();
import_metadata.set_db_comparator_name(snapshot.db_comparator_name.as_str());
import_metadata.set_files(&snapshot.files);

info!(
?partition_id,
min_applied_lsn = ?snapshot.min_applied_lsn,
"Initializing partition store from snapshot"
%partition_id,
min_lsn = %snapshot.min_applied_lsn,
path = ?snapshot.base_dir,
"Importing partition store snapshot"
);

if let Err(e) = self
.rocksdb
self.rocksdb
.import_cf(cf_name.clone(), opts, import_metadata)
.await
{
error!(?partition_id, "Failed to import snapshot");
return Err(e);
}
.await?;

assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some());
let partition_store = PartitionStore::new(
Expand Down
3 changes: 3 additions & 0 deletions crates/rocksdb/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub enum RocksError {
#[error("already exists")]
#[code(unknown)]
ColumnFamilyExists,
#[error("invalid key range for partition")]
#[code(unknown)]
SnapshotKeyRangeMismatch,
#[error(transparent)]
#[code(unknown)]
Other(#[from] rocksdb::Error),
Expand Down
4 changes: 3 additions & 1 deletion crates/rocksdb/src/rock_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub trait RocksAccess {
default_cf_options: rocksdb::Options,
cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>,
) -> Result<(), RocksError>;
/// Create a column family from a snapshot. The data files referenced by
/// `metadata` will be moved into the RocksDB data directory.
fn import_cf(
&self,
name: CfName,
Expand Down Expand Up @@ -163,7 +165,7 @@ impl RocksAccess for rocksdb::DB {
let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?;

let mut import_opts = ImportColumnFamilyOptions::default();
import_opts.set_move_files(false); // keep the snapshot files intact
import_opts.set_move_files(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :-)


Ok(Self::create_column_family_with_import(
self,
Expand Down
3 changes: 2 additions & 1 deletion crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["io-util"] }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ulid = { workspace = true }
Expand Down
10 changes: 7 additions & 3 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,13 @@ impl Worker {
partition_store_manager.clone(),
router_builder,
bifrost,
SnapshotRepository::create_if_configured(snapshots_options)
.await
.map_err(BuildError::SnapshotRepository)?,
SnapshotRepository::create_if_configured(
snapshots_options,
config.common.base_dir().join("pp-snapshots"),
config.common.cluster_name().to_owned(),
)
.await
.map_err(BuildError::SnapshotRepository)?,
);

// handle RPCs
Expand Down
Loading
Loading