Skip to content

Commit

Permalink
refactor: global mutex for faucet store read / writes
Browse files Browse the repository at this point in the history
Signed-off-by: Gregory Hill <[email protected]>
  • Loading branch information
gregdhill committed Jun 26, 2023
1 parent e1ed863 commit 612d4d2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kv = { version = "0.22.0", features = ["json-value"] }
async-trait = "0.1.40"
futures = "0.3.5"
git-version = "0.3.4"
lazy_static = "1.4.0"

reqwest = "0.11.11"
url = "2.2.2"
Expand Down
88 changes: 52 additions & 36 deletions faucet/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use jsonrpc_http_server::{
DomainsValidation, ServerBuilder,
};
use kv::*;
use lazy_static::lazy_static;
use parity_scale_codec::{Decode, Encode};
use reqwest::Url;
use runtime::{
Expand All @@ -14,11 +15,18 @@ use runtime::{
};
use serde::{Deserialize, Deserializer, Serialize};
use std::{net::SocketAddr, time::Duration};
use tokio::time::timeout;
use tokio::{
sync::{Mutex, MutexGuard},
time::timeout,
};

const HEALTH_DURATION: Duration = Duration::from_millis(5000);
const KV_STORE_NAME: &str = "store";

lazy_static! {
static ref LOCK: Mutex<()> = Mutex::new(());
}

#[derive(Serialize, Deserialize, PartialEq)]
struct FaucetRequest {
datetime: String,
Expand Down Expand Up @@ -136,13 +144,17 @@ fn has_request_expired(
)
}

async fn ensure_funding_allowed(
async fn ensure_funding_allowed<'a>(
kv: &'a Bucket<'a, String, Json<FaucetRequest>>,
parachain_rpc: &InterBtcParachain,
account_id: &AccountId,
allowance_config: &AllowanceConfig,
last_request_json: Option<Json<FaucetRequest>>,
allowance_config: AllowanceConfig,
account_type: FundingRequestAccountType,
) -> Result<(), Error> {
) -> Result<MutexGuard<'static, ()>, Error> {
if let Some(auth_url) = allowance_config.auth_url {
ensure_signature_exists(&auth_url, account_id).await?;
}

let account_allowances = match account_type {
FundingRequestAccountType::User => &allowance_config.user_allowances,
FundingRequestAccountType::Vault => &allowance_config.vault_allowances,
Expand All @@ -151,6 +163,7 @@ async fn ensure_funding_allowed(
.iter()
.map(|x| CurrencyId::try_from_symbol(x.symbol.clone()))
.collect();

for currency_id in currency_ids?.iter() {
let free_balance = parachain_rpc
.get_free_balance_for_id(account_id.clone(), *currency_id)
Expand All @@ -177,6 +190,11 @@ async fn ensure_funding_allowed(
.checked_sub_signed(ISO8601::hours(allowance_config.faucet_cooldown_hours))
.ok_or(Error::MathError)?;

// aquire lock after auth and balance checks since they may be slow and we only
// want to guard writes to the local key store
let mutex_guard = LOCK.lock().await;

let last_request_json = kv.get(account_id.to_string())?;
match last_request_json {
Some(last_request_json) => {
let last_request_expired = has_request_expired(
Expand All @@ -189,10 +207,10 @@ async fn ensure_funding_allowed(
log::warn!("Already funded {} at {:?}", account_id, last_request_json.0.datetime);
Err(Error::AccountAlreadyFunded)
} else {
Ok(())
Ok(mutex_guard)
}
}
None => Ok(()),
None => Ok(mutex_guard),
}
}

Expand All @@ -213,33 +231,28 @@ async fn ensure_signature_exists(auth_url: &str, account_id: &AccountId) -> Resu

async fn atomic_faucet_funding(
parachain_rpc: &InterBtcParachain,
kv: Bucket<'_, String, Json<FaucetRequest>>,
kv: &Bucket<'_, String, Json<FaucetRequest>>,
account_id: AccountId,
allowance_config: AllowanceConfig,
) -> Result<(), Error> {
let account_str = account_id.to_string();
let last_request_json = kv.get(account_str.clone())?;
let account_type = get_account_type(parachain_rpc, account_id.clone()).await?;
let amounts: Allowance = match account_type {
FundingRequestAccountType::User => allowance_config.user_allowances.clone(),
FundingRequestAccountType::Vault => allowance_config.vault_allowances.clone(),
};

ensure_funding_allowed(
parachain_rpc,
&account_id,
&allowance_config,
last_request_json,
account_type.clone(),
)
.await?;

if let Some(auth_url) = allowance_config.auth_url {
ensure_signature_exists(&auth_url, &account_id).await?;
}
let mutex_guard =
ensure_funding_allowed(kv, parachain_rpc, &account_id, allowance_config, account_type.clone()).await?;

// replace the previous (expired) claim datetime with the datetime of the current claim
update_kv_store(&kv, account_str.clone(), Utc::now().to_rfc2822(), account_type.clone())?;
update_kv_store(
kv,
account_id.to_string(),
Utc::now().to_rfc2822(),
account_type.clone(),
)?;
// don't block other threads for transfer since we updated the store
drop(mutex_guard);

let transfers = amounts
.into_iter()
Expand All @@ -256,17 +269,7 @@ async fn atomic_faucet_funding(
})
.collect::<Result<Vec<_>, Error>>()?;

if let Err(err) = parachain_rpc.transfer_to(&account_id, transfers).await {
log::error!("Failed to fund {}", account_str);
if err.is_any_module_err() || err.is_invalid_transaction().is_some() {
log::info!("Removing previous claim");
// transfer failed, reset the db so this can be called again
kv.remove(account_str)?;
}

return Err(err.into());
}

parachain_rpc.transfer_to(&account_id, transfers).await?;
Ok(())
}

Expand All @@ -278,8 +281,21 @@ async fn fund_account(
) -> Result<(), Error> {
let parachain_rpc = parachain_rpc.clone();
let kv = open_kv_store(store)?;
atomic_faucet_funding(&parachain_rpc, kv, req.account_id.clone(), allowance_config).await?;
Ok(())
match atomic_faucet_funding(&parachain_rpc, &kv, req.account_id.clone(), allowance_config).await {
Err(Error::RuntimeError(err))
if err.is_any_module_err()
|| err.is_invalid_transaction().is_some()
|| matches!(err, RuntimeError::AssetNotFound) =>
{
let account_str = req.account_id.to_string();
log::error!("Failed to fund {}", account_str);
// transfer failed, reset the db so this can be called again
kv.remove(account_str)?;
Err(Error::RuntimeError(err))
}
Err(err) => Err(err),
Ok(_) => Ok(()),
}
}

pub async fn start_http(
Expand Down

0 comments on commit 612d4d2

Please sign in to comment.