diff --git a/Cargo.lock b/Cargo.lock index 59daff28a..97c194700 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -947,6 +947,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -956,6 +969,34 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -1936,9 +1977,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "memoffset" @@ -2247,9 +2288,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.101" +version = "0.9.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" dependencies = [ "cc", "libc", @@ -2847,6 +2888,7 @@ dependencies = [ "actix-web", "actix-web-httpauth", "althea_kernel_interface", + "althea_proto", "althea_types", "arrayvec", "auto-bridge", @@ -2858,6 +2900,7 @@ dependencies = [ "clarity", "compressed_log", "cosmos-sdk-proto-althea", + "crossbeam", "deep_space", "env_logger 0.11.3", "flate2", @@ -3560,9 +3603,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 8bf21573d..e9fc1b640 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,4 @@ deep_space = {version = "2", features = ["althea"], default-features=false} web30 = "1.2" clarity = "1.3" awc = {version = "3.2", default-features = false, features=["openssl", "compress-gzip", "compress-zstd"]} +althea_proto = "0.6" \ No newline at end of file diff --git a/althea_types/src/interop.rs b/althea_types/src/interop.rs index 4c8ee148b..06ff70d01 100644 --- a/althea_types/src/interop.rs +++ b/althea_types/src/interop.rs @@ -16,7 +16,6 @@ use std::fmt; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::net::IpAddr; -use std::net::Ipv4Addr; use std::str::FromStr; use std::time::{Duration, SystemTime}; @@ -389,21 +388,6 @@ pub struct LocalIdentity { pub global: Identity, } -/// This is all the data a light client needs to open a light client tunnel -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Copy)] -pub struct LightClientLocalIdentity { - pub wg_port: u16, - /// If we have an existing tunnel, None if we don't know - pub have_tunnel: Option, - pub global: Identity, - /// we have to replicate dhcp ourselves due to the android vpn api - pub tunnel_address: Ipv4Addr, - /// the local_fee of the node passing light client traffic, much bigger - /// than the actual babel price field for ergonomics around downcasting - /// the number after upcasting when we compute it. - pub price: u128, -} - /// This represents a generic payment that may be to or from us /// it contains a txid from a published transaction /// that should be validated against the blockchain diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 541fa8f41..c0b821846 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -30,7 +30,7 @@ awc = {workspace = true} actix-rt = "2.8" deep_space = {workspace = true} clarity = {workspace = true} -althea_proto = "0.6.0" +althea_proto = {workspace = true} futures = { version = "0.3", features = ["compat"] } num256 = "0.5" num-traits="0.2" diff --git a/rita_common/Cargo.toml b/rita_common/Cargo.toml index 3007532da..c2c9fc50c 100644 --- a/rita_common/Cargo.toml +++ b/rita_common/Cargo.toml @@ -39,6 +39,8 @@ althea_types = { path = "../althea_types" } deep_space = {workspace = true} prost-types ="0.12" cosmos-sdk-proto-althea = {package = "cosmos-sdk-proto-althea", version = "0.16", features = ["ethermint"]} +althea_proto = {workspace = true} +crossbeam = "0.8" [dependencies.regex] version = "1.6" diff --git a/rita_common/src/network_endpoints/mod.rs b/rita_common/src/network_endpoints/mod.rs index 2a176d5d8..f8d6ea1bd 100644 --- a/rita_common/src/network_endpoints/mod.rs +++ b/rita_common/src/network_endpoints/mod.rs @@ -1,9 +1,9 @@ //! Network endptoints for common Rita functionality (such as exchanging hello messages) -use crate::payment_validator::{validate_later, ToValidate}; +use crate::payment_validator::{add_to_incoming_transaction_queue, ToValidate}; use crate::peer_listener::structs::Peer; +use crate::tm_identity_callback; use crate::tunnel_manager::id_callback::IdentityCallback; -use crate::{tm_identity_callback, RitaCommonError}; use actix_web_async::http::StatusCode; use actix_web_async::web::Json; @@ -20,40 +20,29 @@ pub async fn make_payments(item: Json) -> HttpResponse { let ts = ToValidate { payment: pmt, received: Instant::now(), - checked: false, }; + add_to_incoming_transaction_queue(ts); - match validate_later(ts) { - Ok(()) | Err(RitaCommonError::DuplicatePayment) => { - HttpResponse::Ok().json("Payment Received!") - } - Err(e) => HttpResponse::build(StatusCode::from_u16(400u16).unwrap()).json(&format!("{e}")), - } + // we can't actually check validity here so we simply return Ok + // in any case it's not possible to return if the payment was invalid + // as we may find a validation condition only after making many other checks + HttpResponse::Ok().json("Payment Received!") } /// The recieve side of the make payments v2 call. This processes a list of payments instead of a single payment pub async fn make_payments_v2(item: Json>) -> HttpResponse { let pmt_list = item.into_inner(); - let mut build_err = String::new(); for pmt in pmt_list { let ts = ToValidate { payment: pmt, received: Instant::now(), - checked: false, }; - - // Duplicates will be removed here - match validate_later(ts) { - Ok(()) | Err(RitaCommonError::DuplicatePayment) => {} - Err(e) => { - build_err.push_str(&format!("{e}\n")); - } - } + add_to_incoming_transaction_queue(ts); } - if !build_err.is_empty() { - return HttpResponse::build(StatusCode::from_u16(400u16).unwrap()).json(&build_err); - } + // we can't actually check validity here so we simply return Ok + // in any case it's not possible to return if the payment was invalid + // as we may find a validation condition only after making many other checks HttpResponse::Ok().json("Payment Received!") } diff --git a/rita_common/src/payment_controller/mod.rs b/rita_common/src/payment_controller/mod.rs index 5fa79f2b6..3fa93147b 100644 --- a/rita_common/src/payment_controller/mod.rs +++ b/rita_common/src/payment_controller/mod.rs @@ -8,13 +8,13 @@ use crate::blockchain_oracle::{ }; use crate::debt_keeper::normalize_payment_amount; use crate::debt_keeper::payment_failed; -use crate::payment_validator::{get_payment_txids, validate_later, ToValidate}; +use crate::payment_validator::ToValidate; use crate::payment_validator::{ALTHEA_CHAIN_PREFIX, ALTHEA_CONTACT_TIMEOUT}; use crate::rita_loop::get_web3_server; use crate::KI; use althea_types::interop::UnpublishedPaymentTx; -use althea_types::SystemChain; use althea_types::{Denom, PaymentTx}; +use althea_types::{Identity, SystemChain}; use awc; use deep_space::{Coin, Contact, EthermintPrivateKey}; use futures::future::{join, join_all}; @@ -23,7 +23,7 @@ use num_traits::Num; use settings::network::NetworkSettings; use settings::payment::PaymentSettings; use settings::{DEBT_KEEPER_DENOM, DEBT_KEEPER_DENOM_DECIMAL}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::error::Error; use std::fmt::Result as DisplayResult; use std::fmt::{Display, Formatter}; @@ -136,7 +136,9 @@ impl Error for PaymentControllerError {} /// This function is called by the async loop in order to perform payment /// controller actions -pub async fn tick_payment_controller() { +pub async fn tick_payment_controller( + previously_sent_payments: HashMap>, +) -> Vec { let outgoing_payments: Vec; let resend_queue: Vec; @@ -161,13 +163,18 @@ pub async fn tick_payment_controller() { data.resend_queue = Vec::new(); } + let mut payments_sent_this_round = Vec::new(); + // this creates a series of futures that we can use to perform // retires in parallel, this is helpful because retries may take // a long time to timeout, payments are done in series to reduce // nonce races let mut retry_futures = Vec::new(); for pmt in outgoing_payments { - let _ = make_payment(pmt).await; + match make_payment(pmt, &previously_sent_payments).await { + Ok(pmt) => payments_sent_this_round.push(pmt), + Err(e) => warn!("Failed to send payment with {:?}!", e), + } } for resend in resend_queue { let fut = resend_txid(resend); @@ -176,26 +183,39 @@ pub async fn tick_payment_controller() { // we log all errors in the functions themselves, we could print errors here // instead, but right now no action is needed either way. let _ = join_all(retry_futures).await; + + payments_sent_this_round } /// This is called by debt_keeper to make payments. It sends a -/// PaymentTx to the `mesh_ip` in its `to` field. -async fn make_payment(pmt: UnpublishedPaymentTx) -> Result<(), PaymentControllerError> { +/// PaymentTx to the `mesh_ip` in its `to` field. It returns a payment to validate +async fn make_payment( + pmt: UnpublishedPaymentTx, + previously_sent_payments: &HashMap>, +) -> Result { let common = settings::get_rita_common(); let network_settings = common.network; let payment_settings = common.payment; let system_chain = payment_settings.system_chain; match system_chain { - SystemChain::Althea => make_althea_payment(pmt, payment_settings, network_settings).await, - SystemChain::Xdai => make_xdai_payment(pmt, payment_settings, network_settings).await, - SystemChain::Rinkeby => { - warn!("Payments on Rinkeby not currently supported!"); - Ok(()) + SystemChain::Althea => { + make_althea_payment( + pmt, + payment_settings, + network_settings, + previously_sent_payments, + ) + .await } - SystemChain::Ethereum => { - warn!("Payments on Ethereum not currently supported!"); - Ok(()) + SystemChain::Xdai | SystemChain::Rinkeby | SystemChain::Ethereum => { + make_xdai_payment( + pmt, + payment_settings, + network_settings, + previously_sent_payments, + ) + .await } } } @@ -204,7 +224,8 @@ async fn make_althea_payment( mut pmt: UnpublishedPaymentTx, payment_settings: PaymentSettings, network_settings: NetworkSettings, -) -> Result<(), PaymentControllerError> { + previously_sent_payments: &HashMap>, +) -> Result { // On althea chain, we default to paying with usdc, config must specify this as an accepted denom let usdc_denom = match payment_settings .accepted_denoms @@ -310,27 +331,30 @@ async fn make_althea_payment( // setup tx hash let pmt = pmt.publish(Uint256::from_str_radix(&transaction.txhash, 16).unwrap()); - send_make_payment_endpoints(pmt, network_settings, None, Some(cosmos_node_grpc)).await; + send_make_payment_endpoints( + pmt, + network_settings, + None, + Some(cosmos_node_grpc), + previously_sent_payments, + ) + .await; // place this payment in the validation queue to handle later. let ts = ToValidate { payment: pmt, received: Instant::now(), - checked: false, }; - if let Err(e) = validate_later(ts.clone()) { - error!("Received error trying to validate {:?} Error: {:?}", ts, e); - } - - Ok(()) + Ok(ts) } async fn make_xdai_payment( pmt: UnpublishedPaymentTx, payment_settings: PaymentSettings, network_settings: NetworkSettings, -) -> Result<(), PaymentControllerError> { + previously_sent_payments: &HashMap>, +) -> Result { let balance = get_oracle_balance(); let nonce = get_oracle_nonce(); let gas_price = get_oracle_latest_gas_price(); @@ -400,20 +424,22 @@ async fn make_xdai_payment( // add published txid to submission let pmt = pmt.publish(tx_id); - send_make_payment_endpoints(pmt, network_settings, Some(full_node), None).await; + send_make_payment_endpoints( + pmt, + network_settings, + Some(full_node), + None, + previously_sent_payments, + ) + .await; // place this payment in the validation queue to handle later. let ts = ToValidate { payment: pmt, received: Instant::now(), - checked: false, }; - if let Err(e) = validate_later(ts.clone()) { - error!("Received error trying to validate {:?} Error: {:?}", ts, e); - } - - Ok(()) + Ok(ts) } Err(e) => { error!( @@ -468,6 +494,7 @@ async fn send_make_payment_endpoints( network_settings: NetworkSettings, full_node: Option, cosmos_node_grpc: Option, + previously_sent_payments: &HashMap>, ) { // testing hack let neighbor_url = if cfg!(not(test)) { @@ -489,9 +516,11 @@ async fn send_make_payment_endpoints( String::from("http://127.0.0.1:1234/make_payment_v2") }; - // Get all txids to this client. Temporary add new payment to a copy of a list to send up to endpoint - // this pmt is actually recorded in memory after validator confirms it - let mut txid_history = get_payment_txids(pmt.to); + let mut txid_history = previously_sent_payments + .get(&pmt.to) + .cloned() + .unwrap_or_default(); + txid_history.insert(pmt); let actix_client = awc::Client::new(); diff --git a/rita_common/src/payment_validator/mod.rs b/rita_common/src/payment_validator/mod.rs index 073028923..9cd89505d 100644 --- a/rita_common/src/payment_validator/mod.rs +++ b/rita_common/src/payment_validator/mod.rs @@ -14,20 +14,23 @@ use crate::rita_loop::get_web3_server; use crate::usage_tracker::update_payments; use crate::RitaCommonError; use crate::KI; +use althea_proto::althea::microtx::v1::MsgMicrotx; use althea_types::Denom; use althea_types::Identity; use althea_types::PaymentTx; +use althea_types::SystemChain; use clarity::Address; -use cosmos_sdk_proto_althea::cosmos::bank::v1beta1::MsgSend; +use cosmos_sdk_proto_althea::cosmos::tx::v1beta1::GetTxResponse; use cosmos_sdk_proto_althea::cosmos::tx::v1beta1::{TxBody, TxRaw}; +use crossbeam::queue::SegQueue; +use deep_space::client::type_urls::MSG_MICROTX_TYPE_URL; use deep_space::client::ChainStatus; use deep_space::utils::decode_any; use deep_space::Address as AltheaAddress; +use deep_space::Coin; use deep_space::Contact; -use futures::future::join; use futures::future::join_all; use num256::Uint256; -use num_traits::Num; use settings::get_rita_common; use settings::DEBT_KEEPER_DENOM; use settings::DEBT_KEEPER_DENOM_DECIMAL; @@ -38,7 +41,6 @@ use std::fmt::Write as _; use std::hash::Hash; use std::hash::Hasher; use std::sync::Arc; -use std::sync::RwLock; use std::time::{Duration, Instant}; use web30::client::Web3; use web30::types::TransactionResponse; @@ -65,32 +67,39 @@ const BLOCKS_TO_OLD: u32 = 1440; pub const ALTHEA_CHAIN_PREFIX: &str = "althea"; pub const ALTHEA_CONTACT_TIMEOUT: Duration = FAST_LOOP_TIMEOUT; +// This is a global queue of incoming transactions that need to be validated +// the u32 here is the net namespace used to allow multiple instances to be run +// in the same process without interfering with each other. The SegQueue is a +// lock free queue that allows us to push transactions on from the make_payment_v1 and v2 +// endpoints, potentially several in parallel and then pop them off in the tick_payment_validator lazy_static! { - static ref HISTORY: Arc>> = - Arc::new(RwLock::new(HashMap::new())); + static ref INCOMING_TRANSACTIONS: Arc> = Arc::new(SegQueue::new()); } -/// Gets Payment validator copy from the static ref, or default if no value has been set -pub fn get_payment_validator() -> PaymentValidator { +/// Adds an incoming transaction to the global incoming transaction queue abstracts netns handling +/// to ensure that multiple instances can run in the same process without interfering with each other +pub fn add_to_incoming_transaction_queue(tx: ToValidate) { let netns = KI.check_integration_test_netns(); - HISTORY - .read() - .unwrap() - .clone() - .get(&netns) - .cloned() - .unwrap_or_default() -} - -/// Gets a write ref for the payment validator lock, since this is a mutable reference -/// the lock will be held until you drop the return value, this lets the caller abstract the namespace handling -/// but still hold the lock in the local thread to prevent parallel modification -pub fn get_payment_validator_write_ref( - input: &mut HashMap, -) -> &mut PaymentValidator { - let netns = KI.check_integration_test_netns(); - input.entry(netns).or_default(); - input.get_mut(&netns).unwrap() + INCOMING_TRANSACTIONS.push((netns, tx)); +} + +/// Returns the global incoming transaction queue for the current netns, consuming +/// the queue while doing so. This abstracts netns handling to ensure that multiple instances +/// can run in the same process without interfering with each other +pub fn get_incoming_transaction_queue() -> Vec { + let our_netns = KI.check_integration_test_netns(); + // this is a hack, in order to avoid any locks at all we iterate + // over the entire queue and only take the items that are for our netns + // it helps that this overhead will only ever occur in the integration tests + let mut ret = Vec::new(); + while let Some((tx_netns, ts)) = INCOMING_TRANSACTIONS.pop() { + if tx_netns == our_netns { + ret.push(ts); + } else { + INCOMING_TRANSACTIONS.push((tx_netns, ts)); + } + } + ret } /// Details we pass into handle_tx_handling while validating a transaction @@ -103,21 +112,12 @@ pub struct TransactionDetails { pub denom: String, } -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -pub enum PaymentAddress { - Xdai(Address), - Althea(AltheaAddress), -} - #[derive(PartialEq, Eq, Clone, Debug)] pub struct ToValidate { /// details of the payment from the user in the format they where sent pub payment: PaymentTx, /// When we got this tx pub received: Instant, - /// if we have managed to talk to a full node about this - /// transaction ever - pub checked: bool, } // Ensure that duplicate txid are always treated as the same object @@ -138,59 +138,23 @@ impl fmt::Display for ToValidate { } } +/// This struct stores the state of the payment validator module and is used to keep track of all payments +/// that are in the process of being validated. It also stores all successful transactions that have been sent +/// or received by this router. #[derive(Clone)] pub struct PaymentValidator { unvalidated_transactions: HashSet, /// All successful transactions sent FROM this router, mapped To Address-> list of PaymentTx - successful_transactions_sent: HashMap>, - /// All successful txids this router has verified, used to check for duplicate payments + previously_sent_payments: HashMap>, + /// All successful txids TO this router that have been verified, used to check for duplicate payments successful_transactions: HashSet, } -// Setters and getters HISTORY lazy static -pub fn add_unvalidated_transaction(tx: ToValidate) { - let writer = &mut *HISTORY.write().unwrap(); - get_payment_validator_write_ref(writer) - .unvalidated_transactions - .insert(tx); -} - -pub fn remove_unvalidated_transaction(tx: ToValidate) -> bool { - let writer = &mut *HISTORY.write().unwrap(); - get_payment_validator_write_ref(writer) - .unvalidated_transactions - .remove(&tx) -} - -pub fn get_unvalidated_transactions() -> HashSet { - get_payment_validator().unvalidated_transactions -} - -pub fn get_successful_tx_sent() -> HashMap> { - get_payment_validator().successful_transactions_sent -} - -pub fn set_successful_tx_sent(v: HashMap>) { - let writer = &mut *HISTORY.write().unwrap(); - get_payment_validator_write_ref(writer).successful_transactions_sent = v; -} - -pub fn get_all_successful_tx() -> HashSet { - get_payment_validator().successful_transactions -} - -pub fn add_successful_tx(v: PaymentTx) { - let writer = &mut *HISTORY.write().unwrap(); - get_payment_validator_write_ref(writer) - .successful_transactions - .insert(v); -} - impl PaymentValidator { pub fn new() -> Self { PaymentValidator { unvalidated_transactions: HashSet::new(), - successful_transactions_sent: HashMap::new(), + previously_sent_payments: HashMap::new(), successful_transactions: HashSet::new(), } } @@ -202,215 +166,493 @@ impl Default for PaymentValidator { } } -/// This stores payments of all tx that we sent to different nodes. -pub fn store_payment(pmt: PaymentTx) { - let mut data = get_successful_tx_sent(); - let neighbor = pmt.to; +impl PaymentValidator { + /// Performs a sanity check of the Payment validator struct + /// this checks that we do not have duplicate data anywhere in the struct + /// returns true if the struct contains no duplicate data, false otherwise + fn is_consistent(&self) -> bool { + let mut txids = HashSet::new(); + for tx in self.unvalidated_transactions.iter() { + if txids.contains(&tx.payment.txid) { + return false; + } + txids.insert(tx.payment.txid); + } - if let Some(e) = data.get_mut(&neighbor) { - e.insert(pmt); - } else { - let mut set = HashSet::new(); - set.insert(pmt); - data.insert(neighbor, set); - } + for (_, txs) in self.previously_sent_payments.iter() { + for tx in txs.iter() { + if txids.contains(&tx.txid) { + return false; + } + txids.insert(tx.txid); + } + } - set_successful_tx_sent(data); -} + for tx in self.successful_transactions.iter() { + if txids.contains(&tx.txid) { + return false; + } + txids.insert(tx.txid); + } -/// Given an id, get all payments made to that id -pub fn get_payment_txids(id: Identity) -> HashSet { - let data: HashSet = HashSet::new(); - get_payment_validator() - .successful_transactions_sent - .get(&id) - .unwrap_or(&data) - .clone() -} + true + } -/// Function to compute the total amount of all unverified payments -/// Input: takes in an identity which represents the router we are -/// going to exclude from the total amount of all unverified payments. -pub fn calculate_unverified_payments(router: Identity) -> Uint256 { - let payments_to_process = get_unvalidated_transactions(); - let mut total_unverified_payment: Uint256 = Uint256::from(0u32); - for iterate in payments_to_process.iter() { - if iterate.payment.from == router && iterate.payment.to != router { - total_unverified_payment += iterate.payment.amount; + /// Removes a transaction from the pending validation queue, it may either + /// have been discovered to be invalid or have been successfully accepted + fn remove(&mut self, tx: ToValidate, success: bool) { + let was_present = self.unvalidated_transactions.remove(&tx); + // store successful transactions so that they can't be played back to us, at least + // during this session + if success { + self.successful_transactions.insert(tx.payment); + } + if was_present { + info!("Transaction {} {} was removed", tx, success); + } else { + warn!("Transaction {} was double removed", tx); + // in a development env we want to draw attention to this case + if cfg!(feature = "development") || cfg!(feature = "integration_test") { + panic!("Transaction double removed!"); + } } } - total_unverified_payment -} -/// Checks if we already have a given txid in our to_validate list -/// true if we have it false if we do not -fn check_for_unvalidated_tx(ts: &ToValidate, payment_validator: &mut PaymentValidator) -> bool { - for tx in &payment_validator.unvalidated_transactions { - if tx.payment.txid == ts.payment.txid { - return true; + /// Checks if we already have a given txid in our to_validate list + /// true if we have it false if we do not + fn check_for_unvalidated_tx(&self, ts: &ToValidate) -> bool { + for tx in self.unvalidated_transactions.iter() { + if tx.payment.txid == ts.payment.txid { + return true; + } } + false } - false -} -/// Message to insert transactions into payment validator, once inserted they will remain -/// until they are validated, dropped for validity issues, or time out without being inserted -/// into the blockchain. Transactions that are too old are prevented from being played back -/// by using a history of successful transactions. -/// This endpoint specifically (and only this one) is fully idempotent so that we can retry -/// txid transmissions -pub fn validate_later(ts: ToValidate) -> Result<(), RitaCommonError> { - // We hold the lock to prevent race condition between make_payment_v1 and make_payment_v2 - let successful_txs = get_all_successful_tx(); - let lock = &mut *HISTORY.write().unwrap(); - let payment_validator = get_payment_validator_write_ref(lock); - if !successful_txs.contains(&ts.payment) && !check_for_unvalidated_tx(&ts, payment_validator) { - // insert is safe to run multiple times just so long as we check successful tx's for duplicates - payment_validator.unvalidated_transactions.insert(ts); - Ok(()) - } else { - Err(RitaCommonError::DuplicatePayment) + /// Message to insert transactions into payment validator, once inserted they will remain + /// until they are validated, dropped for validity issues, or time out without being inserted + /// into the blockchain. Transactions that are too old are prevented from being played back + /// by using a history of successful transactions. + /// This endpoint specifically (and only this one) is fully idempotent so that we can retry + /// txid transmissions + fn add_to_validation_queue(&mut self, ts: ToValidate) -> Result<(), RitaCommonError> { + if !self.successful_transactions.contains(&ts.payment) + && !self.check_for_unvalidated_tx(&ts) + { + // insert is safe to run multiple times just so long as we check successful tx's for duplicates + self.unvalidated_transactions.insert(ts); + Ok(()) + } else { + Err(RitaCommonError::DuplicatePayment) + } } -} -#[derive(Clone)] -struct Remove { - tx: ToValidate, - success: bool, -} + /// Iterates the payment validator state, checking transactions for validity. If a transaction to this router + /// is found to be valid it is removed from the unvalidated_transactions list and the debt keeper is updated + /// if a transaction from this router is found to be valid it is removed from the unvalidated_transactions list + /// if a transaction from this router is found to be invalid it is removed from the unvalidated_transactions list + /// and a retry is scheduled with payment_sender + pub async fn tick_payment_validator( + &mut self, + // outgoing payments coming in from payment_controller + outgoing_payments: Vec, + chain: SystemChain, + ) -> HashMap> { + // we panic on a failed receive so it should always be longer than the minimum + // time we expect payments to take to enter the blockchain (the send timeout) + assert!(PAYMENT_RECEIVE_TIMEOUT > PAYMENT_SEND_TIMEOUT); + if !self.is_consistent() { + warn!("Inconsistent payment validator!"); + // in a development env we want to draw attention to this case + if cfg!(feature = "development") || cfg!(feature = "integration_test") { + panic!("Inconsistent payment validator!"); + } + } -/// Removes a transaction from the pending validation queue, it may either -/// have been discovered to be invalid or have been successfully accepted -fn remove(msg: Remove) { - // Try removing both check and uncheked versions of txs - let mut msg_checked = msg.clone(); - msg_checked.tx.checked = true; - let was_present = remove_unvalidated_transaction(msg.tx.clone()) - | remove_unvalidated_transaction(msg_checked.tx); - // store successful transactions so that they can't be played back to us, at least - // during this session - if msg.success { - add_successful_tx(msg.tx.payment); - } - if was_present { - info!("Transaction {} was removed", msg.tx); - } else { - warn!("Transaction {} was double removed", msg.tx); - // in a development env we want to draw attention to this case - #[cfg(feature = "development")] - panic!("Transaction double removed!"); + let mut payments = Vec::new(); + payments.extend(outgoing_payments.into_iter()); + payments.extend(get_incoming_transaction_queue().into_iter()); + for pmt in payments { + let _ = self.add_to_validation_queue(pmt); + } + + let our_address = settings::get_rita_common().payment.eth_address.unwrap(); + let mut to_delete = Vec::new(); + + info!( + "Attempting to validate {} transactions {}", + self.unvalidated_transactions.len(), + print_txids(&self.unvalidated_transactions) + ); + + // Payment validation logic, broken up into three parts first we handle the basic timeouts + // for sending and recieving payments, this can be easily handles entierly within this scope + // then we handle the more complex validation logic which requires a network request by creating + // a future for each transaction and then executing them in parallel. + let mut futs = Vec::new(); + for item in self.unvalidated_transactions.iter() { + let elapsed = Instant::now().checked_duration_since(item.received); + let from_us = item.payment.from.eth_address == our_address; + + if elapsed.is_some() && elapsed.unwrap() > PAYMENT_RECEIVE_TIMEOUT { + error!( + "Incoming transaction {} has timed out, payment failed!", + format!("{:#066x}", item.payment.txid) + ); + + to_delete.push((item.clone(), false)); + } + // no penalties for failure here, we expect to overpay one out of every few hundred + // transactions + else if elapsed.is_some() && from_us && elapsed.unwrap() > PAYMENT_SEND_TIMEOUT { + error!( + "Outgoing transaction {:#066x} has timed out, payment failed!", + item.payment.txid + ); + to_delete.push((item.clone(), false)); + } else { + // we take all these futures and put them onto an array that we will execute + // in parallel, this is essential on the exit where in the worst case scenario + // we could have a thousand or more payments in the queue + let fut = validate_transaction(item.clone(), chain); + futs.push(fut); + } + } + + // Run all parallel validation tasks, there may be many hundreds of tx here + // becuase make_payments_v2 plays back the entire payment history of a node + // in order to resync. This is batched to avoid issues with making too many + // requests at once + const VALIDATE_BATCH_SIZE: usize = 10; + let mut validation_results = Vec::new(); + let mut buf = Vec::new(); + for f in futs.into_iter() { + if buf.len() < VALIDATE_BATCH_SIZE { + buf.push(f) + } else { + // execute all of the above verification operations in parallel + validation_results.extend(join_all(buf).await); + buf = Vec::new(); + } + } + // check the last leftover futures in the array + validation_results.extend(join_all(buf).await); + + // take all validation results and add them to the to_delete list from the + // timeout checking, so that we can process everything in one go + for (tx, success) in validation_results.into_iter().flatten() { + // transactions that have finished being procssed return a Some() + // value and are removed from the queue. + to_delete.push((tx, success)); + } + + // This is the final stage of payment validation, we remove all transactions + // that have been processed from the unvalidated_transactions list and update + // the list of previously sent payments list so that we can play back + // payments we have sent in the future to resync nodes + // Messaging to debt keeper and usage tracker is done within the validate + // functions themselves + for (tx, success) in to_delete.iter() { + let from_us = tx.payment.from.eth_address == our_address; + if from_us && *success { + let txs = self + .previously_sent_payments + .entry(tx.payment.to) + .or_default(); + txs.insert(tx.payment); + } + + self.remove(tx.clone(), *success) + } + + // we return our list of sent payments this is passed to payment_controller + // so that it can be played back to other nodes as part of make_payment_v2 + // where we replay payment history to resync with other nodes and form a sort + // of distributed memory of payments that survives reboots + self.previously_sent_payments.clone() } } -/// Marks a transaction as 'checked' in that we have talked to a full node about it -/// if we fail to talk to a full node about a transaction for the full duration of -/// the timeout we attempt to restart our node. -fn checked(msg: ToValidate) { - if remove_unvalidated_transaction(msg.clone()) { - let mut checked_tx = msg; - checked_tx.checked = true; - info!("We successfully checked tx {:?}", checked_tx); - add_unvalidated_transaction(checked_tx); - } else { - error!("Tried to mark a tx {:?} we don't have as checked!", msg); - #[cfg(feature = "development")] - panic!("Tried to mark a tx {:?} we don't have as checked!", msg); +/// This wrapper function handles validating a transaction on either Althea or Xdai based on the system chain +async fn validate_transaction(ts: ToValidate, chain: SystemChain) -> Option<(ToValidate, bool)> { + match chain { + SystemChain::Althea => handle_althea_tx_checking(ts.clone()).await, + SystemChain::Xdai | SystemChain::Ethereum | SystemChain::Rinkeby => { + handle_xdai_tx_checking(ts.clone()).await + } } } -pub async fn validate() { - // we panic on a failed receive so it should always be longer than the minimum - // time we expect payments to take to enter the blockchain (the send timeout) - assert!(PAYMENT_RECEIVE_TIMEOUT > PAYMENT_SEND_TIMEOUT); - - let our_address = settings::get_rita_common().payment.eth_address.unwrap(); - let mut to_delete = Vec::new(); - - let unvalidated_transactions = get_unvalidated_transactions(); - info!( - "Attempting to validate {} transactions {}", - unvalidated_transactions.len(), - print_txids(&unvalidated_transactions) - ); +async fn handle_althea_tx_checking(ts: ToValidate) -> Option<(ToValidate, bool)> { + let cosmos_node_grpc = get_rita_common().payment.althea_grpc_list[0].clone(); + let althea_contact = Contact::new( + &cosmos_node_grpc, + ALTHEA_CONTACT_TIMEOUT, + ALTHEA_CHAIN_PREFIX, + ) + .unwrap(); + // convert to hex string + let txhash = ts.payment.txid.to_str_radix(16); - let mut futs = Vec::new(); - for item in unvalidated_transactions { - let elapsed = Instant::now().checked_duration_since(item.received); - let from_us = item.payment.from.eth_address == our_address; + let althea_status = althea_contact.get_chain_status().await; + let althea_transaction = althea_contact.get_tx_by_hash(txhash.clone()).await; - if elapsed.is_some() && elapsed.unwrap() > PAYMENT_RECEIVE_TIMEOUT { + match (althea_transaction, althea_status) { + (Ok(transaction), Ok(ChainStatus::Moving { block_height })) => { + let txs = decode_althea_microtx(transaction); + handle_tx_messaging_althea(txs, ts.clone(), block_height) + } + (Ok(_), Ok(status)) => { error!( - "Incoming transaction {} has timed out, payment failed!", - format!("{:#066x}", item.payment.txid) + "Failed to check transaction due to chain status! {:?} {:?}", + txhash, status ); + if cfg!(feature = "development") || cfg!(feature = "integration_test") { + panic!( + "Failed to check transaction due to chain status! {:?} {:?}", + txhash, status + ); + } + None + } + _ => { + trace!("Failed to check transaction {:?}", txhash); + None + } + } +} - // if we fail to so much as get a block height for the full duration of a payment timeout, we have problems and probably we are not counting payments correctly potentially leading to wallet - // drain and other bad outcomes. So we should restart with the hope that the system will be restored to a working state by this last resort action - if !item.checked { - let msg = format!("We failed to check txid {:#066x} against full nodes for the full duration of it's timeout period, please check full nodes", item.payment.txid); - error!("{}", msg); - - let sys = actix_async::System::current(); - sys.stop_with_code(121); - // this satisfies the borrow checker to let us drop history so that if we - // fail to get the current actix system we don't poison the lock fatally - return; +/// This function is used to validate transactions both incoming and outgoing, it must reject any payment +/// that is not correct and returns the payment and a boolean indicating if it was successful, if we do not +/// yet know if the payment was successful we return None +/// This function must handle the unique case of multiple message MicroTx being part of a single message +/// in this case only the first will be checked and the rest will be ignored. We could try to handle this +/// more gracefully but the only reason this would happen is in some sort of attack scenario otherwise +/// there's no reason not to aggregate the payments into a single message anyway +fn handle_tx_messaging_althea( + transactions: Vec, + ts: ToValidate, + _current_block: u64, +) -> Option<(ToValidate, bool)> { + if transactions.is_empty() { + error!("Microtx payment with no transactions!"); + if cfg!(feature = "development") || cfg!(feature = "integration_test") { + panic!("Microtx payment with no transactions!"); + } + return Some((ts, false)); + } + let transaction = transactions[0].clone(); + + let amount: Coin = if let Some(amount) = transaction.amount { + amount.into() + } else { + error!("Transaction with no amount!"); + if cfg!(feature = "development") || cfg!(feature = "integration_test") { + panic!("Transaction with no amount!"); + } + return Some((ts, false)); + }; + + let reciver_address: AltheaAddress = match transaction.receiver.parse() { + Ok(a) => a, + Err(e) => { + error!("Invalid receiver address! {}", e); + if cfg!(feature = "development") || cfg!(feature = "integration_test") { + panic!("Invalid reciever address!"); } + return Some((ts, false)); + } + }; - to_delete.push(item.clone()); + let sender_address: AltheaAddress = match transaction.sender.parse() { + Ok(a) => a, + Err(e) => { + error!("Invalid sender address! {}", e); + if cfg!(feature = "development") || cfg!(feature = "integration_test") { + panic!("Invalid sender address!"); + } + return Some((ts, false)); } - // no penalties for failure here, we expect to overpay one out of every few hundred - // transactions - else if elapsed.is_some() && from_us && elapsed.unwrap() > PAYMENT_SEND_TIMEOUT { - error!( - "Outgoing transaction {:#066x} has timed out, payment failed!", - item.payment.txid - ); - to_delete.push(item.clone()); - } else { - // we take all these futures and put them onto an array that we will execute - // in parallel, this is essential on the exit where in the worst case scenario - // we could have a thousand or more payments in the queue - let fut = validate_transaction(item); - futs.push(fut); + }; + + // Verify that denom is valid + let mut denom: Option = None; + for d in get_rita_common() + .payment + .accepted_denoms + .unwrap_or_default() + { + if amount.denom == d.1.denom { + denom = Some(d.1); } } + if denom.is_none() { + error!( + "Invalid Denom! We do not currently support {}!", + amount.denom + ); + return Some((ts, false)); + } - /// This is the number of tx we validate in a single join operation - /// doing too many at once can cause system problems by opening many connections - /// and spamming full nodes. - const VALIDATE_BATCH_SIZE: usize = 10; - let mut buf = Vec::new(); - for f in futs.into_iter() { - if buf.len() < VALIDATE_BATCH_SIZE { - buf.push(f) - } else { - // execute all of the above verification operations in parallel - join_all(buf).await; - buf = Vec::new(); - } + let our_id = settings::get_rita_common().get_identity().unwrap(); + let our_address_althea = our_id.get_althea_address(); + + // notice we get these values from the blockchain using 'transaction' not ts which may be a lie since we don't + // actually cryptographically validate the txhash locally. Instead we just compare the value we get from the full + // node + let to_us = reciver_address == our_address_althea; + let from_us = sender_address == our_address_althea; + let value_correct = amount.amount == ts.payment.amount; + + if !value_correct { + error!("Transaction with invalid amount!"); + return Some((ts, false)); } - // check the last leftover futures in the array - join_all(buf).await; - for item in to_delete.iter() { - remove_unvalidated_transaction(item.clone()); + match (to_us, from_us) { + // we were successfully paid + (true, false) => { + info!( + "payment {:#066x} from {} for {} {} successfully validated!", + ts.payment.txid, + reciver_address, + amount, + denom.clone().expect("Already verified existance").denom + ); + + // update debt keeper with the details of this payment + let _ = payment_received( + ts.payment.from, + ts.payment.amount, + denom.expect("How did this happen when we already verified existence"), + ); + // update the usage tracker with the details of this payment + update_payments(ts.payment); + Some((ts, false)) + } + // we successfully paid someone + (false, true) => { + info!( + "payment {:#066x} from {} for {} {} successfully sent!", + ts.payment.txid, + reciver_address, + amount, + denom.clone().expect("Already verified existance").denom + ); + + // update debt keeper with the details of this payment + payment_succeeded( + ts.payment.to, + ts.payment.amount, + denom.expect("How did this happen when we already verified existence"), + ) + .unwrap(); + // update the usage tracker with the details of this payment + update_payments(ts.payment); + + Some((ts, true)) + } + (true, true) => { + error!("Transaction to ourselves!"); + Some((ts, false)) + } + (false, false) => { + error!("Transaction has nothing to do with us?"); + Some((ts, true)) + } } } -/// Attempt to validate that a given transaction has been accepted into the blockchain and -/// is at least some configurable number of blocks behind the head. -pub async fn validate_transaction(ts: ToValidate) { - trace!("validating transaction"); - // check both in parallel since we don't know what chain this is on - join( - handle_althea_tx_checking(ts.clone()), - handle_xdai_tx_checking(ts), - ) - .await; +/// Handles decoding of an Althea MicroTx type from the transaction response query +/// since CommosSdk allows for multiple messages in a single transaction, we need to +/// handle that possibility. Any messages that are not of type MsgMicroTx are ignored +fn decode_althea_microtx(response: GetTxResponse) -> Vec { + if let Some(tx_resp) = response.tx_response { + let tx = match tx_resp.tx { + Some(a) => a.value, + None => { + // this exists to handle a pointer in go, it should never happen + // unless the server has a go error where this value is nil on return + error!("Althea chain tx {:?} has no tx field?", tx_resp); + if cfg!(feature = "development") + || cfg!(feature = "integration_test") + || cfg!(feature = "test") + { + panic!("Althea chain tx {:?} has no tx field?", tx_resp); + } + return Vec::new(); + } + }; + + // Decode TxRaw + let raw_tx_any = prost_types::Any { + type_url: "/cosmos.tx.v1beta1.Tx".to_string(), + value: tx, + }; + let tx_raw: TxRaw = match decode_any(raw_tx_any) { + Ok(a) => a, + Err(e) => { + error!("Unable to decode raw_tx with {}", e); + if cfg!(feature = "development") + || cfg!(feature = "integration_test") + || cfg!(feature = "test") + { + panic!("Unable to decode raw_tx with {}", e); + } + return Vec::new(); + } + }; + + // Decode TxBody + let body_any = prost_types::Any { + type_url: "/cosmos.tx.v1beta1.TxBody".to_string(), + value: tx_raw.body_bytes, + }; + let tx_body: TxBody = match decode_any(body_any) { + Ok(a) => a, + Err(e) => { + error!("Unable to decode body_any with {}", e); + if cfg!(feature = "development") + || cfg!(feature = "integration_test") + || cfg!(feature = "test") + { + panic!("Unable to decode body_any with {}", e); + } + return Vec::new(); + } + }; + + let mut ret = Vec::new(); + + // Decode MsgMicroTx and send each one to validator + for message in tx_body.messages { + let msg_send = prost_types::Any { + type_url: MSG_MICROTX_TYPE_URL.to_owned(), + value: message.value.clone(), + }; + let msg_send: Result = decode_any(msg_send); + if let Ok(msg) = msg_send { + ret.push(msg); + } + } + ret + } else { + error!("Althea chain tx {:?} has no tx_response field?", response); + if cfg!(feature = "development") + || cfg!(feature = "integration_test") + || cfg!(feature = "test") + { + panic!("Althea chain tx {:?} has no tx_response field?", response); + } + Vec::new() + } } -async fn handle_xdai_tx_checking(ts: ToValidate) { +/// This function validates transactions on the xDai chain, making a series of requests +/// and then checking the results to determine if the transaction is valid. If the transaction +/// is valid or invalid Some(true) or Some(false) respectively is returned. If the transaction +/// is still pending None is returned. +async fn handle_xdai_tx_checking(ts: ToValidate) -> Option<(ToValidate, bool)> { let full_node = get_web3_server(); let web3 = Web3::new(&full_node, TRANSACTION_VERIFICATION_TIMEOUT); @@ -420,27 +662,12 @@ async fn handle_xdai_tx_checking(ts: ToValidate) { let eth_transaction = web3.eth_get_transaction_by_hash(txid).await; match (eth_transaction, eth_block_num) { (Ok(Some(transaction)), Ok(block_num)) => { - if !ts.checked { - checked(ts.clone()); - } - handle_tx_messaging_xdai(ts.payment.txid, transaction, ts.clone(), block_num); + handle_tx_messaging_xdai(ts.payment.txid, transaction, ts.clone(), block_num) } - (Ok(None), _) => { - // we have a response back from the full node that this tx is not in the mempool this - // satisfies our checked requirement - if !ts.checked { - checked(ts.clone()); - } + (_, _) => { + trace!("Failed to check transaction {:#066x}", txid); + None } - (Err(_), Ok(_)) => { - // we get an error from the full node but a successful block request, clearly we can contact - // the full node so the transaction check has been attempted - if !ts.checked { - checked(ts.clone()); - } - } - (Ok(Some(_)), Err(_)) => trace!("Failed to check transaction {:#066x}", txid), - (Err(_), Err(_)) => trace!("Failed to check transaction {:#066x}", txid), } } @@ -472,147 +699,15 @@ fn get_xdai_transaction_details( } } -async fn handle_althea_tx_checking(ts: ToValidate) { - let cosmos_node_grpc = get_rita_common().payment.althea_grpc_list[0].clone(); - let althea_contact = Contact::new( - &cosmos_node_grpc, - ALTHEA_CONTACT_TIMEOUT, - ALTHEA_CHAIN_PREFIX, - ) - .unwrap(); - // convert to hex string - let txhash = ts.payment.txid.to_str_radix(16); - - let althea_transaction = althea_contact.get_tx_by_hash(txhash.clone()).await; - let althea_chain_status = althea_contact.get_chain_status().await; - - match (althea_transaction, althea_chain_status) { - (Ok(transaction), Ok(chain_status)) => { - if let Some(tx_resp) = transaction.tx_response { - if !ts.checked { - checked(ts.clone()); - } - - let tx = match tx_resp.tx { - Some(a) => a.value, - None => { - error!("Althea chain tx {:?} has no tx field?", tx_resp); - return; - } - }; - - if let ChainStatus::Moving { block_height: _ } = chain_status { - // Decode TxRaw - let raw_tx_any = prost_types::Any { - type_url: "/cosmos.tx.v1beta1.Tx".to_string(), - value: tx, - }; - let tx_raw: TxRaw = match decode_any(raw_tx_any) { - Ok(a) => a, - Err(e) => { - error!("Unable to decode raw_tx with {}", e); - return; - } - }; - - // Decode TxBody - let body_any = prost_types::Any { - type_url: "/cosmos.tx.v1beta1.TxBody".to_string(), - value: tx_raw.body_bytes, - }; - let tx_body: TxBody = match decode_any(body_any) { - Ok(a) => a, - Err(e) => { - error!("Unable to decode body_any with {}", e); - return; - } - }; - - // Decode MsgSend and send each one to validator - for message in tx_body.messages { - let msg_send = prost_types::Any { - type_url: "/cosmos.bank.v1beta1.MsgSend".to_string(), - value: message.value.clone(), - }; - let msg_send: Result = decode_any(msg_send); - if let Ok(msg) = msg_send { - for coin_tx in msg.amount { - let transaction_details = TransactionDetails { - to: match msg.to_address.parse() { - Ok(a) => a, - Err(e) => { - error!( - "Unable to parse send address {}f for tx {} with {}", - msg.to_address, - ts.clone(), - e - ); - continue; - } - }, - from: match msg.from_address.parse() { - Ok(a) => a, - Err(e) => { - error!( - "Unable to parse send address {} for tx {} with {}", - msg.to_address, - ts.clone(), - e - ); - continue; - } - }, - amount: match Uint256::from_str_radix(&coin_tx.amount, 10) { - Ok(a) => a, - Err(e) => { - error!( - "Unable to parse amount : {:?} for tx {:?} with {}", - coin_tx.amount, - ts.clone(), - e - ); - continue; - } - }, - denom: coin_tx.denom, - }; - handle_tx_messaging_althea(transaction_details, ts.clone()); - } - } - } - } else { - error!( - "Unable to check transaction id {} because of chain status {:?}", - txhash.clone(), - chain_status - ) - } - } - } - (Ok(transaction), _) => { - // we have a response back from the full node that this tx is not in the mempool this - // satisfies our checked requirement - if transaction.tx_response.is_some() && !ts.checked { - checked(ts.clone()); - } - } - (Err(_), Ok(_)) => { - // we get an error from the full node but a successful block request, clearly we can contact - // the full node so the transaction check has been attempted - if !ts.checked { - checked(ts.clone()); - } - } - _ => trace!("Failed to check transaction {:?}", txhash), - } -} - +/// This function is used to validate transactions both incoming and outgoing, it must reject any payment +/// that is not correct and returns the payment and a boolean indicating if it was successful, if we do not +/// yet know if the payment was successful we return None fn handle_tx_messaging_xdai( txid: Uint256, transaction: TransactionResponse, ts: ToValidate, current_block: Uint256, -) { +) -> Option<(ToValidate, bool)> { let from_address = ts.payment.from.eth_address; let amount = ts.payment.amount; let pmt = ts.payment; @@ -627,11 +722,7 @@ fn handle_tx_messaging_xdai( Some(val) => val, None => { error!("Invalid TX! No destination!"); - remove(Remove { - tx: ts, - success: false, - }); - return; + return Some((ts, false)); } }; @@ -641,35 +732,22 @@ fn handle_tx_messaging_xdai( let to_us = to == our_address; let from_us = tx_from == our_address; let value_correct = tx_value == amount; - let is_in_chain = payment_in_chain(current_block, tx_block_number); + let is_in_chain = payment_in_chain_xdai(current_block, tx_block_number); let is_old = payment_is_old(current_block, tx_block_number); if !value_correct { error!("Transaction with invalid amount!"); - remove(Remove { - tx: ts, - success: false, - }); - return; + return Some((ts, false)); } if is_old { error!("Transaction is more than 6 hours old! {:#066x}", txid); - remove(Remove { - tx: ts, - success: false, - }); - return; + return Some((ts, false)); } match (to_us, from_us, is_in_chain) { // we were successfully paid (true, false, true) => { - // remove this transaction from our storage - remove(Remove { - tx: ts, - success: true, - }); info!( "payment {:#066x} from {} for {} wei successfully validated!", txid, from_address, amount @@ -686,6 +764,8 @@ fn handle_tx_messaging_xdai( // update the usage tracker with the details of this payment update_payments(pmt); + + Some((ts, true)) } // we successfully paid someone (false, true, true) => { @@ -693,11 +773,6 @@ fn handle_tx_messaging_xdai( "payment {:#066x} from {} for {} wei successfully sent!", txid, from_address, amount ); - // remove this transaction from our storage - remove(Remove { - tx: ts, - success: true, - }); // update debt keeper with the details of this payment let _ = payment_succeeded( pmt.to, @@ -711,157 +786,26 @@ fn handle_tx_messaging_xdai( // update the usage tracker with the details of this payment update_payments(pmt); - // Store this payment as a receipt to send in the future if this receiver doesnt see the payment - store_payment(pmt); + Some((ts, true)) } (true, true, _) => { error!("Transaction to ourselves!"); - remove(Remove { - tx: ts, - success: false, - }); + Some((ts, false)) } (false, false, _) => { error!("Transaction has nothing to do with us?"); - remove(Remove { - tx: ts, - success: false, - }); + Some((ts, false)) } (_, _, false) => { - //transaction waiting for validation, do nothing - } - } -} - -/// Handles the tx response from the full node and it's various cases -/// pulled out of validate_transaction purely for cosmetic reasons -fn handle_tx_messaging_althea(transaction: TransactionDetails, ts: ToValidate) { - let pmt = ts.payment; - - // txid is for eth chain and txhash is for althea chain, only one of these should be - // Some(..). This was verified before - let txid = ts.payment.txid; - - // Verify that denom is valid - let mut denom: Option = None; - for d in get_rita_common() - .payment - .accepted_denoms - .unwrap_or_default() - { - if transaction.denom == d.1.denom { - denom = Some(d.1); - } - } - if denom.is_none() { - error!( - "Invalid Denom! We do not currently support {}!", - transaction.denom - ); - remove(Remove { - tx: ts, - success: false, - }); - return; - } - - let amount = ts.payment.amount; - - let our_id = settings::get_rita_common().get_identity().unwrap(); - let our_address_althea = our_id.get_althea_address(); - - let to = transaction.to; - let from = transaction.from; - - // notice we get these values from the blockchain using 'transaction' not ts which may be a lie since we don't - // actually cryptographically validate the txhash locally. Instead we just compare the value we get from the full - // node - let to_us = to == our_address_althea; - let from_us = from == our_address_althea; - let value_correct = transaction.amount == amount; - - if !value_correct { - error!("Transaction with invalid amount!"); - remove(Remove { - tx: ts, - success: false, - }); - return; - } - - match (to_us, from_us) { - // we were successfully paid - (true, false) => { - // remove this transaction from our storage - remove(Remove { - tx: ts, - success: true, - }); - info!( - "payment {:#066x} from {} for {} {} successfully validated!", - txid, - from, - amount, - denom.clone().expect("Already verified existance").denom - ); - - // update debt keeper with the details of this payment - let _ = payment_received( - pmt.from, - pmt.amount, - denom.expect("How did this happen when we already verified existence"), - ); - // update the usage tracker with the details of this payment - update_payments(pmt); - } - // we successfully paid someone - (false, true) => { - info!( - "payment {:#066x} from {} for {} {} successfully sent!", - txid, - from, - amount, - denom.clone().expect("Already verified existance").denom - ); - - // remove this transaction from our storage - remove(Remove { - tx: ts, - success: true, - }); - - // update debt keeper with the details of this payment - let _ = payment_succeeded( - pmt.to, - pmt.amount, - denom.expect("How did this happen when we already verified existence"), - ); - // update the usage tracker with the details of this payment - update_payments(pmt); - - // Store this payment as a receipt to send in the future if this receiver doesnt see the payment - store_payment(pmt); - } - (true, true) => { - error!("Transaction to ourselves!"); - remove(Remove { - tx: ts, - success: false, - }); - } - (false, false) => { - error!("Transaction has nothing to do with us?"); - remove(Remove { - tx: ts, - success: false, - }); + //transaction waiting for validation, do nothingi + None } } } /// Determine if a given payment satisfies our criteria for being in the blockchain -fn payment_in_chain(chain_height: Uint256, tx_height: Option) -> bool { +/// this is not required or valid for althea L1 as payments there have instant finality +fn payment_in_chain_xdai(chain_height: Uint256, tx_height: Option) -> bool { match tx_height { Some(tx_block) => { // somehow the block is newer than our block height request, wait until later @@ -900,13 +844,9 @@ fn print_txids(list: &HashSet) -> String { #[cfg(test)] mod tests { - use actix_async::System; - use cosmos_sdk_proto_althea::cosmos::bank::v1beta1::MsgSend; - use deep_space::utils::decode_any; - - use crate::usage_tracker::tests::test::random_identity; - use super::*; + use crate::usage_tracker::tests::test::random_identity; + use actix_async::System; fn generate_fake_payment() -> ToValidate { let amount: u128 = rand::random(); @@ -920,81 +860,71 @@ mod tests { ToValidate { payment: tx, received: Instant::now(), - checked: false, } } #[test] /// Attempts to insert a duplicate tx into the to_validate list fn test_duplicate_tx() { - // check that we can't put duplicates in to_validate + // check that we can't put duplicates in add_to_validation_queue + let mut validator = PaymentValidator::new(); let payment = generate_fake_payment(); - assert!(validate_later(payment.clone()).is_ok()); - assert!(validate_later(payment).is_err()); + assert!(validator.add_to_validation_queue(payment.clone()).is_ok()); + assert!(validator.add_to_validation_queue(payment).is_err()); // check that we can't put dupliates in that we have already validated let payment = generate_fake_payment(); - add_successful_tx(payment.clone().payment); - assert!(validate_later(payment).is_err()); + validator.successful_transactions.insert(payment.payment); + assert!(validator.add_to_validation_queue(payment).is_err()); } + // ensures that payment validator crashes when presented with an invalid state #[test] - fn test_payment_txid_datastore() { - let client_id = Identity { - mesh_ip: "fd00::1".parse().unwrap(), - eth_address: "0xE39bDB2e345ACf7B0C7B1A28dFA26288C3094A6A" - .parse() - .unwrap(), - wg_public_key: "NZnbEv9w5lC3JG3hacwh5cq8C5NnsAUJLrNKYL91fS0=" - .parse() - .unwrap(), - nickname: None, - }; - - let exit_id = Identity { - mesh_ip: "fd00::1337".parse().unwrap(), - eth_address: "0xE39bDB2e345ACf7B0C7B1A28dFA26288C3094A6A" - .parse() - .unwrap(), - wg_public_key: "PiMD6fCsgyNKwz9AVqP/GRT3+o6h6e9Y0KPEdFct/yw=" - .parse() - .unwrap(), - nickname: None, - }; - - let mut sent_hashset = HashSet::new(); - - let pmt1 = PaymentTx { - to: exit_id, - from: client_id, - amount: 10u8.into(), - txid: 1u8.into(), - }; + fn test_invalid_payment_validator_state() { + // duplicate between unvalidated and previously sent + let mut validator = PaymentValidator::new(); + let payment = generate_fake_payment(); + let mut set = HashSet::new(); + set.insert(payment.clone().payment); + validator.unvalidated_transactions.insert(payment.clone()); + validator + .previously_sent_payments + .insert(payment.payment.to, set); - store_payment(pmt1); - sent_hashset.insert(pmt1); - assert_eq!(get_payment_txids(pmt1.to), sent_hashset); + assert!(!validator.is_consistent()); - let pmt2 = PaymentTx { - to: exit_id, - from: client_id, - amount: 100u8.into(), - txid: 2u8.into(), - }; - store_payment(pmt2); + // duplicate between unvalidated and successful + let mut validator = PaymentValidator::new(); + let payment = generate_fake_payment(); + validator.unvalidated_transactions.insert(payment.clone()); + validator.successful_transactions.insert(payment.payment); - sent_hashset.insert(pmt2); - assert_eq!(get_payment_txids(pmt2.to), sent_hashset); + assert!(!validator.is_consistent()); - let pmt3 = PaymentTx { - to: exit_id, - from: client_id, - amount: 100u8.into(), - txid: 2u8.into(), - }; + // duplicate between sent and recieved + let mut validator = PaymentValidator::new(); + let payment = generate_fake_payment(); + let mut set = HashSet::new(); + set.insert(payment.clone().payment); + validator.successful_transactions.insert(payment.payment); + validator + .previously_sent_payments + .insert(payment.payment.to, set); - store_payment(pmt3); + assert!(!validator.is_consistent()); - assert_eq!(get_payment_txids(pmt3.to), sent_hashset); + // Consistent with 3 different payments, happy path + let mut validator = PaymentValidator::new(); + let mut set = HashSet::new(); + set.insert(generate_fake_payment().payment); + validator.unvalidated_transactions.insert(payment.clone()); + validator + .successful_transactions + .insert(generate_fake_payment().payment); + validator + .previously_sent_payments + .insert(payment.payment.to, set); + + assert!(validator.is_consistent()); } #[ignore] @@ -1002,8 +932,12 @@ mod tests { fn test_althea_chain_response() { let runner = System::new(); runner.block_on(async move { - let contact = - Contact::new("http://althea.zone:9090", ALTHEA_CONTACT_TIMEOUT, "althea").unwrap(); + let contact = Contact::new( + "http://rpc.althea.zone:9090", + ALTHEA_CONTACT_TIMEOUT, + "althea", + ) + .unwrap(); let tx = contact .get_tx_by_hash( @@ -1011,33 +945,7 @@ mod tests { ) .await .expect("Unable to get tx by hash"); - println!("{:?}", tx.tx_response.clone().unwrap().tx); - - let raw_tx_any = prost_types::Any { - type_url: "/cosmos.tx.v1beta1.Tx".to_string(), - value: tx.tx_response.unwrap().tx.unwrap().value, - }; - let tx_raw: TxRaw = decode_any(raw_tx_any).unwrap(); - - println!("{:?}", tx_raw); - - let body_any = prost_types::Any { - type_url: "/cosmos.tx.v1beta1.TxBody".to_string(), - value: tx_raw.body_bytes, - }; - let tx_body: TxBody = decode_any(body_any).unwrap(); - - println!("{:?}", tx_body); - - for message in tx_body.messages { - let msg_send = prost_types::Any { - type_url: "/cosmos.bank.v1beta1.MsgSend".to_string(), - value: message.value.clone(), - }; - let msg_send: Result = decode_any(msg_send); - - println!("\n\n{:?}", msg_send); - } + let _tx = decode_althea_microtx(tx); }); } } diff --git a/rita_common/src/rita_loop/fast_loop.rs b/rita_common/src/rita_loop/fast_loop.rs index 870406a77..d5ed01d3a 100644 --- a/rita_common/src/rita_loop/fast_loop.rs +++ b/rita_common/src/rita_loop/fast_loop.rs @@ -3,7 +3,7 @@ use crate::debt_keeper::send_debt_update; use crate::network_monitor::update_network_info; use crate::network_monitor::NetworkInfo as NetworkMonitorTick; use crate::payment_controller::tick_payment_controller; -use crate::payment_validator::validate; +use crate::payment_validator::PaymentValidator; use crate::peer_listener::peerlistener_tick; use crate::peer_listener::structs::PeerListener; use crate::traffic_watcher::watch; @@ -14,7 +14,6 @@ use actix_async::System as AsyncSystem; use babel_monitor::open_babel_stream; use babel_monitor::parse_neighs; use babel_monitor::parse_routes; - use std::thread; use std::time::{Duration, Instant}; @@ -39,64 +38,70 @@ pub fn start_rita_fast_loop() { // this will always be an error, so it's really just a loop statement // with some fancy destructuring while let Err(e) = { - thread::spawn(move || loop { + thread::spawn(move || { trace!("Common Fast tick!"); let start = Instant::now(); - let runner = AsyncSystem::new(); + let babel_port = settings::get_rita_common().network.babel_port; + let system_chain = settings::get_rita_common().payment.system_chain; runner.block_on(async move { - let babel_port = settings::get_rita_common().network.babel_port; - trace!("Common tick!"); - - let res = tm_get_neighbors(); - trace!("Currently open tunnels: {:?}", res); - let neighbors = res; - let neigh = Instant::now(); - - if let Ok(mut stream) = open_babel_stream(babel_port, FAST_LOOP_TIMEOUT) { - if let Ok(babel_routes) = parse_routes(&mut stream) { - if let Err(e) = watch(babel_routes.clone(), &neighbors) { - error!("Error for Rita common traffic watcher {}", e); + let mut payment_validator_state = PaymentValidator::new(); + let mut outgoing_payments = Vec::new(); + loop { + trace!("Common tick!"); + + let res = tm_get_neighbors(); + trace!("Currently open tunnels: {:?}", res); + let neighbors = res; + let neigh = Instant::now(); + + if let Ok(mut stream) = open_babel_stream(babel_port, FAST_LOOP_TIMEOUT) { + if let Ok(babel_routes) = parse_routes(&mut stream) { + if let Err(e) = watch(babel_routes.clone(), &neighbors) { + error!("Error for Rita common traffic watcher {}", e); + } + info!( + "TrafficWatcher completed in {}s {}ms", + neigh.elapsed().as_secs(), + neigh.elapsed().subsec_millis() + ); + + // Observe the dataplane for status and problems. + if let Ok(babel_neighbors) = parse_neighs(&mut stream) { + let rita_neighbors = tm_get_neighbors(); + trace!("Sending network monitor tick"); + update_network_info(NetworkMonitorTick { + babel_neighbors, + babel_routes, + rita_neighbors, + }); + } } - info!( - "TrafficWatcher completed in {}s {}ms", - neigh.elapsed().as_secs(), - neigh.elapsed().subsec_millis() - ); + } - // Observe the dataplane for status and problems. - if let Ok(babel_neighbors) = parse_neighs(&mut stream) { - let rita_neighbors = tm_get_neighbors(); - trace!("Sending network monitor tick"); - update_network_info(NetworkMonitorTick { - babel_neighbors, - babel_routes, - rita_neighbors, - }); - } + // Update debts + if let Err(e) = send_debt_update() { + warn!("Debt keeper update failed! {:?}", e); } - } - // Update debts - if let Err(e) = send_debt_update() { - warn!("Debt keeper update failed! {:?}", e); + // updating blockchain info often is easier than dealing with edge cases + // like out of date nonces or balances, also users really really want fast + // balance updates, think very long and very hard before running this more slowly + BlockchainOracleUpdate().await; + info!("Finished oracle update!"); + // Check on payments, only really needs to be run this quickly + // on large nodes where very high variation in throughput can result + // in blowing through the entire grace in less than a minute + let previously_sent_payments = payment_validator_state + .tick_payment_validator(outgoing_payments, system_chain) + .await; + info!("Finished validated!"); + // Process payments queued for sending, needs to be run often for + // the same reason as the validate code, during high throughput periods + // payments must be sent quickly to avoid enforcement + outgoing_payments = tick_payment_controller(previously_sent_payments).await; + info!("Finished tick payment controller!"); } - - // updating blockchain info often is easier than dealing with edge cases - // like out of date nonces or balances, also users really really want fast - // balance updates, think very long and very hard before running this more slowly - BlockchainOracleUpdate().await; - info!("Finished oracle update!"); - // Check on payments, only really needs to be run this quickly - // on large nodes where very high variation in throughput can result - // in blowing through the entire grace in less than a minute - validate().await; - info!("Finished validated!"); - // Process payments queued for sending, needs to be run often for - // the same reason as the validate code, during high throughput periods - // payments must be sent quickly to avoid enforcement - tick_payment_controller().await; - info!("Finished tick payment controller!"); }); info!( "Common Fast tick completed in {}s {}ms", diff --git a/rita_exit/src/network_endpoints/mod.rs b/rita_exit/src/network_endpoints/mod.rs index f143b37e5..39f893bd0 100644 --- a/rita_exit/src/network_endpoints/mod.rs +++ b/rita_exit/src/network_endpoints/mod.rs @@ -25,7 +25,6 @@ use num256::Int256; use rita_client_registration::client_db::get_exits_list; use rita_common::blockchain_oracle::potential_payment_issues_detected; use rita_common::debt_keeper::get_debts_list; -use rita_common::payment_validator::calculate_unverified_payments; use rita_common::rita_loop::get_web3_server; use settings::get_rita_exit; use sodiumoxide::crypto::box_; @@ -444,12 +443,6 @@ pub async fn get_client_debt(client: Json) -> HttpResponse { return HttpResponse::Ok().json(zero); } - // these are payments to us, remember debt is positive when we owe and negative when we are owed - // this value is being presented to the client router who's debt is positive (they owe the exit) so we - // want to make it negative - let unverified_payments_uint = calculate_unverified_payments(client); - let unverified_payments = unverified_payments_uint.to_int256().unwrap(); - let debts = get_debts_list(); for debt in debts { if debt.identity == client { @@ -457,11 +450,6 @@ pub async fn get_client_debt(client: Json) -> HttpResponse { let incoming_payments = debt.payment_details.incoming_payments; let we_owe_them = client_debt > zero; - let they_owe_more_than_in_queue = if we_owe_them { - false - } else { - (neg_one * client_debt).to_uint256().unwrap() > unverified_payments_uint - }; // they have more credit than they owe, wait for this to unwind // we apply credit right before enforcing or on payment. @@ -469,18 +457,16 @@ pub async fn get_client_debt(client: Json) -> HttpResponse { return HttpResponse::Ok().json(zero); } - match (we_owe_them, they_owe_more_than_in_queue) { + match we_owe_them { // in this case we owe them, return zero - (true, _) => return HttpResponse::Ok().json(zero), + true => return HttpResponse::Ok().json(zero), // they owe us more than is in the queue - (false, true) => { + false => { // client debt is negative, they owe us, so we make it positive and subtract // the unverified payments, which we're sure are less than or equal to the debt - let ret = (client_debt * neg_one) - unverified_payments; + let ret = client_debt * neg_one; return HttpResponse::Ok().json(ret); } - // they owe us less than what is in the queue, return zero - (false, false) => return HttpResponse::Ok().json(zero), } } }