Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add pagination to deposit fetching #1252

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docker/mainnet/sbtc-signer/signer-config.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ endpoints = [
"https://[email protected]"
]

# The pagination timeout, in seconds, used to fetch deposits and withdrawals
# requests from Emily.
djordon marked this conversation as resolved.
Show resolved Hide resolved
# Required: true
# Environment: SIGNER_EMILY__PAGINATION_TIMEOUT
pagination_timeout = 30

# !! ==============================================================================
# !! Bitcoin Core Configuration
# !! ==============================================================================
Expand Down
6 changes: 6 additions & 0 deletions docker/sbtc/signer/signer-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ endpoints = [
"http://testApiKey@emily-server:3031",
]

# The pagination timeout, in seconds, used to fetch deposits and withdrawals
# requests from Emily.
# Required: true
# Environment: SIGNER_EMILY__PAGINATION_TIMEOUT
pagination_timeout = 30

# !! ==============================================================================
# !! Bitcoin Core Configuration
# !! ==============================================================================
Expand Down
1 change: 1 addition & 0 deletions envs/signer-1.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export SIGNER_BITCOIN__BLOCK_HASH_STREAM_ENDPOINTS=tcp://localhost:28332
export SIGNER_STACKS__ENDPOINTS=http://localhost:20443
export SIGNER_SIGNER__EVENT_OBSERVER__BIND=0.0.0.0:8801
export SIGNER_EMILY__ENDPOINTS=http://testApiKey@localhost:3031
export SIGNER_EMILY__PAGINATION_TIMEOUT=30
matteojug marked this conversation as resolved.
Show resolved Hide resolved
export SIGNER_SIGNER__DB_ENDPOINT=postgresql://postgres:postgres@localhost:5432/signer
export SIGNER_SIGNER__PRIVATE_KEY=41634762d89dfa09133a4a8e9c1378d0161d29cd0a9433b51f1e3d32947a73dc
export SIGNER_SIGNER__P2P__LISTEN_ON=tcp://127.0.0.1:41221
Expand Down
1 change: 1 addition & 0 deletions envs/signer-2.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export SIGNER_BITCOIN__BLOCK_HASH_STREAM_ENDPOINTS=tcp://localhost:28332
export SIGNER_STACKS__ENDPOINTS=http://localhost:20443
export SIGNER_SIGNER__EVENT_OBSERVER__BIND=0.0.0.0:8802
export SIGNER_EMILY__ENDPOINTS=http://testApiKey@localhost:3031
export SIGNER_EMILY__PAGINATION_TIMEOUT=30
export SIGNER_SIGNER__DB_ENDPOINT=postgresql://postgres:postgres@localhost:5433/signer
export SIGNER_SIGNER__PRIVATE_KEY=9bfecf16c9c12792589dd2b843f850d5b89b81a04f8ab91c083bdf6709fbefee01
export SIGNER_SIGNER__P2P__LISTEN_ON=tcp://127.0.0.1:41222
Expand Down
1 change: 1 addition & 0 deletions envs/signer-3.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export SIGNER_BITCOIN__BLOCK_HASH_STREAM_ENDPOINTS=tcp://localhost:28332
export SIGNER_STACKS__ENDPOINTS=http://localhost:20443
export SIGNER_SIGNER__EVENT_OBSERVER__BIND=0.0.0.0:8803
export SIGNER_EMILY__ENDPOINTS=http://testApiKey@localhost:3031
export SIGNER_EMILY__PAGINATION_TIMEOUT=30
export SIGNER_SIGNER__DB_ENDPOINT=postgresql://postgres:postgres@localhost:5434/signer
export SIGNER_SIGNER__PRIVATE_KEY=3ec0ca5770a356d6cd1a9bfcbf6cd151eb1bd85c388cc00648ec4ef5853fdb7401
export SIGNER_SIGNER__P2P__LISTEN_ON=tcp://127.0.0.1:41223
Expand Down
6 changes: 6 additions & 0 deletions signer/src/config/default.toml
djordon marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ endpoints = [
"http://testApiKey@localhost:3031",
]

# The pagination timeout, in seconds, used to fetch deposits and withdrawals
# requests from Emily.
# Required: true
# Environment: SIGNER_EMILY__PAGINATION_TIMEOUT
pagination_timeout = 30

# !! ==============================================================================
# !! Bitcoin Core Configuration
# !! ==============================================================================
Expand Down
7 changes: 7 additions & 0 deletions signer/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ pub struct EmilyClientConfig {
/// Emily API endpoints.
#[serde(deserialize_with = "url_deserializer_vec")]
pub endpoints: Vec<Url>,
/// Pagination timeout in seconds.
#[serde(deserialize_with = "duration_seconds_deserializer")]
pub pagination_timeout: std::time::Duration,
matteojug marked this conversation as resolved.
Show resolved Hide resolved
/// Maximum items returned per page. If `None`, the response can include up to 1 MB of items.
matteojug marked this conversation as resolved.
Show resolved Hide resolved
pub page_size: Option<NonZeroU16>,
}

impl Validatable for EmilyClientConfig {
Expand Down Expand Up @@ -611,6 +616,8 @@ mod tests {
NonZeroU32::new(1).unwrap()
);
assert_eq!(settings.signer.dkg_min_bitcoin_block_height, None);
assert_eq!(settings.emily.page_size, None);
assert_eq!(settings.emily.pagination_timeout, Duration::from_secs(30));
}

#[test]
Expand Down
85 changes: 61 additions & 24 deletions signer/src/emily_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Emily API client module

use std::str::FromStr;
use std::time::Duration;
use std::time::Instant;

use bitcoin::Amount;
use bitcoin::OutPoint;
Expand Down Expand Up @@ -132,6 +134,8 @@ pub trait EmilyInteract: Sync + Send {
#[derive(Clone)]
pub struct EmilyClient {
config: EmilyApiConfig,
pagination_timeout: Duration,
page_size: Option<i32>,
matteojug marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(any(test, feature = "testing"))]
Expand All @@ -140,13 +144,13 @@ impl EmilyClient {
pub fn config(&self) -> &EmilyApiConfig {
&self.config
}
}

#[cfg(any(test, feature = "testing"))]
impl TryFrom<&Url> for EmilyClient {
type Error = Error;
/// Initialize a new Emily client from just a URL for testing scenarios.
fn try_from(url: &Url) -> Result<Self, Self::Error> {
/// Initialize a new Emily client and validate the url.
pub fn try_new(
url: &Url,
pagination_timeout: Duration,
page_size: Option<i32>,
) -> Result<Self, Error> {
let mut url = url.clone();
let api_key = if url.username().is_empty() {
None
Expand Down Expand Up @@ -178,7 +182,11 @@ impl TryFrom<&Url> for EmilyClient {
config.base_path = url.to_string().trim_end_matches("/").to_string();
config.api_key = api_key;

Ok(Self { config })
Ok(Self {
config,
pagination_timeout,
page_size,
})
}
}

Expand Down Expand Up @@ -214,18 +222,25 @@ impl EmilyInteract for EmilyClient {
.map_err(Error::DecodeHexScript)?,
}))
}

async fn get_deposits(&self) -> Result<Vec<CreateDepositRequest>, Error> {
// TODO: hanlde pagination -- if the queried data is over 1MB DynamoDB will
// paginate the results even if we pass `None` as page limit.
let resp = deposit_api::get_deposits(&self.config, Status::Pending, None, None)
let mut all_deposits = Vec::new();
let mut next_token: Option<String> = None;
let start_time = Instant::now();
loop {
let resp = deposit_api::get_deposits(
&self.config,
Status::Pending,
next_token.as_deref(),
self.page_size,
)
.await
.map_err(EmilyClientError::GetDeposits)
.map_err(Error::EmilyApi)?;
matteojug marked this conversation as resolved.
Show resolved Hide resolved

resp.deposits
.iter()
.map(|deposit| {
Ok(CreateDepositRequest {
// Convert each deposit to our CreateDepositRequest
for deposit in resp.deposits.iter() {
let req = CreateDepositRequest {
outpoint: OutPoint {
txid: Txid::from_str(&deposit.bitcoin_txid)
.map_err(Error::DecodeHexTxid)?,
Expand All @@ -235,9 +250,27 @@ impl EmilyInteract for EmilyClient {
.map_err(Error::DecodeHexScript)?,
deposit_script: ScriptBuf::from_hex(&deposit.deposit_script)
.map_err(Error::DecodeHexScript)?,
})
})
.collect()
};
all_deposits.push(req);
}

// If more pages exist, loop again; otherwise stop
match resp.next_token.flatten() {
Some(token) => next_token = Some(token),
None => break,
}

if start_time.elapsed() > self.pagination_timeout {
tracing::warn!(
"timeout fetching deposits, breaking at page {:?}, fetched {} deposits",
next_token,
all_deposits.len()
);
break;
}
}

Ok(all_deposits)
matteojug marked this conversation as resolved.
Show resolved Hide resolved
}

async fn update_deposits(
Expand Down Expand Up @@ -366,10 +399,8 @@ impl EmilyInteract for ApiFallbackClient<EmilyClient> {
.await
}

fn get_deposits(
&self,
) -> impl std::future::Future<Output = Result<Vec<CreateDepositRequest>, Error>> {
self.exec(|client, _| client.get_deposits())
async fn get_deposits(&self) -> Result<Vec<CreateDepositRequest>, Error> {
self.exec(|client, _| client.get_deposits()).await
}

async fn update_deposits(
Expand Down Expand Up @@ -426,7 +457,13 @@ impl TryFrom<&EmilyClientConfig> for ApiFallbackClient<EmilyClient> {
let clients = config
.endpoints
.iter()
.map(EmilyClient::try_from)
.map(|url| {
EmilyClient::try_new(
url,
config.pagination_timeout,
config.page_size.map(|size| size.get() as i32), // The autogenerated EmilyClient uses i32 for page_size
)
})
.collect::<Result<Vec<_>, _>>()?;

Self::new(clients).map_err(Into::into)
Expand All @@ -442,7 +479,7 @@ mod tests {
// Arrange.
let url = Url::parse("http://test_key@localhost:8080").unwrap();
// Act.
let client = EmilyClient::try_from(&url).unwrap();
let client = EmilyClient::try_new(&url, Duration::from_secs(1), None).unwrap();
// Assert.
assert_eq!(client.config.base_path, "http://localhost:8080");
assert_eq!(client.config.api_key.unwrap().key, "test_key");
Expand All @@ -453,7 +490,7 @@ mod tests {
// Arrange.
let url = Url::parse("http://localhost:8080").unwrap();
// Act.
let client = EmilyClient::try_from(&url).unwrap();
let client = EmilyClient::try_new(&url, Duration::from_secs(1), None).unwrap();
// Assert.
assert_eq!(client.config.base_path, "http://localhost:8080");
assert!(client.config.api_key.is_none());
Expand Down
8 changes: 6 additions & 2 deletions signer/tests/integration/block_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,12 @@ async fn block_observer_stores_donation_and_sbtc_utxos() {
let (rpc, faucet) = regtest::initialize_blockchain();

// We need to populate our databases, so let's fetch the data.
let emily_client =
EmilyClient::try_from(&Url::parse("http://testApiKey@localhost:3031").unwrap()).unwrap();
let emily_client = EmilyClient::try_new(
&Url::parse("http://testApiKey@localhost:3031").unwrap(),
Duration::from_secs(1),
None,
)
.unwrap();

testing_api::wipe_databases(emily_client.config())
.await
Expand Down
93 changes: 89 additions & 4 deletions signer/tests/integration/emily.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use clarity::types::chainstate::BurnchainHeaderHash;
use emily_client::apis::deposit_api;
use emily_client::apis::testing_api::wipe_databases;
use emily_client::models::CreateDepositRequestBody;
use futures::future::join_all;
use sbtc::testing::regtest::Recipient;
use signer::bitcoin::rpc::BitcoinTxInfo;
use signer::bitcoin::rpc::GetTxResponse;
Expand Down Expand Up @@ -147,8 +148,12 @@ async fn deposit_flow() {
let network = network::in_memory::InMemoryNetwork::new();
let signer_info = testing::wsts::generate_signer_info(&mut rng, num_signers);

let emily_client =
EmilyClient::try_from(&Url::parse("http://testApiKey@localhost:3031").unwrap()).unwrap();
let emily_client = EmilyClient::try_new(
&Url::parse("http://testApiKey@localhost:3031").unwrap(),
Duration::from_secs(1),
None,
)
.unwrap();
let stacks_client = WrappedMock::default();

// Wipe the Emily database to start fresh
Expand Down Expand Up @@ -564,8 +569,12 @@ async fn get_deposit_request_works() {
let amount_sats = 49_900_000;
let lock_time = 150;

let emily_client =
EmilyClient::try_from(&Url::parse("http://testApiKey@localhost:3031").unwrap()).unwrap();
let emily_client = EmilyClient::try_new(
&Url::parse("http://testApiKey@localhost:3031").unwrap(),
Duration::from_secs(1),
None,
)
.unwrap();

wipe_databases(&emily_client.config())
.await
Expand Down Expand Up @@ -596,3 +605,79 @@ async fn get_deposit_request_works() {
let request = emily_client.get_deposit(&txid, 50).await.unwrap();
assert!(request.is_none());
}

#[tokio::test]
async fn get_deposits_request_handles_paging() {
let max_fee: u64 = 15000;
let amount_sats = 49_900_000;
let lock_time = 150;

let num_deposits: usize = 3;
let emily_client = EmilyClient::try_new(
&Url::parse("http://testApiKey@localhost:3031").unwrap(),
Duration::from_secs(10),
Some(2), // Limits to 2 results per page
)
.unwrap();

wipe_databases(&emily_client.config())
.await
.expect("Wiping Emily database in test setup failed.");
let futures = (0..num_deposits).map(|_| {
let setup = sbtc::testing::deposits::tx_setup(lock_time, max_fee, amount_sats);

let create_deposit_request_body = CreateDepositRequestBody {
bitcoin_tx_output_index: 0,
bitcoin_txid: setup.tx.compute_txid().to_string(),
deposit_script: setup.deposit.deposit_script().to_hex_string(),
reclaim_script: setup.reclaim.reclaim_script().to_hex_string(),
};

deposit_api::create_deposit(emily_client.config(), create_deposit_request_body)
});
let results = join_all(futures).await;
matteojug marked this conversation as resolved.
Show resolved Hide resolved
for result in results {
result.expect("cannot create emily deposit");
}

let request = emily_client.get_deposits().await.unwrap();
assert_eq!(request.len(), num_deposits);
}

#[tokio::test]
async fn get_deposits_request_handles_timeout() {
matteojug marked this conversation as resolved.
Show resolved Hide resolved
let max_fee: u64 = 15000;
let amount_sats = 49_900_000;
let lock_time = 150;
let num_deposits = 3;

let emily_client = EmilyClient::try_new(
&Url::parse("http://testApiKey@localhost:3031").unwrap(),
Duration::ZERO, // Zero will cause a timeout right after the first response is received
Some(2), // Limits to 2 results per page
)
.unwrap();

wipe_databases(&emily_client.config())
.await
.expect("Wiping Emily database in test setup failed.");
let futures = (0..num_deposits).map(|_| {
let setup = sbtc::testing::deposits::tx_setup(lock_time, max_fee, amount_sats);

let create_deposit_request_body = CreateDepositRequestBody {
bitcoin_tx_output_index: 0,
bitcoin_txid: setup.tx.compute_txid().to_string(),
deposit_script: setup.deposit.deposit_script().to_hex_string(),
reclaim_script: setup.reclaim.reclaim_script().to_hex_string(),
};

deposit_api::create_deposit(emily_client.config(), create_deposit_request_body)
});
let results = join_all(futures).await;
for result in results {
result.expect("cannot create emily deposit");
}

let request = emily_client.get_deposits().await.unwrap();
assert_eq!(request.len(), 2);
}
10 changes: 8 additions & 2 deletions signer/tests/integration/request_decider.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use emily_client::apis::deposit_api;
use emily_client::apis::testing_api;
use emily_client::models::CreateDepositRequestBody;
Expand Down Expand Up @@ -293,8 +295,12 @@ async fn persist_received_deposit_decision_fetches_missing_deposit_requests() {

let mut rng = rand::rngs::StdRng::seed_from_u64(51);

let emily_client =
EmilyClient::try_from(&Url::parse("http://testApiKey@localhost:3031").unwrap()).unwrap();
let emily_client = EmilyClient::try_new(
&Url::parse("http://testApiKey@localhost:3031").unwrap(),
Duration::from_secs(1),
None,
)
.unwrap();

testing_api::wipe_databases(emily_client.config())
.await
Expand Down
Loading