Skip to content

Commit

Permalink
implement add and send fns for snapshot host info
Browse files Browse the repository at this point in the history
  • Loading branch information
saketh-are committed Nov 2, 2023
1 parent bd4fb4b commit a7ddcd0
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 10 deletions.
4 changes: 4 additions & 0 deletions chain/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub struct NetworkConfig {
pub archive: bool,
/// Maximal rate at which SyncAccountsData can be broadcasted.
pub accounts_data_broadcast_rate_limit: rate::Limit,
/// Maximal rate at which SyncSnapshotHosts can be broadcasted.
pub snapshot_hosts_broadcast_rate_limit: rate::Limit,
/// Maximal rate at which RoutingTable can be recomputed.
pub routing_table_update_rate_limit: rate::Limit,
/// Config of the TIER1 network.
Expand Down Expand Up @@ -318,6 +320,7 @@ impl NetworkConfig {
outbound_disabled: false,
archive,
accounts_data_broadcast_rate_limit: rate::Limit { qps: 0.1, burst: 1 },
snapshot_hosts_broadcast_rate_limit: rate::Limit { qps: 0.1, burst: 1 },
routing_table_update_rate_limit: rate::Limit { qps: 1., burst: 1 },
tier1: Some(Tier1 {
connect_interval: cfg.experimental.tier1_connect_interval.try_into()?,
Expand Down Expand Up @@ -386,6 +389,7 @@ impl NetworkConfig {
inbound_disabled: false,
archive: false,
accounts_data_broadcast_rate_limit: rate::Limit { qps: 100., burst: 1000000 },
snapshot_hosts_broadcast_rate_limit: rate::Limit { qps: 100., burst: 1000000 },
routing_table_update_rate_limit: rate::Limit { qps: 10., burst: 1 },
tier1: Some(Tier1 {
// Interval is very large, so that it doesn't happen spontaneously in tests.
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod borsh_conv;
mod edge;
mod peer;
mod proto_conv;
mod state_sync;
pub mod state_sync;
pub use edge::*;
pub use peer::*;
pub use state_sync::*;
Expand Down
10 changes: 8 additions & 2 deletions chain/network/src/network_protocol/proto_conv/peer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,20 @@ pub enum ParseSyncSnapshotHostsError {

impl From<&SyncSnapshotHosts> for proto::SyncSnapshotHosts {
fn from(x: &SyncSnapshotHosts) -> Self {
Self { hosts: x.hosts.iter().map(Into::into).collect(), ..Default::default() }
Self { hosts: x.hosts.iter().map(|d| d.as_ref().into()).collect(), ..Default::default() }
}
}

impl TryFrom<&proto::SyncSnapshotHosts> for SyncSnapshotHosts {
type Error = ParseSyncSnapshotHostsError;
fn try_from(x: &proto::SyncSnapshotHosts) -> Result<Self, Self::Error> {
Ok(Self { hosts: try_from_slice(&x.hosts).map_err(Self::Error::Hosts)? })
Ok(Self {
hosts: try_from_slice(&x.hosts)
.map_err(Self::Error::Hosts)?
.into_iter()
.map(Arc::new)
.collect(),
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion chain/network/src/network_protocol/state_sync.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::network_protocol::Arc;
use near_crypto::SecretKey;
use near_crypto::Signature;
use near_primitives::hash::CryptoHash;
Expand Down Expand Up @@ -63,5 +64,5 @@ impl SnapshotHostInfo {
// SyncAccountsData either so it's worth revisiting.
#[derive(Clone, Debug, Eq, PartialEq, borsh::BorshSerialize, borsh::BorshDeserialize)]
pub struct SyncSnapshotHosts {
pub hosts: Vec<SnapshotHostInfo>,
pub hosts: Vec<Arc<SnapshotHostInfo>>,
}
3 changes: 3 additions & 0 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,9 @@ impl PeerActor {
send_accounts_data_demux: demux::Demux::new(
self.network_state.config.accounts_data_broadcast_rate_limit,
),
send_snapshot_hosts_demux: demux::Demux::new(
self.network_state.config.snapshot_hosts_broadcast_rate_limit,
),
});

let tracker = self.tracker.clone();
Expand Down
47 changes: 46 additions & 1 deletion chain/network/src/peer_manager/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::concurrency::arc_mutex::ArcMutex;
use crate::concurrency::atomic_cell::AtomicCell;
use crate::concurrency::demux;
use crate::network_protocol::state_sync::SyncSnapshotHosts;
use crate::network_protocol::{
PeerInfo, PeerMessage, RoutedMessageBody, SignedAccountData, SignedOwnedAccount,
SyncAccountsData,
SnapshotHostInfo, SyncAccountsData,
};
use crate::peer::peer_actor;
use crate::peer::peer_actor::PeerActor;
Expand Down Expand Up @@ -104,6 +105,8 @@ pub(crate) struct Connection {

/// Demultiplexer for the calls to send_accounts_data().
pub send_accounts_data_demux: demux::Demux<Vec<Arc<SignedAccountData>>, ()>,
/// Demultiplexer for the calls to send_snapshot_hosts().
pub send_snapshot_hosts_demux: demux::Demux<Vec<Arc<SnapshotHostInfo>>, ()>,
}

impl fmt::Debug for Connection {
Expand Down Expand Up @@ -182,6 +185,48 @@ impl Connection {
}
}
}

pub fn send_snapshot_hosts(
self: &Arc<Self>,
data: Vec<Arc<SnapshotHostInfo>>,
) -> impl Future<Output = ()> {
let this = self.clone();
async move {
let res = this
.send_snapshot_hosts_demux
.call(data, {
let this = this.clone();
|ds: Vec<Vec<Arc<SnapshotHostInfo>>>| async move {
let res = ds.iter().map(|_| ()).collect();
let mut sum = HashMap::<_, Arc<SnapshotHostInfo>>::new();
for d in ds.into_iter().flatten() {
match sum.entry(d.peer_id.clone()) {
Entry::Occupied(mut x) => {
if x.get().epoch_height < d.epoch_height {
x.insert(d);
}
}
Entry::Vacant(x) => {
x.insert(d);
}
}
}
let msg = Arc::new(PeerMessage::SyncSnapshotHosts(SyncSnapshotHosts {
hosts: sum.into_values().collect(),
}));
this.send_message(msg);
res
}
})
.await;
if res.is_err() {
tracing::info!(
"peer {} disconnected, while sending SyncSnapshotHosts",
this.peer_info.id
);
}
}
}
}

#[derive(Clone)]
Expand Down
33 changes: 32 additions & 1 deletion chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::concurrency::runtime::Runtime;
use crate::config;
use crate::network_protocol::{
Edge, EdgeState, PartialEdgeInfo, PeerIdOrHash, PeerInfo, PeerMessage, RawRoutedMessage,
RoutedMessageBody, RoutedMessageV2, SignedAccountData,
RoutedMessageBody, RoutedMessageV2, SignedAccountData, SnapshotHostInfo,
};
use crate::peer::peer_actor::PeerActor;
use crate::peer::peer_actor::{ClosingReason, ConnectionClosedEvent};
Expand All @@ -18,6 +18,7 @@ use crate::private_actix::RegisterPeerError;
use crate::routing::route_back_cache::RouteBackCache;
use crate::routing::NetworkTopologyChange;
use crate::shards_manager::ShardsManagerRequestFromNetwork;
use crate::snapshot_hosts::{SnapshotHostInfoError, SnapshotHostsCache};
use crate::stats::metrics;
use crate::store;
use crate::tcp;
Expand Down Expand Up @@ -113,6 +114,8 @@ pub(crate) struct NetworkState {
pub inbound_handshake_permits: Arc<tokio::sync::Semaphore>,
/// Peer store that provides read/write access to peers.
pub peer_store: peer_store::PeerStore,
/// Information about state snapshots hosted by network peers.
pub snapshot_hosts: Arc<SnapshotHostsCache>,
/// Connection store that provides read/write access to stored connections.
pub connection_store: connection_store::ConnectionStore,
/// List of peers to which we should re-establish a connection
Expand Down Expand Up @@ -189,6 +192,7 @@ impl NetworkState {
tier1: connection::Pool::new(config.node_id()),
inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)),
peer_store,
snapshot_hosts: Arc::new(SnapshotHostsCache::new()),
connection_store: connection_store::ConnectionStore::new(store.clone()).unwrap(),
pending_reconnect: Mutex::new(Vec::<PeerInfo>::new()),
accounts_data: Arc::new(AccountDataCache::new()),
Expand Down Expand Up @@ -657,6 +661,33 @@ impl NetworkState {
.unwrap()
}

pub async fn add_snapshot_hosts(
self: &Arc<Self>,
hosts: Vec<Arc<SnapshotHostInfo>>,
) -> Option<SnapshotHostInfoError> {
let this = self.clone();
self.spawn(async move {
// Verify and add the new data to the internal state.
let (new_data, err) = this.snapshot_hosts.clone().insert(hosts).await;
// Broadcast any valid new data, even if an err was returned.
// The presence of one invalid entry doesn't invalidate the remaining ones.
if !new_data.is_empty() {
let tier2 = this.tier2.load();
let tasks: Vec<_> = tier2
.ready
.values()
.map(|p| this.spawn(p.send_snapshot_hosts(new_data.clone())))
.collect();
for t in tasks {
t.await.unwrap();
}
}
err
})
.await
.unwrap()
}

/// a) there is a peer we should be connected to, but we aren't
/// b) there is an edge indicating that we should be disconnected from a peer, but we are connected.
/// Try to resolve the inconsistency.
Expand Down
9 changes: 5 additions & 4 deletions chain/network/src/snapshot_hosts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Cache of SnapshotHostInfos.
//!
//! Each node in the network which is willing to generate and serve state snapshots
//! publishes a SnapshotHostInfo once per epoch. The info is flooded to nodes in the
//! network and stored in this cache.
//! publishes a SnapshotHostInfo once per epoch. The info is flooded to all nodes in
//! the network and stored locally in this cache.
use crate::concurrency;
use crate::concurrency::arc_mutex::ArcMutex;
use crate::network_protocol::SnapshotHostInfo;
Expand Down Expand Up @@ -46,9 +47,9 @@ impl Inner {
}
}

pub(crate) struct SnapshotHosts(ArcMutex<Inner>);
pub(crate) struct SnapshotHostsCache(ArcMutex<Inner>);

impl SnapshotHosts {
impl SnapshotHostsCache {
pub fn new() -> Self {
Self(ArcMutex::new(Inner { hosts: im::HashMap::new() }))
}
Expand Down

0 comments on commit a7ddcd0

Please sign in to comment.