Skip to content

Commit

Permalink
feat(resharding): label metrics with the parent shard uid (near#10070)
Browse files Browse the repository at this point in the history
It actually makes sense to label metrics by parent shard uid. Otherwise
the same metric value is duplicated for the children. It's because the
processing also happens per parent.
  • Loading branch information
wacban authored Nov 2, 2023
2 parents f38d3bf + 2853e08 commit f9e6707
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 39 deletions.
40 changes: 24 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 10 additions & 16 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ impl Chain {
let prev_block_header = self.get_block_header(prev_hash)?;
let prev_prev_hash = prev_block_header.prev_hash();
let state_root = *self.get_chunk_extra(&prev_hash, &shard_uid)?.state_root();
let child_shard_uids = next_epoch_shard_layout.get_split_shard_uids(shard_id);

state_split_scheduler(StateSplitRequest {
tries: Arc::new(self.runtime_adapter.get_tries()),
Expand All @@ -211,11 +210,9 @@ impl Chain {
config: self.state_split_config,
});

for shard_uid in child_shard_uids.unwrap_or_default() {
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::Scheduled.into());
}
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::Scheduled.into());
Ok(())
}

Expand Down Expand Up @@ -268,11 +265,9 @@ impl Chain {
let mut state_roots: HashMap<_, _> =
new_shards.iter().map(|shard_uid| (*shard_uid, Trie::EMPTY_ROOT)).collect();

for shard_uid in &new_shards {
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::BuildingState.into());
}
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::BuildingState.into());

// Build the required iterator from flat storage and delta changes. Note that we are
// working with iterators as we don't want to have all the state in memory at once.
Expand Down Expand Up @@ -382,6 +377,7 @@ impl Chain {

pub fn build_state_for_split_shards_postprocessing(
&mut self,
shard_uid: ShardUId,
sync_hash: &CryptoHash,
state_roots: HashMap<ShardUId, StateRoot>,
) -> Result<(), Error> {
Expand All @@ -400,11 +396,9 @@ impl Chain {
}
chain_store_update.commit()?;

for shard_uid in child_shard_uids {
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::Finished.into());
}
RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::Finished.into());

Ok(())
}
Expand Down
20 changes: 13 additions & 7 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,18 @@ impl StateSync {
let prev_hash = *chain.get_block_header(&sync_hash)?.prev_hash();
let prev_epoch_id = chain.get_block_header(&prev_hash)?.epoch_id().clone();
let epoch_id = chain.get_block_header(&sync_hash)?.epoch_id().clone();
if epoch_manager.get_shard_layout(&prev_epoch_id)?
!= epoch_manager.get_shard_layout(&epoch_id)?
{
let prev_shard_layout = epoch_manager.get_shard_layout(&prev_epoch_id)?;
let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?;
if prev_shard_layout != shard_layout {
// This error message is used in tests to ensure node exists for the
// correct reason. When changing it please also update the tests.
panic!("cannot sync to the first epoch after sharding upgrade. Please wait for the next epoch or find peers that are more up to date");
}
let split_states = epoch_manager.will_shard_layout_change(&prev_hash)?;

for shard_id in tracking_shards {
let version = prev_shard_layout.version();
let shard_uid = ShardUId { version, shard_id: shard_id as u32 };
let mut download_timeout = false;
let mut run_shard_state_download = false;
let shard_sync_download = sync_status.entry(shard_id).or_insert_with(|| {
Expand Down Expand Up @@ -341,7 +343,7 @@ impl StateSync {
ShardSyncStatus::StateSplitApplying => {
debug_assert!(split_states);
shard_sync_done = self.sync_shards_state_split_applying_status(
shard_id,
shard_uid,
shard_sync_download,
sync_hash,
chain,
Expand Down Expand Up @@ -1048,15 +1050,19 @@ impl StateSync {
/// Returns whether the State Sync for the given shard is complete.
fn sync_shards_state_split_applying_status(
&mut self,
shard_id: ShardId,
shard_uid: ShardUId,
shard_sync_download: &mut ShardSyncDownload,
sync_hash: CryptoHash,
chain: &mut Chain,
) -> Result<bool, near_chain::Error> {
let result = self.split_state_roots.remove(&shard_id);
let result = self.split_state_roots.remove(&shard_uid.shard_id());
let mut shard_sync_done = false;
if let Some(state_roots) = result {
chain.build_state_for_split_shards_postprocessing(&sync_hash, state_roots?)?;
chain.build_state_for_split_shards_postprocessing(
shard_uid,
&sync_hash,
state_roots?,
)?;
*shard_sync_download =
ShardSyncDownload { downloads: vec![], status: ShardSyncStatus::StateSyncDone };
shard_sync_done = true;
Expand Down

0 comments on commit f9e6707

Please sign in to comment.