Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement resume-resharding tool #12796

Merged
merged 5 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ impl FlatStorageResharder {
split_params.clone(),
)),
);
// Do not update parent flat head, to avoid overriding the resharding status.
// In any case, at the end of resharding the parent shard will completely disappear.
self.runtime
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to freeze the parent flat storage in the end, because otherwise something in chain was overriding its status.

.get_flat_storage_manager()
.get_flat_storage_for_shard(parent_shard)
.expect("flat storage of the parent shard must exist!")
.set_flat_head_update_mode(false);
store_update.set_flat_storage_status(
left_child_shard,
FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CreatingChild),
Expand Down Expand Up @@ -569,6 +576,10 @@ impl FlatStorageResharder {
parent_shard,
FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }),
);
self.runtime
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really needed, but included for completeness, same at line 898

.get_flat_storage_manager()
.get_flat_storage_for_shard(parent_shard)
.map(|flat_storage| flat_storage.set_flat_head_update_mode(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not needed in FlatStorageReshardingTaskResult::Cancelled case too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good point you raised.
The operation of 'cancelling' resharing in the context of flat storage means that the node got SIGINT'ed. We want to cancel the tasks to shutdown quickly, but continue resharding on startup. So we keep flat storage locked to ensure the its state is locked in Resharding.

We could argue that unlocking flat storage is not needed also for Failed since we panic immediately, but well, maybe one day we won't panic

// Remove children shards entirely.
for child_shard in [left_child_shard, right_child_shard] {
store_update.remove_flat_storage(child_shard);
Expand Down Expand Up @@ -884,6 +895,10 @@ impl FlatStorageResharder {
flat_head: split_status.flat_head,
}),
);
self.runtime
.get_flat_storage_manager()
.get_flat_storage_for_shard(parent_shard)
.map(|flat_storage| flat_storage.set_flat_head_update_mode(true));
// Remove children shards status.
for child_shard in [split_status.left_child_shard, split_status.right_child_shard] {
store_update.remove_status(child_shard);
Expand Down
6 changes: 5 additions & 1 deletion core/store/src/flat/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use super::delta::{CachedFlatStateDelta, FlatStateDelta};
use super::metrics::FlatStorageMetrics;
use super::types::FlatStorageError;
use super::FlatStorageReshardingStatus;

/// FlatStorage stores information on which blocks flat storage current supports key lookups on.
/// Note that this struct is shared by multiple threads, the chain thread, threads that apply chunks,
Expand Down Expand Up @@ -235,6 +236,9 @@ impl FlatStorage {
let shard_id = shard_uid.shard_id();
let flat_head = match store.get_flat_storage_status(shard_uid) {
Ok(FlatStorageStatus::Ready(ready_status)) => ready_status.flat_head,
Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::SplittingParent(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to load the parent flat storage stuck in the "middle" of resharding

split_parent_status,
))) => split_parent_status.flat_head,
status => {
return Err(StorageError::StorageInconsistentState(format!(
"Cannot create flat storage for shard {shard_id} with status {status:?}"
Expand Down Expand Up @@ -492,7 +496,7 @@ impl FlatStorage {
}

/// Updates `move_head_enabled` and returns whether the change was done.
pub(crate) fn set_flat_head_update_mode(&self, enabled: bool) {
pub fn set_flat_head_update_mode(&self, enabled: bool) {
let mut guard = self.0.write().expect(crate::flat::POISONED_LOCK_ERR);
guard.move_head_enabled = enabled;
}
Expand Down
1 change: 1 addition & 0 deletions tools/flat-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ near-epoch-manager.workspace = true
near-primitives.workspace = true
near-store.workspace = true
nearcore.workspace = true
near-async.workspace = true
13 changes: 13 additions & 0 deletions tools/flat-storage/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::resume_resharding::resume_resharding;
/// Tools for modifying flat storage - should be used only for experimentation & debugging.
use borsh::BorshDeserialize;
use clap::Parser;
Expand All @@ -17,7 +18,7 @@
use near_store::{DBCol, Mode, NodeStorage, ShardUId, Store, StoreOpener};
use nearcore::{load_config, NearConfig, NightshadeRuntime, NightshadeRuntimeExt};
use std::{path::PathBuf, sync::Arc};
use tqdm::tqdm;

Check warning on line 21 in tools/flat-storage/src/commands.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (tqdm)

Check warning on line 21 in tools/flat-storage/src/commands.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (tqdm)

#[derive(Parser)]
pub struct FlatStorageCommand {
Expand Down Expand Up @@ -47,6 +48,9 @@

/// Move flat storage head.
MoveFlatHead(MoveFlatHeadCmd),

/// Resume an unfinished Flat storage resharding for a given shard.
ResumeResharding(ResumeReshardingCmd),
}

#[derive(Parser)]
Expand Down Expand Up @@ -124,6 +128,12 @@
mode: MoveFlatHeadMode,
}

#[derive(Parser)]
pub struct ResumeReshardingCmd {
#[clap(long)]
pub shard_id: ShardId,
}

fn print_delta(store: &FlatStoreAdapter, shard_uid: ShardUId, metadata: FlatStateDeltaMetadata) {
let changes = store.get_delta(shard_uid, metadata.block.hash).unwrap().unwrap();
println!("{:?}", FlatStateDelta { metadata, changes });
Expand Down Expand Up @@ -261,7 +271,7 @@

let head_hash = match hot_store
.get_flat_storage_status(shard_uid)
.expect("falied to read flat storage status")

Check warning on line 274 in tools/flat-storage/src/commands.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (falied)
{
FlatStorageStatus::Ready(ready_status) => ready_status.flat_head.hash,
status => {
Expand Down Expand Up @@ -298,7 +308,7 @@
let trie_iter = trie.disk_iter()?;
let mut verified = 0;
let mut success = true;
for (item_trie, item_flat) in tqdm(std::iter::zip(trie_iter, flat_state_entries_iter)) {

Check warning on line 311 in tools/flat-storage/src/commands.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (tqdm)
let item_flat = item_flat?;
let value_ref = item_flat.1.to_value_ref();
verified += 1;
Expand Down Expand Up @@ -549,6 +559,9 @@
SubCommand::MoveFlatHead(cmd) => {
self.move_flat_head(cmd, home_dir, &near_config, opener)
}
SubCommand::ResumeResharding(cmd) => {
resume_resharding(cmd, home_dir, &near_config, opener)
}
}
}
}
1 change: 1 addition & 0 deletions tools/flat-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod commands;
mod resume_resharding;
174 changes: 174 additions & 0 deletions tools/flat-storage/src/resume_resharding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use crate::commands::ResumeReshardingCmd;
use anyhow::anyhow;
use near_async::messaging::{CanSend, IntoMultiSender};
use near_async::time::Clock;
use near_chain::flat_storage_resharder::FlatStorageReshardingTaskResult;
use near_chain::rayon_spawner::RayonAsyncComputationSpawner;
use near_chain::resharding::types::{
FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest,
};
use near_chain::types::ChainConfig;
use near_chain::{Chain, ChainGenesis, ChainStore, DoomslugThresholdMode};
use near_chain_configs::MutableConfigValue;
use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManager;
use near_store::adapter::StoreAdapter;
use near_store::flat::FlatStorageReshardingStatus;
use near_store::{ShardUId, StoreOpener};
use nearcore::{NearConfig, NightshadeRuntime, NightshadeRuntimeExt};
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::info;

pub(crate) fn resume_resharding(
cmd: &ResumeReshardingCmd,
home_dir: &PathBuf,
config: &NearConfig,
opener: StoreOpener,
) -> anyhow::Result<()> {
let (chain, executor) = create_chain_and_executor(config, home_dir, opener)?;
let resharder = chain.resharding_manager.flat_storage_resharder.clone();

let shard_uid = ShardUId::new(3, cmd.shard_id); // version is fixed at 3 in resharding V3
let resharding_status = get_resharding_status_for_shard(&shard_uid, &chain)?;

chain.runtime_adapter.get_flat_storage_manager().create_flat_storage_for_shard(shard_uid)?;

resharder.resume(shard_uid, &resharding_status)?;

while executor.run()? {
info!(target: "resharding", "running next task");
}

info!(target: "resharding", "operation completed");
Ok(())
}

fn create_chain_and_executor(
config: &NearConfig,
home_dir: &Path,
opener: StoreOpener,
) -> anyhow::Result<(Chain, Arc<SerialExecutor>)> {
let node_storage = opener.open_in_mode(near_store::Mode::ReadWriteExisting).unwrap();
let epoch_manager = EpochManager::new_arc_handle(
node_storage.get_hot_store(),
&config.genesis.config,
Some(home_dir),
);
let runtime_adapter = NightshadeRuntime::from_config(
home_dir,
node_storage.get_hot_store(),
&config,
epoch_manager.clone(),
)?;
let shard_tracker =
ShardTracker::new(TrackedConfig::from_config(&config.client_config), epoch_manager.clone());
let chain_genesis = ChainGenesis::new(&config.genesis.config);
let chain_config = ChainConfig {
save_trie_changes: config.client_config.save_trie_changes,
background_migration_threads: config.client_config.client_background_migration_threads,
resharding_config: config.client_config.resharding_config.clone(),
};
let executor = Arc::new(SerialExecutor::new(ChainStore::new(
node_storage.get_hot_store(),
chain_genesis.height,
false,
chain_genesis.transaction_validity_period,
)));
let chain = Chain::new(
Clock::real(),
epoch_manager,
shard_tracker,
runtime_adapter,
&chain_genesis,
DoomslugThresholdMode::TwoThirds,
chain_config,
None,
Arc::new(RayonAsyncComputationSpawner),
MutableConfigValue::new(None, "validator_signer"),
executor.as_multi_sender(),
)?;
Ok((chain, executor))
}

fn get_resharding_status_for_shard(
shard_uid: &ShardUId,
chain: &Chain,
) -> anyhow::Result<FlatStorageReshardingStatus> {
use near_store::flat::FlatStorageStatus::*;
let flat_storage_status =
chain.runtime_adapter.store().flat_store().get_flat_storage_status(*shard_uid)?;
match &flat_storage_status {
Disabled | Empty | Creation(_) | Ready(_) => Err(anyhow!(
"resharding is not in progress! flat storage status: {:?}",
flat_storage_status
)),
Resharding(status) => Ok(status.clone()),
}
}

/// Executor that runs tasks in sequence.
struct SerialExecutor {
chain_store: Arc<Mutex<ChainStore>>,
tasks: Mutex<VecDeque<Box<dyn Fn(&ChainStore) -> FlatStorageReshardingTaskResult + Send>>>,
}

impl SerialExecutor {
fn new(chain_store: ChainStore) -> Self {
Self { chain_store: Arc::new(Mutex::new(chain_store)), tasks: Mutex::new(VecDeque::new()) }
}

/// Runs the next task.
///
/// Returns false if there are no tasks.
fn run(&self) -> anyhow::Result<bool> {
let task = self.tasks.lock().unwrap().pop_front();
match task {
Some(task) => {
match task(&self.chain_store.lock().unwrap()) {
FlatStorageReshardingTaskResult::Successful { num_batches_done } => {
info!(target: "resharding", num_batches_done, "task completed");
}
FlatStorageReshardingTaskResult::Failed => {
return Err(anyhow!("resharding task has failed!"));
}
FlatStorageReshardingTaskResult::Cancelled => {
info!(target: "resharding", "task cancelled")
}
FlatStorageReshardingTaskResult::Postponed => {
info!(target: "resharding", "task postponed - retrying");
self.tasks.lock().unwrap().push_back(task);
}
}
Ok(true)
}
None => Ok(false),
}
}
}

impl CanSend<FlatStorageSplitShardRequest> for SerialExecutor {
fn send(&self, msg: FlatStorageSplitShardRequest) {
let resharder = msg.resharder.clone();
let task =
Box::new(move |chain_store: &ChainStore| resharder.split_shard_task(chain_store));
self.tasks.lock().unwrap().push_back(task);
}
}

impl CanSend<FlatStorageShardCatchupRequest> for SerialExecutor {
fn send(&self, msg: FlatStorageShardCatchupRequest) {
let resharder = msg.resharder.clone();
let task = Box::new(move |chain_store: &ChainStore| {
resharder.shard_catchup_task(msg.shard_uid, chain_store)
});
self.tasks.lock().unwrap().push_back(task);
}
}

impl CanSend<MemtrieReloadRequest> for SerialExecutor {
fn send(&self, _: MemtrieReloadRequest) {
// no op
}
}
Loading