From a1341855b12ace1afdbe634ace86049d298fd4de Mon Sep 17 00:00:00 2001
From: Aleksandr Logunov <alex.logunov@near.org>
Date: Fri, 28 Jun 2024 19:36:30 +0400
Subject: [PATCH] test: chunk validator kickout (#11672)

Express the power of TestLoop by implementing scenario where chunk
validator-only node is kicked out due to low endorsement stats.

The logic is to simply prevent all chunks validated by selected accounts
from appearing on chain. But implementing this without adding extra
logic to `Client`, `ShardsManagerActor` and `Network` is a challenge.
TestLoop, however, can add overrides to itself, which are generic enough
to use for testing other scenarios, like ones in `chunks_management`
tests. New components are:

* `TestLoopChunksStorage` - global storage for all chunks ever observed
by monitoring client messages. Other options would be to extend network
messages content or understand how to query `ShardsManagerActor` on
network message override. However, global test loop storage makes sense
to me as it can serve for measuring health of the whole chain, if we
test other disruption scenarios.
* `partial_encoded_chunks_dropper` - overrides processing of network
messages related to chunks; if chunk is validated by the given account,
the message is dropped. The logic is generic enough to be extended to
kickout specific block/chunk producers, drop chunks for specific
heights/shards, etc.

---------

Co-authored-by: Bowen Wang <bowen@near.org>
---
 chain/client/src/client.rs                    |   4 +-
 core/chain-configs/src/test_genesis.rs        |  20 ++-
 integration-tests/src/test_loop/builder.rs    | 137 ++++++++++++++++--
 integration-tests/src/test_loop/env.rs        |  65 ++++++++-
 .../tests/chunk_validator_kickout.rs          | 122 ++++++++++++++++
 integration-tests/src/test_loop/tests/mod.rs  |   1 +
 integration-tests/src/test_loop/utils.rs      |   2 +
 7 files changed, 332 insertions(+), 19 deletions(-)
 create mode 100644 integration-tests/src/test_loop/tests/chunk_validator_kickout.rs

diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs
index ae736ebd245..2447ac06b99 100644
--- a/chain/client/src/client.rs
+++ b/chain/client/src/client.rs
@@ -246,7 +246,7 @@ impl Client {
         state_sync_adapter: Arc<RwLock<SyncAdapter>>,
         runtime_adapter: Arc<dyn RuntimeAdapter>,
         network_adapter: PeerManagerAdapter,
-        shards_manager_adapter: Sender<ShardsManagerRequestFromClient>,
+        shards_manager_sender: Sender<ShardsManagerRequestFromClient>,
         validator_signer: MutableValidatorSigner,
         enable_doomslug: bool,
         rng_seed: RngSeed,
@@ -390,7 +390,7 @@ impl Client {
             epoch_manager,
             shard_tracker,
             runtime_adapter,
-            shards_manager_adapter,
+            shards_manager_adapter: shards_manager_sender,
             sharded_tx_pool,
             network_adapter,
             validator_signer,
diff --git a/core/chain-configs/src/test_genesis.rs b/core/chain-configs/src/test_genesis.rs
index 255896fe63b..65856252ea5 100644
--- a/core/chain-configs/src/test_genesis.rs
+++ b/core/chain-configs/src/test_genesis.rs
@@ -238,11 +238,23 @@ impl TestGenesisBuilder {
         self
     }
 
-    pub fn kickouts_standard_90_percent(&mut self) -> &mut Self {
+    /// Validators with performance below 80% are kicked out, similarly to
+    /// mainnet as of 28 Jun 2024.
+    pub fn kickouts_standard_80_percent(&mut self) -> &mut Self {
         self.kickouts_config = Some(KickoutsConfig {
-            block_producer_kickout_threshold: 90,
-            chunk_producer_kickout_threshold: 90,
-            chunk_validator_only_kickout_threshold: 90,
+            block_producer_kickout_threshold: 80,
+            chunk_producer_kickout_threshold: 80,
+            chunk_validator_only_kickout_threshold: 80,
+        });
+        self
+    }
+
+    /// Only chunk validator-only nodes can be kicked out.
+    pub fn kickouts_for_chunk_validators_only(&mut self) -> &mut Self {
+        self.kickouts_config = Some(KickoutsConfig {
+            block_producer_kickout_threshold: 0,
+            chunk_producer_kickout_threshold: 0,
+            chunk_validator_only_kickout_threshold: 50,
         });
         self
     }
diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs
index c8cad859e01..155984f1535 100644
--- a/integration-tests/src/test_loop/builder.rs
+++ b/integration-tests/src/test_loop/builder.rs
@@ -1,4 +1,4 @@
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, Mutex, RwLock};
 
 use near_async::futures::FutureSpawner;
 use near_async::messaging::{noop, IntoMultiSender, IntoSender, LateBoundSender};
@@ -23,8 +23,9 @@ use near_client::sync_jobs_actor::SyncJobsActor;
 use near_client::test_utils::test_loop::test_loop_sync_actor_maker;
 use near_client::{Client, PartialWitnessActor, SyncAdapter};
 use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
-use near_epoch_manager::EpochManager;
+use near_epoch_manager::{EpochManager, EpochManagerAdapter};
 use near_network::test_loop::TestLoopPeerManagerActor;
+use near_network::types::NetworkRequests;
 use near_primitives::network::PeerId;
 use near_primitives::test_utils::create_test_signer;
 use near_primitives::types::AccountId;
@@ -36,18 +37,29 @@ use near_vm_runner::{ContractRuntimeCache, FilesystemContractRuntimeCache};
 use nearcore::state_sync::StateSyncDumper;
 use tempfile::TempDir;
 
-use super::env::{TestData, TestLoopEnv};
+use super::env::{ClientToShardsManagerSender, TestData, TestLoopChunksStorage, TestLoopEnv};
 
 pub struct TestLoopBuilder {
     test_loop: TestLoopV2,
     genesis: Option<Genesis>,
     clients: Vec<AccountId>,
+    /// Will store all chunks produced within the test loop.
+    chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
+    /// Whether test loop should drop all chunks validated by the given account.
+    drop_chunks_validated_by: Option<AccountId>,
     gc: bool,
 }
 
 impl TestLoopBuilder {
     pub fn new() -> Self {
-        Self { test_loop: TestLoopV2::new(), genesis: None, clients: vec![], gc: true }
+        Self {
+            test_loop: TestLoopV2::new(),
+            genesis: None,
+            clients: vec![],
+            chunks_storage: Default::default(),
+            drop_chunks_validated_by: None,
+            gc: true,
+        }
     }
 
     /// Get the clock for the test loop.
@@ -67,6 +79,11 @@ impl TestLoopBuilder {
         self
     }
 
+    pub fn drop_chunks_validated_by(mut self, account_id: &str) -> Self {
+        self.drop_chunks_validated_by = Some(account_id.parse().unwrap());
+        self
+    }
+
     /// Disable garbage collection for the nodes.
     /// TODO(#11605): should always be enabled, if it doesn't work, it's a bug.
     pub fn disable_gc(mut self) -> Self {
@@ -92,13 +109,15 @@ impl TestLoopBuilder {
     fn build_impl(mut self) -> TestLoopEnv {
         let mut datas = Vec::new();
         let mut network_adapters = Vec::new();
+        let mut epoch_manager_adapters = Vec::new();
         let tempdir = tempfile::tempdir().unwrap();
         for idx in 0..self.clients.len() {
-            let (data, network_adapter) = self.setup_client(idx, &tempdir);
+            let (data, network_adapter, epoch_manager_adapter) = self.setup_client(idx, &tempdir);
             datas.push(data);
             network_adapters.push(network_adapter);
+            epoch_manager_adapters.push(epoch_manager_adapter);
         }
-        self.setup_network(&datas, &network_adapters);
+        self.setup_network(&datas, &network_adapters, &epoch_manager_adapters);
 
         let env = TestLoopEnv { test_loop: self.test_loop, datas };
         env.warmup()
@@ -108,11 +127,14 @@ impl TestLoopBuilder {
         &mut self,
         idx: usize,
         tempdir: &TempDir,
-    ) -> (TestData, Arc<LateBoundSender<TestLoopSender<TestLoopPeerManagerActor>>>) {
+    ) -> (
+        TestData,
+        Arc<LateBoundSender<TestLoopSender<TestLoopPeerManagerActor>>>,
+        Arc<dyn EpochManagerAdapter>,
+    ) {
         let client_adapter = LateBoundSender::new();
         let network_adapter = LateBoundSender::new();
         let state_snapshot_adapter = LateBoundSender::new();
-        let shards_manager_adapter = LateBoundSender::new();
         let partial_witness_adapter = LateBoundSender::new();
         let sync_jobs_adapter = LateBoundSender::new();
 
@@ -194,6 +216,13 @@ impl TestLoopBuilder {
             Some(Arc::new(create_test_signer(self.clients[idx].as_str()))),
             "validator_signer",
         );
+
+        let shards_manager_adapter = LateBoundSender::new();
+        let client_to_shards_manager_sender = Arc::new(ClientToShardsManagerSender {
+            sender: shards_manager_adapter.clone(),
+            chunks_storage: self.chunks_storage.clone(),
+        });
+
         let client = Client::new(
             self.test_loop.clock(),
             client_config.clone(),
@@ -203,7 +232,7 @@ impl TestLoopBuilder {
             state_sync_adapter,
             runtime_adapter.clone(),
             network_adapter.as_multi_sender(),
-            shards_manager_adapter.as_sender(),
+            client_to_shards_manager_sender.as_sender(),
             validator_signer.clone(),
             true,
             [0; 32],
@@ -269,7 +298,7 @@ impl TestLoopBuilder {
             clock: self.test_loop.clock(),
             client_config,
             chain_genesis,
-            epoch_manager,
+            epoch_manager: epoch_manager.clone(),
             shard_tracker,
             runtime: runtime_adapter,
             validator: validator_signer,
@@ -312,17 +341,29 @@ impl TestLoopBuilder {
             partial_witness_sender,
             state_sync_dumper_handle,
         };
-        (data, network_adapter)
+        (data, network_adapter, epoch_manager)
     }
 
+    // TODO: we assume that all `Vec`s have the same length, consider
+    // joining them into one structure.
     fn setup_network(
         &mut self,
         datas: &Vec<TestData>,
         network_adapters: &Vec<Arc<LateBoundSender<TestLoopSender<TestLoopPeerManagerActor>>>>,
+        epoch_manager_adapters: &Vec<Arc<dyn EpochManagerAdapter>>,
     ) {
         for (idx, data) in datas.iter().enumerate() {
-            let peer_manager_actor =
+            let mut peer_manager_actor =
                 TestLoopPeerManagerActor::new(self.test_loop.clock(), &data.account_id, datas);
+
+            if let Some(account_id) = &self.drop_chunks_validated_by {
+                peer_manager_actor.register_override_handler(partial_encoded_chunks_dropper(
+                    self.chunks_storage.clone(),
+                    epoch_manager_adapters[idx].clone(),
+                    account_id.clone(),
+                ));
+            }
+
             self.test_loop.register_actor_for_index(
                 idx,
                 peer_manager_actor,
@@ -331,3 +372,75 @@ impl TestLoopBuilder {
         }
     }
 }
+
+/// Handler to drop all network messages relevant to chunk validated by
+/// `validator_of_chunks_to_drop`. If number of nodes on chain is significant
+/// enough (at least three?), this is enough to prevent chunk from being
+/// included.
+///
+/// This logic can be easily extended to dropping chunk based on any rule.
+pub fn partial_encoded_chunks_dropper(
+    chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
+    epoch_manager_adapter: Arc<dyn EpochManagerAdapter>,
+    validator_of_chunks_to_drop: AccountId,
+) -> Arc<dyn Fn(NetworkRequests) -> Option<NetworkRequests>> {
+    Arc::new(move |request| {
+        // Filter out only messages related to distributing chunk in the
+        // network; extract `chunk_hash` from the message.
+        let chunk_hash = match &request {
+            NetworkRequests::PartialEncodedChunkRequest { request, .. } => {
+                Some(request.chunk_hash.clone())
+            }
+            NetworkRequests::PartialEncodedChunkResponse { response, .. } => {
+                Some(response.chunk_hash.clone())
+            }
+            NetworkRequests::PartialEncodedChunkMessage { partial_encoded_chunk, .. } => {
+                Some(partial_encoded_chunk.header.chunk_hash())
+            }
+            NetworkRequests::PartialEncodedChunkForward { forward, .. } => {
+                Some(forward.chunk_hash.clone())
+            }
+            _ => None,
+        };
+
+        let Some(chunk_hash) = chunk_hash else {
+            return Some(request);
+        };
+
+        let chunk = {
+            let chunks_storage = chunks_storage.lock().unwrap();
+            let chunk = chunks_storage.get(&chunk_hash).unwrap().clone();
+            let can_drop_chunk = chunks_storage.can_drop_chunk(&chunk);
+
+            if !can_drop_chunk {
+                return Some(request);
+            }
+
+            chunk
+        };
+
+        let prev_block_hash = chunk.prev_block_hash();
+        let shard_id = chunk.shard_id();
+        let height_created = chunk.height_created();
+
+        // If we don't have block on top of which chunk is built, we can't
+        // retrieve epoch id.
+        // This case appears to be too rare to interfere with the goal of
+        // dropping chunk.
+        let Ok(epoch_id) = epoch_manager_adapter.get_epoch_id_from_prev_block(prev_block_hash)
+        else {
+            return Some(request);
+        };
+
+        // Finally, we drop chunk if the given account is present in the list
+        // of its validators.
+        let chunk_validators = epoch_manager_adapter
+            .get_chunk_validator_assignments(&epoch_id, shard_id, height_created)
+            .unwrap();
+        if !chunk_validators.contains(&validator_of_chunks_to_drop) {
+            return Some(request);
+        }
+
+        return None;
+    })
+}
diff --git a/integration-tests/src/test_loop/env.rs b/integration-tests/src/test_loop/env.rs
index 09d1d0a19f9..8655bf317ab 100644
--- a/integration-tests/src/test_loop/env.rs
+++ b/integration-tests/src/test_loop/env.rs
@@ -1,16 +1,21 @@
-use near_async::messaging::{IntoMultiSender, IntoSender, Sender};
+use near_async::messaging::{CanSend, IntoMultiSender, IntoSender, LateBoundSender, Sender};
 use near_async::test_loop::data::{TestLoopData, TestLoopDataHandle};
 use near_async::test_loop::sender::TestLoopSender;
 use near_async::test_loop::TestLoopV2;
 use near_async::time::Duration;
+use near_chunks::adapter::ShardsManagerRequestFromClient;
 use near_chunks::shards_manager_actor::ShardsManagerActor;
 use near_client::client_actor::ClientActorInner;
 use near_client::PartialWitnessActor;
 use near_network::shards_manager::ShardsManagerRequestFromNetwork;
 use near_network::state_witness::PartialWitnessSenderForNetwork;
 use near_network::test_loop::ClientSenderForTestLoopNetwork;
+use near_primitives::sharding::{ChunkHash, ShardChunkHeader};
 use near_primitives::types::AccountId;
+use near_primitives_core::types::BlockHeight;
 use nearcore::state_sync::StateSyncDumper;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
 
 const NETWORK_DELAY: Duration = Duration::milliseconds(10);
 
@@ -67,6 +72,64 @@ impl TestLoopEnv {
     }
 }
 
+/// Stores all chunks ever observed on chain. Determines if a chunk can be
+/// dropped within a test loop.
+///
+/// Needed to intercept network messages storing chunk hash only, while
+/// interception requires more detailed information like shard id.
+#[derive(Default)]
+pub struct TestLoopChunksStorage {
+    /// Mapping from chunk hashes to headers.
+    storage: HashMap<ChunkHash, ShardChunkHeader>,
+    /// Minimal chunk height ever observed.
+    min_chunk_height: Option<BlockHeight>,
+}
+
+impl TestLoopChunksStorage {
+    pub fn insert(&mut self, chunk_header: ShardChunkHeader) {
+        let chunk_height = chunk_header.height_created();
+        self.min_chunk_height = Some(
+            self.min_chunk_height
+                .map_or(chunk_height, |current_height| current_height.min(chunk_height)),
+        );
+        self.storage.insert(chunk_header.chunk_hash(), chunk_header);
+    }
+
+    pub fn get(&self, chunk_hash: &ChunkHash) -> Option<&ShardChunkHeader> {
+        self.storage.get(chunk_hash)
+    }
+
+    /// If chunk height is too low, don't drop chunk, allow the chain to warm
+    /// up.
+    pub fn can_drop_chunk(&self, chunk_header: &ShardChunkHeader) -> bool {
+        self.min_chunk_height
+            .is_some_and(|min_height| chunk_header.height_created() >= min_height + 3)
+    }
+}
+
+/// Custom implementation of `Sender` for messages from `Client` to
+/// `ShardsManagerActor` that allows to intercept all messages indicating
+/// any chunk production and storing all chunks.
+pub struct ClientToShardsManagerSender {
+    pub sender: Arc<LateBoundSender<TestLoopSender<ShardsManagerActor>>>,
+    /// Storage of chunks shared between all test loop nodes.
+    pub chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
+}
+
+impl CanSend<ShardsManagerRequestFromClient> for ClientToShardsManagerSender {
+    fn send(&self, message: ShardsManagerRequestFromClient) {
+        // `DistributeEncodedChunk` indicates that a certain chunk was produced.
+        if let ShardsManagerRequestFromClient::DistributeEncodedChunk { partial_chunk, .. } =
+            &message
+        {
+            let mut chunks_storage = self.chunks_storage.lock().unwrap();
+            chunks_storage.insert(partial_chunk.cloned_header());
+        }
+        // After maybe storing the chunk, send the message as usual.
+        self.sender.send(message);
+    }
+}
+
 pub struct TestData {
     pub account_id: AccountId,
     pub client_sender: TestLoopSender<ClientActorInner>,
diff --git a/integration-tests/src/test_loop/tests/chunk_validator_kickout.rs b/integration-tests/src/test_loop/tests/chunk_validator_kickout.rs
new file mode 100644
index 00000000000..0d642e11144
--- /dev/null
+++ b/integration-tests/src/test_loop/tests/chunk_validator_kickout.rs
@@ -0,0 +1,122 @@
+use crate::test_loop::builder::TestLoopBuilder;
+use crate::test_loop::env::TestLoopEnv;
+use crate::test_loop::utils::ONE_NEAR;
+use itertools::Itertools;
+use near_async::test_loop::data::TestLoopData;
+use near_async::time::Duration;
+use near_chain_configs::test_genesis::TestGenesisBuilder;
+use near_client::Client;
+use near_o11y::testonly::init_test_logger;
+use near_primitives::types::AccountId;
+use near_primitives_core::checked_feature;
+use near_primitives_core::version::PROTOCOL_VERSION;
+use std::string::ToString;
+
+fn run_test_chunk_validator_kickout(select_chunk_validator_only: bool) {
+    if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) {
+        println!("Test not applicable without StatelessValidation enabled");
+        return;
+    }
+
+    init_test_logger();
+    let builder = TestLoopBuilder::new();
+
+    let initial_balance = 10000 * ONE_NEAR;
+    let epoch_length = 10;
+    let accounts =
+        (0..8).map(|i| format!("account{}", i).parse().unwrap()).collect::<Vec<AccountId>>();
+    let clients = accounts.iter().cloned().collect_vec();
+    let accounts_str = accounts.iter().map(|a| a.as_str()).collect_vec();
+    let (block_and_chunk_producers, chunk_validators_only) = accounts_str.split_at(6);
+
+    // Select the account to kick out.
+    // Only chunk validator-only node can be kicked out for low endorsement
+    // stats.
+    let account_id = if select_chunk_validator_only {
+        chunk_validators_only[0]
+    } else {
+        block_and_chunk_producers[3]
+    };
+    let expect_kickout = select_chunk_validator_only;
+
+    let mut genesis_builder = TestGenesisBuilder::new();
+    genesis_builder
+        .genesis_time_from_clock(&builder.clock())
+        .shard_layout_simple_v1(&["account2", "account4", "account6"])
+        .epoch_length(epoch_length)
+        // Select 6 block&chunk producers and 2 chunk validators.
+        .validators_desired_roles(block_and_chunk_producers, chunk_validators_only)
+        // Set up config to kick out only chunk validators for low performance.
+        .kickouts_for_chunk_validators_only()
+        // Target giving one mandate to each chunk validator, which results in
+        // every chunk validator validating only one shard in most cases.
+        .target_validator_mandates_per_shard(1);
+    for account in &accounts {
+        genesis_builder.add_user_account_simple(account.clone(), initial_balance);
+    }
+    let genesis = genesis_builder.build();
+
+    let TestLoopEnv { mut test_loop, datas: node_datas } = builder
+        .genesis(genesis)
+        .clients(clients)
+        // Drop only chunks validated by `account_id`.
+        // By how our endorsement stats are computed, this will count as this
+        // validator validating zero chunks.
+        .drop_chunks_validated_by(account_id)
+        .build();
+
+    // Run chain until our targeted chunk validator is (not) kicked out.
+    let client_handle = node_datas[0].client_sender.actor_handle();
+    let initial_validators = get_epoch_all_validators(&test_loop.data.get(&client_handle).client);
+    assert_eq!(initial_validators.len(), 8);
+    assert!(initial_validators.contains(&account_id.to_string()));
+    let success_condition = |test_loop_data: &mut TestLoopData| -> bool {
+        let client = &test_loop_data.get(&client_handle).client;
+        let validators = get_epoch_all_validators(client);
+        let tip = client.chain.head().unwrap();
+        let epoch_height =
+            client.epoch_manager.get_epoch_height_from_prev_block(&tip.prev_block_hash).unwrap();
+
+        if expect_kickout {
+            assert!(epoch_height < 4);
+            return if validators.len() == 7 {
+                assert!(!validators.contains(&account_id.to_string()));
+                true
+            } else {
+                false
+            };
+        } else {
+            assert_eq!(validators.len(), 8, "No kickouts are expected");
+            epoch_height >= 4
+        }
+    };
+
+    test_loop.run_until(
+        success_condition,
+        // Timeout at producing 5 epochs, approximately.
+        Duration::seconds((5 * epoch_length) as i64),
+    );
+
+    TestLoopEnv { test_loop, datas: node_datas }
+        .shutdown_and_drain_remaining_events(Duration::seconds(20));
+}
+
+/// Get all validator account names for the latest epoch.
+fn get_epoch_all_validators(client: &Client) -> Vec<String> {
+    let tip = client.chain.head().unwrap();
+    let epoch_id = tip.epoch_id;
+    let all_validators = client.epoch_manager.get_epoch_all_validators(&epoch_id).unwrap();
+    all_validators.into_iter().map(|vs| vs.account_id().to_string()).collect()
+}
+
+/// Checks that chunk validator with low endorsement stats is kicked out.
+#[test]
+fn test_chunk_validator_kicked_out() {
+    run_test_chunk_validator_kickout(true);
+}
+
+/// Checks that block producer with low chunk endorsement stats is not kicked out.
+#[test]
+fn test_block_producer_not_kicked_out() {
+    run_test_chunk_validator_kickout(false);
+}
diff --git a/integration-tests/src/test_loop/tests/mod.rs b/integration-tests/src/test_loop/tests/mod.rs
index 762ad3ea757..36fb080c385 100644
--- a/integration-tests/src/test_loop/tests/mod.rs
+++ b/integration-tests/src/test_loop/tests/mod.rs
@@ -1,3 +1,4 @@
+mod chunk_validator_kickout;
 pub mod in_memory_tries;
 pub mod multinode_stateless_validators;
 pub mod multinode_test_loop_example;
diff --git a/integration-tests/src/test_loop/utils.rs b/integration-tests/src/test_loop/utils.rs
index 1028cec0b32..5af89cf1f47 100644
--- a/integration-tests/src/test_loop/utils.rs
+++ b/integration-tests/src/test_loop/utils.rs
@@ -16,6 +16,8 @@ pub(crate) const ONE_NEAR: u128 = 1_000_000_000_000_000_000_000_000;
 /// Runs chain long enough for the transfers to be optimistically executed.
 /// Used to generate state changes and check that chain is able to update
 /// balances correctly.
+/// TODO: consider resending transactions which may be dropped because of
+/// missing chunks.
 pub(crate) fn execute_money_transfers(
     test_loop: &mut TestLoopV2,
     node_data: &[TestData],