Skip to content

Commit

Permalink
push
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger committed Feb 20, 2024
1 parent da3d5d4 commit d3db0b0
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 83 deletions.
1 change: 0 additions & 1 deletion examples/ping-pong/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ async fn main() {
None,
config.waku_port,
None,
Some(false),
None,
config.discv5_port,
)
Expand Down
26 changes: 4 additions & 22 deletions src/graphcast_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
//! - GraphcastID wallet: resolve Graph Account identity
//! - Ethereum node provider endpoint: provider access
//! - Waku Node Instance: interact with the Graphcast network
//! - Pubsub and Content filter topics: interaction configurations
//! - Pubsub topics: interaction configurations
//!
//! Graphcast agent shall be able to construct, send, receive, validate, and attest
//! Graphcast messages regardless of specific radio use cases
//!
use self::message_typing::{GraphcastMessage, IdentityValidation, MessageError, RadioPayload};
use self::waku_handling::{
build_content_topics, filter_peer_subscriptions, handle_signal, pubsub_topic,
setup_node_handle, WakuHandlingError,
build_content_topics, handle_signal, pubsub_topic, setup_node_handle, WakuHandlingError,
};
use ethers::signers::WalletError;

Expand Down Expand Up @@ -74,7 +73,6 @@ pub struct GraphcastAgentConfig {
pub waku_host: Option<String>,
pub waku_port: Option<String>,
pub waku_addr: Option<String>,
pub filter_protocol: Option<bool>,
pub discv5_enrs: Vec<String>,
pub discv5_port: Option<u16>,
}
Expand All @@ -96,7 +94,6 @@ impl GraphcastAgentConfig {
waku_host: Option<String>,
waku_port: Option<String>,
waku_addr: Option<String>,
filter_protocol: Option<bool>,
discv5_enrs: Option<Vec<String>>,
discv5_port: Option<u16>,
) -> Result<Self, GraphcastAgentError> {
Expand All @@ -120,8 +117,6 @@ impl GraphcastAgentConfig {
waku_host,
waku_port,
waku_addr,
// Extra handling here to make sure the default behavior is filter protocol disabled
filter_protocol: Some(filter_protocol.unwrap_or(false)),
discv5_enrs,
discv5_port,
};
Expand Down Expand Up @@ -213,9 +208,6 @@ pub struct GraphcastAgent {
pub seen_msg_ids: Arc<SyncMutex<HashSet<String>>>,
/// Sender identity validation mechanism used by the Graphcast agent
pub id_validation: IdentityValidation,
//TODO: Consider deprecating this field as it isn't utilized in network_check anymore
/// Keeps track of whether Filter protocol is enabled, if false -> we're using Relay protocol
pub filter_protocol_enabled: bool,
}

impl GraphcastAgent {
Expand Down Expand Up @@ -284,7 +276,6 @@ impl GraphcastAgent {
waku_host,
waku_port,
waku_addr,
filter_protocol,
discv5_enrs,
discv5_port,
id_validation,
Expand All @@ -308,7 +299,6 @@ impl GraphcastAgent {
port,
advertised_addr,
node_key,
filter_protocol,
discv5_enrs,
discv5_port,
)
Expand All @@ -318,15 +308,8 @@ impl GraphcastAgent {
let content_topics = build_content_topics(&radio_name, 0.to_string(), &subtopics);
let content_filter = ContentFilter::new(Some(pubsub_topic.clone()), content_topics.clone());

if filter_protocol.is_some() && !filter_protocol.unwrap() {
debug!("Filter protocol disabled, subscribe to pubsub topic on the relay protocol");
relay_subscribe(&node_handle, &content_filter)
.expect("Could not subscribe to the pubsub topic");
} else {
debug!("Filter protocol enabled, filter subscriptions with peers");
let _ = filter_peer_subscriptions(&node_handle, &pubsub_topic, &content_topics)
.expect("Could not connect and subscribe to the subtopics");
}
relay_subscribe(&node_handle, &content_filter)
.expect("Could not subscribe to the pubsub topic");

let callbook = CallBook::new(registry_subgraph, network_subgraph, graph_node_endpoint);

Expand All @@ -345,7 +328,6 @@ impl GraphcastAgent {
callbook,
seen_msg_ids,
id_validation,
filter_protocol_enabled: filter_protocol.is_some(),
})
}

Expand Down
66 changes: 6 additions & 60 deletions src/graphcast_agent/waku_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use url::ParseError;
use waku::{
waku_dns_discovery, waku_new, ContentFilter, DnsInfo, Encoding, GossipSubParams, Multiaddr,
ProtocolId, Running, SecretKey, Signal, WakuContentTopic, WakuLogLevel, WakuMessage,
WakuNodeConfig, WakuNodeHandle, WakuPeerData, WakuPubSubTopic,
WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic,
};

use crate::{app_name, cf_nameserver, discovery_url, graphql::QueryError};
Expand Down Expand Up @@ -60,45 +60,6 @@ pub fn relay_subscribe(
.map_err(WakuHandlingError::CreateNodeError)
}

/// Make filter subscription requests to all peers except for ourselves
/// Return subscription results for each peer
pub fn filter_peer_subscriptions(
node_handle: &WakuNodeHandle<Running>,
graphcast_topic: &WakuPubSubTopic,
content_topics: &[WakuContentTopic],
) -> Result<Vec<String>, WakuHandlingError> {
let subscription: ContentFilter = content_filter(graphcast_topic, content_topics);
debug!(
peers = tracing::field::debug(&subscription),
"Subscribe to topics"
);
let filter_subscribe_result: Vec<String> = node_handle
.peers()
.map_err(WakuHandlingError::RetrievePeersError)?
.iter()
.map(|peer: &WakuPeerData| {
// subscribe to all other peers
let filter_res = node_handle.filter_subscribe(
&subscription,
Some(peer.peer_id().clone()),
Some(Duration::new(6000, 0)),
);
match filter_res {
Ok(_) => format!(
"Success filter subcription request made to peer {}",
peer.peer_id(),
),
Err(e) => format!("Filter subcription request failed for peer {e}"),
}
})
.collect();
info!(
peers = tracing::field::debug(&filter_subscribe_result),
"Subscription connections established",
);
Ok(filter_subscribe_result)
}

/// Make filter subscription requests to all peers except for ourselves
/// Return subscription results for each peer
pub fn unsubscribe_peer(
Expand All @@ -121,16 +82,13 @@ pub fn unsubscribe_peer(
})
}

/// For boot nodes, configure a Waku Relay Node with filter protocol enabled (Waiting on filterFullNode waku-bindings impl). These node route all messages on the subscribed pubsub topic
/// For boot nodes, configure a Waku Relay Node, they will route all messages on the subscribed pubsub topic.
/// Preferrably also provide advertise_addr and Secp256k1 private key in Hex format (0x123...abc).
///
/// For light nodes, config with relay disabled and filter enabled. These node will route all messages but only pull data for messages matching the subscribed content topics.
fn node_config(
host: Option<&str>,
port: usize,
ad_addr: Option<Multiaddr>,
key: Option<SecretKey>,
filter_protocol: Option<bool>,
discv5_nodes: Vec<String>,
discv5_port: Option<u16>,
) -> Option<WakuNodeConfig> {
Expand All @@ -153,12 +111,9 @@ fn node_config(
..Default::default()
};

let relay = filter_protocol.map(|b| !b);
debug!(
relay_protocol = tracing::field::debug(&relay),
filter_protocol = tracing::field::debug(&filter_protocol),
discv5_nodes = tracing::field::debug(&discv5_nodes),
"Protocol setup",
"Discv5 nodes",
);

Some(WakuNodeConfig {
Expand All @@ -167,7 +122,7 @@ fn node_config(
advertise_addr: ad_addr, // Fill this for boot nodes
node_key: key,
keep_alive_interval: None,
relay, // Default true - will receive all msg on relay
relay: Some(true),
min_peers_to_publish: Some(0), // Default 0
log_level: Some(log_level),
relay_topics: [].to_vec(),
Expand Down Expand Up @@ -285,7 +240,6 @@ pub fn setup_node_handle(
port: Option<&str>,
advertised_addr: Option<Multiaddr>,
node_key: Option<SecretKey>,
filter_protocol: Option<bool>,
discv5_enrs: Vec<String>,
discv5_port: Option<u16>,
) -> Result<WakuNodeHandle<Running>, WakuHandlingError> {
Expand All @@ -307,7 +261,6 @@ pub fn setup_node_handle(
port,
advertised_addr,
node_key,
filter_protocol,
discv5_enrs,
discv5_port,
),
Expand All @@ -318,7 +271,6 @@ pub fn setup_node_handle(
port,
advertised_addr,
node_key,
filter_protocol,
discv5_nodes,
discv5_port,
);
Expand All @@ -328,12 +280,8 @@ pub fn setup_node_handle(
.start()
.map_err(WakuHandlingError::CreateNodeError)?;
let nodes = gather_nodes(boot_node_addresses, pubsub_topic);
// Connect to peers on the filter protocol or relay protocol
if let Some(false) = filter_protocol {
connect_multiaddresses(nodes, &node_handle, ProtocolId::Relay);
} else {
connect_multiaddresses(nodes, &node_handle, ProtocolId::Filter);
}

connect_multiaddresses(nodes, &node_handle, ProtocolId::Relay);

info!(
id = tracing::field::debug(node_handle.peer_id()),
Expand All @@ -352,7 +300,6 @@ pub fn boot_node_handle(
port: usize,
advertised_addr: Option<Multiaddr>,
node_key: Option<SecretKey>,
filter: Option<bool>,
discv5_enrs: Vec<String>,
discv5_port: Option<u16>,
) -> Result<WakuNodeHandle<Running>, WakuHandlingError> {
Expand All @@ -361,7 +308,6 @@ pub fn boot_node_handle(
port,
advertised_addr,
node_key,
filter,
discv5_enrs,
discv5_port,
);
Expand Down

0 comments on commit d3db0b0

Please sign in to comment.