Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[resharding] Handling forks in memtrie during resharding #12714

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,7 @@ impl ReshardingManager {
?block_hash, block_height, ?parent_shard_uid)
.entered();

// TODO(resharding): what if node doesn't have memtrie? just pause
// processing?
// TODO(resharding): fork handling. if epoch is finalized on different
// blocks, the second finalization will crash.
tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?;
tries.freeze_parent_memtrie(parent_shard_uid, split_shard_event.children_shards())?;

let parent_chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?;
let boundary_account = split_shard_event.boundary_account;
Expand Down Expand Up @@ -221,7 +217,7 @@ impl ReshardingManager {
let mem_trie_update = mem_tries.update(*parent_chunk_extra.state_root(), mode)?;

let trie_changes = mem_trie_update.retain_split_shard(&boundary_account, retain_mode);
let mem_changes = trie_changes.mem_trie_changes.as_ref().unwrap();
let mem_changes = trie_changes.memtrie_changes.as_ref().unwrap();
let new_state_root = mem_tries.apply_memtrie_changes(block_height, mem_changes);
drop(mem_tries);

Expand Down
5 changes: 3 additions & 2 deletions core/store/src/trie/mem/mem_trie_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ impl<'a, M: ArenaMemory> MemTrieUpdate<'a, M> {
.unwrap_or_default(),
insertions,
deletions,
mem_trie_changes: Some(mem_trie_changes),
memtrie_changes: Some(mem_trie_changes),
children_memtrie_changes: Default::default(),
}
}

Expand Down Expand Up @@ -610,7 +611,7 @@ mod tests {
let disk_changes = self.make_disk_changes_only(changes.clone());
let mut all_changes = self.make_all_changes(changes.clone());

let mem_trie_changes_from_all_changes = all_changes.mem_trie_changes.take().unwrap();
let mem_trie_changes_from_all_changes = all_changes.memtrie_changes.take().unwrap();
assert_eq!(memtrie_changes, mem_trie_changes_from_all_changes);
assert_eq!(disk_changes, all_changes);

Expand Down
145 changes: 102 additions & 43 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use crate::trie::trie_storage::{TrieCache, TrieCachingStorage, TrieDBStorage
use crate::StorageError;
use borsh::{BorshDeserialize, BorshSerialize};
pub use from_flat::construct_trie_from_flat;
use itertools::Itertools;
use mem::mem_trie_update::{TrackingMode, UpdatedMemTrieNodeWithSize};
use mem::mem_tries::MemTries;
use near_primitives::challenge::PartialState;
Expand All @@ -37,7 +38,7 @@ use ops::interface::{GenericTrieValue, UpdatedNodeId};
use ops::resharding::{GenericTrieUpdateRetain, RetainMode};
pub use raw_node::{Children, RawTrieNode, RawTrieNodeWithSize};
use std::cell::RefCell;
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Write;
use std::hash::Hash;
use std::ops::DerefMut;
Expand Down Expand Up @@ -344,6 +345,10 @@ impl std::fmt::Debug for TrieNode {
pub struct Trie {
storage: Arc<dyn TrieStorage>,
memtries: Option<Arc<RwLock<MemTries>>>,
/// In super rare cases it's possible that we see forks close to the resharding boundary.
/// We would like to apply the same set of trie changes to the child memtrie to keep
/// a consistent view across forks.
children_memtries: HashMap<ShardUId, Arc<RwLock<MemTries>>>,
root: StateRoot,
/// If present, flat storage is used to look up keys (if asked for).
/// Otherwise, we would crawl through the trie.
Expand Down Expand Up @@ -569,7 +574,12 @@ pub struct TrieChanges {
deletions: Vec<TrieRefcountSubtraction>,
// If Some, in-memory changes are applied as well.
#[borsh(skip)]
pub mem_trie_changes: Option<MemTrieChanges>,
pub memtrie_changes: Option<MemTrieChanges>,
// In super rare cases it's possible that we see forks close to the resharding boundary.
// We would like to apply the same set of trie changes to the child memtrie to keep
// a consistent view across forks.
#[borsh(skip)]
pub children_memtrie_changes: HashMap<ShardUId, MemTrieChanges>,
}

impl TrieChanges {
Expand All @@ -579,7 +589,8 @@ impl TrieChanges {
new_root: old_root,
insertions: vec![],
deletions: vec![],
mem_trie_changes: Default::default(),
memtrie_changes: Default::default(),
children_memtrie_changes: Default::default(),
}
}

Expand Down Expand Up @@ -678,12 +689,13 @@ impl Trie {
root: StateRoot,
flat_storage_chunk_view: Option<FlatStorageChunkView>,
) -> Self {
Self::new_with_memtries(storage, None, root, flat_storage_chunk_view)
Self::new_with_memtries(storage, None, Default::default(), root, flat_storage_chunk_view)
}

pub fn new_with_memtries(
storage: Arc<dyn TrieStorage>,
memtries: Option<Arc<RwLock<MemTries>>>,
children_memtries: HashMap<ShardUId, Arc<RwLock<MemTries>>>,
root: StateRoot,
flat_storage_chunk_view: Option<FlatStorageChunkView>,
) -> Self {
Expand All @@ -702,6 +714,7 @@ impl Trie {
Trie {
storage,
memtries,
children_memtries,
root,
charge_gas_for_trie_node_access,
flat_storage_chunk_view,
Expand Down Expand Up @@ -732,6 +745,7 @@ impl Trie {
let mut trie = Self::new_with_memtries(
self.storage.clone(),
self.memtries.clone(),
self.children_memtries.clone(),
self.root,
self.flat_storage_chunk_view.clone(),
);
Expand Down Expand Up @@ -1667,50 +1681,94 @@ impl Trie {
let _ = self.get(&trie_key.to_vec());
}

match &self.memtries {
Some(memtries) => {
let guard = memtries.read().unwrap();
let mut recorder = self.recorder.as_ref().map(|recorder| recorder.borrow_mut());
let tracking_mode = match &mut recorder {
Some(recorder) => TrackingMode::RefcountsAndAccesses(recorder.deref_mut()),
None => TrackingMode::Refcounts,
};

let mut trie_update = guard.update(self.root, tracking_mode)?;
for (key, value) in changes {
match value {
Some(arr) => trie_update.insert(&key, arr)?,
None => trie_update.generic_delete(0, &key)?,
if self.memtries.is_some() {
self.update_with_memtrie(changes)
} else {
self.update_with_trie_storage(changes)
}
}

fn update_with_memtrie<I>(&self, changes: I) -> Result<TrieChanges, StorageError>
where
I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
{
// Get trie_update for memtrie
let guard = self.memtries.as_ref().unwrap().read().unwrap();
let mut recorder = self.recorder.as_ref().map(|recorder| recorder.borrow_mut());
let tracking_mode = match &mut recorder {
Some(recorder) => TrackingMode::RefcountsAndAccesses(recorder.deref_mut()),
None => TrackingMode::Refcounts,
};
let mut trie_update = guard.update(self.root, tracking_mode)?;

// Get trie_update for all child memtries
let child_guards = self
.children_memtries
.iter()
.map(|(shard_uid, memtrie)| (shard_uid, memtrie.read().unwrap()))
.collect_vec();
let mut child_updates = child_guards
.iter()
.map(|(shard_uid, memtrie)| {
(shard_uid, memtrie.update(self.root, TrackingMode::None).unwrap())
})
.collect_vec();

// Insert key, value into both the child memtries as well as the main memtrie
for (key, value) in changes {
match value {
Some(arr) => {
// Update all child memtries. This is a rare case where parent shard
// has forks after resharding. It is fine to clone the value here.
for trie_update in &mut child_updates {
trie_update.1.insert(&key, arr.clone())?;
}
trie_update.insert(&key, arr)?;
}

Ok(trie_update.to_trie_changes())
}
None => {
let mut trie_update = TrieStorageUpdate::new(&self);
let root_node = self.move_node_to_mutable(&mut trie_update, &self.root)?;
for (key, value) in changes {
match value {
Some(arr) => trie_update.generic_insert(
root_node.0,
&key,
GenericTrieValue::MemtrieAndDisk(arr),
)?,
None => trie_update.generic_delete(0, &key)?,
};
None => {
// Update all child memtries. This is a rare case where parent shard
// has forks after resharding.
for trie_update in &mut child_updates {
trie_update.1.generic_delete(0, &key)?;
}
trie_update.generic_delete(0, &key)?;
}
}
}

#[cfg(test)]
{
self.memory_usage_verify(
&trie_update,
GenericNodeOrIndex::Updated(root_node.0),
);
}
let mut trie_changes = trie_update.to_trie_changes();
for (shard_uid, trie_update) in child_updates {
trie_changes
.children_memtrie_changes
.insert(**shard_uid, trie_update.to_mem_trie_changes_only());
}

trie_update.flatten_nodes(&self.root, root_node.0)
}
Ok(trie_changes)
}

fn update_with_trie_storage<I>(&self, changes: I) -> Result<TrieChanges, StorageError>
where
I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
{
let mut trie_update = TrieStorageUpdate::new(&self);
let root_node = self.move_node_to_mutable(&mut trie_update, &self.root)?;
for (key, value) in changes {
match value {
Some(arr) => trie_update.generic_insert(
root_node.0,
&key,
GenericTrieValue::MemtrieAndDisk(arr),
)?,
None => trie_update.generic_delete(0, &key)?,
};
}

#[cfg(test)]
{
self.memory_usage_verify(&trie_update, GenericNodeOrIndex::Updated(root_node.0));
}

trie_update.flatten_nodes(&self.root, root_node.0)
}

/// Returns an iterator that can be used to traverse any range in the trie.
Expand Down Expand Up @@ -2375,7 +2433,8 @@ mod borsh_compatibility_test {
hash(b"e"),
std::num::NonZeroU32::new(2).unwrap(),
)],
mem_trie_changes: None,
memtrie_changes: None,
children_memtrie_changes: Default::default(),
}
);
}
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/trie/ops/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn run(initial_entries: Vec<(Vec<u8>, Vec<u8>)>, retain_multi_ranges: Vec<Range<
let mut update = memtries.update(initial_state_root, mode).unwrap();
retain_split_shard_custom_ranges(&mut update, &retain_multi_ranges);
let mut trie_changes = update.to_trie_changes();
let memtrie_changes = trie_changes.mem_trie_changes.take().unwrap();
let memtrie_changes = trie_changes.memtrie_changes.take().unwrap();
let mem_state_root = memtries.apply_memtrie_changes(1, &memtrie_changes);
let proof = trie_recorder.recorded_storage();

Expand Down
Loading
Loading