Skip to content

Commit

Permalink
Merge pull request #1154 from subspace/async-provider-announcement-ac…
Browse files Browse the repository at this point in the history
…knowledgement

Upgrade libp2p to latest master with two upstream PRs on top that all…
  • Loading branch information
nazar-pc authored Feb 20, 2023
2 parents a5ead62 + eab5080 commit a941651
Show file tree
Hide file tree
Showing 31 changed files with 322 additions and 246 deletions.
428 changes: 256 additions & 172 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
async-oneshot = "0.5.0"
futures = "0.3.25"
futures = "0.3.26"
futures-timer = "3.0.2"
jsonrpsee = { version = "0.16.2", features = ["server", "macros"] }
parity-scale-codec = "3.2.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/sc-consensus-subspace/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
async-trait = "0.1.58"
codec = { package = "parity-scale-codec", version = "3.2.1", features = ["derive"] }
fork-tree = { version = "3.0.0", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
futures = "0.3.25"
futures = "0.3.26"
futures-timer = "3.0.2"
log = "0.4.17"
lru = "0.8.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/sp-lightclient/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ subspace-verification = { version = "0.1.0", path = "../subspace-verification",
[dev-dependencies]
async-trait = "0.1.58"
frame-support = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
futures = "0.3.25"
futures = "0.3.26"
rand = { version = "0.8.5", features = ["min_const_gen"] }
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving"}
subspace-farmer-components = { version = "0.1.0", path = "../subspace-farmer-components" }
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bench = false
[dependencies]
async-trait = "0.1.58"
fs2 = "0.4.3"
futures = "0.3.25"
futures = "0.3.26"
libc = "0.2.131"
parity-scale-codec = "3.2.1"
rand = "0.8.5"
Expand All @@ -33,7 +33,7 @@ tracing = "0.1.37"

[dev-dependencies]
criterion = "0.4.0"
futures = "0.3.25"
futures = "0.3.26"
memmap2 = "0.5.8"
rayon = "1.6.0"
subspace-archiving = { version = "0.1.0", path = "../subspace-archiving" }
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ derive_more = "0.99.17"
dirs = "4.0.0"
event-listener-primitives = "2.0.1"
fdlimit = "0.2"
futures = "0.3.25"
futures = "0.3.26"
hex = { version = "0.4.3", features = ["serde"] }
jsonrpsee = { version = "0.16.2", features = ["client", "macros", "server"] }
lru = "0.8.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,16 @@ pub(crate) fn start_announcements_processor(
let handler_id = node.on_announcement(Arc::new({
let provider_records_sender = Mutex::new(provider_records_sender);

move |record| {
if let Err(error) = provider_records_sender.lock().try_send(record.clone()) {
move |record, guard| {
if let Err(error) = provider_records_sender
.lock()
.try_send((record.clone(), Arc::clone(guard)))
{
if error.is_disconnected() {
// Receiver exited, nothing left to be done
return;
}
let record = error.into_inner();
let (record, _guard) = error.into_inner();
// TODO: This should be made a warning, but due to
// https://github.com/libp2p/rust-libp2p/discussions/3411 it'll take us some time
// to resolve
Expand Down Expand Up @@ -218,7 +221,7 @@ pub(crate) fn start_announcements_processor(
.name("ann-processor".to_string())
.spawn(move || {
let processor_fut = async {
while let Some(provider_record) = provider_records_receiver.next().await {
while let Some((provider_record, _guard)) = provider_records_receiver.next().await {
if weak_readers_and_pieces.upgrade().is_none() {
// `ReadersAndPieces` was dropped, nothing left to be done
return;
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-fraud-proof/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ tracing = "0.1.37"
domain-block-builder = { version = "0.1.0", path = "../../domains/client/block-builder" }
domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" }
domain-test-service = { version = "0.1.0", path = "../../domains/test/service" }
futures = "0.3.25"
futures = "0.3.26"
pallet-balances = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
sc-cli = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34", default-features = false }
sc-consensus = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
Expand Down
8 changes: 4 additions & 4 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ actix-web = "4.2.1"
anyhow = "1.0.66"
async-trait = "0.1.58"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bytes = "1.2.1"
bytes = "1.4.0"
bytesize = "1.1.0"
chrono = {version = "0.4.21", features = ["clock", "serde", "std",]}
clap = { version = "4.0.26", features = ["color", "derive"] }
derive_more = "0.99.17"
either = "1.8.0"
event-listener-primitives = "2.0.1"
futures = "0.3.25"
futures = "0.3.26"
hex = "0.4.3"
lru = "0.8.1"
nohash-hasher = "0.2.0"
parity-db = "0.4.2"
parity-scale-codec = "3.2.1"
parking_lot = "0.12.1"
pin-project = "1.0.11"
prometheus-client = "0.18.1"
prometheus-client = "0.19.0"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.83"
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
Expand All @@ -49,7 +49,7 @@ unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_code
[dependencies.libp2p]
# TODO: change to upstream release when https://github.com/libp2p/rust-libp2p/pull/3287 is released
git = "https://github.com/subspace/rust-libp2p"
rev = "b700d0c9a12f984936b44f634e79c9f3ee5e342d"
rev = "2de61da642888e3c4deac9925be90d56cdef1475"
default-features = false
features = [
"dns",
Expand Down
4 changes: 3 additions & 1 deletion crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use crate::request_responses::{
Event as RequestResponseEvent, RequestHandler, RequestResponsesBehaviour,
};
use derive_more::From;
use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, MessageAuthenticity};
use libp2p::gossipsub::{
Behaviour as Gossipsub, Config as GossipsubConfig, Event as GossipsubEvent, MessageAuthenticity,
};
use libp2p::identify::{Behaviour as Identify, Config as IdentifyConfig, Event as IdentifyEvent};
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent};
use libp2p::ping::{Behaviour as Ping, Event as PingEvent};
Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use crate::shared::Shared;
use crate::utils::{convert_multiaddresses, ResizableSemaphore};
use futures::channel::mpsc;
use libp2p::gossipsub::{
GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId, ValidationMode,
Config as GossipsubConfig, ConfigBuilder as GossipsubConfigBuilder,
Message as GossipsubMessage, MessageId, ValidationMode,
};
use libp2p::identify::Config as IdentifyConfig;
use libp2p::kad::record::Key;
Expand Down
8 changes: 6 additions & 2 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::request_handlers::generic_request_handler::GenericRequest;
use crate::request_responses;
use crate::shared::{Command, CreatedSubscription, HandlerFn, Shared};
use crate::shared::{Command, CreatedSubscription, HandlerFn, HandlerFn2, Shared};
use crate::utils::ResizableSemaphorePermit;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand All @@ -10,6 +10,7 @@ use futures::{SinkExt, Stream};
use libp2p::core::multihash::Multihash;
use libp2p::gossipsub::error::SubscriptionError;
use libp2p::gossipsub::Sha256Topic;
use libp2p::kad::handler::InboundStreamEventGuard;
use libp2p::kad::record::Key;
use libp2p::kad::{PeerRecord, ProviderRecord};
use libp2p::{Multiaddr, PeerId};
Expand Down Expand Up @@ -535,7 +536,10 @@ impl Node {
}

/// Callback is called when node starts listening on new address.
pub fn on_announcement(&self, callback: HandlerFn<ProviderRecord>) -> HandlerId {
pub fn on_announcement(
&self,
callback: HandlerFn2<ProviderRecord, Arc<InboundStreamEventGuard>>,
) -> HandlerId {
self.shared.handlers.announcement.add(callback)
}
}
8 changes: 4 additions & 4 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::channel::mpsc;
use futures::future::Fuse;
use futures::{FutureExt, StreamExt};
use libp2p::core::ConnectedPoint;
use libp2p::gossipsub::{GossipsubEvent, TopicHash};
use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
use libp2p::identify::Event as IdentifyEvent;
use libp2p::kad::store::RecordStore;
use libp2p::kad::{
Expand Down Expand Up @@ -561,10 +561,10 @@ where

match event {
KademliaEvent::InboundRequest {
request: InboundRequest::AddProvider { record },
request: InboundRequest::AddProvider { record, guard },
} => {
trace!("Add provider request received: {:?}", record);
if let Some(record) = record {
if let (Some(record), Some(guard)) = (record, guard) {
if let Err(err) = self
.swarm
.behaviour_mut()
Expand All @@ -582,7 +582,7 @@ where
}
};

shared.handlers.announcement.call_simple(&record);
shared.handlers.announcement.call_simple(&record, &guard);
}
}
KademliaEvent::OutboundQueryProgressed {
Expand Down
45 changes: 12 additions & 33 deletions crates/subspace-networking/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ mod tests;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;
use libp2p::core::connection::ConnectionId;
use libp2p::core::{Multiaddr, PeerId};
use libp2p::request_response::{
Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
Expand All @@ -50,7 +49,10 @@ use libp2p::request_response::{
pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId};
use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
use libp2p::swarm::handler::multi::MultiHandler;
use libp2p::swarm::{ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::swarm::{
ConnectionHandler, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
THandlerInEvent,
};
use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
Expand Down Expand Up @@ -467,24 +469,21 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}
}
FromSwarm::DialFailure(inner) => {
for ((protocol, _), (_, handler)) in
self.protocols.values_mut().zip(inner.handler.into_iter())
{
for (protocol, _) in self.protocols.values_mut() {
protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: inner.peer_id,
handler,
error: inner.error,
connection_id: inner.connection_id,
}));
}
}
FromSwarm::ListenFailure(inner) => {
for ((protocol, _), (_, handler)) in
self.protocols.values_mut().zip(inner.handler.into_iter())
{
for (protocol, _) in self.protocols.values_mut() {
protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: inner.local_addr,
send_back_addr: inner.send_back_addr,
handler,
error: inner.error,
connection_id: inner.connection_id,
}));
}
}
Expand Down Expand Up @@ -550,7 +549,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> {
'poll_all: loop {
if let Some(message_request) = self.message_request.take() {
let MessageRequest {
Expand Down Expand Up @@ -654,34 +653,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour {

// Other events generated by the underlying behaviour are transparently
// passed through.
NetworkBehaviourAction::Dial { opts, handler } => {
NetworkBehaviourAction::Dial { opts } => {
if opts.get_peer_id().is_none() {
error!(
"The request-response isn't supposed to start dialing \
addresses"
);
}
let protocol = protocol.to_string();
let handler = {
let mut handlers: HashMap<_, _> = self
.protocols
.iter_mut()
.map(|(p, (r, _))| {
(p.to_string(), NetworkBehaviour::new_handler(r))
})
.collect();

if let Some(h) = handlers.get_mut(&protocol) {
*h = handler
}

MultiHandler::try_from_iter(handlers).expect(
"Protocols are in a HashMap and there can be at most one \
handler per protocol name, which is the only possible error; \
qed",
)
};
return Poll::Ready(NetworkBehaviourAction::Dial { opts, handler });
return Poll::Ready(NetworkBehaviourAction::Dial { opts });
}
NetworkBehaviourAction::NotifyHandler {
peer_id,
Expand Down
5 changes: 4 additions & 1 deletion crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::channel::{mpsc, oneshot};
use libp2p::core::multihash::Multihash;
use libp2p::gossipsub::error::{PublishError, SubscriptionError};
use libp2p::gossipsub::Sha256Topic;
use libp2p::kad::handler::InboundStreamEventGuard;
use libp2p::kad::record::Key;
use libp2p::kad::{PeerRecord, ProviderRecord};
use libp2p::{Multiaddr, PeerId};
Expand Down Expand Up @@ -87,12 +88,14 @@ pub(crate) enum Command {
}

pub(crate) type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
pub(crate) type HandlerFn2<A, B> = Arc<dyn Fn(&A, &B) + Send + Sync + 'static>;
type Handler<A> = Bag<HandlerFn<A>, A>;
type Handler2<A, B> = Bag<HandlerFn2<A, B>, A, B>;

#[derive(Default, Debug)]
pub(crate) struct Handlers {
pub(crate) new_listener: Handler<Multiaddr>,
pub(crate) announcement: Handler<ProviderRecord>,
pub(crate) announcement: Handler2<ProviderRecord, Arc<InboundStreamEventGuard>>,
}

#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/utils/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type SharedRegistry = Arc<Mutex<Registry>>;

#[get("/metrics")]
async fn metrics(registry: Data<SharedRegistry>) -> Result<HttpResponse, Box<dyn Error>> {
let mut encoded: Vec<u8> = Vec::new();
let mut encoded = String::new();
encode(&mut encoded, &registry.lock())?;

let resp = HttpResponse::build(StatusCode::OK).body(encoded);
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitive
frame-benchmarking = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34", default-features = false }
frame-benchmarking-cli = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34", default-features = false }
frame-support = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
futures = "0.3.25"
futures = "0.3.26"
hex-literal = "0.3.4"
log = "0.4.17"
once_cell = "1.16.0"
once_cell = "1.17.1"
parity-scale-codec = "3.2.1"
sc-cli = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34", default-features = false }
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ derive_more = "0.99.17"
domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" }
either = "1.8.0"
frame-support = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
futures = "0.3.25"
futures = "0.3.26"
jsonrpsee = { version = "0.16.2", features = ["server"] }
pallet-transaction-payment-rpc = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
parity-scale-codec = "3.2.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Subspace transaction pool"

[dependencies]
domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" }
futures = "0.3.25"
futures = "0.3.26"
jsonrpsee = { version = "0.16.2", features = ["server"] }
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
sc-service = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion domains/client/cross-domain-message-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ include = [
]

[dependencies]
futures = "0.3.25"
futures = "0.3.26"
parity-scale-codec = { version = "3.2.1", features = ["derive"] }
parking_lot = "0.12.1"
sc-network = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
Expand Down
2 changes: 1 addition & 1 deletion domains/client/domain-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ crossbeam = "0.8.2"
domain-block-builder = { version = "0.1.0", path = "../block-builder" }
domain-client-executor-gossip = { version = "0.1.0", path = "../executor-gossip" }
domain-runtime-primitives = { version = "0.1.0", path = "../../primitives/runtime" }
futures = { version = "0.3.25", features = ["compat"] }
futures = { version = "0.3.26", features = ["compat"] }
futures-timer = "3.0.1"
rand = "0.8.5"
rand_chacha = "0.3.1"
Expand Down
2 changes: 1 addition & 1 deletion domains/client/executor-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Liu-Cheng Xu <[email protected]>"]
edition = "2021"

[dependencies]
futures = "0.3.25"
futures = "0.3.26"
parity-scale-codec = { version = "3.2.1", features = ["derive"] }
parking_lot = "0.12.1"
sc-network = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
Expand Down
Loading

0 comments on commit a941651

Please sign in to comment.