Skip to content

Commit

Permalink
Add support for Web3Call adapter (graphprotocol#4351)
Browse files Browse the repository at this point in the history
- Always prefer web3call adapter for runtime adapter on ethereum if available
- Prevent non call functions on call_only adapters
- Add web3call type configuration
- Require Web3Call endpoint to be archive
  • Loading branch information
mangas authored Feb 9, 2023
1 parent e4cb769 commit 0737908
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ graph-runtime-derive = { path = "../../runtime/derive" }
[dev-dependencies]
test-store = { path = "../../store/test-store" }
base64 = "0.20.0"
graph-mock = { path = "../../mock" }

[build-dependencies]
tonic-build = { workspace = true }
3 changes: 2 additions & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ impl TriggersAdapterSelector<Chain> for EthereumAdapterSelector {
traces: false,
};

self.adapters.cheapest_with(&adjusted_capabilities)?
self.adapters
.call_or_cheapest(Some(&adjusted_capabilities))?
} else {
self.adapters.cheapest_with(capabilities)?
};
Expand Down
12 changes: 12 additions & 0 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct EthereumAdapter {
web3: Arc<Web3<Transport>>,
metrics: Arc<ProviderEthRpcMetrics>,
supports_eip_1898: bool,
call_only: bool,
}

/// Gas limit for `eth_call`. The value of 50_000_000 is a protocol-wide parameter so this
Expand All @@ -84,18 +85,24 @@ impl CheapClone for EthereumAdapter {
web3: self.web3.cheap_clone(),
metrics: self.metrics.cheap_clone(),
supports_eip_1898: self.supports_eip_1898,
call_only: self.call_only,
}
}
}

impl EthereumAdapter {
pub fn is_call_only(&self) -> bool {
self.call_only
}

pub async fn new(
logger: Logger,
provider: String,
url: &str,
transport: Transport,
provider_metrics: Arc<ProviderEthRpcMetrics>,
supports_eip_1898: bool,
call_only: bool,
) -> Self {
// Unwrap: The transport was constructed with this url, so it is valid and has a host.
let hostname = graph::url::Url::parse(url)
Expand All @@ -122,6 +129,7 @@ impl EthereumAdapter {
web3,
metrics: provider_metrics,
supports_eip_1898: supports_eip_1898 && !is_ganache,
call_only,
}
}

Expand All @@ -133,6 +141,8 @@ impl EthereumAdapter {
to: BlockNumber,
addresses: Vec<H160>,
) -> Result<Vec<Trace>, Error> {
assert!(!self.call_only);

let eth = self.clone();
let retry_log_message =
format!("trace_filter RPC call for block range: [{}..{}]", from, to);
Expand Down Expand Up @@ -225,6 +235,8 @@ impl EthereumAdapter {
filter: Arc<EthGetLogsFilter>,
too_many_logs_fingerprints: &'static [&'static str],
) -> Result<Vec<Log>, TimeoutError<web3::error::Error>> {
assert!(!self.call_only);

let eth_adapter = self.clone();
let retry_log_message = format!("eth_getLogs RPC call for block range: [{}..{}]", from, to);
retry(retry_log_message, &logger)
Expand Down
167 changes: 163 additions & 4 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Context};
use anyhow::{anyhow, bail, Context};
use graph::cheap_clone::CheapClone;
use graph::prelude::rand::{self, seq::IteratorRandom};
use std::cmp::Ordering;
Expand All @@ -23,12 +23,26 @@ pub struct EthereumNetworkAdapter {
limit: usize,
}

#[derive(Clone)]
impl EthereumNetworkAdapter {
fn is_call_only(&self) -> bool {
self.adapter.is_call_only()
}
}

#[derive(Clone, Default)]
pub struct EthereumNetworkAdapters {
pub adapters: Vec<EthereumNetworkAdapter>,
pub call_only_adapters: Vec<EthereumNetworkAdapter>,
}

impl EthereumNetworkAdapters {
pub fn push_adapter(&mut self, adapter: EthereumNetworkAdapter) {
if adapter.is_call_only() {
self.call_only_adapters.push(adapter);
} else {
self.adapters.push(adapter);
}
}
pub fn all_cheapest_with(
&self,
required_capabilities: &NodeCapabilities,
Expand Down Expand Up @@ -73,6 +87,42 @@ impl EthereumNetworkAdapters {
self.adapters
.retain(|adapter| adapter.adapter.provider() != provider);
}

pub fn call_or_cheapest(
&self,
capabilities: Option<&NodeCapabilities>,
) -> anyhow::Result<Arc<EthereumAdapter>> {
match self.call_only_adapter()? {
Some(adapter) => Ok(adapter),
None => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities {
// Archive is required for call_only
archive: true,
traces: false,
})),
}
}

pub fn call_only_adapter(&self) -> anyhow::Result<Option<Arc<EthereumAdapter>>> {
if self.call_only_adapters.is_empty() {
return Ok(None);
}

let adapters = self
.call_only_adapters
.iter()
.min_by_key(|x| Arc::strong_count(&x.adapter))
.ok_or(anyhow!("no available call only endpoints"))?;

// TODO: This will probably blow up a lot sooner than [limit] amount of
// subgraphs, since we probably use a few instances.
if Arc::strong_count(&adapters.adapter) >= adapters.limit {
bail!("call only adapter has reached the concurrency limit");
}

// Cloning here ensure we have the correct count at any given time, if we return a reference it can be cloned later
// which could cause a high number of endpoints to be given away before accounting for them.
Ok(Some(adapters.adapter.clone()))
}
}

#[derive(Clone)]
Expand All @@ -97,8 +147,9 @@ impl EthereumNetworks {
let network_adapters = self
.networks
.entry(name)
.or_insert(EthereumNetworkAdapters { adapters: vec![] });
network_adapters.adapters.push(EthereumNetworkAdapter {
.or_insert(EthereumNetworkAdapters::default());

network_adapters.push_adapter(EthereumNetworkAdapter {
capabilities,
adapter,
limit,
Expand Down Expand Up @@ -160,6 +211,14 @@ impl EthereumNetworks {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use graph::{prelude::MetricsRegistry, tokio, url::Url};
use graph_mock::MockMetricsRegistry;
use http::HeaderMap;

use crate::{EthereumAdapter, EthereumNetworks, ProviderEthRpcMetrics, Transport};

use super::NodeCapabilities;

#[test]
Expand Down Expand Up @@ -216,4 +275,104 @@ mod tests {
assert_eq!(true, &full_traces >= &full);
assert_eq!(true, &full_traces >= &full_traces);
}

#[tokio::test]
async fn adapter_selector_selects_eth_call() {
let chain = "mainnet".to_string();
let logger = graph::log::logger(true);
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
let transport =
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));

let eth_call_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
true,
)
.await,
);

let eth_adapter = Arc::new(
EthereumAdapter::new(
logger.clone(),
String::new(),
"http://127.0.0.1",
transport.clone(),
provider_metrics.clone(),
true,
false,
)
.await,
);

let mut adapters = {
let mut ethereum_networks = EthereumNetworks::new();
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_call_adapter.clone(),
3,
);
ethereum_networks.insert(
chain.clone(),
NodeCapabilities {
archive: true,
traces: false,
},
eth_adapter.clone(),
3,
);
ethereum_networks.networks.get(&chain).unwrap().clone()
};
// one reference above and one inside adapters struct
assert_eq!(Arc::strong_count(&eth_call_adapter), 2);
assert_eq!(Arc::strong_count(&eth_adapter), 2);

{
// Not Found
assert!(adapters
.cheapest_with(&NodeCapabilities {
archive: false,
traces: true,
})
.is_err());

// Check cheapest is not call only
let adapter = adapters
.cheapest_with(&NodeCapabilities {
archive: true,
traces: false,
})
.unwrap();
assert_eq!(adapter.is_call_only(), false);
}

// Check limits
{
let adapter = adapters.call_or_cheapest(None).unwrap();
assert!(adapter.is_call_only());
assert!(adapters.call_or_cheapest(None).is_err());
}

// Check empty falls back to call only
{
adapters.call_only_adapters = vec![];
let adapter = adapters
.call_or_cheapest(Some(&NodeCapabilities {
archive: true,
traces: false,
}))
.unwrap();
assert_eq!(adapter.is_call_only(), false);
}
}
}
11 changes: 4 additions & 7 deletions chain/ethereum/src/runtime/runtime_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
fn host_fns(&self, ds: &DataSource) -> Result<Vec<HostFn>, Error> {
let abis = ds.mapping.abis.clone();
let call_cache = self.call_cache.cheap_clone();
let eth_adapter = self
.eth_adapters
.cheapest_with(&NodeCapabilities {
archive: ds.mapping.requires_archive()?,
traces: false,
})?
.cheap_clone();
let eth_adapter = self.eth_adapters.call_or_cheapest(Some(&NodeCapabilities {
archive: ds.mapping.requires_archive()?,
traces: false,
}))?;

let ethereum_call = HostFn {
name: "ethereum.call",
Expand Down
1 change: 1 addition & 0 deletions node/resources/tests/full_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ingestor = "index_0"
shard = "primary"
provider = [
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] },
{ label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }},
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }},
{ label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }},
]
Expand Down
Loading

0 comments on commit 0737908

Please sign in to comment.