Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chainbound): updated EchoExecutor #62

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/clients/chainbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ readme = "README.md"
[dependencies]
artemis-core = { path = "../../artemis-core" }
ethers = { version = "2", features = ["ws", "rustls"] }
fiber = { version = "0.3.3", git = "https://github.com/chainbound/fiber-rs" }
fiber = { git = "https://github.com/chainbound/fiber-rs" }
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
tokio = { version = "1.18", features = ["full"] }
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
async-trait = "0.1.64"
serde = "1.0.152"
anyhow = "1.0.70"
Expand Down
8 changes: 4 additions & 4 deletions crates/clients/chainbound/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ pub async fn main() -> anyhow::Result<()> {
let provider = Arc::new(Provider::connect("wss://eth.llamarpc.com").await.unwrap());
let tx_signer = LocalWallet::new(&mut rand::thread_rng()); // or any other signer
let auth_signer = LocalWallet::new(&mut rand::thread_rng()); // or any other signer
let echo_executor = Box::new(EchoExecutor::new(provider, tx_signer, auth_signer, api_key));
let echo_exec = Box::new(EchoExecutor::new(provider, tx_signer, auth_signer, api_key).await);

let executor_map = ExecutorMap::new(echo_executor, |action| match action {
Action::SendBundle(bundle) => Some(bundle),
});
// We can simply map all Action types in an Option
// since `EchoExecutor` implements `Executor<Action>`.
let executor_map = ExecutorMap::new(echo_exec, Some);

// And add these components to your Artemis engine
let mut engine: Engine<Event, Action> = Engine::default();
Expand Down
198 changes: 143 additions & 55 deletions crates/clients/chainbound/src/echo.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,45 @@
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use ethers::{providers::Middleware, signers::Signer};
use reqwest::{
header::{HeaderMap, HeaderValue},
Client,
};
use futures::{SinkExt, StreamExt, TryStreamExt};
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error};

use artemis_core::types::Executor;

use crate::SendBundleArgs;
use crate::{
utils::{generate_fb_signature, generate_jsonrpc_request},
SendBundleArgs, SendPrivateTransactionArgs,
};

/// Possible actions that can be executed by the Echo executor
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
#[allow(missing_docs)]
pub enum Action {
SendBundle(SendBundleArgs),
SendPrivateTransaction(SendPrivateTransactionArgs),
}

const ECHO_RPC_URL: &str = "https://echo-rpc.chainbound.io";
const ECHO_RPC_URL_WS: &str = "wss://echo-rpc.chainbound.io/ws";

/// An Echo executor that sends transactions to the specified block builders
pub struct EchoExecutor<M, S> {
/// The Echo RPC endpoint
echo_endpoint: String,
/// The HTTP client to send requests to the Echo RPC
echo_client: Client,
/// The native ethers middleware
inner: Arc<M>,
/// The signer to sign transactions before sending to the builders
tx_signer: S,
/// the signer to compute the `X-Flashbots-Signature` of the bundle payload
auth_signer: S,
/// Channel to send websocket messages
api_requests_tx: broadcast::Sender<String>,
/// Channel to receive websocket messages
api_responses_rx: broadcast::Receiver<String>,
}

impl<M: Middleware, S: Signer> EchoExecutor<M, S> {
Expand All @@ -45,23 +50,70 @@ impl<M: Middleware, S: Signer> EchoExecutor<M, S> {
/// - `tx_signer`: The actual signer of the bundle transactions
/// - `auth_signer`: The signer to compute the `X-Flashbots-Signature` of the bundle payload
/// - `api_key`: The Echo API key to use
pub fn new(inner: Arc<M>, tx_signer: S, auth_signer: S, api_key: impl Into<String>) -> Self {
let mut headers = HeaderMap::new();
headers.insert("Content-Type", "application/json".parse().unwrap());
headers.insert("X-Api-Key", api_key.into().parse().expect("Broken API key"));
pub async fn new(
inner: Arc<M>,
tx_signer: S,
auth_signer: S,
api_key: impl Into<String>,
) -> Self {
let request = tokio_tungstenite::tungstenite::http::Request::builder()
.uri(ECHO_RPC_URL_WS)
.header("x-api-key", api_key.into())
.header("host", "echo-artemis-client")
.header("sec-websocket-key", "dGhlIHNhbXBsZSBub25jZQ==")
.header("sec-websocket-version", "13")
.header("upgrade", "websocket")
.header("connection", "upgrade")
.body(())
.unwrap();

let (ws_client, _) = tokio_tungstenite::connect_async(request)
.await
.expect("Failed to connect to Echo via Websocket.");
debug!("Echo websocket handshake succeeded.");

let (api_requests_tx, mut api_requests_rx) = broadcast::channel(1024);
let (api_responses_tx, api_responses_rx) = broadcast::channel(1024);

// Spawn a task to manage the websocket connection with request and response channels
tokio::spawn(async move {
let (mut outgoing, incoming) = ws_client.split();

// send all incoming messages to the responses channel in a separate task
tokio::spawn(async move {
let responses_tx = api_responses_tx.clone();
incoming.try_for_each(|msg| async {
let text = msg.into_text().unwrap_or_else(|e| {
error!(error = ?e, "Error converting Echo API response to text");
Default::default()
});

responses_tx.send(text).unwrap_or_else(|e| {
error!(error = ?e, "Error sending Echo API response to the responses channel");
Default::default()
});

Ok(())
}).await.ok();
});

// send all messages from the intake channel into the websocket
while let Ok(msg) = api_requests_rx.recv().await {
if let Err(e) = outgoing.send(Message::Text(msg)).await {
error!(error = ?e, "Error sending Echo API request to the websocket")
}
}

let echo_client = Client::builder()
.timeout(Duration::from_secs(300))
.default_headers(headers)
.build()
.expect("Could not instantiate HTTP client");
error!("Echo API request channel has stopped sending messages");
});

Self {
echo_endpoint: ECHO_RPC_URL.into(),
echo_client,
echo_endpoint: ECHO_RPC_URL_WS.into(),
inner,
tx_signer,
auth_signer,
api_requests_tx,
api_responses_rx,
}
}

Expand All @@ -72,7 +124,12 @@ impl<M: Middleware, S: Signer> EchoExecutor<M, S> {

/// Returns a reference to the native ethers middleware
pub fn provider(&self) -> Arc<M> {
self.inner.clone()
Arc::clone(&self.inner)
}

/// Returns a reference to the API receipts channel
pub fn receipts_channel(&self) -> broadcast::Receiver<String> {
self.api_responses_rx.resubscribe()
}
}

Expand Down Expand Up @@ -102,53 +159,84 @@ where
// Set block number to the next block if not specified
if action.standard_features.block_number.is_none() {
let block_number = self.inner.get_block_number().await?;
let next_block_number_hex = format!("0x{:#x}", block_number.as_u64() + 1);
let next_block_number_hex = format!("{:#x}", block_number.as_u64() + 1);
action.standard_features.block_number = Some(next_block_number_hex);
}

// TODO: Simulate bundle

// Sign bundle payload (without the Echo-specific features)
let signable_payload = serde_json::to_string(&action.standard_features)?;
let flashbots_signature = self.auth_signer.sign_message(&signable_payload).await?;
let method = "eth_sendBundle";
let fb_payload = generate_jsonrpc_request(action.id, method, &action.standard_features);
let fb_signature = generate_fb_signature(&self.auth_signer, &fb_payload).await;

// Websocket usage format:
// https://echo.chainbound.io/docs/usage/api-interface#flashbots-authentication
let request_body = format!(
r#"{{"x-flashbots-signature":"{}","payload":{}}}"#,
fb_signature,
generate_jsonrpc_request(action.id, method, action)
);

// Create the `X-Flashbots-Signature` header
let flashbots_signature_header: HeaderValue =
format!("{:#x}:{}", self.auth_signer.address(), flashbots_signature).parse()?;
// Send bundle request
self.api_requests_tx.send(request_body)?;
debug!("bundle sent to Echo.");

// Prepare the full JSON-RPC request body
let bundle_json = serde_json::to_string(&action)?;
Ok(())
}
}

#[async_trait]
impl<M, S> Executor<SendPrivateTransactionArgs> for EchoExecutor<M, S>
where
M: Middleware + 'static,
M::Error: 'static,
S: Signer + 'static,
{
/// Send a transaction to the specified builders
async fn execute(&self, mut action: SendPrivateTransactionArgs) -> Result<()> {
let tx = &action.unsigned_tx;

// Sign the transaction
let signature = self.tx_signer.sign_transaction(&tx.clone().into()).await?;
let signed = tx.rlp_signed(&signature).to_string();
action.standard_features.tx = signed;

// TODO: Simulate transaction

// Sign payload (without the Echo-specific features)
let method = "eth_sendPrivateRawTransaction";
let fb_payload = generate_jsonrpc_request(action.id, method, &action.standard_features);
let fb_signature = generate_fb_signature(&self.auth_signer, &fb_payload).await;

// Websocket usage format:
// https://echo.chainbound.io/docs/usage/api-interface#flashbots-authentication
let request_body = format!(
r#"{{"id":1,"jsonrpc":"2.0","method":"eth_sendBundle","params":[{}]}}"#,
bundle_json
r#"{{"x-flashbots-signature":"{}","payload":{}}}"#,
fb_signature,
generate_jsonrpc_request(action.id, method, action)
);

// Send bundle
let echo_response = self
.echo_client
.post(&self.echo_endpoint)
.body(request_body)
.header("X-Flashbots-Signature", flashbots_signature_header)
.send()
.await;

match echo_response {
Ok(send_response) => {
let status = send_response.status();
let body = send_response.text().await?;

dbg!(body.clone());

if status.is_success() {
debug!("Echo bundle response: {:?}", body);
} else {
error!("Error in Echo bundle response: {:?}", body);
}
}
Err(send_error) => error!("Error while sending bundle to Echo: {:?}", send_error),
}
// Send transaction request
self.api_requests_tx.send(request_body)?;
debug!("transaction sent to Echo.");

Ok(())
}
}

#[async_trait]
impl<M, S> Executor<Action> for EchoExecutor<M, S>
where
M: Middleware + 'static,
M::Error: 'static,
S: Signer + 'static,
{
/// Send a transaction or bundle to the specified builders
async fn execute(&self, action: Action) -> Result<()> {
match action {
Action::SendBundle(bundle) => self.execute(bundle).await,
Action::SendPrivateTransaction(tx) => self.execute(tx).await,
}
}
}
8 changes: 4 additions & 4 deletions crates/clients/chainbound/src/fiber.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Result;
use async_trait::async_trait;
use ethers::types::Transaction;
use fiber::{
use ::fiber::{
eth::{CompactBeaconBlock, ExecutionPayload, ExecutionPayloadHeader},
Client,
};
use anyhow::Result;
use async_trait::async_trait;
use ethers::types::Transaction;
use futures::StreamExt;

use artemis_core::types::{Collector, CollectorStream};
Expand Down
Loading
Loading