diff --git a/orchestrator/Cargo.lock b/orchestrator/Cargo.lock index e10114b64..ec4ed4714 100644 --- a/orchestrator/Cargo.lock +++ b/orchestrator/Cargo.lock @@ -3241,6 +3241,7 @@ dependencies = [ "openssl-probe", "serde", "serde_derive", + "serde_json", "tokio 1.12.0", "tonic", "web30", diff --git a/orchestrator/relayer/Cargo.toml b/orchestrator/relayer/Cargo.toml index 12286a168..598949433 100644 --- a/orchestrator/relayer/Cargo.toml +++ b/orchestrator/relayer/Cargo.toml @@ -32,6 +32,7 @@ env_logger = "0.8" tokio = "1.4" tonic = "0.4" openssl-probe = "0.1" +serde_json = "1.0.68" [dev-dependencies] actix = "0.11" diff --git a/orchestrator/relayer/src/batch_relaying.rs b/orchestrator/relayer/src/batch_relaying.rs index 2335dc26b..eb5483ae4 100644 --- a/orchestrator/relayer/src/batch_relaying.rs +++ b/orchestrator/relayer/src/batch_relaying.rs @@ -4,17 +4,21 @@ use clarity::Uint256; use cosmos_gravity::query::get_latest_transaction_batches; use cosmos_gravity::query::get_transaction_batch_signatures; +use ethereum_gravity::utils::GasCost; use ethereum_gravity::utils::{downcast_to_u128, get_tx_batch_nonce}; use ethereum_gravity::{one_eth, submit_batch::send_eth_transaction_batch}; use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; use gravity_utils::message_signatures::encode_tx_batch_confirm_hashed; +use gravity_utils::types::Erc20Token; use gravity_utils::types::Valset; use gravity_utils::types::{BatchConfirmResponse, TransactionBatch}; -use web30::types::SendTxOption; use std::collections::HashMap; +use std::str::FromStr; use std::time::Duration; +use std::time::Instant; use tonic::transport::Channel; use web30::client::Web3; +use web30::types::SendTxOption; #[derive(Debug, Clone)] struct SubmittableBatch { @@ -39,13 +43,13 @@ pub async fn relay_batches( gravity_id: String, timeout: Duration, gas_multiplier: f32, + next_batch_send_time: &mut HashMap, ) { let possible_batches = get_batches_and_signatures(current_valset.clone(), grpc_client, gravity_id.clone()).await; trace!("possible batches {:?}", possible_batches); - submit_batches( current_valset, ethereum_key, @@ -55,11 +59,11 @@ pub async fn relay_batches( timeout, gas_multiplier, possible_batches, + next_batch_send_time, ) .await; } - /// This function retrieves the latest batches from the Cosmos module and then /// iterates through the signatures for each batch, determining if they are ready /// to submit. It is possible for a batch to not have valid signatures for two reasons @@ -117,7 +121,6 @@ async fn get_batches_and_signatures( return possible_batches; } - /// Attempts to submit batches with valid signatures, checking the state /// of the Ethereum chain to ensure that it is valid to submit a given batch /// more specifically that the correctly signed batch has not timed out or already @@ -139,6 +142,7 @@ async fn submit_batches( timeout: Duration, gas_multiplier: f32, possible_batches: HashMap>, + next_batch_send_time: &mut HashMap, ) { let our_ethereum_address = ethereum_key.to_public_key().unwrap(); let ethereum_block_height = if let Ok(bn) = web3.eth_block_number().await { @@ -200,33 +204,137 @@ async fn submit_batches( continue; } let cost = cost.unwrap(); - info!( - "We have detected latest batch {} but latest on Ethereum is {} This batch is estimated to cost {} Gas / {:.4} ETH to submit", - latest_cosmos_batch_nonce, - latest_ethereum_batch, - cost.gas_price.clone(), - downcast_to_u128(cost.get_total()).unwrap() as f32 - / downcast_to_u128(one_eth()).unwrap() as f32 - ); - let tx_options = vec![SendTxOption::GasPriceMultiplier(gas_multiplier)]; - - let res = send_eth_transaction_batch( - current_valset.clone(), - oldest_signed_batch, - &oldest_signatures, - web3, - timeout, - gravity_contract_address, - gravity_id.clone(), - ethereum_key, - tx_options, + if can_send_batch( + &cost, + &oldest_signed_batch.total_fee, + &oldest_signed_batch.token_contract, + next_batch_send_time, ) - .await; - if res.is_err() { - info!("Batch submission failed with {:?}", res); + .await + { + let token_contract = oldest_signed_batch.token_contract; + + info!( + "We have detected latest batch {} but latest on Ethereum is {} This batch is estimated to cost {} Gas / {:.4} ETH to submit", + latest_cosmos_batch_nonce, + latest_ethereum_batch, + cost.gas_price.clone(), + downcast_to_u128(cost.get_total()).unwrap() as f32 + / downcast_to_u128(one_eth()).unwrap() as f32 + ); + + let tx_options = vec![SendTxOption::GasPriceMultiplier(gas_multiplier)]; + + let res = send_eth_transaction_batch( + current_valset.clone(), + oldest_signed_batch, + &oldest_signatures, + web3, + timeout, + gravity_contract_address, + gravity_id.clone(), + ethereum_key, + tx_options, + ) + .await; + + if res.is_err() { + info!("Batch submission failed with {:?}", res); + } else { + update_next_batch_send_time(next_batch_send_time, token_contract) + } } } } } -} \ No newline at end of file +} + +async fn can_send_batch( + estimated_cost: &GasCost, + batch_fee: &Erc20Token, + contract_address: &EthAddress, + next_batch_send_time: &mut HashMap, +) -> bool { + match next_batch_send_time.get(contract_address) { + Some(time) => { + if *time < Instant::now() { + return true; + } + } + None => update_next_batch_send_time(next_batch_send_time, *contract_address), + } + + let token_price = match get_token_price(&batch_fee.token_contract_address).await { + Ok(token_price) => token_price, + Err(_) => return false, + }; + + let estimated_fee = estimated_cost.get_total(); + let batch_value = batch_fee.amount.clone() * token_price; + + batch_value >= estimated_fee +} + +fn update_next_batch_send_time( + next_batch_send_time: &mut HashMap, + contract_address: EthAddress, +) { + let timeout_duration = std::env::var("GRAVITY_BATCH_SENDING_SECS") + .map(|value| Duration::from_secs(value.parse().unwrap())) + .unwrap_or_else(|_| Duration::from_secs(3600)); + + next_batch_send_time.insert(contract_address, Instant::now() + timeout_duration); +} + +async fn get_token_price(contract_address: &EthAddress) -> Result { + // TODO: Use API for fetching price instead of json config file + + let config_file_path = + std::env::var("GRAVITY_TOKEN_PRICES").unwrap_or_else(|_| "token_prices.json".to_owned()); + + let config_str = match tokio::fs::read_to_string(config_file_path).await { + Err(err) => { + log::error!("Error while fetching token price: {}", err); + return Err(()); + } + Ok(value) => value, + }; + + let config: serde_json::Map = match serde_json::from_str(&config_str) + { + Err(err) => { + log::error!( + "Error while parsing token prices json configuration: {}", + err + ); + return Err(()); + } + Ok(config) => config, + }; + + let token_price = config + .get(&contract_address.to_string()) + .ok_or_else(|| ())?; + + if !token_price.is_string() { + log::error!("Expected token price in string format"); + return Err(()); + } + + match token_price.as_str() { + None => { + log::error!("Expected token price in string format"); + Err(()) + } + Some(token_price_str) => { + let token_price = Uint256::from_str(token_price_str); + + if token_price.is_err() { + log::error!("Unable to parse token price"); + } + + token_price.map_err(|_| ()) + } + } +} diff --git a/orchestrator/relayer/src/main_loop.rs b/orchestrator/relayer/src/main_loop.rs index 976ddd636..30dc913dc 100644 --- a/orchestrator/relayer/src/main_loop.rs +++ b/orchestrator/relayer/src/main_loop.rs @@ -6,7 +6,10 @@ use clarity::address::Address as EthAddress; use clarity::PrivateKey as EthPrivateKey; use ethereum_gravity::utils::get_gravity_id; use gravity_proto::gravity::query_client::QueryClient as GravityQueryClient; -use std::time::{Duration, Instant}; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; use tokio::time::sleep as delay_for; use tonic::transport::Channel; use web30::client::Web3; @@ -23,6 +26,8 @@ pub async fn relayer_main_loop( gas_multiplier: f32, ) { let mut grpc_client = grpc_client; + let mut next_batch_send_time: HashMap = HashMap::new(); + loop { let loop_start = Instant::now(); @@ -63,6 +68,7 @@ pub async fn relayer_main_loop( gravity_id.clone(), LOOP_SPEED, gas_multiplier, + &mut next_batch_send_time, ) .await; @@ -74,7 +80,7 @@ pub async fn relayer_main_loop( gravity_contract_address, gravity_id.clone(), LOOP_SPEED, - gas_multiplier + gas_multiplier, ) .await;