From a7ddcd04f1bbfbfaf65e3d77deda3e9ee5dceaaa Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Thu, 2 Nov 2023 13:49:29 -0400 Subject: [PATCH] implement add and send fns for snapshot host info --- chain/network/src/config.rs | 4 ++ chain/network/src/network_protocol/mod.rs | 2 +- .../proto_conv/peer_message.rs | 10 +++- .../src/network_protocol/state_sync.rs | 3 +- chain/network/src/peer/peer_actor.rs | 3 ++ .../src/peer_manager/connection/mod.rs | 47 ++++++++++++++++++- .../src/peer_manager/network_state/mod.rs | 33 ++++++++++++- chain/network/src/snapshot_hosts/mod.rs | 9 ++-- 8 files changed, 101 insertions(+), 10 deletions(-) diff --git a/chain/network/src/config.rs b/chain/network/src/config.rs index 8cad9c8701c..354943b60ae 100644 --- a/chain/network/src/config.rs +++ b/chain/network/src/config.rs @@ -149,6 +149,8 @@ pub struct NetworkConfig { pub archive: bool, /// Maximal rate at which SyncAccountsData can be broadcasted. pub accounts_data_broadcast_rate_limit: rate::Limit, + /// Maximal rate at which SyncSnapshotHosts can be broadcasted. + pub snapshot_hosts_broadcast_rate_limit: rate::Limit, /// Maximal rate at which RoutingTable can be recomputed. pub routing_table_update_rate_limit: rate::Limit, /// Config of the TIER1 network. @@ -318,6 +320,7 @@ impl NetworkConfig { outbound_disabled: false, archive, accounts_data_broadcast_rate_limit: rate::Limit { qps: 0.1, burst: 1 }, + snapshot_hosts_broadcast_rate_limit: rate::Limit { qps: 0.1, burst: 1 }, routing_table_update_rate_limit: rate::Limit { qps: 1., burst: 1 }, tier1: Some(Tier1 { connect_interval: cfg.experimental.tier1_connect_interval.try_into()?, @@ -386,6 +389,7 @@ impl NetworkConfig { inbound_disabled: false, archive: false, accounts_data_broadcast_rate_limit: rate::Limit { qps: 100., burst: 1000000 }, + snapshot_hosts_broadcast_rate_limit: rate::Limit { qps: 100., burst: 1000000 }, routing_table_update_rate_limit: rate::Limit { qps: 10., burst: 1 }, tier1: Some(Tier1 { // Interval is very large, so that it doesn't happen spontaneously in tests. diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 34e37fa5f2e..b586602fe01 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -5,7 +5,7 @@ mod borsh_conv; mod edge; mod peer; mod proto_conv; -mod state_sync; +pub mod state_sync; pub use edge::*; pub use peer::*; pub use state_sync::*; diff --git a/chain/network/src/network_protocol/proto_conv/peer_message.rs b/chain/network/src/network_protocol/proto_conv/peer_message.rs index 9afe877fd12..38b4250b15a 100644 --- a/chain/network/src/network_protocol/proto_conv/peer_message.rs +++ b/chain/network/src/network_protocol/proto_conv/peer_message.rs @@ -209,14 +209,20 @@ pub enum ParseSyncSnapshotHostsError { impl From<&SyncSnapshotHosts> for proto::SyncSnapshotHosts { fn from(x: &SyncSnapshotHosts) -> Self { - Self { hosts: x.hosts.iter().map(Into::into).collect(), ..Default::default() } + Self { hosts: x.hosts.iter().map(|d| d.as_ref().into()).collect(), ..Default::default() } } } impl TryFrom<&proto::SyncSnapshotHosts> for SyncSnapshotHosts { type Error = ParseSyncSnapshotHostsError; fn try_from(x: &proto::SyncSnapshotHosts) -> Result { - Ok(Self { hosts: try_from_slice(&x.hosts).map_err(Self::Error::Hosts)? }) + Ok(Self { + hosts: try_from_slice(&x.hosts) + .map_err(Self::Error::Hosts)? + .into_iter() + .map(Arc::new) + .collect(), + }) } } diff --git a/chain/network/src/network_protocol/state_sync.rs b/chain/network/src/network_protocol/state_sync.rs index 4e9033277fd..4aec3685e81 100644 --- a/chain/network/src/network_protocol/state_sync.rs +++ b/chain/network/src/network_protocol/state_sync.rs @@ -1,3 +1,4 @@ +use crate::network_protocol::Arc; use near_crypto::SecretKey; use near_crypto::Signature; use near_primitives::hash::CryptoHash; @@ -63,5 +64,5 @@ impl SnapshotHostInfo { // SyncAccountsData either so it's worth revisiting. #[derive(Clone, Debug, Eq, PartialEq, borsh::BorshSerialize, borsh::BorshDeserialize)] pub struct SyncSnapshotHosts { - pub hosts: Vec, + pub hosts: Vec>, } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index ed40d9541cd..2679b7f4cc5 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -635,6 +635,9 @@ impl PeerActor { send_accounts_data_demux: demux::Demux::new( self.network_state.config.accounts_data_broadcast_rate_limit, ), + send_snapshot_hosts_demux: demux::Demux::new( + self.network_state.config.snapshot_hosts_broadcast_rate_limit, + ), }); let tracker = self.tracker.clone(); diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index c6d3620cdcb..c27b556619b 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -1,9 +1,10 @@ use crate::concurrency::arc_mutex::ArcMutex; use crate::concurrency::atomic_cell::AtomicCell; use crate::concurrency::demux; +use crate::network_protocol::state_sync::SyncSnapshotHosts; use crate::network_protocol::{ PeerInfo, PeerMessage, RoutedMessageBody, SignedAccountData, SignedOwnedAccount, - SyncAccountsData, + SnapshotHostInfo, SyncAccountsData, }; use crate::peer::peer_actor; use crate::peer::peer_actor::PeerActor; @@ -104,6 +105,8 @@ pub(crate) struct Connection { /// Demultiplexer for the calls to send_accounts_data(). pub send_accounts_data_demux: demux::Demux>, ()>, + /// Demultiplexer for the calls to send_snapshot_hosts(). + pub send_snapshot_hosts_demux: demux::Demux>, ()>, } impl fmt::Debug for Connection { @@ -182,6 +185,48 @@ impl Connection { } } } + + pub fn send_snapshot_hosts( + self: &Arc, + data: Vec>, + ) -> impl Future { + let this = self.clone(); + async move { + let res = this + .send_snapshot_hosts_demux + .call(data, { + let this = this.clone(); + |ds: Vec>>| async move { + let res = ds.iter().map(|_| ()).collect(); + let mut sum = HashMap::<_, Arc>::new(); + for d in ds.into_iter().flatten() { + match sum.entry(d.peer_id.clone()) { + Entry::Occupied(mut x) => { + if x.get().epoch_height < d.epoch_height { + x.insert(d); + } + } + Entry::Vacant(x) => { + x.insert(d); + } + } + } + let msg = Arc::new(PeerMessage::SyncSnapshotHosts(SyncSnapshotHosts { + hosts: sum.into_values().collect(), + })); + this.send_message(msg); + res + } + }) + .await; + if res.is_err() { + tracing::info!( + "peer {} disconnected, while sending SyncSnapshotHosts", + this.peer_info.id + ); + } + } + } } #[derive(Clone)] diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index e9015133e63..c3fee5ce478 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -6,7 +6,7 @@ use crate::concurrency::runtime::Runtime; use crate::config; use crate::network_protocol::{ Edge, EdgeState, PartialEdgeInfo, PeerIdOrHash, PeerInfo, PeerMessage, RawRoutedMessage, - RoutedMessageBody, RoutedMessageV2, SignedAccountData, + RoutedMessageBody, RoutedMessageV2, SignedAccountData, SnapshotHostInfo, }; use crate::peer::peer_actor::PeerActor; use crate::peer::peer_actor::{ClosingReason, ConnectionClosedEvent}; @@ -18,6 +18,7 @@ use crate::private_actix::RegisterPeerError; use crate::routing::route_back_cache::RouteBackCache; use crate::routing::NetworkTopologyChange; use crate::shards_manager::ShardsManagerRequestFromNetwork; +use crate::snapshot_hosts::{SnapshotHostInfoError, SnapshotHostsCache}; use crate::stats::metrics; use crate::store; use crate::tcp; @@ -113,6 +114,8 @@ pub(crate) struct NetworkState { pub inbound_handshake_permits: Arc, /// Peer store that provides read/write access to peers. pub peer_store: peer_store::PeerStore, + /// Information about state snapshots hosted by network peers. + pub snapshot_hosts: Arc, /// Connection store that provides read/write access to stored connections. pub connection_store: connection_store::ConnectionStore, /// List of peers to which we should re-establish a connection @@ -189,6 +192,7 @@ impl NetworkState { tier1: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), peer_store, + snapshot_hosts: Arc::new(SnapshotHostsCache::new()), connection_store: connection_store::ConnectionStore::new(store.clone()).unwrap(), pending_reconnect: Mutex::new(Vec::::new()), accounts_data: Arc::new(AccountDataCache::new()), @@ -657,6 +661,33 @@ impl NetworkState { .unwrap() } + pub async fn add_snapshot_hosts( + self: &Arc, + hosts: Vec>, + ) -> Option { + let this = self.clone(); + self.spawn(async move { + // Verify and add the new data to the internal state. + let (new_data, err) = this.snapshot_hosts.clone().insert(hosts).await; + // Broadcast any valid new data, even if an err was returned. + // The presence of one invalid entry doesn't invalidate the remaining ones. + if !new_data.is_empty() { + let tier2 = this.tier2.load(); + let tasks: Vec<_> = tier2 + .ready + .values() + .map(|p| this.spawn(p.send_snapshot_hosts(new_data.clone()))) + .collect(); + for t in tasks { + t.await.unwrap(); + } + } + err + }) + .await + .unwrap() + } + /// a) there is a peer we should be connected to, but we aren't /// b) there is an edge indicating that we should be disconnected from a peer, but we are connected. /// Try to resolve the inconsistency. diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index cf5b4d313be..ee3664d89d4 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -1,8 +1,9 @@ //! Cache of SnapshotHostInfos. //! //! Each node in the network which is willing to generate and serve state snapshots -//! publishes a SnapshotHostInfo once per epoch. The info is flooded to nodes in the -//! network and stored in this cache. +//! publishes a SnapshotHostInfo once per epoch. The info is flooded to all nodes in +//! the network and stored locally in this cache. + use crate::concurrency; use crate::concurrency::arc_mutex::ArcMutex; use crate::network_protocol::SnapshotHostInfo; @@ -46,9 +47,9 @@ impl Inner { } } -pub(crate) struct SnapshotHosts(ArcMutex); +pub(crate) struct SnapshotHostsCache(ArcMutex); -impl SnapshotHosts { +impl SnapshotHostsCache { pub fn new() -> Self { Self(ArcMutex::new(Inner { hosts: im::HashMap::new() })) }