Skip to content

Commit

Permalink
feat: implement resume-resharding tool (#12796)
Browse files Browse the repository at this point in the history
This PR adds a new tool for flat storage, to resume resharding if the
node is stopped or crashes before an ongoing resharding can terminate.


Tested in this way:
- Grab a forknet
- Schedule resharding
- Stop the node while resharding is in progress
- Start the node, verify it fails
- Run the tool `.near/neard-runner/binaries/neard0 flat-storage
resume-resharding --shard-id 0`
- Verify flat state for parent and children is correct on disk
- Restart the node and verify it can rejoin the network
  • Loading branch information
Trisfald authored Jan 24, 2025
1 parent 0c3fdd0 commit 5581f65
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ impl FlatStorageResharder {
split_params.clone(),
)),
);
// Do not update parent flat head, to avoid overriding the resharding status.
// In any case, at the end of resharding the parent shard will completely disappear.
self.runtime
.get_flat_storage_manager()
.get_flat_storage_for_shard(parent_shard)
.expect("flat storage of the parent shard must exist!")
.set_flat_head_update_mode(false);
store_update.set_flat_storage_status(
left_child_shard,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CreatingChild),
Expand Down Expand Up @@ -569,6 +576,10 @@ impl FlatStorageResharder {
parent_shard,
FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }),
);
self.runtime
.get_flat_storage_manager()
.get_flat_storage_for_shard(parent_shard)
.map(|flat_storage| flat_storage.set_flat_head_update_mode(true));
// Remove children shards entirely.
for child_shard in [left_child_shard, right_child_shard] {
store_update.remove_flat_storage(child_shard);
Expand Down Expand Up @@ -884,6 +895,10 @@ impl FlatStorageResharder {
flat_head: split_status.flat_head,
}),
);
self.runtime
.get_flat_storage_manager()
.get_flat_storage_for_shard(parent_shard)
.map(|flat_storage| flat_storage.set_flat_head_update_mode(true));
// Remove children shards status.
for child_shard in [split_status.left_child_shard, split_status.right_child_shard] {
store_update.remove_status(child_shard);
Expand Down
6 changes: 5 additions & 1 deletion core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
use super::types::FlatStorageError;
use super::FlatStorageReshardingStatus;

/// FlatStorage stores information on which blocks flat storage current supports key lookups on.
/// Note that this struct is shared by multiple threads, the chain thread, threads that apply chunks,
Expand Down Expand Up @@ -235,6 +236,9 @@ impl FlatStorage {
let shard_id = shard_uid.shard_id();
let flat_head = match store.get_flat_storage_status(shard_uid) {
Ok(FlatStorageStatus::Ready(ready_status)) => ready_status.flat_head,
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::SplittingParent(
split_parent_status,
))) => split_parent_status.flat_head,
status => {
return Err(StorageError::StorageInconsistentState(format!(
"Cannot create flat storage for shard {shard_id} with status {status:?}"
Expand Down Expand Up @@ -492,7 +496,7 @@ impl FlatStorage {
}

/// Updates `move_head_enabled` and returns whether the change was done.
pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) {
pub fn set_flat_head_update_mode(&self, enabled: bool) {
let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR);
guard.move_head_enabled = enabled;
}
Expand Down
1 change: 1 addition & 0 deletions runtime/runtime/src/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2026,6 +2026,7 @@ mod tests {
check("hello", 5, "hello");
check("hello", 6, "hello");
check("hello", 10, "hello");
// cspell:ignore привет
check("привет", 3, "п");
}
}
1 change: 1 addition & 0 deletions tools/flat-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ near-epoch-manager.workspace = true
near-primitives.workspace = true
near-store.workspace = true
nearcore.workspace = true
near-async.workspace = true
16 changes: 15 additions & 1 deletion tools/flat-storage/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::resume_resharding::resume_resharding;
/// Tools for modifying flat storage - should be used only for experimentation & debugging.
use borsh::BorshDeserialize;
use clap::Parser;
Expand All @@ -17,6 +18,7 @@ use near_store::flat::{
use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener};
use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt};
use std::{path::PathBuf, sync::Arc};
// cspell:ignore tqdm
use tqdm::tqdm;

#[derive(Parser)]
Expand Down Expand Up @@ -47,6 +49,9 @@ enum SubCommand {

/// Move flat storage head.
MoveFlatHead(MoveFlatHeadCmd),

/// Resume an unfinished Flat storage resharding for a given shard.
ResumeResharding(ResumeReshardingCmd),
}

#[derive(Parser)]
Expand Down Expand Up @@ -124,6 +129,12 @@ pub struct MoveFlatHeadCmd {
mode: MoveFlatHeadMode,
}

#[derive(Parser)]
pub struct ResumeReshardingCmd {
#[clap(long)]
pub shard_id: ShardId,
}

fn print_delta(store: &FlatStoreAdapter, shard_uid: ShardUId, metadata: FlatStateDeltaMetadata) {
let changes = store.get_delta(shard_uid, metadata.block.hash).unwrap().unwrap();
println!("{:?}", FlatStateDelta { metadata, changes });
Expand Down Expand Up @@ -261,7 +272,7 @@ impl FlatStorageCommand {

let head_hash = match hot_store
.get_flat_storage_status(shard_uid)
.expect("falied to read flat storage status")
.expect("failed to read flat storage status")
{
FlatStorageStatus::Ready(ready_status) => ready_status.flat_head.hash,
status => {
Expand Down Expand Up @@ -549,6 +560,9 @@ impl FlatStorageCommand {
SubCommand::MoveFlatHead(cmd) => {
self.move_flat_head(cmd, home_dir, &near_config, opener)
}
SubCommand::ResumeResharding(cmd) => {
resume_resharding(cmd, home_dir, &near_config, opener)
}
}
}
}
1 change: 1 addition & 0 deletions tools/flat-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod commands;
mod resume_resharding;
172 changes: 172 additions & 0 deletions tools/flat-storage/src/resume_resharding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use crate::commands::ResumeReshardingCmd;
use anyhow::anyhow;
use near_async::messaging::{CanSend, IntoMultiSender};
use near_async::time::Clock;
use near_chain::flat_storage_resharder::FlatStorageReshardingTaskResult;
use near_chain::rayon_spawner::RayonAsyncComputationSpawner;
use near_chain::resharding::types::{
FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest,
};
use near_chain::types::ChainConfig;
use near_chain::{Chain, ChainGenesis, ChainStore, DoomslugThresholdMode};
use near_chain_configs::MutableConfigValue;
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManager;
use near_store::adapter::StoreAdapter;
use near_store::flat::FlatStorageReshardingStatus;
use near_store::{ShardUId, StoreOpener};
use nearcore::{NearConfig, NightshadeRuntime, NightshadeRuntimeExt};
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::info;

pub(crate) fn resume_resharding(
cmd: &ResumeReshardingCmd,
home_dir: &PathBuf,
config: &NearConfig,
opener: StoreOpener,
) -> anyhow::Result<()> {
let (chain, executor) = create_chain_and_executor(config, home_dir, opener)?;
let resharder = chain.resharding_manager.flat_storage_resharder.clone();

let shard_uid = ShardUId::new(3, cmd.shard_id); // version is fixed at 3 in resharding V3
let resharding_status = get_resharding_status_for_shard(&shard_uid, &chain)?;

chain.runtime_adapter.get_flat_storage_manager().create_flat_storage_for_shard(shard_uid)?;

resharder.resume(shard_uid, &resharding_status)?;

while executor.run()? {
info!(target: "resharding", "running next task");
}

info!(target: "resharding", "operation completed");
Ok(())
}

fn create_chain_and_executor(
config: &NearConfig,
home_dir: &Path,
opener: StoreOpener,
) -> anyhow::Result<(Chain, Arc<SerialExecutor>)> {
let node_storage = opener.open_in_mode(near_store::Mode::ReadWriteExisting).unwrap();
let epoch_manager = EpochManager::new_arc_handle(
node_storage.get_hot_store(),
&config.genesis.config,
Some(home_dir),
);
let runtime_adapter = NightshadeRuntime::from_config(
home_dir,
node_storage.get_hot_store(),
&config,
epoch_manager.clone(),
)?;
let shard_tracker =
ShardTracker::new(TrackedConfig::from_config(&config.client_config), epoch_manager.clone());
let chain_genesis = ChainGenesis::new(&config.genesis.config);
let chain_config = ChainConfig {
save_trie_changes: config.client_config.save_trie_changes,
background_migration_threads: config.client_config.client_background_migration_threads,
resharding_config: config.client_config.resharding_config.clone(),
};
let executor = Arc::new(SerialExecutor::new(ChainStore::new(
node_storage.get_hot_store(),
chain_genesis.height,
false,
chain_genesis.transaction_validity_period,
)));
let chain = Chain::new(
Clock::real(),
epoch_manager,
shard_tracker,
runtime_adapter,
&chain_genesis,
DoomslugThresholdMode::TwoThirds,
chain_config,
None,
Arc::new(RayonAsyncComputationSpawner),
MutableConfigValue::new(None, "validator_signer"),
executor.as_multi_sender(),
)?;
Ok((chain, executor))
}

fn get_resharding_status_for_shard(
shard_uid: &ShardUId,
chain: &Chain,
) -> anyhow::Result<FlatStorageReshardingStatus> {
use near_store::flat::FlatStorageStatus::*;
let flat_storage_status =
chain.runtime_adapter.store().flat_store().get_flat_storage_status(*shard_uid)?;
match &flat_storage_status {
Disabled | Empty | Creation(_) | Ready(_) => Err(anyhow!(
"resharding is not in progress! flat storage status: {:?}",
flat_storage_status
)),
Resharding(status) => Ok(status.clone()),
}
}

/// Executor that runs tasks in sequence.
struct SerialExecutor {
chain_store: Arc<Mutex<ChainStore>>,
tasks: Mutex<VecDeque<Box<dyn Fn(&ChainStore) -> FlatStorageReshardingTaskResult + Send>>>,
}

impl SerialExecutor {
fn new(chain_store: ChainStore) -> Self {
Self { chain_store: Arc::new(Mutex::new(chain_store)), tasks: Mutex::new(VecDeque::new()) }
}

/// Runs the next task.
///
/// Returns false if there are no tasks.
fn run(&self) -> anyhow::Result<bool> {
let task = self.tasks.lock().unwrap().pop_front();
match task {
Some(task) => {
match task(&self.chain_store.lock().unwrap()) {
FlatStorageReshardingTaskResult::Successful { num_batches_done } => {
info!(target: "resharding", num_batches_done, "task completed");
}
FlatStorageReshardingTaskResult::Failed => {
return Err(anyhow!("resharding task has failed!"));
}
FlatStorageReshardingTaskResult::Cancelled => {
info!(target: "resharding", "task cancelled")
}
FlatStorageReshardingTaskResult::Postponed => {
info!(target: "resharding", "task postponed - retrying");
self.tasks.lock().unwrap().push_back(task);
}
}
Ok(true)
}
None => Ok(false),
}
}
}

impl CanSend<FlatStorageSplitShardRequest> for SerialExecutor {
fn send(&self, msg: FlatStorageSplitShardRequest) {
let task =
Box::new(move |chain_store: &ChainStore| msg.resharder.split_shard_task(chain_store));
self.tasks.lock().unwrap().push_back(task);
}
}

impl CanSend<FlatStorageShardCatchupRequest> for SerialExecutor {
fn send(&self, msg: FlatStorageShardCatchupRequest) {
let task = Box::new(move |chain_store: &ChainStore| {
msg.resharder.shard_catchup_task(msg.shard_uid, chain_store)
});
self.tasks.lock().unwrap().push_back(task);
}
}

impl CanSend<MemtrieReloadRequest> for SerialExecutor {
fn send(&self, _: MemtrieReloadRequest) {
// no op
}
}

0 comments on commit 5581f65

Please sign in to comment.