Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support tx subscriptions #218

Merged
merged 28 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,599 changes: 1,007 additions & 592 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
members = [
"chains/astar/config",
"chains/astar/server",
"chains/ethereum/backend",
"chains/ethereum/config",
"chains/ethereum/rpc-client",
"chains/ethereum/server",
"chains/ethereum/tx",
"chains/ethereum/types",
"chains/polkadot/config",
"chains/polkadot/server",
"chains/polkadot/tx",
Expand All @@ -16,6 +18,7 @@ members = [
"rosetta-server",
"rosetta-types",
"chains/arbitrum/testing/rosetta-testing-arbitrum",
"rosetta-utils",
]
resolver = "2"

Expand All @@ -25,10 +28,12 @@ resolver = "2"
[workspace.dependencies]
rosetta-config-astar = { path = "chains/astar/config", default-features = false }
rosetta-server-astar = { path = "chains/astar/server" }
rosetta-ethereum-backend = { path = "chains/ethereum/backend" }
rosetta-config-ethereum = { path = "chains/ethereum/config" }
rosetta-server-ethereum = { path = "chains/ethereum/server" }
rosetta-ethereum-rpc-client = { path = "chains/ethereum/rpc-client" }
rosetta-tx-ethereum = { path = "chains/ethereum/tx" }
rosetta-ethereum-types = { path = "chains/ethereum/types", default-features = false }
rosetta-config-polkadot = { path = "chains/polkadot/config" }
rosetta-server-polkadot = { path = "chains/polkadot/server" }
rosetta-tx-polkadot = { path = "chains/polkadot/tx" }
Expand All @@ -38,12 +43,17 @@ rosetta-crypto = { path = "rosetta-crypto" }
rosetta-docker = { path = "rosetta-docker" }
rosetta-server = { path = "rosetta-server", default-features = false }
rosetta-types = { path = "rosetta-types" }
rosetta-utils = { path = "rosetta-utils", default-features = false }

## Crates we want all members to use the same version
jsonrpsee = { version = "0.21", default-features = false }
jsonrpsee = { version = "0.22", default-features = false }
parity-scale-codec = { version = "3.6" }
tokio = { version = "1.32" }
subxt = { version = "0.33", default-features = false }
subxt = { version = "0.34", default-features = false }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
scale-info = { version = "2.3" }

# Used to sign substrate transactions, must be the same version used by subxt
# https://github.com/paritytech/subxt/blob/v0.34.0/Cargo.toml#L125
sp-keyring = { version = "31.0" }
2 changes: 2 additions & 0 deletions chains/arbitrum/testing/rosetta-testing-arbitrum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::large_futures)]

//! # Arbitrum Nitro Testnet Rosetta Server
//!
//! This module contains the production test for an Arbitrum Rosetta server implementation
Expand Down
4 changes: 2 additions & 2 deletions chains/astar/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1.0"
async-trait = "0.1"
ethers = "2.0"
futures = { version = "0.3", default-features = false, features = ["std"] }
futures-util = "0.3"
hex = "0.4"
log = "0.4"
parity-scale-codec = { workspace = true, features = ["derive"] }
Expand All @@ -21,8 +22,7 @@ rosetta-server = { workspace = true, features = ["ws", "webpki-tls"] }
rosetta-server-ethereum.workspace = true
serde.workspace = true
serde_json.workspace = true
# sp-core = { version = "27.0", default-features = false, features = ["blake2", "std"] }
sp-keyring = "29.0"
sp-keyring.workspace = true
subxt = { workspace = true, features = ["substrate-compat"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

Expand Down
72 changes: 62 additions & 10 deletions chains/astar/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,19 @@ impl AstarClient {
// If a hash if provided, we don't know if it's a ethereum block hash or substrate
// block hash. We try to fetch the block using ethereum first, and
// if it fails, we try to fetch it using substrate.
let ethereum_block =
self.client.call(&EthQuery::GetBlockByHash(H256(*block_hash))).await.map(
|result| match result {
EthQueryResult::GetBlockByHash(block) => block,
_ => unreachable!(),
},
);
let ethereum_block = self
.client
.call(&EthQuery::GetBlockByHash(H256(*block_hash).into()))
.await
.map(|result| match result {
EthQueryResult::GetBlockByHash(block) => block,
_ => unreachable!(),
});

if let Ok(Some(ethereum_block)) = ethereum_block {
// Convert ethereum block to substrate block by fetching the block by number.
let substrate_block_number = BlockNumber::Number(ethereum_block.header.number);
let substrate_block_number =
BlockNumber::Number(ethereum_block.header().number());
let substrate_block_hash = self
.rpc_methods
.chain_get_block_hash(Some(substrate_block_number))
Expand Down Expand Up @@ -132,12 +134,12 @@ impl AstarClient {
// Verify if the ethereum block hash matches the provided ethereum block hash.
// TODO: compute the block hash
if U256(actual_eth_block.header.number.0) !=
U256::from(ethereum_block.header.number)
U256::from(ethereum_block.header().number())
{
anyhow::bail!("ethereum block hash mismatch");
}
if actual_eth_block.header.parent_hash.as_fixed_bytes() !=
&ethereum_block.header.parent_hash.0
&ethereum_block.header().header().parent_hash.0
{
anyhow::bail!("ethereum block hash mismatch");
}
Expand Down Expand Up @@ -206,6 +208,8 @@ impl BlockchainClient for AstarClient {

type Query = rosetta_config_ethereum::Query;
type Transaction = rosetta_config_ethereum::SignedTransaction;
type Subscription = <MaybeWsEthereumClient as BlockchainClient>::Subscription;
type Event = <MaybeWsEthereumClient as BlockchainClient>::Event;

async fn query(
&self,
Expand Down Expand Up @@ -297,6 +301,9 @@ impl BlockchainClient for AstarClient {
async fn listen<'a>(&'a self) -> Result<Option<Self::EventStream<'a>>> {
self.client.listen().await
}
async fn subscribe(&self, _sub: &Self::Subscription) -> Result<u32> {
anyhow::bail!("not implemented");
}
}

#[allow(clippy::ignored_unit_patterns)]
Expand Down Expand Up @@ -450,4 +457,49 @@ mod tests {
.await;
Ok(())
}

#[tokio::test]
async fn test_subscription() -> Result<()> {
use futures_util::StreamExt;
use rosetta_client::client::GenericBlockIdentifier;
use rosetta_core::{BlockOrIdentifier, ClientEvent};
let config = rosetta_config_astar::config("dev").unwrap();
let env = Env::new("astar-subscription", config.clone(), client_from_config)
.await
.unwrap();

run_test(env, |env| async move {
let wallet = env.ephemeral_wallet().await.unwrap();
let mut stream = wallet.listen().await.unwrap().unwrap();

let mut last_head: Option<u64> = None;
let mut last_finalized: Option<u64> = None;
for _ in 0..10 {
let event = stream.next().await.unwrap();
match event {
ClientEvent::NewHead(BlockOrIdentifier::Identifier(
GenericBlockIdentifier::Ethereum(head),
)) => {
if let Some(block_number) = last_head {
assert!(head.index > block_number);
}
last_head = Some(head.index);
},
ClientEvent::NewFinalized(BlockOrIdentifier::Identifier(
GenericBlockIdentifier::Ethereum(finalized),
)) => {
if let Some(block_number) = last_finalized {
assert!(finalized.index > block_number);
}
last_finalized = Some(finalized.index);
},
event => panic!("unexpected event: {event:?}"),
}
}
assert!(last_head.is_some());
assert!(last_finalized.is_some());
})
.await;
Ok(())
}
}
34 changes: 34 additions & 0 deletions chains/ethereum/backend/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "rosetta-ethereum-backend"
version = "0.1.0"
edition = "2021"
license = "MIT"
repository = "https://github.com/analog-labs/chain-connectors"
description = "Ethereum RPC method."

[dependencies]
async-trait = "0.1"
auto_impl = "1.1"
futures-core = { version = "0.3", default-features = false, features = ["alloc"] }
jsonrpsee-core = { version = "0.22", default-features = false, features = ["client"], optional = true }
parity-scale-codec = { workspace = true, features = ["derive"], optional = true }
rosetta-ethereum-types = { workspace = true, features = ["with-rlp", "with-crypto"] }
scale-info = { version = "2.9", default-features = false, features = ["derive"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }

[dev-dependencies]
hex-literal = "0.4"
serde_json = { version = "1.0", default-features = false }

[features]
default = ["std", "jsonrpsee"]
with-codec = ["dep:parity-scale-codec", "dep:scale-info", "rosetta-ethereum-types/with-codec"]
serde = ["dep:serde", "rosetta-ethereum-types/serde"]
std = [
"futures-core/std",
"rosetta-ethereum-types/std",
"parity-scale-codec?/std",
"scale-info?/std",
"serde?/std",
]
jsonrpsee = ["dep:jsonrpsee-core", "serde"]
121 changes: 121 additions & 0 deletions chains/ethereum/backend/src/block_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use rosetta_ethereum_types::{Address, AtBlock, H256};

#[cfg(feature = "serde")]
use crate::serde_util::opt_value_or_array;

#[derive(Clone, PartialEq, Eq, Debug)]
#[cfg_attr(
feature = "with-codec",
derive(parity_scale_codec::Encode, parity_scale_codec::Decode, scale_info::TypeInfo)
)]
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub struct BlockRange {
/// A list of addresses from which logs should originate.
#[cfg_attr(
feature = "serde",
serde(with = "opt_value_or_array", skip_serializing_if = "Vec::is_empty")
)]
pub address: Vec<Address>,
/// Array of topics. topics are order-dependent.
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Vec::is_empty"))]
pub topics: Vec<H256>,
/// Array of topics. topics are order-dependent.
#[cfg_attr(
feature = "serde",
serde(default, rename = "fromBlock", skip_serializing_if = "Option::is_none")
)]
pub from: Option<AtBlock>,
/// A hexadecimal block number, or the string latest, earliest or pending
#[cfg_attr(
feature = "serde",
serde(default, rename = "toBlock", skip_serializing_if = "Option::is_none")
)]
pub to: Option<AtBlock>,
#[cfg_attr(
feature = "serde",
serde(default, rename = "blockHash", skip_serializing_if = "Option::is_none")
)]
pub blockhash: Option<AtBlock>,
}

impl Default for BlockRange {
fn default() -> Self {
Self {
address: Vec::new(),
from: Some(AtBlock::Latest),
to: Some(AtBlock::Latest),
topics: Vec::new(),
blockhash: None,
}
}
}

#[cfg(all(test, feature = "serde"))]
mod tests {
use super::*;
use hex_literal::hex;
use rosetta_ethereum_types::BlockIdentifier;
use serde_json::json;

#[test]
fn block_range_with_one_address_works() {
let expected = BlockRange {
address: vec![Address::from(hex!("1a94fce7ef36bc90959e206ba569a12afbc91ca1"))],
from: None,
to: None,
topics: vec![H256(hex!(
"241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"
))],
blockhash: Some(AtBlock::At(BlockIdentifier::Hash(H256(hex!(
"7c5a35e9cb3e8ae0e221ab470abae9d446c3a5626ce6689fc777dcffcab52c70"
))))),
};
let json = json!({
"address": "0x1a94fce7ef36bc90959e206ba569a12afbc91ca1",
"topics":["0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"],
"blockHash": "0x7c5a35e9cb3e8ae0e221ab470abae9d446c3a5626ce6689fc777dcffcab52c70",
});
// Decode works
let actual = serde_json::from_value::<BlockRange>(json.clone()).unwrap();
assert_eq!(expected, actual);

// Encode works
let encoded = serde_json::to_value(expected).unwrap();
assert_eq!(json, encoded);
}

#[test]
fn block_range_with_many_addresses_works() {
let expected = BlockRange {
address: vec![
Address::from(hex!("1a94fce7ef36bc90959e206ba569a12afbc91ca1")),
Address::from(hex!("86e4dc95c7fbdbf52e33d563bbdb00823894c287")),
],
from: None,
to: None,
topics: vec![H256(hex!(
"241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"
))],
blockhash: Some(AtBlock::At(BlockIdentifier::Hash(H256(hex!(
"7c5a35e9cb3e8ae0e221ab470abae9d446c3a5626ce6689fc777dcffcab52c70"
))))),
};
let json = json!({
"address": ["0x1a94fce7ef36bc90959e206ba569a12afbc91ca1", "0x86e4dc95c7fbdbf52e33d563bbdb00823894c287"],
"topics":["0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"],
"blockHash": "0x7c5a35e9cb3e8ae0e221ab470abae9d446c3a5626ce6689fc777dcffcab52c70",
});

// Decode works
let actual = serde_json::from_value::<BlockRange>(json.clone()).unwrap();
assert_eq!(expected, actual);

// Encode works
let encoded = serde_json::to_value(actual).unwrap();
assert_eq!(json, encoded);
}
}
Loading
Loading