diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index ae736ebd245..2447ac06b99 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -246,7 +246,7 @@ impl Client { state_sync_adapter: Arc>, runtime_adapter: Arc, network_adapter: PeerManagerAdapter, - shards_manager_adapter: Sender, + shards_manager_sender: Sender, validator_signer: MutableValidatorSigner, enable_doomslug: bool, rng_seed: RngSeed, @@ -390,7 +390,7 @@ impl Client { epoch_manager, shard_tracker, runtime_adapter, - shards_manager_adapter, + shards_manager_adapter: shards_manager_sender, sharded_tx_pool, network_adapter, validator_signer, diff --git a/core/chain-configs/src/test_genesis.rs b/core/chain-configs/src/test_genesis.rs index 255896fe63b..65856252ea5 100644 --- a/core/chain-configs/src/test_genesis.rs +++ b/core/chain-configs/src/test_genesis.rs @@ -238,11 +238,23 @@ impl TestGenesisBuilder { self } - pub fn kickouts_standard_90_percent(&mut self) -> &mut Self { + /// Validators with performance below 80% are kicked out, similarly to + /// mainnet as of 28 Jun 2024. + pub fn kickouts_standard_80_percent(&mut self) -> &mut Self { self.kickouts_config = Some(KickoutsConfig { - block_producer_kickout_threshold: 90, - chunk_producer_kickout_threshold: 90, - chunk_validator_only_kickout_threshold: 90, + block_producer_kickout_threshold: 80, + chunk_producer_kickout_threshold: 80, + chunk_validator_only_kickout_threshold: 80, + }); + self + } + + /// Only chunk validator-only nodes can be kicked out. + pub fn kickouts_for_chunk_validators_only(&mut self) -> &mut Self { + self.kickouts_config = Some(KickoutsConfig { + block_producer_kickout_threshold: 0, + chunk_producer_kickout_threshold: 0, + chunk_validator_only_kickout_threshold: 50, }); self } diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index c8cad859e01..155984f1535 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use near_async::futures::FutureSpawner; use near_async::messaging::{noop, IntoMultiSender, IntoSender, LateBoundSender}; @@ -23,8 +23,9 @@ use near_client::sync_jobs_actor::SyncJobsActor; use near_client::test_utils::test_loop::test_loop_sync_actor_maker; use near_client::{Client, PartialWitnessActor, SyncAdapter}; use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; -use near_epoch_manager::EpochManager; +use near_epoch_manager::{EpochManager, EpochManagerAdapter}; use near_network::test_loop::TestLoopPeerManagerActor; +use near_network::types::NetworkRequests; use near_primitives::network::PeerId; use near_primitives::test_utils::create_test_signer; use near_primitives::types::AccountId; @@ -36,18 +37,29 @@ use near_vm_runner::{ContractRuntimeCache, FilesystemContractRuntimeCache}; use nearcore::state_sync::StateSyncDumper; use tempfile::TempDir; -use super::env::{TestData, TestLoopEnv}; +use super::env::{ClientToShardsManagerSender, TestData, TestLoopChunksStorage, TestLoopEnv}; pub struct TestLoopBuilder { test_loop: TestLoopV2, genesis: Option, clients: Vec, + /// Will store all chunks produced within the test loop. + chunks_storage: Arc>, + /// Whether test loop should drop all chunks validated by the given account. + drop_chunks_validated_by: Option, gc: bool, } impl TestLoopBuilder { pub fn new() -> Self { - Self { test_loop: TestLoopV2::new(), genesis: None, clients: vec![], gc: true } + Self { + test_loop: TestLoopV2::new(), + genesis: None, + clients: vec![], + chunks_storage: Default::default(), + drop_chunks_validated_by: None, + gc: true, + } } /// Get the clock for the test loop. @@ -67,6 +79,11 @@ impl TestLoopBuilder { self } + pub fn drop_chunks_validated_by(mut self, account_id: &str) -> Self { + self.drop_chunks_validated_by = Some(account_id.parse().unwrap()); + self + } + /// Disable garbage collection for the nodes. /// TODO(#11605): should always be enabled, if it doesn't work, it's a bug. pub fn disable_gc(mut self) -> Self { @@ -92,13 +109,15 @@ impl TestLoopBuilder { fn build_impl(mut self) -> TestLoopEnv { let mut datas = Vec::new(); let mut network_adapters = Vec::new(); + let mut epoch_manager_adapters = Vec::new(); let tempdir = tempfile::tempdir().unwrap(); for idx in 0..self.clients.len() { - let (data, network_adapter) = self.setup_client(idx, &tempdir); + let (data, network_adapter, epoch_manager_adapter) = self.setup_client(idx, &tempdir); datas.push(data); network_adapters.push(network_adapter); + epoch_manager_adapters.push(epoch_manager_adapter); } - self.setup_network(&datas, &network_adapters); + self.setup_network(&datas, &network_adapters, &epoch_manager_adapters); let env = TestLoopEnv { test_loop: self.test_loop, datas }; env.warmup() @@ -108,11 +127,14 @@ impl TestLoopBuilder { &mut self, idx: usize, tempdir: &TempDir, - ) -> (TestData, Arc>>) { + ) -> ( + TestData, + Arc>>, + Arc, + ) { let client_adapter = LateBoundSender::new(); let network_adapter = LateBoundSender::new(); let state_snapshot_adapter = LateBoundSender::new(); - let shards_manager_adapter = LateBoundSender::new(); let partial_witness_adapter = LateBoundSender::new(); let sync_jobs_adapter = LateBoundSender::new(); @@ -194,6 +216,13 @@ impl TestLoopBuilder { Some(Arc::new(create_test_signer(self.clients[idx].as_str()))), "validator_signer", ); + + let shards_manager_adapter = LateBoundSender::new(); + let client_to_shards_manager_sender = Arc::new(ClientToShardsManagerSender { + sender: shards_manager_adapter.clone(), + chunks_storage: self.chunks_storage.clone(), + }); + let client = Client::new( self.test_loop.clock(), client_config.clone(), @@ -203,7 +232,7 @@ impl TestLoopBuilder { state_sync_adapter, runtime_adapter.clone(), network_adapter.as_multi_sender(), - shards_manager_adapter.as_sender(), + client_to_shards_manager_sender.as_sender(), validator_signer.clone(), true, [0; 32], @@ -269,7 +298,7 @@ impl TestLoopBuilder { clock: self.test_loop.clock(), client_config, chain_genesis, - epoch_manager, + epoch_manager: epoch_manager.clone(), shard_tracker, runtime: runtime_adapter, validator: validator_signer, @@ -312,17 +341,29 @@ impl TestLoopBuilder { partial_witness_sender, state_sync_dumper_handle, }; - (data, network_adapter) + (data, network_adapter, epoch_manager) } + // TODO: we assume that all `Vec`s have the same length, consider + // joining them into one structure. fn setup_network( &mut self, datas: &Vec, network_adapters: &Vec>>>, + epoch_manager_adapters: &Vec>, ) { for (idx, data) in datas.iter().enumerate() { - let peer_manager_actor = + let mut peer_manager_actor = TestLoopPeerManagerActor::new(self.test_loop.clock(), &data.account_id, datas); + + if let Some(account_id) = &self.drop_chunks_validated_by { + peer_manager_actor.register_override_handler(partial_encoded_chunks_dropper( + self.chunks_storage.clone(), + epoch_manager_adapters[idx].clone(), + account_id.clone(), + )); + } + self.test_loop.register_actor_for_index( idx, peer_manager_actor, @@ -331,3 +372,75 @@ impl TestLoopBuilder { } } } + +/// Handler to drop all network messages relevant to chunk validated by +/// `validator_of_chunks_to_drop`. If number of nodes on chain is significant +/// enough (at least three?), this is enough to prevent chunk from being +/// included. +/// +/// This logic can be easily extended to dropping chunk based on any rule. +pub fn partial_encoded_chunks_dropper( + chunks_storage: Arc>, + epoch_manager_adapter: Arc, + validator_of_chunks_to_drop: AccountId, +) -> Arc Option> { + Arc::new(move |request| { + // Filter out only messages related to distributing chunk in the + // network; extract `chunk_hash` from the message. + let chunk_hash = match &request { + NetworkRequests::PartialEncodedChunkRequest { request, .. } => { + Some(request.chunk_hash.clone()) + } + NetworkRequests::PartialEncodedChunkResponse { response, .. } => { + Some(response.chunk_hash.clone()) + } + NetworkRequests::PartialEncodedChunkMessage { partial_encoded_chunk, .. } => { + Some(partial_encoded_chunk.header.chunk_hash()) + } + NetworkRequests::PartialEncodedChunkForward { forward, .. } => { + Some(forward.chunk_hash.clone()) + } + _ => None, + }; + + let Some(chunk_hash) = chunk_hash else { + return Some(request); + }; + + let chunk = { + let chunks_storage = chunks_storage.lock().unwrap(); + let chunk = chunks_storage.get(&chunk_hash).unwrap().clone(); + let can_drop_chunk = chunks_storage.can_drop_chunk(&chunk); + + if !can_drop_chunk { + return Some(request); + } + + chunk + }; + + let prev_block_hash = chunk.prev_block_hash(); + let shard_id = chunk.shard_id(); + let height_created = chunk.height_created(); + + // If we don't have block on top of which chunk is built, we can't + // retrieve epoch id. + // This case appears to be too rare to interfere with the goal of + // dropping chunk. + let Ok(epoch_id) = epoch_manager_adapter.get_epoch_id_from_prev_block(prev_block_hash) + else { + return Some(request); + }; + + // Finally, we drop chunk if the given account is present in the list + // of its validators. + let chunk_validators = epoch_manager_adapter + .get_chunk_validator_assignments(&epoch_id, shard_id, height_created) + .unwrap(); + if !chunk_validators.contains(&validator_of_chunks_to_drop) { + return Some(request); + } + + return None; + }) +} diff --git a/integration-tests/src/test_loop/env.rs b/integration-tests/src/test_loop/env.rs index 09d1d0a19f9..8655bf317ab 100644 --- a/integration-tests/src/test_loop/env.rs +++ b/integration-tests/src/test_loop/env.rs @@ -1,16 +1,21 @@ -use near_async::messaging::{IntoMultiSender, IntoSender, Sender}; +use near_async::messaging::{CanSend, IntoMultiSender, IntoSender, LateBoundSender, Sender}; use near_async::test_loop::data::{TestLoopData, TestLoopDataHandle}; use near_async::test_loop::sender::TestLoopSender; use near_async::test_loop::TestLoopV2; use near_async::time::Duration; +use near_chunks::adapter::ShardsManagerRequestFromClient; use near_chunks::shards_manager_actor::ShardsManagerActor; use near_client::client_actor::ClientActorInner; use near_client::PartialWitnessActor; use near_network::shards_manager::ShardsManagerRequestFromNetwork; use near_network::state_witness::PartialWitnessSenderForNetwork; use near_network::test_loop::ClientSenderForTestLoopNetwork; +use near_primitives::sharding::{ChunkHash, ShardChunkHeader}; use near_primitives::types::AccountId; +use near_primitives_core::types::BlockHeight; use nearcore::state_sync::StateSyncDumper; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; const NETWORK_DELAY: Duration = Duration::milliseconds(10); @@ -67,6 +72,64 @@ impl TestLoopEnv { } } +/// Stores all chunks ever observed on chain. Determines if a chunk can be +/// dropped within a test loop. +/// +/// Needed to intercept network messages storing chunk hash only, while +/// interception requires more detailed information like shard id. +#[derive(Default)] +pub struct TestLoopChunksStorage { + /// Mapping from chunk hashes to headers. + storage: HashMap, + /// Minimal chunk height ever observed. + min_chunk_height: Option, +} + +impl TestLoopChunksStorage { + pub fn insert(&mut self, chunk_header: ShardChunkHeader) { + let chunk_height = chunk_header.height_created(); + self.min_chunk_height = Some( + self.min_chunk_height + .map_or(chunk_height, |current_height| current_height.min(chunk_height)), + ); + self.storage.insert(chunk_header.chunk_hash(), chunk_header); + } + + pub fn get(&self, chunk_hash: &ChunkHash) -> Option<&ShardChunkHeader> { + self.storage.get(chunk_hash) + } + + /// If chunk height is too low, don't drop chunk, allow the chain to warm + /// up. + pub fn can_drop_chunk(&self, chunk_header: &ShardChunkHeader) -> bool { + self.min_chunk_height + .is_some_and(|min_height| chunk_header.height_created() >= min_height + 3) + } +} + +/// Custom implementation of `Sender` for messages from `Client` to +/// `ShardsManagerActor` that allows to intercept all messages indicating +/// any chunk production and storing all chunks. +pub struct ClientToShardsManagerSender { + pub sender: Arc>>, + /// Storage of chunks shared between all test loop nodes. + pub chunks_storage: Arc>, +} + +impl CanSend for ClientToShardsManagerSender { + fn send(&self, message: ShardsManagerRequestFromClient) { + // `DistributeEncodedChunk` indicates that a certain chunk was produced. + if let ShardsManagerRequestFromClient::DistributeEncodedChunk { partial_chunk, .. } = + &message + { + let mut chunks_storage = self.chunks_storage.lock().unwrap(); + chunks_storage.insert(partial_chunk.cloned_header()); + } + // After maybe storing the chunk, send the message as usual. + self.sender.send(message); + } +} + pub struct TestData { pub account_id: AccountId, pub client_sender: TestLoopSender, diff --git a/integration-tests/src/test_loop/tests/chunk_validator_kickout.rs b/integration-tests/src/test_loop/tests/chunk_validator_kickout.rs new file mode 100644 index 00000000000..0d642e11144 --- /dev/null +++ b/integration-tests/src/test_loop/tests/chunk_validator_kickout.rs @@ -0,0 +1,122 @@ +use crate::test_loop::builder::TestLoopBuilder; +use crate::test_loop::env::TestLoopEnv; +use crate::test_loop::utils::ONE_NEAR; +use itertools::Itertools; +use near_async::test_loop::data::TestLoopData; +use near_async::time::Duration; +use near_chain_configs::test_genesis::TestGenesisBuilder; +use near_client::Client; +use near_o11y::testonly::init_test_logger; +use near_primitives::types::AccountId; +use near_primitives_core::checked_feature; +use near_primitives_core::version::PROTOCOL_VERSION; +use std::string::ToString; + +fn run_test_chunk_validator_kickout(select_chunk_validator_only: bool) { + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + init_test_logger(); + let builder = TestLoopBuilder::new(); + + let initial_balance = 10000 * ONE_NEAR; + let epoch_length = 10; + let accounts = + (0..8).map(|i| format!("account{}", i).parse().unwrap()).collect::>(); + let clients = accounts.iter().cloned().collect_vec(); + let accounts_str = accounts.iter().map(|a| a.as_str()).collect_vec(); + let (block_and_chunk_producers, chunk_validators_only) = accounts_str.split_at(6); + + // Select the account to kick out. + // Only chunk validator-only node can be kicked out for low endorsement + // stats. + let account_id = if select_chunk_validator_only { + chunk_validators_only[0] + } else { + block_and_chunk_producers[3] + }; + let expect_kickout = select_chunk_validator_only; + + let mut genesis_builder = TestGenesisBuilder::new(); + genesis_builder + .genesis_time_from_clock(&builder.clock()) + .shard_layout_simple_v1(&["account2", "account4", "account6"]) + .epoch_length(epoch_length) + // Select 6 block&chunk producers and 2 chunk validators. + .validators_desired_roles(block_and_chunk_producers, chunk_validators_only) + // Set up config to kick out only chunk validators for low performance. + .kickouts_for_chunk_validators_only() + // Target giving one mandate to each chunk validator, which results in + // every chunk validator validating only one shard in most cases. + .target_validator_mandates_per_shard(1); + for account in &accounts { + genesis_builder.add_user_account_simple(account.clone(), initial_balance); + } + let genesis = genesis_builder.build(); + + let TestLoopEnv { mut test_loop, datas: node_datas } = builder + .genesis(genesis) + .clients(clients) + // Drop only chunks validated by `account_id`. + // By how our endorsement stats are computed, this will count as this + // validator validating zero chunks. + .drop_chunks_validated_by(account_id) + .build(); + + // Run chain until our targeted chunk validator is (not) kicked out. + let client_handle = node_datas[0].client_sender.actor_handle(); + let initial_validators = get_epoch_all_validators(&test_loop.data.get(&client_handle).client); + assert_eq!(initial_validators.len(), 8); + assert!(initial_validators.contains(&account_id.to_string())); + let success_condition = |test_loop_data: &mut TestLoopData| -> bool { + let client = &test_loop_data.get(&client_handle).client; + let validators = get_epoch_all_validators(client); + let tip = client.chain.head().unwrap(); + let epoch_height = + client.epoch_manager.get_epoch_height_from_prev_block(&tip.prev_block_hash).unwrap(); + + if expect_kickout { + assert!(epoch_height < 4); + return if validators.len() == 7 { + assert!(!validators.contains(&account_id.to_string())); + true + } else { + false + }; + } else { + assert_eq!(validators.len(), 8, "No kickouts are expected"); + epoch_height >= 4 + } + }; + + test_loop.run_until( + success_condition, + // Timeout at producing 5 epochs, approximately. + Duration::seconds((5 * epoch_length) as i64), + ); + + TestLoopEnv { test_loop, datas: node_datas } + .shutdown_and_drain_remaining_events(Duration::seconds(20)); +} + +/// Get all validator account names for the latest epoch. +fn get_epoch_all_validators(client: &Client) -> Vec { + let tip = client.chain.head().unwrap(); + let epoch_id = tip.epoch_id; + let all_validators = client.epoch_manager.get_epoch_all_validators(&epoch_id).unwrap(); + all_validators.into_iter().map(|vs| vs.account_id().to_string()).collect() +} + +/// Checks that chunk validator with low endorsement stats is kicked out. +#[test] +fn test_chunk_validator_kicked_out() { + run_test_chunk_validator_kickout(true); +} + +/// Checks that block producer with low chunk endorsement stats is not kicked out. +#[test] +fn test_block_producer_not_kicked_out() { + run_test_chunk_validator_kickout(false); +} diff --git a/integration-tests/src/test_loop/tests/mod.rs b/integration-tests/src/test_loop/tests/mod.rs index 762ad3ea757..36fb080c385 100644 --- a/integration-tests/src/test_loop/tests/mod.rs +++ b/integration-tests/src/test_loop/tests/mod.rs @@ -1,3 +1,4 @@ +mod chunk_validator_kickout; pub mod in_memory_tries; pub mod multinode_stateless_validators; pub mod multinode_test_loop_example; diff --git a/integration-tests/src/test_loop/utils.rs b/integration-tests/src/test_loop/utils.rs index 1028cec0b32..5af89cf1f47 100644 --- a/integration-tests/src/test_loop/utils.rs +++ b/integration-tests/src/test_loop/utils.rs @@ -16,6 +16,8 @@ pub(crate) const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000; /// Runs chain long enough for the transfers to be optimistically executed. /// Used to generate state changes and check that chain is able to update /// balances correctly. +/// TODO: consider resending transactions which may be dropped because of +/// missing chunks. pub(crate) fn execute_money_transfers( test_loop: &mut TestLoopV2, node_data: &[TestData],