Skip to content

Commit

Permalink
test(state-sync): shard swap in single shard tracking (near#12108)
Browse files Browse the repository at this point in the history
Pytest to check decentralised state sync of nodes tracking one shard.
Keep shard shuffling off until the implementation is done.

This test sets a dumper node only for the state sync headers.
Sets 4 validator nodes, each tracking 1 shard.
Sets an RPC node to handle the random traffic that changes the state.
Only allow validator nodes to share parts by enabling the state
snapshot.

Check if the network can got for 6 epochs while shard shuffling is on. 
Validator nodes are expected to download parts from each other.

Check http://127.0.0.1:3040/debug/pages/epoch_info for the validator
assignment rotation.
  • Loading branch information
VanBarbascu authored Sep 23, 2024
1 parent 523837b commit 70acea6
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 12 deletions.
5 changes: 5 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pytest --timeout=3600 sanity/state_sync_massive_validator.py
pytest --timeout=3600 sanity/state_sync_massive.py --features nightly
pytest --timeout=3600 sanity/state_sync_massive_validator.py --features nightly

# TODO(#12108): Enable the test again once decentralized state sync is implemented.
# pytest sanity/state_sync_decentralized.py
# TODO(#12108): Enable the test again once decentralized state sync is implemented.
# pytest sanity/state_sync_decentralized.py --features nightly

pytest sanity/sync_chunks_from_archival.py
pytest sanity/sync_chunks_from_archival.py --features nightly
pytest sanity/rpc_tx_forwarding.py
Expand Down
23 changes: 11 additions & 12 deletions pytest/lib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,18 +1004,17 @@ def apply_config_changes(node_dir: str,
# ClientConfig keys which are valid but may be missing from the config.json
# file. Those are often Option<T> types which are not stored in JSON file
# when None.
allowed_missing_configs = ('archive', 'consensus.block_fetch_horizon',
'consensus.min_block_production_delay',
'consensus.max_block_production_delay',
'consensus.max_block_wait_delay',
'consensus.state_sync_timeout',
'expected_shutdown', 'log_summary_period',
'max_gas_burnt_view', 'rosetta_rpc',
'save_trie_changes', 'split_storage',
'state_sync', 'state_sync_enabled',
'store.state_snapshot_enabled',
'tracked_shard_schedule', 'cold_store',
'store.load_mem_tries_for_tracked_shards')
allowed_missing_configs = (
'archive', 'consensus.block_fetch_horizon',
'consensus.min_block_production_delay',
'consensus.max_block_production_delay',
'consensus.max_block_wait_delay', 'consensus.state_sync_timeout',
'expected_shutdown', 'log_summary_period', 'max_gas_burnt_view',
'rosetta_rpc', 'save_trie_changes', 'split_storage', 'state_sync',
'state_sync_enabled', 'store.state_snapshot_enabled',
'store.state_snapshot_config.state_snapshot_type',
'tracked_shard_schedule', 'cold_store',
'store.load_mem_tries_for_tracked_shards')

for k, v in client_config_change.items():
if not (k in allowed_missing_configs or k in config_json):
Expand Down
141 changes: 141 additions & 0 deletions pytest/tests/sanity/state_sync_decentralized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#!/usr/bin/env python3
# Spins up 4 validating nodes. Let validators track a single shard.
# Add a dumper node for the state sync headers.
# Add an RPC node to issue tx and change the state.
# Send random transactions between accounts in different shards.
# Shuffle the shard assignment of validators and check if they can sync up.

import unittest
import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
from cluster import start_cluster
import state_sync_lib
from utils import wait_for_blocks
import simple_test

EPOCH_LENGTH = 10

NUM_VALIDATORS = 4

# Shard layout with 5 roughly equal size shards for convenience.
SHARD_LAYOUT = {
"V1": {
"boundary_accounts": [
"fff",
"lll",
"rrr",
],
"version": 2,
"shards_split_map": [],
"to_parent_shard_map": [],
}
}

NUM_SHARDS = len(SHARD_LAYOUT["V1"]["boundary_accounts"]) + 1

ALL_ACCOUNTS_PREFIXES = [
"aaa",
"ggg",
"lll",
"sss",
]


class StateSyncValidatorShardSwap(unittest.TestCase):

def _prepare_cluster(self, with_rpc=False, shuffle_shard_assignment=False):
(node_config_dump,
node_config_sync) = state_sync_lib.get_state_sync_configs_pair(
tracked_shards=None)

# State snapshot is disabled for dumper. We only want to dump the headers.
node_config_dump["store.state_snapshot_enabled"] = False
node_config_dump[
"store.state_snapshot_config.state_snapshot_type"] = "ForReshardingOnly"

# State snapshot is enabled for validators. They will share parts of the state.
node_config_sync["store.state_snapshot_enabled"] = True
node_config_sync["tracked_shards"] = []

# Validators
configs = {x: node_config_sync.copy() for x in range(NUM_VALIDATORS)}

# Dumper
configs[NUM_VALIDATORS] = node_config_dump

if with_rpc:
# RPC
configs[NUM_VALIDATORS + 1] = node_config_sync.copy()
# RPC tracks all shards.
configs[NUM_VALIDATORS + 1]["tracked_shards"] = [0]
# RPC node does not participate in state parts distribution.
configs[NUM_VALIDATORS + 1]["store.state_snapshot_enabled"] = False
configs[NUM_VALIDATORS + 1][
"store.state_snapshot_config.state_snapshot_type"] = "ForReshardingOnly"

nodes = start_cluster(
num_nodes=NUM_VALIDATORS,
num_observers=1 + (1 if with_rpc else 0),
num_shards=NUM_SHARDS,
config=None,
genesis_config_changes=[
["epoch_length", EPOCH_LENGTH], ["shard_layout", SHARD_LAYOUT],
[
"shuffle_shard_assignment_for_chunk_producers",
shuffle_shard_assignment
], ["block_producer_kickout_threshold", 0],
["chunk_producer_kickout_threshold", 0]
],
client_config_changes=configs)

for node in nodes:
node.stop_checking_store()

self.dumper_node = nodes[NUM_VALIDATORS]
self.rpc_node = nodes[NUM_VALIDATORS +
1] if with_rpc else self.dumper_node
self.nodes = nodes
self.validators = nodes[:NUM_VALIDATORS]

def _prepare_simple_transfers(self):
self.testcase = simple_test.SimpleTransferBetweenAccounts(
nodes=self.nodes,
rpc_node=self.rpc_node,
account_prefixes=ALL_ACCOUNTS_PREFIXES,
epoch_length=EPOCH_LENGTH)

self.testcase.wait_for_blocks(3)

self.testcase.create_accounts()

self.testcase.deploy_contracts()

def _clear_cluster(self):
self.testcase = None
for node in self.nodes:
node.cleanup()

def test_state_sync_with_shard_swap(self):
# Dumper node will not track any shard. So we need a dedicated RPC node.
# TODO: enable shuffle_shard_assignment after decentralized state sync is implemented.
self._prepare_cluster(with_rpc=True, shuffle_shard_assignment=False)
self._prepare_simple_transfers()

target_height = 6 * EPOCH_LENGTH
self.testcase.random_workload_until(target_height)

# Wait for all nodes to reach epoch 6.
for n in self.validators:
wait_for_blocks(n, target=target_height)
logger.info("Test ended")

def tearDown(self):
self._clear_cluster()


if __name__ == '__main__':
unittest.main()

0 comments on commit 70acea6

Please sign in to comment.