diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index b5ee4a6652e..ba1b8b0ed78 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -185,11 +185,7 @@ impl ReshardingManager { ?block_hash, block_height, ?parent_shard_uid) .entered(); - // TODO(resharding): what if node doesn't have memtrie? just pause - // processing? - // TODO(resharding): fork handling. if epoch is finalized on different - // blocks, the second finalization will crash. - tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?; + tries.freeze_parent_memtrie(parent_shard_uid, split_shard_event.children_shards())?; let parent_chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?; let boundary_account = split_shard_event.boundary_account; @@ -221,7 +217,7 @@ impl ReshardingManager { let mem_trie_update = mem_tries.update(*parent_chunk_extra.state_root(), mode)?; let trie_changes = mem_trie_update.retain_split_shard(&boundary_account, retain_mode); - let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap(); + let mem_changes = trie_changes.memtrie_changes.as_ref().unwrap(); let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes); drop(mem_tries); diff --git a/core/store/src/trie/mem/mem_trie_update.rs b/core/store/src/trie/mem/mem_trie_update.rs index 71c14a52ef5..c31fbc1b8b6 100644 --- a/core/store/src/trie/mem/mem_trie_update.rs +++ b/core/store/src/trie/mem/mem_trie_update.rs @@ -448,7 +448,8 @@ impl<'a, M: ArenaMemory> MemTrieUpdate<'a, M> { .unwrap_or_default(), insertions, deletions, - mem_trie_changes: Some(mem_trie_changes), + memtrie_changes: Some(mem_trie_changes), + children_memtrie_changes: Default::default(), } } @@ -610,7 +611,7 @@ mod tests { let disk_changes = self.make_disk_changes_only(changes.clone()); let mut all_changes = self.make_all_changes(changes.clone()); - let mem_trie_changes_from_all_changes = all_changes.mem_trie_changes.take().unwrap(); + let mem_trie_changes_from_all_changes = all_changes.memtrie_changes.take().unwrap(); assert_eq!(memtrie_changes, mem_trie_changes_from_all_changes); assert_eq!(disk_changes, all_changes); diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index 67c6ce9f29b..435ba682c26 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -18,6 +18,7 @@ pub use crate::trie::trie_storage::{TrieCache, TrieCachingStorage, TrieDBStorage use crate::StorageError; use borsh::{BorshDeserialize, BorshSerialize}; pub use from_flat::construct_trie_from_flat; +use itertools::Itertools; use mem::mem_trie_update::{TrackingMode, UpdatedMemTrieNodeWithSize}; use mem::mem_tries::MemTries; use near_primitives::challenge::PartialState; @@ -37,7 +38,7 @@ use ops::interface::{GenericTrieValue, UpdatedNodeId}; use ops::resharding::{GenericTrieUpdateRetain, RetainMode}; pub use raw_node::{Children, RawTrieNode, RawTrieNodeWithSize}; use std::cell::RefCell; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Write; use std::hash::Hash; use std::ops::DerefMut; @@ -344,6 +345,10 @@ impl std::fmt::Debug for TrieNode { pub struct Trie { storage: Arc, memtries: Option>>, + /// In super rare cases it's possible that we see forks close to the resharding boundary. + /// We would like to apply the same set of trie changes to the child memtrie to keep + /// a consistent view across forks. + children_memtries: HashMap>>, root: StateRoot, /// If present, flat storage is used to look up keys (if asked for). /// Otherwise, we would crawl through the trie. @@ -569,7 +574,12 @@ pub struct TrieChanges { deletions: Vec, // If Some, in-memory changes are applied as well. #[borsh(skip)] - pub mem_trie_changes: Option, + pub memtrie_changes: Option, + // In super rare cases it's possible that we see forks close to the resharding boundary. + // We would like to apply the same set of trie changes to the child memtrie to keep + // a consistent view across forks. + #[borsh(skip)] + pub children_memtrie_changes: HashMap, } impl TrieChanges { @@ -579,7 +589,8 @@ impl TrieChanges { new_root: old_root, insertions: vec![], deletions: vec![], - mem_trie_changes: Default::default(), + memtrie_changes: Default::default(), + children_memtrie_changes: Default::default(), } } @@ -678,12 +689,13 @@ impl Trie { root: StateRoot, flat_storage_chunk_view: Option, ) -> Self { - Self::new_with_memtries(storage, None, root, flat_storage_chunk_view) + Self::new_with_memtries(storage, None, Default::default(), root, flat_storage_chunk_view) } pub fn new_with_memtries( storage: Arc, memtries: Option>>, + children_memtries: HashMap>>, root: StateRoot, flat_storage_chunk_view: Option, ) -> Self { @@ -702,6 +714,7 @@ impl Trie { Trie { storage, memtries, + children_memtries, root, charge_gas_for_trie_node_access, flat_storage_chunk_view, @@ -732,6 +745,7 @@ impl Trie { let mut trie = Self::new_with_memtries( self.storage.clone(), self.memtries.clone(), + self.children_memtries.clone(), self.root, self.flat_storage_chunk_view.clone(), ); @@ -1667,50 +1681,94 @@ impl Trie { let _ = self.get(&trie_key.to_vec()); } - match &self.memtries { - Some(memtries) => { - let guard = memtries.read().unwrap(); - let mut recorder = self.recorder.as_ref().map(|recorder| recorder.borrow_mut()); - let tracking_mode = match &mut recorder { - Some(recorder) => TrackingMode::RefcountsAndAccesses(recorder.deref_mut()), - None => TrackingMode::Refcounts, - }; - - let mut trie_update = guard.update(self.root, tracking_mode)?; - for (key, value) in changes { - match value { - Some(arr) => trie_update.insert(&key, arr)?, - None => trie_update.generic_delete(0, &key)?, + if self.memtries.is_some() { + self.update_with_memtrie(changes) + } else { + self.update_with_trie_storage(changes) + } + } + + fn update_with_memtrie(&self, changes: I) -> Result + where + I: IntoIterator, Option>)>, + { + // Get trie_update for memtrie + let guard = self.memtries.as_ref().unwrap().read().unwrap(); + let mut recorder = self.recorder.as_ref().map(|recorder| recorder.borrow_mut()); + let tracking_mode = match &mut recorder { + Some(recorder) => TrackingMode::RefcountsAndAccesses(recorder.deref_mut()), + None => TrackingMode::Refcounts, + }; + let mut trie_update = guard.update(self.root, tracking_mode)?; + + // Get trie_update for all child memtries + let child_guards = self + .children_memtries + .iter() + .map(|(shard_uid, memtrie)| (shard_uid, memtrie.read().unwrap())) + .collect_vec(); + let mut child_updates = child_guards + .iter() + .map(|(shard_uid, memtrie)| { + (shard_uid, memtrie.update(self.root, TrackingMode::None).unwrap()) + }) + .collect_vec(); + + // Insert key, value into both the child memtries as well as the main memtrie + for (key, value) in changes { + match value { + Some(arr) => { + // Update all child memtries. This is a rare case where parent shard + // has forks after resharding. It is fine to clone the value here. + for trie_update in &mut child_updates { + trie_update.1.insert(&key, arr.clone())?; } + trie_update.insert(&key, arr)?; } - - Ok(trie_update.to_trie_changes()) - } - None => { - let mut trie_update = TrieStorageUpdate::new(&self); - let root_node = self.move_node_to_mutable(&mut trie_update, &self.root)?; - for (key, value) in changes { - match value { - Some(arr) => trie_update.generic_insert( - root_node.0, - &key, - GenericTrieValue::MemtrieAndDisk(arr), - )?, - None => trie_update.generic_delete(0, &key)?, - }; + None => { + // Update all child memtries. This is a rare case where parent shard + // has forks after resharding. + for trie_update in &mut child_updates { + trie_update.1.generic_delete(0, &key)?; + } + trie_update.generic_delete(0, &key)?; } + } + } - #[cfg(test)] - { - self.memory_usage_verify( - &trie_update, - GenericNodeOrIndex::Updated(root_node.0), - ); - } + let mut trie_changes = trie_update.to_trie_changes(); + for (shard_uid, trie_update) in child_updates { + trie_changes + .children_memtrie_changes + .insert(**shard_uid, trie_update.to_mem_trie_changes_only()); + } - trie_update.flatten_nodes(&self.root, root_node.0) - } + Ok(trie_changes) + } + + fn update_with_trie_storage(&self, changes: I) -> Result + where + I: IntoIterator, Option>)>, + { + let mut trie_update = TrieStorageUpdate::new(&self); + let root_node = self.move_node_to_mutable(&mut trie_update, &self.root)?; + for (key, value) in changes { + match value { + Some(arr) => trie_update.generic_insert( + root_node.0, + &key, + GenericTrieValue::MemtrieAndDisk(arr), + )?, + None => trie_update.generic_delete(0, &key)?, + }; } + + #[cfg(test)] + { + self.memory_usage_verify(&trie_update, GenericNodeOrIndex::Updated(root_node.0)); + } + + trie_update.flatten_nodes(&self.root, root_node.0) } /// Returns an iterator that can be used to traverse any range in the trie. @@ -2375,7 +2433,8 @@ mod borsh_compatibility_test { hash(b"e"), std::num::NonZeroU32::new(2).unwrap(), )], - mem_trie_changes: None, + memtrie_changes: None, + children_memtrie_changes: Default::default(), } ); } diff --git a/core/store/src/trie/ops/tests.rs b/core/store/src/trie/ops/tests.rs index ca8eb8dab88..9e52f27f0e6 100644 --- a/core/store/src/trie/ops/tests.rs +++ b/core/store/src/trie/ops/tests.rs @@ -109,7 +109,7 @@ fn run(initial_entries: Vec<(Vec, Vec)>, retain_multi_ranges: Vec>>, /// Configures how to make state snapshots. state_snapshot_config: StateSnapshotConfig, + /// Details on which shards are split into which shards. + /// The mapping is from parent_shard_uid to a list of child_shard_uids. + /// This mapping is only maintained for parent mem_tries that have split and + /// converted to hybrid mem_tries for the children. + /// + /// Once the child memtrie is recreated, we can remove it from this mapping. + /// + /// In super rare cases it's possible that we see forks close to the resharding boundary. + /// We would like to apply the same set of trie changes to the child memtrie to keep + /// a consistent view across forks. + temp_split_shard_map: RwLock>>, } #[derive(Clone)] @@ -61,13 +71,14 @@ impl ShardTries { ShardTries(Arc::new(ShardTriesInner { store, trie_config, - mem_tries: RwLock::new(HashMap::new()), + mem_tries: Default::default(), caches: Mutex::new(caches), view_caches: Mutex::new(view_caches), flat_storage_manager, prefetchers: Default::default(), - state_snapshot: Arc::new(RwLock::new(None)), + state_snapshot: Default::default(), state_snapshot_config, + temp_split_shard_map: Default::default(), })) } @@ -130,8 +141,33 @@ impl ShardTries { .and_then(|block_hash| self.0.flat_storage_manager.chunk_view(shard_uid, block_hash)); // Do not use memtries for view queries, for two reasons: memtries do not provide historical state, // and also this can introduce lock contention on memtries. - let memtries = if is_view { None } else { self.get_mem_tries(shard_uid) }; - Trie::new_with_memtries(storage, memtries, state_root, flat_storage_chunk_view) + if is_view { + Trie::new_with_memtries( + storage, + None, + Default::default(), + state_root, + flat_storage_chunk_view, + ) + } else { + let memtries = self.get_mem_tries(shard_uid); + let split_shard_map_guard = self.0.temp_split_shard_map.read().unwrap(); + let children_shard_uid = + split_shard_map_guard.get(&shard_uid).cloned().unwrap_or_default(); + let mut children_memtries = HashMap::new(); + for shard_uid in children_shard_uid { + if let Some(memtrie) = self.get_mem_tries(shard_uid) { + children_memtries.insert(shard_uid, memtrie); + } + } + Trie::new_with_memtries( + storage, + memtries, + children_memtries, + state_root, + flat_storage_chunk_view, + ) + } } pub fn get_trie_for_shard(&self, shard_uid: ShardUId, state_root: StateRoot) -> Trie { @@ -366,15 +402,30 @@ impl ShardTries { shard_uid: ShardUId, block_height: BlockHeight, ) -> Option { + // Apply children memtrie changes in case of forks on parent. Most of the time children_memtrie_changes + // will be empty. Lookup children_memtrie_changes for more context. + let split_shard_map_guard = self.0.temp_split_shard_map.read().unwrap(); + let children_shard_uid = split_shard_map_guard.get(&shard_uid).cloned().unwrap_or_default(); + for (shard_uid, memtrie_changes) in &trie_changes.children_memtrie_changes { + // Note that we should only be writing the changes to the child memtrie iff the child and parent + // share the frozen memtrie base. + // It's possible that while processing the block, the child memtrie was recreated and no longer + // shares the base with parent, in which case we skip writing the changes. + if children_shard_uid.contains(&shard_uid) { + let memtrie = self.get_mem_tries(*shard_uid).expect("Memtrie must exist"); + memtrie.write().unwrap().apply_memtrie_changes(block_height, memtrie_changes); + } + } + if let Some(memtries) = self.get_mem_tries(shard_uid) { let changes = trie_changes - .mem_trie_changes + .memtrie_changes .as_ref() .expect("Memtrie changes must be present if memtrie is loaded"); Some(memtries.write().unwrap().apply_memtrie_changes(block_height, changes)) } else { assert!( - trie_changes.mem_trie_changes.is_none(), + trie_changes.memtrie_changes.is_none(), "Memtrie changes must not be present if memtrie is not loaded" ); None @@ -395,17 +446,17 @@ impl ShardTries { /// Retains in-memory tries for given shards, i.e. unload tries from memory for shards that are NOT /// in the given list. Should be called to unload obsolete tries from memory. pub fn retain_mem_tries(&self, shard_uids: &[ShardUId]) { - info!(target: "memtrie", "Current memtries: {:?}. Keeping memtries for shards {:?}...", + tracing::info!(target: "memtrie", "Current memtries: {:?}. Keeping memtries for shards {:?}...", self.0.mem_tries.read().unwrap().keys(), shard_uids); self.0.mem_tries.write().unwrap().retain(|shard_uid, _| shard_uids.contains(shard_uid)); - info!(target: "memtrie", "Memtries retaining complete for shards {:?}", shard_uids); + tracing::info!(target: "memtrie", "Memtries retaining complete for shards {:?}", shard_uids); } /// Remove trie from memory for given shard. pub fn unload_mem_trie(&self, shard_uid: &ShardUId) { - info!(target: "memtrie", "Unloading trie from memory for shard {:?}...", shard_uid); + tracing::info!(target: "memtrie", "Unloading trie from memory for shard {:?}...", shard_uid); self.0.mem_tries.write().unwrap().remove(shard_uid); - info!(target: "memtrie", "Memtrie unloading complete for shard {:?}", shard_uid); + tracing::info!(target: "memtrie", "Memtrie unloading complete for shard {:?}", shard_uid); } /// Loads in-memory-trie for given shard and state root (if given). @@ -415,7 +466,7 @@ impl ShardTries { state_root: Option, parallelize: bool, ) -> Result<(), StorageError> { - info!(target: "memtrie", "Loading trie to memory for shard {:?}...", shard_uid); + tracing::info!(target: "memtrie", "Loading trie to memory for shard {:?}...", shard_uid); let mem_tries = load_trie_from_flat_state_and_delta( &self.0.store.store(), *shard_uid, @@ -423,7 +474,7 @@ impl ShardTries { parallelize, )?; self.0.mem_tries.write().unwrap().insert(*shard_uid, Arc::new(RwLock::new(mem_tries))); - info!(target: "memtrie", "Memtrie loading complete for shard {:?}", shard_uid); + tracing::info!(target: "memtrie", "Memtrie loading complete for shard {:?}", shard_uid); Ok(()) } @@ -479,13 +530,13 @@ impl ShardTries { ?shard_uids_pending_resharding, "Loading tries config" ); - info!(target: "memtrie", "Loading tries to memory for shards {:?}...", shard_uids_to_load); + tracing::info!(target: "memtrie", "Loading tries to memory for shards {:?}...", shard_uids_to_load); shard_uids_to_load .par_iter() .map(|shard_uid| self.load_mem_trie(shard_uid, None, parallelize)) .collect::>()?; - info!(target: "memtrie", "Memtries loading complete for shards {:?}", shard_uids_to_load); + tracing::info!(target: "memtrie", "Memtries loading complete for shards {:?}", shard_uids_to_load); Ok(()) } @@ -507,12 +558,18 @@ impl ShardTries { /// children shards. /// Needed to serve queries for these shards just after resharding, before /// proper memtries are loaded. - pub fn freeze_mem_tries( + pub fn freeze_parent_memtrie( &self, parent_shard_uid: ShardUId, children_shard_uids: Vec, ) -> Result<(), StorageError> { - info!( + let mut split_shard_map_guard = self.0.temp_split_shard_map.write().unwrap(); + if split_shard_map_guard.contains_key(&parent_shard_uid) { + // If the parent has already been split, then we don't do anything here. + return Ok(()); + } + + tracing::info!( target: "memtrie", ?parent_shard_uid, ?children_shard_uids, @@ -529,6 +586,7 @@ impl ShardTries { let memtries = std::mem::replace(&mut *guard, MemTries::new(parent_shard_uid)); let frozen_memtries = memtries.freeze(); + // Create hybrid memtrie for both parent and children shards. for shard_uid in [vec![parent_shard_uid], children_shard_uids.clone()].concat() { outer_guard.insert( shard_uid, @@ -539,12 +597,10 @@ impl ShardTries { ); } - info!( - target: "memtrie", - ?parent_shard_uid, - ?children_shard_uids, - "Memtries freezing complete" - ); + // Insert the mapping from parent to children shards. + split_shard_map_guard.insert(parent_shard_uid, children_shard_uids); + + tracing::info!(target: "memtrie", "Memtries freezing complete"); Ok(()) } } diff --git a/core/store/src/trie/state_parts.rs b/core/store/src/trie/state_parts.rs index 5044a432ff1..4f1ee100baf 100644 --- a/core/store/src/trie/state_parts.rs +++ b/core/store/src/trie/state_parts.rs @@ -476,7 +476,8 @@ impl Trie { new_root: *state_root, insertions, deletions, - mem_trie_changes: None, + memtrie_changes: None, + children_memtrie_changes: Default::default(), }, flat_state_delta, contract_codes, @@ -643,7 +644,8 @@ mod tests { new_root: *state_root, insertions, deletions: vec![], - mem_trie_changes: None, + memtrie_changes: None, + children_memtrie_changes: Default::default(), }) } @@ -900,7 +902,8 @@ mod tests { new_root, insertions, deletions, - mem_trie_changes: None, + memtrie_changes: None, + children_memtrie_changes: Default::default(), } } diff --git a/core/store/src/trie/trie_storage_update.rs b/core/store/src/trie/trie_storage_update.rs index 758a528df1d..68359a7d69e 100644 --- a/core/store/src/trie/trie_storage_update.rs +++ b/core/store/src/trie/trie_storage_update.rs @@ -246,7 +246,8 @@ impl TrieStorageUpdate<'_> { new_root: last_hash, insertions, deletions, - mem_trie_changes: None, + memtrie_changes: None, + children_memtrie_changes: Default::default(), }) } diff --git a/tools/protocol-schema-check/res/protocol_schema.toml b/tools/protocol-schema-check/res/protocol_schema.toml index dcb30b62353..66d9614aa41 100644 --- a/tools/protocol-schema-check/res/protocol_schema.toml +++ b/tools/protocol-schema-check/res/protocol_schema.toml @@ -273,7 +273,7 @@ TransactionReceipt = 968816131 TransactionV0 = 197396442 TransactionV1 = 2594686420 TransferAction = 1078380396 -TrieChanges = 3833039794 +TrieChanges = 2613580820 TrieKey = 1352104737 TrieQueueIndices = 2601394796 TrieRefcountAddition = 2117109883