From e1ed8635f9bb74aef6840fc40763f788abb884a6 Mon Sep 17 00:00:00 2001 From: Gregory Hill Date: Thu, 22 Jun 2023 21:20:52 +0000 Subject: [PATCH 1/2] refactor: better caching, optional auth check Signed-off-by: Gregory Hill --- .gitignore | 1 + Cargo.lock | 2 ++ faucet/Cargo.toml | 3 ++ faucet/src/error.rs | 21 ++++++++---- faucet/src/http.rs | 81 +++++++++++++++++++++++++++++--------------- faucet/src/lib.rs | 2 ++ faucet/src/main.rs | 2 ++ runtime/src/error.rs | 7 ++++ runtime/src/rpc.rs | 20 ++++++++--- vault/src/metrics.rs | 2 +- vault/src/replace.rs | 2 +- 11 files changed, 103 insertions(+), 40 deletions(-) diff --git a/.gitignore b/.gitignore index ba09d6b34..dd498d267 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ event-logs faucet/kv keyfile.json +config.json .deploy/monitoring/data .deploy/monitoring/prometheus .deploy/monitoring/alertmanager diff --git a/Cargo.lock b/Cargo.lock index bc2997256..e2d61a680 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3052,6 +3052,7 @@ dependencies = [ "kv", "log 0.4.18", "parity-scale-codec", + "reqwest", "runtime", "serde", "serde_json", @@ -3059,6 +3060,7 @@ dependencies = [ "sp-keyring 7.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror", "tokio", + "url 2.3.1", ] [[package]] diff --git a/faucet/Cargo.toml b/faucet/Cargo.toml index 009e235dd..479a866c9 100644 --- a/faucet/Cargo.toml +++ b/faucet/Cargo.toml @@ -32,6 +32,9 @@ async-trait = "0.1.40" futures = "0.3.5" git-version = "0.3.4" +reqwest = "0.11.11" +url = "2.2.2" + # Workspace dependencies runtime = { path = "../runtime" } service = { path = "../service" } diff --git a/faucet/src/error.rs b/faucet/src/error.rs index cc71154db..156728c70 100644 --- a/faucet/src/error.rs +++ b/faucet/src/error.rs @@ -1,13 +1,15 @@ #![allow(clippy::enum_variant_names)] -use chrono::ParseError; +use chrono::ParseError as ChronoParseError; use jsonrpc_http_server::jsonrpc_core::Error as JsonRpcError; use kv::Error as KvError; use parity_scale_codec::Error as CodecError; +use reqwest::Error as ReqwestError; use runtime::Error as RuntimeError; use serde_json::Error as SerdeJsonError; use std::{io::Error as IoError, net::AddrParseError}; use thiserror::Error; +use url::ParseError as UrlParseError; #[derive(Error, Debug)] pub enum Error { @@ -21,16 +23,23 @@ pub enum Error { AddrParseError(#[from] AddrParseError), #[error("Kv store error: {0}")] KvError(#[from] KvError), + #[error("ReqwestError: {0}")] + ReqwestError(#[from] ReqwestError), + #[error("UrlParseError: {0}")] + UrlParseError(#[from] UrlParseError), #[error("Error parsing datetime string: {0}")] - DatetimeParsingError(#[from] ParseError), + DatetimeParsingError(#[from] ChronoParseError), + #[error("IoError: {0}")] + IoError(#[from] IoError), + #[error("SerdeJsonError: {0}")] + SerdeJsonError(#[from] SerdeJsonError), + #[error("Requester balance already sufficient")] AccountBalanceExceedsMaximum, #[error("Requester was recently funded")] AccountAlreadyFunded, #[error("Mathematical operation error")] MathError, - #[error("IoError: {0}")] - IoError(#[from] IoError), - #[error("SerdeJsonError: {0}")] - SerdeJsonError(#[from] SerdeJsonError), + #[error("Terms and conditions not signed")] + SignatureMissing, } diff --git a/faucet/src/http.rs b/faucet/src/http.rs index 1c545fce1..522dec478 100644 --- a/faucet/src/http.rs +++ b/faucet/src/http.rs @@ -7,18 +7,19 @@ use jsonrpc_http_server::{ }; use kv::*; use parity_scale_codec::{Decode, Encode}; +use reqwest::Url; use runtime::{ AccountId, CollateralBalancesPallet, CurrencyId, Error as RuntimeError, InterBtcParachain, RuntimeCurrencyInfo, - TryFromSymbol, VaultRegistryPallet, + Ss58Codec, TryFromSymbol, VaultRegistryPallet, SS58_PREFIX, }; -use serde::{Deserialize, Deserializer}; +use serde::{Deserialize, Deserializer, Serialize}; use std::{net::SocketAddr, time::Duration}; use tokio::time::timeout; const HEALTH_DURATION: Duration = Duration::from_millis(5000); const KV_STORE_NAME: &str = "store"; -#[derive(serde::Serialize, serde::Deserialize, PartialEq)] +#[derive(Serialize, Deserialize, PartialEq)] struct FaucetRequest { datetime: String, account_type: FundingRequestAccountType, @@ -137,14 +138,14 @@ fn has_request_expired( async fn ensure_funding_allowed( parachain_rpc: &InterBtcParachain, - account_id: AccountId, - allowance_config: AllowanceConfig, + account_id: &AccountId, + allowance_config: &AllowanceConfig, last_request_json: Option>, account_type: FundingRequestAccountType, ) -> Result<(), Error> { let account_allowances = match account_type { - FundingRequestAccountType::User => allowance_config.user_allowances, - FundingRequestAccountType::Vault => allowance_config.vault_allowances, + FundingRequestAccountType::User => &allowance_config.user_allowances, + FundingRequestAccountType::Vault => &allowance_config.vault_allowances, }; let currency_ids: Result, _> = account_allowances .iter() @@ -195,6 +196,21 @@ async fn ensure_funding_allowed( } } +#[derive(Deserialize)] +struct GetSignatureData { + exists: bool, +} + +async fn ensure_signature_exists(auth_url: &str, account_id: &AccountId) -> Result<(), Error> { + reqwest::get(Url::parse(auth_url)?.join(&account_id.to_ss58check_with_version(SS58_PREFIX.into()))?) + .await? + .json::() + .await? + .exists + .then(|| ()) + .ok_or(Error::SignatureMissing) +} + async fn atomic_faucet_funding( parachain_rpc: &InterBtcParachain, kv: Bucket<'_, String, Json>, @@ -211,35 +227,46 @@ async fn atomic_faucet_funding( ensure_funding_allowed( parachain_rpc, - account_id.clone(), - allowance_config, + &account_id, + &allowance_config, last_request_json, account_type.clone(), ) .await?; - let mut transfers = vec![]; - for AllowanceAmount { symbol, amount } in amounts.iter() { - let currency_id = CurrencyId::try_from_symbol(symbol.clone())?; - log::info!( - "AccountId: {}, Currency: {:?} Type: {:?}, Amount: {}", - account_id, - currency_id.symbol().unwrap_or_default(), - account_type, - amount - ); - transfers.push(parachain_rpc.transfer_to(&account_id, *amount, currency_id)); + if let Some(auth_url) = allowance_config.auth_url { + ensure_signature_exists(&auth_url, &account_id).await?; } - let result = futures::future::join_all(transfers).await; + // replace the previous (expired) claim datetime with the datetime of the current claim + update_kv_store(&kv, account_str.clone(), Utc::now().to_rfc2822(), account_type.clone())?; + + let transfers = amounts + .into_iter() + .map(|AllowanceAmount { symbol, amount }| { + let currency_id = CurrencyId::try_from_symbol(symbol.clone())?; + log::info!( + "AccountId: {}, Currency: {:?} Type: {:?}, Amount: {}", + account_id, + currency_id.symbol().unwrap_or_default(), + account_type, + amount + ); + Ok((amount, currency_id)) + }) + .collect::, Error>>()?; + + if let Err(err) = parachain_rpc.transfer_to(&account_id, transfers).await { + log::error!("Failed to fund {}", account_str); + if err.is_any_module_err() || err.is_invalid_transaction().is_some() { + log::info!("Removing previous claim"); + // transfer failed, reset the db so this can be called again + kv.remove(account_str)?; + } - if let Some(err) = result.into_iter().find_map(|x| x.err()) { return Err(err.into()); } - // Replace the previous (expired) claim datetime with the datetime of the current claim, only update - // this after successfully transferring funds to ensure that this can be called again on error - update_kv_store(&kv, account_str, Utc::now().to_rfc2822(), account_type.clone())?; Ok(()) } @@ -262,7 +289,7 @@ pub async fn start_http( allowance_config: AllowanceConfig, ) -> jsonrpc_http_server::CloseHandle { let mut io = IoHandler::default(); - let store = Store::new(Config::new("./kv")).expect("Unable to open kv store"); + let store = Store::new(Config::new("./kv").flush_every_ms(100)).expect("Unable to open kv store"); let user_allowances_clone = allowance_config.user_allowances.clone(); let vault_allowances_clone = allowance_config.vault_allowances.clone(); io.add_sync_method("user_allowance", move |_| handle_resp(Ok(&user_allowances_clone))); @@ -397,7 +424,7 @@ mod tests { join_all(balance.iter().map(|(amount, currency)| { let leftover = leftover_units * 10u128.pow(currency.decimals().unwrap()); let amount_to_transfer = if *amount > leftover { amount - leftover } else { 0 }; - provider.transfer_to(&drain_account_id, amount_to_transfer, currency.clone()) + provider.transfer_to(&drain_account_id, vec![(amount_to_transfer, currency.clone())]) })) .await .into_iter() diff --git a/faucet/src/lib.rs b/faucet/src/lib.rs index f29d6db03..3ba279fe8 100644 --- a/faucet/src/lib.rs +++ b/faucet/src/lib.rs @@ -1,10 +1,12 @@ use parity_scale_codec::{Decode, Encode}; use serde::Deserialize; + #[derive(Deserialize, Debug, Clone, Encode, Decode)] pub struct AllowanceAmount { pub symbol: String, pub amount: u128, } + impl AllowanceAmount { pub fn new(symbol: String, amount: u128) -> Self { Self { symbol, amount } diff --git a/faucet/src/main.rs b/faucet/src/main.rs index c51dc19b5..cf20cbcd9 100644 --- a/faucet/src/main.rs +++ b/faucet/src/main.rs @@ -40,6 +40,7 @@ pub struct AllowanceConfig { pub faucet_cooldown_hours: i64, pub user_allowances: Allowance, pub vault_allowances: Allowance, + pub auth_url: Option, } impl AllowanceConfig { @@ -54,6 +55,7 @@ impl AllowanceConfig { faucet_cooldown_hours, user_allowances, vault_allowances, + auth_url: None, } } } diff --git a/runtime/src/error.rs b/runtime/src/error.rs index 6489b98df..790948348 100644 --- a/runtime/src/error.rs +++ b/runtime/src/error.rs @@ -108,6 +108,13 @@ impl From for Error { } impl Error { + pub fn is_any_module_err(&self) -> bool { + matches!( + self, + Error::SubxtRuntimeError(SubxtError::Runtime(DispatchError::Module(_))), + ) + } + fn is_module_err(&self, pallet_name: &str, error_name: &str) -> bool { matches!( self, diff --git a/runtime/src/rpc.rs b/runtime/src/rpc.rs index fb72f2496..bb08f2fd0 100644 --- a/runtime/src/rpc.rs +++ b/runtime/src/rpc.rs @@ -836,7 +836,7 @@ pub trait CollateralBalancesPallet { async fn get_reserved_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result; - async fn transfer_to(&self, recipient: &AccountId, amount: u128, currency_id: CurrencyId) -> Result<(), Error>; + async fn transfer_to(&self, recipient: &AccountId, amounts: Vec<(u128, CurrencyId)>) -> Result<(), Error>; } #[async_trait] @@ -859,10 +859,20 @@ impl CollateralBalancesPallet for InterBtcParachain { Ok(self.query_finalized_or_default(storage_key).await?.reserved) } - async fn transfer_to(&self, recipient: &AccountId, amount: u128, currency_id: CurrencyId) -> Result<(), Error> { - self.with_unique_signer(metadata::tx().tokens().transfer(recipient.clone(), currency_id, amount)) - .await?; - Ok(()) + async fn transfer_to(&self, recipient: &AccountId, amounts: Vec<(u128, CurrencyId)>) -> Result<(), Error> { + self.batch( + amounts + .into_iter() + .map(|(amount, currency_id)| { + EncodedCall::Tokens(metadata::runtime_types::orml_tokens::module::Call::transfer { + dest: recipient.clone(), + currency_id, + amount, + }) + }) + .collect(), + ) + .await } } diff --git a/vault/src/metrics.rs b/vault/src/metrics.rs index ca90a0ebe..ca407687c 100644 --- a/vault/src/metrics.rs +++ b/vault/src/metrics.rs @@ -748,7 +748,7 @@ mod tests { async fn get_free_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result; async fn get_reserved_balance(&self, currency_id: CurrencyId) -> Result; async fn get_reserved_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result; - async fn transfer_to(&self, recipient: &AccountId, amount: u128, currency_id: CurrencyId) -> Result<(), RuntimeError>; + async fn transfer_to(&self, recipient: &AccountId, amounts: Vec<(u128, CurrencyId)>) -> Result<(), RuntimeError>; } #[async_trait] diff --git a/vault/src/replace.rs b/vault/src/replace.rs index 816d664ce..d36527f11 100644 --- a/vault/src/replace.rs +++ b/vault/src/replace.rs @@ -344,7 +344,7 @@ mod tests { async fn get_free_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result; async fn get_reserved_balance(&self, currency_id: CurrencyId) -> Result; async fn get_reserved_balance_for_id(&self, id: AccountId, currency_id: CurrencyId) -> Result; - async fn transfer_to(&self, recipient: &AccountId, amount: u128, currency_id: CurrencyId) -> Result<(), RuntimeError>; } + async fn transfer_to(&self, recipient: &AccountId, amounts: Vec<(u128, CurrencyId)>) -> Result<(), RuntimeError>; } } impl Clone for MockProvider { From 612d4d2e62b7198df13e90741c650f6649f2ea32 Mon Sep 17 00:00:00 2001 From: Gregory Hill Date: Mon, 26 Jun 2023 17:29:56 +0000 Subject: [PATCH 2/2] refactor: global mutex for faucet store read / writes Signed-off-by: Gregory Hill --- Cargo.lock | 1 + faucet/Cargo.toml | 1 + faucet/src/http.rs | 88 +++++++++++++++++++++++++++------------------- 3 files changed, 54 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2d61a680..d7e3ea1bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3050,6 +3050,7 @@ dependencies = [ "hex", "jsonrpc-http-server", "kv", + "lazy_static", "log 0.4.18", "parity-scale-codec", "reqwest", diff --git a/faucet/Cargo.toml b/faucet/Cargo.toml index 479a866c9..49c6d5097 100644 --- a/faucet/Cargo.toml +++ b/faucet/Cargo.toml @@ -31,6 +31,7 @@ kv = { version = "0.22.0", features = ["json-value"] } async-trait = "0.1.40" futures = "0.3.5" git-version = "0.3.4" +lazy_static = "1.4.0" reqwest = "0.11.11" url = "2.2.2" diff --git a/faucet/src/http.rs b/faucet/src/http.rs index 522dec478..e5707ec91 100644 --- a/faucet/src/http.rs +++ b/faucet/src/http.rs @@ -6,6 +6,7 @@ use jsonrpc_http_server::{ DomainsValidation, ServerBuilder, }; use kv::*; +use lazy_static::lazy_static; use parity_scale_codec::{Decode, Encode}; use reqwest::Url; use runtime::{ @@ -14,11 +15,18 @@ use runtime::{ }; use serde::{Deserialize, Deserializer, Serialize}; use std::{net::SocketAddr, time::Duration}; -use tokio::time::timeout; +use tokio::{ + sync::{Mutex, MutexGuard}, + time::timeout, +}; const HEALTH_DURATION: Duration = Duration::from_millis(5000); const KV_STORE_NAME: &str = "store"; +lazy_static! { + static ref LOCK: Mutex<()> = Mutex::new(()); +} + #[derive(Serialize, Deserialize, PartialEq)] struct FaucetRequest { datetime: String, @@ -136,13 +144,17 @@ fn has_request_expired( ) } -async fn ensure_funding_allowed( +async fn ensure_funding_allowed<'a>( + kv: &'a Bucket<'a, String, Json>, parachain_rpc: &InterBtcParachain, account_id: &AccountId, - allowance_config: &AllowanceConfig, - last_request_json: Option>, + allowance_config: AllowanceConfig, account_type: FundingRequestAccountType, -) -> Result<(), Error> { +) -> Result, Error> { + if let Some(auth_url) = allowance_config.auth_url { + ensure_signature_exists(&auth_url, account_id).await?; + } + let account_allowances = match account_type { FundingRequestAccountType::User => &allowance_config.user_allowances, FundingRequestAccountType::Vault => &allowance_config.vault_allowances, @@ -151,6 +163,7 @@ async fn ensure_funding_allowed( .iter() .map(|x| CurrencyId::try_from_symbol(x.symbol.clone())) .collect(); + for currency_id in currency_ids?.iter() { let free_balance = parachain_rpc .get_free_balance_for_id(account_id.clone(), *currency_id) @@ -177,6 +190,11 @@ async fn ensure_funding_allowed( .checked_sub_signed(ISO8601::hours(allowance_config.faucet_cooldown_hours)) .ok_or(Error::MathError)?; + // aquire lock after auth and balance checks since they may be slow and we only + // want to guard writes to the local key store + let mutex_guard = LOCK.lock().await; + + let last_request_json = kv.get(account_id.to_string())?; match last_request_json { Some(last_request_json) => { let last_request_expired = has_request_expired( @@ -189,10 +207,10 @@ async fn ensure_funding_allowed( log::warn!("Already funded {} at {:?}", account_id, last_request_json.0.datetime); Err(Error::AccountAlreadyFunded) } else { - Ok(()) + Ok(mutex_guard) } } - None => Ok(()), + None => Ok(mutex_guard), } } @@ -213,33 +231,28 @@ async fn ensure_signature_exists(auth_url: &str, account_id: &AccountId) -> Resu async fn atomic_faucet_funding( parachain_rpc: &InterBtcParachain, - kv: Bucket<'_, String, Json>, + kv: &Bucket<'_, String, Json>, account_id: AccountId, allowance_config: AllowanceConfig, ) -> Result<(), Error> { - let account_str = account_id.to_string(); - let last_request_json = kv.get(account_str.clone())?; let account_type = get_account_type(parachain_rpc, account_id.clone()).await?; let amounts: Allowance = match account_type { FundingRequestAccountType::User => allowance_config.user_allowances.clone(), FundingRequestAccountType::Vault => allowance_config.vault_allowances.clone(), }; - ensure_funding_allowed( - parachain_rpc, - &account_id, - &allowance_config, - last_request_json, - account_type.clone(), - ) - .await?; - - if let Some(auth_url) = allowance_config.auth_url { - ensure_signature_exists(&auth_url, &account_id).await?; - } + let mutex_guard = + ensure_funding_allowed(kv, parachain_rpc, &account_id, allowance_config, account_type.clone()).await?; // replace the previous (expired) claim datetime with the datetime of the current claim - update_kv_store(&kv, account_str.clone(), Utc::now().to_rfc2822(), account_type.clone())?; + update_kv_store( + kv, + account_id.to_string(), + Utc::now().to_rfc2822(), + account_type.clone(), + )?; + // don't block other threads for transfer since we updated the store + drop(mutex_guard); let transfers = amounts .into_iter() @@ -256,17 +269,7 @@ async fn atomic_faucet_funding( }) .collect::, Error>>()?; - if let Err(err) = parachain_rpc.transfer_to(&account_id, transfers).await { - log::error!("Failed to fund {}", account_str); - if err.is_any_module_err() || err.is_invalid_transaction().is_some() { - log::info!("Removing previous claim"); - // transfer failed, reset the db so this can be called again - kv.remove(account_str)?; - } - - return Err(err.into()); - } - + parachain_rpc.transfer_to(&account_id, transfers).await?; Ok(()) } @@ -278,8 +281,21 @@ async fn fund_account( ) -> Result<(), Error> { let parachain_rpc = parachain_rpc.clone(); let kv = open_kv_store(store)?; - atomic_faucet_funding(¶chain_rpc, kv, req.account_id.clone(), allowance_config).await?; - Ok(()) + match atomic_faucet_funding(¶chain_rpc, &kv, req.account_id.clone(), allowance_config).await { + Err(Error::RuntimeError(err)) + if err.is_any_module_err() + || err.is_invalid_transaction().is_some() + || matches!(err, RuntimeError::AssetNotFound) => + { + let account_str = req.account_id.to_string(); + log::error!("Failed to fund {}", account_str); + // transfer failed, reset the db so this can be called again + kv.remove(account_str)?; + Err(Error::RuntimeError(err)) + } + Err(err) => Err(err), + Ok(_) => Ok(()), + } } pub async fn start_http(