diff --git a/signer/src/bitcoin/validation.rs b/signer/src/bitcoin/validation.rs index aabe356e3..ce782f74c 100644 --- a/signer/src/bitcoin/validation.rs +++ b/signer/src/bitcoin/validation.rs @@ -322,7 +322,6 @@ impl BitcoinPreSignRequest { tx_fee: Amount::from_sat(tx.tx_fee), reports, chain_tip_height: btc_ctx.chain_tip_height, - // If the cap is None, then we assume that it is unlimited. sbtc_limits: ctx.state().get_current_limits(), }; diff --git a/signer/src/bitcoin/zmq.rs b/signer/src/bitcoin/zmq.rs index b722efc27..ee16270e9 100644 --- a/signer/src/bitcoin/zmq.rs +++ b/signer/src/bitcoin/zmq.rs @@ -50,11 +50,8 @@ pub struct BitcoinCoreMessageStream { } impl BitcoinCoreMessageStream { - /// Create a new one using the endpoint(s) in the config. - pub async fn new_from_endpoint(endpoint: &str, _subscriptions: &[T]) -> Result - where - T: AsRef, - { + /// Create a new one using the given endpoint. + pub async fn new_from_endpoint(endpoint: &str) -> Result { let inner_stream = tokio::time::timeout(Duration::from_secs(10), async { bitcoincore_zmq::subscribe_async_monitor(&[endpoint]) }) diff --git a/signer/src/block_observer.rs b/signer/src/block_observer.rs index a019f9573..55030b7cd 100644 --- a/signer/src/block_observer.rs +++ b/signer/src/block_observer.rs @@ -17,6 +17,7 @@ //! - Update signer set transactions //! - Set aggregate key transactions +use std::collections::BTreeSet; use std::future::Future; use std::time::Duration; @@ -29,6 +30,7 @@ use crate::context::SbtcLimits; use crate::context::SignerEvent; use crate::emily_client::EmilyInteract; use crate::error::Error; +use crate::keys::PublicKey; use crate::metrics::Metrics; use crate::metrics::BITCOIN_BLOCKCHAIN; use crate::stacks::api::GetNakamotoStartHeight as _; @@ -157,8 +159,9 @@ where tracing::warn!(%error, "could not process stacks blocks"); } - if let Err(error) = self.update_sbtc_limits().await { - tracing::warn!(%error, "could not update sBTC limits"); + tracing::debug!("updating the signer state"); + if let Err(error) = self.update_signer_state(block_hash).await { + tracing::warn!(%error, "could not update the signer state"); continue; } @@ -577,6 +580,101 @@ impl BlockObserver { } Ok(()) } + + /// Update the `SignerState` object with current signer set and + /// aggregate key data. + /// + /// # Notes + /// + /// The query used for fetching the cached information can take quite a + /// lot of some time to complete on mainnet. So this function updates + /// the signers state once so that the other event loops do not need to + /// execute them. The cached information is: + /// + /// * The current signer set. It gets this information from the last + /// successful key-rotation contract call if it exists. If such a + /// contract call does not exist this function uses the latest DKG + /// shares, and if that doesn't exist it uses the bootstrap signing + /// set from the configuration. + /// * The current aggregate key. It gets this information from the last + /// successful key-rotation contract call if it exists, and from the + /// latest DKG shares if no such contract call can be found. + async fn set_signer_set_and_aggregate_key(&self, chain_tip: BlockHash) -> Result<(), Error> { + let (aggregate_key, public_keys) = + get_signer_set_and_aggregate_key(&self.context, chain_tip).await?; + + let state = self.context.state(); + if let Some(aggregate_key) = aggregate_key { + state.set_current_aggregate_key(aggregate_key); + } + + state.update_current_signer_set(public_keys); + Ok(()) + } + + /// Update the `SignerState` object with data that is unlikely to + /// change until the arrival of the next bitcoin block. + /// + /// # Notes + /// + /// The function updates the following: + /// * sBTC limits from Emily. + /// * The current signer set. + /// * The current aggregate key. + async fn update_signer_state(&self, chain_tip: BlockHash) -> Result<(), Error> { + tracing::info!("loading sbtc limits from Emily"); + self.update_sbtc_limits().await?; + + tracing::info!("updating the signer state with the current signer set"); + self.set_signer_set_and_aggregate_key(chain_tip).await + } +} + +/// Return the signing set that can make sBTC related contract calls along +/// with the current aggregate key to use for locking UTXOs on bitcoin. +/// +/// The aggregate key fetched here is the one confirmed on the canonical +/// Stacks blockchain as part of a `rotate-keys` contract call. It will be +/// the public key that is the result of a DKG run. If there are no +/// rotate-keys transactions on the canonical stacks blockchain, then we +/// fall back on the last known DKG shares row in our database, and return +/// None as the aggregate key if no DKG shares can be found, implying that +/// this signer has not participated in DKG. +#[tracing::instrument(skip_all)] +pub async fn get_signer_set_and_aggregate_key( + context: &C, + chain_tip: B, +) -> Result<(Option, BTreeSet), Error> +where + C: Context, + B: Into, +{ + let db = context.get_storage(); + let chain_tip = chain_tip.into(); + + // We are supposed to submit a rotate-keys transaction after running + // DKG, but that transaction may not have been submitted yet (if we + // have just run DKG) or it may not have been confirmed on the + // canonical Stacks blockchain. + // + // If the signers have already run DKG, then we know that all + // participating signers should have the same view of the latest + // aggregate key, so we can fall back on the stored DKG shares for + // getting the current aggregate key and associated signing set. + match db.get_last_key_rotation(&chain_tip).await? { + Some(last_key) => { + let aggregate_key = last_key.aggregate_key; + let signer_set = last_key.signer_set.into_iter().collect(); + Ok((Some(aggregate_key), signer_set)) + } + None => match db.get_latest_encrypted_dkg_shares().await? { + Some(shares) => { + let signer_set = shares.signer_set_public_keys.into_iter().collect(); + Ok((Some(shares.aggregate_key), signer_set)) + } + None => Ok((None, context.config().signer.bootstrap_signing_set())), + }, + } } #[cfg(test)] diff --git a/signer/src/context/signer_state.rs b/signer/src/context/signer_state.rs index aefdd53a5..f1d87a656 100644 --- a/signer/src/context/signer_state.rs +++ b/signer/src/context/signer_state.rs @@ -1,5 +1,6 @@ //! Module for signer state +use std::collections::BTreeSet; use std::sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, RwLock, @@ -18,6 +19,7 @@ use crate::keys::PublicKey; pub struct SignerState { current_signer_set: SignerSet, current_limits: RwLock, + current_aggregate_key: RwLock>, sbtc_contracts_deployed: AtomicBool, sbtc_bitcoin_start_height: AtomicU64, is_sbtc_bitcoin_start_height_set: AtomicBool, @@ -29,6 +31,38 @@ impl SignerState { &self.current_signer_set } + /// Return the public keys of the current signer set. + pub fn current_signer_public_keys(&self) -> BTreeSet { + self.current_signer_set + .get_signers() + .into_iter() + .map(|signer| signer.public_key) + .collect() + } + + /// Replace the current signer set with the given set of public keys. + pub fn update_current_signer_set(&self, public_keys: BTreeSet) { + self.current_signer_set.replace_signers(public_keys); + } + + /// Return the current aggregate key from the cache. + #[allow(clippy::unwrap_in_result)] + pub fn current_aggregate_key(&self) -> Option { + self.current_aggregate_key + .read() + .expect("BUG: Failed to acquire read lock") + .as_ref() + .copied() + } + + /// Set the current aggregate key to the given public key. + pub fn set_current_aggregate_key(&self, aggregate_key: PublicKey) { + self.current_aggregate_key + .write() + .expect("BUG: Failed to acquire write lock") + .replace(aggregate_key); + } + /// Get the current sBTC limits. pub fn get_current_limits(&self) -> SbtcLimits { // We should never fail to acquire a lock from the RwLock so that it panics. @@ -82,6 +116,7 @@ impl Default for SignerState { Self { current_signer_set: Default::default(), current_limits: RwLock::new(SbtcLimits::zero()), + current_aggregate_key: RwLock::new(None), sbtc_contracts_deployed: Default::default(), sbtc_bitcoin_start_height: Default::default(), is_sbtc_bitcoin_start_height_set: Default::default(), @@ -132,11 +167,6 @@ impl SbtcLimits { } } - /// Create a new `SbtcLimits` object without any limits - pub fn unlimited() -> Self { - Self::new(None, None, None, None, None) - } - /// Create a new `SbtcLimits` object with limits set to zero (fully constraining) pub fn zero() -> Self { Self::new( @@ -177,9 +207,21 @@ impl SbtcLimits { pub fn max_mintable_cap(&self) -> Amount { self.max_mintable_cap.unwrap_or(Amount::MAX_MONEY) } +} + +#[cfg(any(test, feature = "testing"))] +impl SbtcLimits { + /// Create a new `SbtcLimits` object without any limits + pub fn unlimited() -> Self { + Self { + total_cap: Some(Amount::MAX_MONEY), + per_deposit_minimum: Some(Amount::ZERO), + per_deposit_cap: Some(Amount::MAX_MONEY), + per_withdrawal_cap: Some(Amount::MAX_MONEY), + max_mintable_cap: Some(Amount::MAX_MONEY), + } + } - /// TODO: Document this - #[cfg(test)] /// Create a new Self with only the given deposit minimum and maximums /// set. pub fn new_per_deposit(min: u64, max: u64) -> Self { @@ -266,6 +308,38 @@ impl SignerSet { .insert(signer); } + /// Replace the current signer set with the given set of public keys. + pub fn replace_signers(&self, public_keys: BTreeSet) { + let inner_signer_set = self.get_signers(); + + // Get a guard for the peer IDs. + #[allow(clippy::expect_used)] + let mut inner_peer_ids = self + .peer_ids + .write() + .expect("BUG: Failed to acquire write lock"); + + // Get a guard for the Signer objects the signer into the set. + #[allow(clippy::expect_used)] + let mut inner_public_keys = self + .signers + .write() + .expect("BUG: Failed to acquire write lock"); + + // Remove the old signer set + for signer in inner_signer_set { + inner_peer_ids.remove(signer.peer_id()); + inner_public_keys.remove(signer.public_key()); + } + + // Add the new signer set + for public_key in public_keys { + let signer = Signer::new(public_key); + inner_peer_ids.insert(signer.peer_id); + inner_public_keys.insert(signer); + } + } + /// Remove a signer (public key) from the known active signer set. pub fn remove_signer(&self, signer: &PublicKey) { if self.is_signer(signer) { diff --git a/signer/src/main.rs b/signer/src/main.rs index 47446e2f7..c43ccc667 100644 --- a/signer/src/main.rs +++ b/signer/src/main.rs @@ -299,12 +299,10 @@ async fn run_block_observer(ctx: impl Context) -> Result<(), Error> { // TODO: Need to handle multiple endpoints, so some sort of // failover-stream-wrapper. - let stream = BitcoinCoreMessageStream::new_from_endpoint( - config.bitcoin.block_hash_stream_endpoints[0].as_str(), - &["hashblock"], - ) - .await - .unwrap(); + let endpoint = config.bitcoin.block_hash_stream_endpoints[0].as_str(); + let stream = BitcoinCoreMessageStream::new_from_endpoint(endpoint) + .await + .unwrap(); // TODO: We should have a new() method that builds from the context let block_observer = block_observer::BlockObserver { diff --git a/signer/src/testing/btc.rs b/signer/src/testing/btc.rs index 5d00ed3e8..1d0cc8486 100644 --- a/signer/src/testing/btc.rs +++ b/signer/src/testing/btc.rs @@ -1,6 +1,7 @@ //! Helper functions for the bitcoin module //! use bitcoin::Amount; +use bitcoin::BlockHash; use bitcoin::OutPoint; use bitcoin::ScriptBuf; use bitcoin::Sequence; @@ -10,8 +11,12 @@ use bitcoin::TxOut; use bitcoin::Witness; use emily_client::models::CreateDepositRequestBody; +use futures::StreamExt as _; +use tokio_stream::wrappers::ReceiverStream; use crate::bitcoin::utxo; +use crate::bitcoin::zmq::BitcoinCoreMessageStream; +use crate::error::Error; /// Return a transaction that is kinda like the signers' transaction, /// but it does not service any requests, and it does not have any @@ -56,3 +61,29 @@ impl utxo::DepositRequest { } } } + +/// Create a new BlockHash stream for messages from bitcoin core over the +/// ZMQ interface. +/// +/// The returned object implements Stream + Send + Sync, which is sometimes +/// needed in our integration tests. +/// +/// # Notes +/// +/// This function panics if it cannot establish a connection the bitcoin +/// core in 10 seconds. +pub async fn new_zmq_block_hash_stream(endpoint: &str) -> ReceiverStream> { + let zmq_stream = BitcoinCoreMessageStream::new_from_endpoint(endpoint) + .await + .unwrap(); + + let (sender, receiver) = tokio::sync::mpsc::channel(100); + tokio::spawn(async move { + let mut stream = zmq_stream.to_block_hash_stream(); + while let Some(block) = stream.next().await { + sender.send(block).await.unwrap(); + } + }); + + ReceiverStream::new(receiver) +} diff --git a/signer/src/transaction_coordinator.rs b/signer/src/transaction_coordinator.rs index 473ab2b4c..374667b7c 100644 --- a/signer/src/transaction_coordinator.rs +++ b/signer/src/transaction_coordinator.rs @@ -286,9 +286,8 @@ where // we need to know the aggregate key for constructing bitcoin // transactions. We need to know the current signing set and the // current aggregate key. - let (maybe_aggregate_key, signer_public_keys) = self - .get_signer_set_and_aggregate_key(&bitcoin_chain_tip) - .await?; + let maybe_aggregate_key = self.context.state().current_aggregate_key(); + let signer_public_keys = self.context.state().current_signer_public_keys(); // If we are not the coordinator, then we have no business // coordinating DKG or constructing bitcoin and stacks @@ -310,15 +309,7 @@ where let should_coordinate_dkg = should_coordinate_dkg(&self.context, &bitcoin_chain_tip).await?; let aggregate_key = if should_coordinate_dkg { - let dkg_result = self.coordinate_dkg(&bitcoin_chain_tip).await?; - // TODO: in `run_dkg_from_scratch` test, `dkg_result` differs from - // value fetched from the db. Adding a temporary fix for the (probably) - // race condition, but we should address this properly. - self.get_signer_set_and_aggregate_key(&bitcoin_chain_tip) - .await - .ok() - .and_then(|res| res.0) - .unwrap_or(dkg_result) + self.coordinate_dkg(&bitcoin_chain_tip).await? } else { maybe_aggregate_key.ok_or(Error::MissingAggregateKey(*bitcoin_chain_tip))? }; @@ -1093,7 +1084,7 @@ where // set of the last DKG (either through the last rotate-keys // contract call or from the `dkg_shares` table) so we wind up // never changing the signing set. - let (_, signer_set) = self.get_signer_set_and_aggregate_key(chain_tip).await?; + let signer_set = self.context.state().current_signer_public_keys(); let mut state_machine = CoordinatorStateMachine::new(signer_set, self.threshold, self.private_key); @@ -1157,10 +1148,7 @@ where // this assumes that the signer set doesn't change for the duration of this call, // but we're already assuming that the bitcoin chain tip doesn't change // alternately we could hit the DB every time we get a new message - let (_, signer_set) = self - .get_signer_set_and_aggregate_key(bitcoin_chain_tip) - .await?; - + let signer_set = self.context.state().current_signer_public_keys(); tokio::pin!(signal_stream); coordinator_state_machine.save(); @@ -1397,50 +1385,6 @@ where })) } - /// Return the signing set that can make sBTC related contract calls - /// along with the current aggregate key to use for locking UTXOs on - /// bitcoin. - /// - /// The aggregate key fetched here is the one confirmed on the - /// canonical Stacks blockchain as part of a `rotate-keys` contract - /// call. It will be the public key that is the result of a DKG run. If - /// there are no rotate-keys transactions on the canonical stacks - /// blockchain, then we fall back on the last known DKG shares row in - /// our database, and return None as the aggregate key if no DKG shares - /// can be found, implying that this signer has not participated in - /// DKG. - #[tracing::instrument(skip_all)] - pub async fn get_signer_set_and_aggregate_key( - &self, - bitcoin_chain_tip: &model::BitcoinBlockHash, - ) -> Result<(Option, BTreeSet), Error> { - let db = self.context.get_storage(); - - // We are supposed to submit a rotate-keys transaction after - // running DKG, but that transaction may not have been submitted - // yet (if we have just run DKG) or it may not have been confirmed - // on the canonical Stacks blockchain. - // - // If the signers have already run DKG, then we know that all - // participating signers should have the same view of the latest - // aggregate key, so we can fall back on the stored DKG shares for - // getting the current aggregate key and associated signing set. - match db.get_last_key_rotation(bitcoin_chain_tip).await? { - Some(last_key) => { - let aggregate_key = last_key.aggregate_key; - let signer_set = last_key.signer_set.into_iter().collect(); - Ok((Some(aggregate_key), signer_set)) - } - None => match db.get_latest_encrypted_dkg_shares().await? { - Some(shares) => { - let signer_set = shares.signer_set_public_keys.into_iter().collect(); - Ok((Some(shares.aggregate_key), signer_set)) - } - None => Ok((None, self.context.config().signer.bootstrap_signing_set())), - }, - } - } - fn pub_key(&self) -> PublicKey { PublicKey::from_private_key(&self.private_key) } diff --git a/signer/src/transaction_signer.rs b/signer/src/transaction_signer.rs index 5ffcb63b6..829445e03 100644 --- a/signer/src/transaction_signer.rs +++ b/signer/src/transaction_signer.rs @@ -5,7 +5,6 @@ //! //! For more details, see the [`TxSignerEventLoop`] documentation. -use std::collections::BTreeSet; use std::time::Duration; use crate::bitcoin::validation::BitcoinTxContext; @@ -318,7 +317,7 @@ where .is_some(); let is_canonical = msg_bitcoin_chain_tip == &chain_tip; - let signer_set = self.get_signer_public_keys(&chain_tip).await?; + let signer_set = self.context.state().current_signer_public_keys(); let sender_is_coordinator = crate::transaction_coordinator::given_key_is_coordinator( msg_sender, &chain_tip, @@ -357,9 +356,7 @@ where .map_err(|_| Error::NoChainTip)? .ok_or_else(|| Error::NoChainTip)?; - let (maybe_aggregate_key, _signer_set) = self - .get_signer_set_and_aggregate_key(bitcoin_chain_tip) - .await?; + let maybe_aggregate_key = self.context.state().current_aggregate_key(); let btc_ctx = BitcoinTxContext { chain_tip: *bitcoin_chain_tip, @@ -524,7 +521,7 @@ where // and configuration. assert_allow_dkg_begin(&self.context, bitcoin_chain_tip).await?; - let signer_public_keys = self.get_signer_public_keys(bitcoin_chain_tip).await?; + let signer_public_keys = self.context.state().current_signer_public_keys(); let state_machine = SignerStateMachine::new( signer_public_keys, @@ -815,73 +812,6 @@ where Ok(()) } - /// Return the signing set that can make sBTC related contract calls - /// along with the current aggregate key to use for locking UTXOs on - /// bitcoin. - /// - /// The aggregate key fetched here is the one confirmed on the - /// canonical Stacks blockchain as part of a `rotate-keys` contract - /// call. It will be the public key that is the result of a DKG run. If - /// there are no rotate-keys transactions on the canonical stacks - /// blockchain, then we fall back on the last known DKG shares row in - /// our database, and return None as the aggregate key if no DKG shares - /// can be found, implying that this signer has not participated in - /// DKG. - #[tracing::instrument(skip_all)] - pub async fn get_signer_set_and_aggregate_key( - &self, - bitcoin_chain_tip: &model::BitcoinBlockHash, - ) -> Result<(Option, BTreeSet), Error> { - let db = self.context.get_storage(); - - // We are supposed to submit a rotate-keys transaction after - // running DKG, but that transaction may not have been submitted - // yet (if we have just run DKG) or it may not have been confirmed - // on the canonical Stacks blockchain. - // - // If the signers have already run DKG, then we know that all - // participating signers should have the same view of the latest - // aggregate key, so we can fall back on the stored DKG shares for - // getting the current aggregate key and associated signing set. - match db.get_last_key_rotation(bitcoin_chain_tip).await? { - Some(last_key) => { - let aggregate_key = last_key.aggregate_key; - let signer_set = last_key.signer_set.into_iter().collect(); - Ok((Some(aggregate_key), signer_set)) - } - None => match db.get_latest_encrypted_dkg_shares().await? { - Some(shares) => { - let signer_set = shares.signer_set_public_keys.into_iter().collect(); - Ok((Some(shares.aggregate_key), signer_set)) - } - None => Ok((None, self.context.config().signer.bootstrap_signing_set())), - }, - } - } - - /// Get the set of public keys for the current signing set. - /// - /// If there is a successful `rotate-keys` transaction in the database - /// then we should use that as the source of truth for the current - /// signing set, otherwise we fall back to the bootstrap keys in our - /// config. - #[tracing::instrument(skip_all)] - pub async fn get_signer_public_keys( - &self, - chain_tip: &model::BitcoinBlockHash, - ) -> Result, Error> { - let db = self.context.get_storage(); - - // Get the last rotate-keys transaction from the database on the - // canonical Stacks blockchain (which we identify using the - // canonical bitcoin blockchain). If we don't have such a - // transaction then get the bootstrap keys from our config. - match db.get_last_key_rotation(chain_tip).await? { - Some(last_key) => Ok(last_key.signer_set.into_iter().collect()), - None => Ok(self.context.config().signer.bootstrap_signing_set()), - } - } - fn signer_public_key(&self) -> PublicKey { PublicKey::from_private_key(&self.signer_private_key) } diff --git a/signer/tests/integration/block_observer.rs b/signer/tests/integration/block_observer.rs index 2e47e6f0d..a8208f198 100644 --- a/signer/tests/integration/block_observer.rs +++ b/signer/tests/integration/block_observer.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::collections::HashSet; use std::ops::Deref; use std::sync::atomic::AtomicBool; @@ -18,20 +19,23 @@ use emily_client::apis::testing_api; use emily_client::models::CreateDepositRequestBody; use fake::Fake as _; use fake::Faker; -use futures::StreamExt; use rand::SeedableRng as _; use sbtc::testing::regtest; use sbtc::testing::regtest::Recipient; use signer::bitcoin::utxo::SbtcRequests; use signer::bitcoin::utxo::SignerBtcState; +use signer::block_observer::get_signer_set_and_aggregate_key; use signer::context::SbtcLimits; use signer::emily_client::EmilyClient; use signer::error::Error; +use signer::keys::PublicKey; use signer::keys::SignerScriptPubKey as _; use signer::stacks::api::TenureBlocks; use signer::storage::model; use signer::storage::model::BitcoinBlockHash; use signer::storage::model::EncryptedDkgShares; +use signer::storage::model::RotateKeysTransaction; +use signer::storage::model::StacksBlock; use signer::storage::model::TxOutput; use signer::storage::model::TxOutputType; use signer::storage::model::TxPrevout; @@ -41,7 +45,6 @@ use signer::storage::DbWrite; use signer::testing::stacks::DUMMY_SORTITION_INFO; use signer::testing::stacks::DUMMY_TENURE_INFO; -use signer::bitcoin::zmq::BitcoinCoreMessageStream; use signer::block_observer::BlockObserver; use signer::context::Context as _; use signer::context::SignerEvent; @@ -51,7 +54,7 @@ use signer::storage::DbRead as _; use signer::testing; use signer::testing::context::TestContext; use signer::testing::context::*; -use tokio_stream::wrappers::ReceiverStream; +use signer::testing::storage::model::TestData; use url::Url; use crate::setup::TestSweepSetup; @@ -146,24 +149,9 @@ async fn load_latest_deposit_requests_persists_requests_from_past(blocks_ago: u6 let start_flag = Arc::new(AtomicBool::new(false)); let flag = start_flag.clone(); - // We jump through all of these hoops to make sure that the block - // stream object is Send + Sync. - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT).await, }; // We need at least one receiver @@ -264,22 +252,9 @@ async fn link_blocks() { }) .await; - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT).await, }; let mut signal_rx = ctx.get_signal_receiver(); @@ -430,24 +405,9 @@ async fn block_observer_stores_donation_and_sbtc_utxos() { let start_flag = Arc::new(AtomicBool::new(false)); let flag = start_flag.clone(); - // We jump through all of these hoops to make sure that the block - // stream object is Send + Sync. - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(1000); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT).await, }; tokio::spawn(async move { @@ -747,24 +707,9 @@ async fn block_observer_handles_update_limits(deployed: bool, sbtc_limits: SbtcL let start_flag = Arc::new(AtomicBool::new(false)); let flag = start_flag.clone(); - // We jump through all of these hoops to make sure that the block - // stream object is Send + Sync. - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT).await, }; let mut signal_receiver = ctx.get_signal_receiver(); @@ -906,3 +851,323 @@ async fn next_headers_to_process_ignores_known_headers() { testing::storage::drop_db(db).await; } + +/// The [`get_signer_set_and_aggregate_key`] function is supposed to fetch +/// the "current" signing set and the aggregate key to use for bitcoin +/// transactions. It attempts to get the latest rotate-keys contract call +/// transaction confirmed on the canonical Stacks blockchain and falls back +/// to the DKG shares table if no such transaction can be found. +/// +/// This tests that we prefer rotate keys transactions if it's available +/// but will use the DKG shares behavior is indeed the case. +#[tokio::test] +async fn get_signer_public_keys_and_aggregate_key_falls_back() { + let db = testing::storage::new_test_database().await; + + let mut rng = rand::rngs::StdRng::seed_from_u64(51); + + let ctx = TestContext::builder() + .with_storage(db.clone()) + .with_mocked_clients() + .build(); + + // We need stacks blocks for the rotate-keys transactions. + let test_params = testing::storage::model::Params { + num_bitcoin_blocks: 10, + num_stacks_blocks_per_bitcoin_block: 1, + num_deposit_requests_per_block: 0, + num_withdraw_requests_per_block: 0, + num_signers_per_request: 0, + consecutive_blocks: false, + }; + let test_data = TestData::generate(&mut rng, &[], &test_params); + test_data.write_to(&db).await; + + // We always need the chain tip. + let chain_tip = db.get_bitcoin_canonical_chain_tip().await.unwrap().unwrap(); + + // We have no rows in the DKG shares table and no rotate-keys + // transactions, so there should be no aggregate key, since that only + // happens after DKG, but we should always know the current signer set. + let (maybe_aggregate_key, signer_set) = get_signer_set_and_aggregate_key(&ctx, chain_tip) + .await + .unwrap(); + assert!(maybe_aggregate_key.is_none()); + assert!(!signer_set.is_empty()); + + // Alright, lets write some DKG shares into the database. When we do + // that the signer set should be considered whatever the signer set is + // from our DKG shares. + let shares: EncryptedDkgShares = Faker.fake_with_rng(&mut rng); + db.write_encrypted_dkg_shares(&shares).await.unwrap(); + + let (aggregate_key, signer_set) = get_signer_set_and_aggregate_key(&ctx, chain_tip) + .await + .unwrap(); + + let shares_signer_set: BTreeSet = + shares.signer_set_public_keys.iter().copied().collect(); + + assert_eq!(shares.aggregate_key, aggregate_key.unwrap()); + assert_eq!(shares_signer_set, signer_set); + + // Okay now we write a rotate-keys transaction into the database. To do + // that we need the stacks chain tip, and a something in 3 different + // tables... + let stacks_chain_tip = db.get_stacks_chain_tip(&chain_tip).await.unwrap().unwrap(); + + let rotate_keys: RotateKeysTransaction = Faker.fake_with_rng(&mut rng); + let transaction = model::Transaction { + txid: rotate_keys.txid.into_bytes(), + tx: Vec::new(), + tx_type: model::TransactionType::RotateKeys, + block_hash: stacks_chain_tip.block_hash.into_bytes(), + }; + let tx = model::StacksTransaction { + txid: rotate_keys.txid, + block_hash: stacks_chain_tip.block_hash, + }; + + db.write_transaction(&transaction).await.unwrap(); + db.write_stacks_transaction(&tx).await.unwrap(); + db.write_rotate_keys_transaction(&rotate_keys) + .await + .unwrap(); + + // Alright, now that we have a rotate-keys transaction, we can check if + // it is preferred over the DKG shares table. + let (aggregate_key, signer_set) = get_signer_set_and_aggregate_key(&ctx, chain_tip) + .await + .unwrap(); + + let rotate_keys_signer_set: BTreeSet = + rotate_keys.signer_set.iter().copied().collect(); + + assert_eq!(rotate_keys.aggregate_key, aggregate_key.unwrap()); + assert_eq!(rotate_keys_signer_set, signer_set); + + testing::storage::drop_db(db).await; +} + +/// This test checks that the signer state is updated with the latest the +/// sbtc limits, current signer set, and current aggregate key after the +/// block observer processes a bitcoin block. +#[tokio::test] +async fn block_observer_updates_state_after_observing_bitcoin_block() { + let mut rng = rand::rngs::StdRng::seed_from_u64(512); + // We start with the typical setup with a fresh database and context + // with a real bitcoin core client and a real connection to our + // database. + let (_, faucet) = regtest::initialize_blockchain(); + let db = testing::storage::new_test_database().await; + let mut ctx = TestContext::builder() + .with_storage(db.clone()) + .with_first_bitcoin_core_client() + .with_mocked_emily_client() + .with_mocked_stacks_client() + .build(); + + // We need to set up the stacks client as well. We use it to fetch + // information about the Stacks blockchain, so we need to prep it, even + // though it isn't necessary for our test. + ctx.with_stacks_client(|client| { + client + .expect_get_tenure_info() + .returning(|| Box::pin(std::future::ready(Ok(DUMMY_TENURE_INFO.clone())))); + client.expect_get_block().returning(|_| { + let response = Ok(NakamotoBlock { + header: NakamotoBlockHeader::empty(), + txs: Vec::new(), + }); + Box::pin(std::future::ready(response)) + }); + client + .expect_get_tenure() + .returning(|_| Box::pin(std::future::ready(TenureBlocks::nearly_empty()))); + client.expect_get_pox_info().returning(|| { + let response = serde_json::from_str::(GET_POX_INFO_JSON) + .map_err(Error::JsonSerialize); + Box::pin(std::future::ready(response)) + }); + client + .expect_get_sortition_info() + .returning(|_| Box::pin(std::future::ready(Ok(DUMMY_SORTITION_INFO.clone())))); + }) + .await; + + ctx.with_emily_client(|client| { + client + .expect_get_deposits() + .returning(|| Box::pin(std::future::ready(Ok(vec![])))); + + client + .expect_get_limits() + .returning(|| Box::pin(std::future::ready(Ok(SbtcLimits::unlimited())))); + }) + .await; + + // We only proceed with the test after the BlockObserver "process" has + // started, and we use this counter to notify us when that happens. + let start_flag = Arc::new(AtomicBool::new(false)); + let flag = start_flag.clone(); + + let block_observer = BlockObserver { + context: ctx.clone(), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT).await, + }; + + // In this test the signer set public keys start empty. When running + // the signer binary the signer starts as the bootstrap signing set. + // Also, the sbtc limits start off as "zero" and then get updated by + // the block observer. + let state = ctx.state(); + assert_eq!(state.get_current_limits(), SbtcLimits::zero()); + assert!(state.current_signer_public_keys().is_empty()); + assert!(state.current_aggregate_key().is_none()); + + tokio::spawn(async move { + flag.store(true, Ordering::Relaxed); + block_observer.run().await + }); + + // Wait for the task to start. + while !start_flag.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + // Let's generate a new block and wait for our block observer to send a + // BitcoinBlockObserved signal. + let chain_tip = faucet.generate_blocks(1).pop().unwrap().into(); + + ctx.wait_for_signal(Duration::from_secs(3), |signal| { + matches!( + signal, + SignerSignal::Event(SignerEvent::BitcoinBlockObserved) + ) + }) + .await + .unwrap(); + + // If we pass the above without panicking it should be fine, this is just a + // sanity check. + let db_chain_tip = db + .get_bitcoin_canonical_chain_tip() + .await + .expect("cannot get chain tip"); + assert_eq!(db_chain_tip, Some(chain_tip)); + + // There is no aggregate key since there aren't any key rotation + // contract calls and no DKG shares. But the current signer set should + // be the bootstrap signing set now. + let bootstrap_signing_set = ctx.config().signer.bootstrap_signing_set(); + assert_eq!(state.get_current_limits(), SbtcLimits::unlimited()); + assert!(state.current_aggregate_key().is_none()); + assert_eq!(state.current_signer_public_keys(), bootstrap_signing_set); + + // Okay now let's add in some DKG shares into the database. This should + // take precedence over what is configured as the bootstrap signing + // set. + let mut dkg_shares: EncryptedDkgShares = Faker.fake_with_rng(&mut rng); + let mut public_keys: Vec = std::iter::repeat_with(|| Faker.fake_with_rng(&mut rng)) + .take(12) + .collect(); + public_keys.sort(); + dkg_shares.signer_set_public_keys = public_keys; + db.write_encrypted_dkg_shares(&dkg_shares).await.unwrap(); + + // Sanity check that the signing set in the DKG shares are different + // from the bootstrap signing set. + let dkg_public_keys = dkg_shares.signer_set_public_keys.iter().copied().collect(); + assert_ne!(dkg_public_keys, bootstrap_signing_set); + + // Let's generate a new block and wait for our block observer to send a + // BitcoinBlockObserved signal. Then after we received the signal that + // a bitcoin block has been observed we check the signer state. + let chain_tip = faucet.generate_blocks(1).pop().unwrap().into(); + + ctx.wait_for_signal(Duration::from_secs(3), |signal| { + matches!( + signal, + SignerSignal::Event(SignerEvent::BitcoinBlockObserved) + ) + }) + .await + .unwrap(); + + // Check that the chain tip has been updated. + let db_chain_tip = db + .get_bitcoin_canonical_chain_tip() + .await + .expect("cannot get chain tip"); + assert_eq!(db_chain_tip, Some(chain_tip)); + + let dkg_aggregate_key = Some(dkg_shares.aggregate_key); + assert_eq!(state.get_current_limits(), SbtcLimits::unlimited()); + assert_eq!(state.current_aggregate_key(), dkg_aggregate_key); + assert_eq!(state.current_signer_public_keys(), dkg_public_keys); + + // Okay now we're going to show what happens if we have received a key + // rotation event. Such events take priority over DKG shares, even if + // the DKG shares are newer. So let's add such an event to the + // database. First we need a stacks block for the join. + let stacks_block = StacksBlock { + bitcoin_anchor: chain_tip, + ..Faker.fake_with_rng(&mut rng) + }; + + db.write_stacks_block(&stacks_block).await.unwrap(); + + let rotate_keys: RotateKeysTransaction = Faker.fake_with_rng(&mut rng); + let transaction = model::Transaction { + txid: rotate_keys.txid.into_bytes(), + tx: Vec::new(), + tx_type: model::TransactionType::RotateKeys, + block_hash: stacks_block.block_hash.into_bytes(), + }; + let tx = model::StacksTransaction { + txid: rotate_keys.txid, + block_hash: stacks_block.block_hash, + }; + + db.write_transaction(&transaction).await.unwrap(); + db.write_stacks_transaction(&tx).await.unwrap(); + db.write_rotate_keys_transaction(&rotate_keys) + .await + .unwrap(); + + // Let's add some DKG shares after the insertion of the rotate keys + // transaction. + let dkg_shares: EncryptedDkgShares = Faker.fake_with_rng(&mut rng); + db.write_encrypted_dkg_shares(&dkg_shares).await.unwrap(); + + // Let's generate a new block and wait for our block observer to send a + // BitcoinBlockObserved signal. + let chain_tip = faucet.generate_blocks(1).pop().unwrap().into(); + + ctx.wait_for_signal(Duration::from_secs(3), |signal| { + matches!( + signal, + SignerSignal::Event(SignerEvent::BitcoinBlockObserved) + ) + }) + .await + .unwrap(); + + let db_chain_tip = db + .get_bitcoin_canonical_chain_tip() + .await + .expect("cannot get chain tip"); + assert_eq!(db_chain_tip, Some(chain_tip)); + + // We expect the signer state to be the same as what is in the rotate + // keys event in the database. + let rotate_keys_aggregate_key = Some(rotate_keys.aggregate_key); + let rotate_keys_public_keys = rotate_keys.signer_set.iter().copied().collect(); + + assert_eq!(state.current_aggregate_key(), rotate_keys_aggregate_key); + assert_eq!(state.current_signer_public_keys(), rotate_keys_public_keys); + assert_ne!(rotate_keys_public_keys, dkg_public_keys); + assert_ne!(rotate_keys_aggregate_key, dkg_aggregate_key); + + testing::storage::drop_db(db).await; +} diff --git a/signer/tests/integration/transaction_coordinator.rs b/signer/tests/integration/transaction_coordinator.rs index 3d061bea2..415fd5d23 100644 --- a/signer/tests/integration/transaction_coordinator.rs +++ b/signer/tests/integration/transaction_coordinator.rs @@ -31,7 +31,7 @@ use emily_client::apis::deposit_api; use emily_client::apis::testing_api; use fake::Fake as _; use fake::Faker; -use futures::StreamExt; +use futures::StreamExt as _; use lru::LruCache; use mockito; use rand::rngs::OsRng; @@ -72,10 +72,8 @@ use stacks_common::types::chainstate::SortitionId; use test_case::test_case; use test_log::test; use tokio_stream::wrappers::BroadcastStream; -use tokio_stream::wrappers::ReceiverStream; use url::Url; -use signer::bitcoin::zmq::BitcoinCoreMessageStream; use signer::block_observer::BlockObserver; use signer::context::Context; use signer::emily_client::EmilyClient; @@ -92,7 +90,6 @@ use signer::stacks::contracts::CompleteDepositV1; use signer::stacks::contracts::SMART_CONTRACTS; use signer::storage::model; use signer::storage::model::EncryptedDkgShares; -use signer::storage::model::RotateKeysTransaction; use signer::storage::DbRead as _; use signer::storage::DbWrite as _; use signer::testing; @@ -472,11 +469,6 @@ async fn process_complete_deposit() { .expect_estimate_fees() .once() .returning(move |_, _, _| Box::pin(async move { Ok(25505) })); - - client - .expect_get_sbtc_total_supply() - .once() - .returning(move |_| Box::pin(async move { Ok(Amount::ZERO) })); }) .await; @@ -493,6 +485,17 @@ async fn process_complete_deposit() { let (aggregate_key, bitcoin_chain_tip) = run_dkg(&context, &mut rng, &mut testing_signer_set).await; + // When the signer binary starts up in main(), it sets the current + // signer set public keys in the context state using the values in the + // bootstrap_signing_set configuration parameter. Later, the aggregate + // key gets set in the block observer. We're not running a block + // observer in this test, nor are we going through main, so we manually + // update the state here. + let signer_set_public_keys = testing_signer_set.signer_keys().into_iter().collect(); + let state = context.state(); + state.update_current_signer_set(signer_set_public_keys); + state.set_current_aggregate_key(aggregate_key); + // Ensure we have a signers UTXO (as a donation, to not mess with the current // temporary `get_swept_deposit_requests` implementation) push_utxo_donation(&context, &aggregate_key, &setup.sweep_block_hash).await; @@ -776,122 +779,6 @@ async fn deploy_smart_contracts_coordinator( testing::storage::drop_db(db).await; } -/// The [`TxCoordinatorEventLoop::get_signer_set_and_aggregate_key`] -/// function is supposed to fetch the "current" signing set and the -/// aggregate key to use for bitcoin transactions. It attempts to get the -/// latest rotate-keys contract call transaction confirmed on the canonical -/// Stacks blockchain and falls back to the DKG shares table if no such -/// transaction can be found. -/// -/// This tests that we prefer rotate keys transactions if it's available -/// but will use the DKG shares behavior is indeed the case. -#[tokio::test] -async fn get_signer_public_keys_and_aggregate_key_falls_back() { - let db = testing::storage::new_test_database().await; - - let mut rng = rand::rngs::StdRng::seed_from_u64(51); - - let ctx = TestContext::builder() - .with_storage(db.clone()) - .with_mocked_clients() - .build(); - - let network = InMemoryNetwork::new(); - - ctx.state().set_sbtc_contracts_deployed(); // Skip contract deployment - let coord = TxCoordinatorEventLoop { - network: network.connect(), - context: ctx.clone(), - context_window: 10000, - private_key: ctx.config().signer.private_key, - signing_round_max_duration: Duration::from_secs(10), - bitcoin_presign_request_max_duration: Duration::from_secs(10), - threshold: 2, - dkg_max_duration: Duration::from_secs(10), - is_epoch3: true, - }; - - // We need stacks blocks for the rotate-keys transactions. - let test_params = testing::storage::model::Params { - num_bitcoin_blocks: 10, - num_stacks_blocks_per_bitcoin_block: 1, - num_deposit_requests_per_block: 0, - num_withdraw_requests_per_block: 0, - num_signers_per_request: 0, - consecutive_blocks: false, - }; - let test_data = TestData::generate(&mut rng, &[], &test_params); - test_data.write_to(&db).await; - - // We always need the chain tip. - let chain_tip = db.get_bitcoin_canonical_chain_tip().await.unwrap().unwrap(); - - // We have no rows in the DKG shares table and no rotate-keys - // transactions, so there should be no aggregate key, since that only - // happens after DKG, but we should always know the current signer set. - let (maybe_aggregate_key, signer_set) = coord - .get_signer_set_and_aggregate_key(&chain_tip) - .await - .unwrap(); - assert!(maybe_aggregate_key.is_none()); - assert!(!signer_set.is_empty()); - - // Alright, lets write some DKG shares into the database. When we do - // that the signer set should be considered whatever the signer set is - // from our DKG shares. - let shares: EncryptedDkgShares = Faker.fake_with_rng(&mut rng); - db.write_encrypted_dkg_shares(&shares).await.unwrap(); - - let (aggregate_key, signer_set) = coord - .get_signer_set_and_aggregate_key(&chain_tip) - .await - .unwrap(); - - let shares_signer_set: BTreeSet = - shares.signer_set_public_keys.iter().copied().collect(); - - assert_eq!(shares.aggregate_key, aggregate_key.unwrap()); - assert_eq!(shares_signer_set, signer_set); - - // Okay not we write a rotate-keys transaction into the database. To do - // that we need the stacks chain tip, and a something in 3 different - // tables... - let stacks_chain_tip = db.get_stacks_chain_tip(&chain_tip).await.unwrap().unwrap(); - - let rotate_keys: RotateKeysTransaction = Faker.fake_with_rng(&mut rng); - let transaction = model::Transaction { - txid: rotate_keys.txid.into_bytes(), - tx: Vec::new(), - tx_type: model::TransactionType::RotateKeys, - block_hash: stacks_chain_tip.block_hash.into_bytes(), - }; - let tx = model::StacksTransaction { - txid: rotate_keys.txid, - block_hash: stacks_chain_tip.block_hash, - }; - - db.write_transaction(&transaction).await.unwrap(); - db.write_stacks_transaction(&tx).await.unwrap(); - db.write_rotate_keys_transaction(&rotate_keys) - .await - .unwrap(); - - // Alright, now that we have a rotate-keys transaction, we can check if - // it is preferred over the DKG shares table. - let (aggregate_key, signer_set) = coord - .get_signer_set_and_aggregate_key(&chain_tip) - .await - .unwrap(); - - let rotate_keys_signer_set: BTreeSet = - rotate_keys.signer_set.iter().copied().collect(); - - assert_eq!(rotate_keys.aggregate_key, aggregate_key.unwrap()); - assert_eq!(rotate_keys_signer_set, signer_set); - - testing::storage::drop_db(db).await; -} - /// Test that we run DKG if the coordinator notices that DKG has not been /// run yet. /// @@ -947,6 +834,11 @@ async fn run_dkg_from_scratch() { let network = WanNetwork::default(); let mut signers: Vec<_> = Vec::new(); + let signer_set_public_keys: BTreeSet = signer_key_pairs + .iter() + .map(|kp| kp.public_key().into()) + .collect(); + for (kp, data) in iter { let broadcast_stacks_tx = broadcast_stacks_tx.clone(); let db = testing::storage::new_test_database().await; @@ -955,6 +847,15 @@ async fn run_dkg_from_scratch() { .with_mocked_clients() .build(); + // When the signer binary starts up in main(), it sets the current + // signer set public keys in the context state using the values in + // the bootstrap_signing_set configuration parameter. Later, the + // state gets updated in the block observer. We're not running a + // block observer in this test, nor are we going through main, so + // we manually update the state here. + ctx.state() + .update_current_signer_set(signer_set_public_keys.clone()); + ctx.with_stacks_client(|client| { client .expect_estimate_fees() @@ -1167,6 +1068,10 @@ async fn run_subsequent_dkg() { // The aggregate key we will use for the first DKG shares entry. let aggregate_key_1: PublicKey = Faker.fake_with_rng(&mut rng); + let signer_set_public_keys: BTreeSet = signer_key_pairs + .iter() + .map(|kp| kp.public_key().into()) + .collect(); for (kp, data) in iter { let broadcast_stacks_tx = broadcast_stacks_tx.clone(); @@ -1180,14 +1085,20 @@ async fn run_subsequent_dkg() { }) .build(); + // When the signer binary starts up in main(), it sets the current + // signer set public keys in the context state using the values in + // the bootstrap_signing_set configuration parameter. Later, this + // state gets updated in the block observer. We're not running a + // block observer in this test, nor are we going through main, so + // we manually update the necessary state here. + ctx.state() + .update_current_signer_set(signer_set_public_keys.clone()); + // Write one DKG shares entry to the signer's database simulating that // DKG has been successfully run once. db.write_encrypted_dkg_shares(&EncryptedDkgShares { aggregate_key: aggregate_key_1, - signer_set_public_keys: signer_key_pairs - .iter() - .map(|kp| kp.public_key().into()) - .collect(), + signer_set_public_keys: signer_set_public_keys.iter().copied().collect(), ..Faker.fake() }) .await @@ -1601,22 +1512,10 @@ async fn sign_bitcoin_transaction() { ev.run().await }); - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT) + .await, }; let counter = start_count.clone(); tokio::spawn(async move { @@ -2042,22 +1941,10 @@ async fn sign_bitcoin_transaction_multiple_locking_keys() { ev.run().await }); - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT) + .await, }; let counter = start_count.clone(); tokio::spawn(async move { @@ -2632,22 +2519,10 @@ async fn skip_smart_contract_deployment_and_key_rotation_if_up_to_date() { ev.run().await }); - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT) + .await, }; let counter = start_count.clone(); tokio::spawn(async move { @@ -3185,6 +3060,10 @@ async fn test_conservative_initial_sbtc_limits() { // - We load the database with a bitcoin blocks going back to some // genesis block. // ========================================================================= + let signer_set_public_keys: BTreeSet = signer_key_pairs + .iter() + .map(|kp| kp.public_key().into()) + .collect(); let mut signers = Vec::new(); for kp in signer_key_pairs.iter() { let db = testing::storage::new_test_database().await; @@ -3202,6 +3081,15 @@ async fn test_conservative_initial_sbtc_limits() { .with_mocked_emily_client() .build(); + // When the signer binary starts up in main(), it sets the current + // signer set public keys in the context state using the values in + // the bootstrap_signing_set configuration parameter. Later, this + // state gets updated in the block observer. We're not running a + // block observer in this test, nor are we going through main, so + // we manually update the necessary state here. + ctx.state() + .update_current_signer_set(signer_set_public_keys.clone()); + let network = network.connect(&ctx); signers.push((ctx, db, kp, network)); @@ -3418,22 +3306,10 @@ async fn test_conservative_initial_sbtc_limits() { ev.run().await }); - let zmq_stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - let mut stream = zmq_stream.to_block_hash_stream(); - while let Some(block) = stream.next().await { - sender.send(block).await.unwrap(); - } - }); - let block_observer = BlockObserver { context: ctx.clone(), - bitcoin_blocks: ReceiverStream::new(receiver), + bitcoin_blocks: testing::btc::new_zmq_block_hash_stream(BITCOIN_CORE_ZMQ_ENDPOINT) + .await, }; let counter = start_count.clone(); tokio::spawn(async move { diff --git a/signer/tests/integration/transaction_signer.rs b/signer/tests/integration/transaction_signer.rs index f395961bd..94824effe 100644 --- a/signer/tests/integration/transaction_signer.rs +++ b/signer/tests/integration/transaction_signer.rs @@ -12,6 +12,7 @@ use signer::bitcoin::utxo::RequestRef; use signer::bitcoin::utxo::Requests; use signer::bitcoin::utxo::UnsignedTransaction; use signer::bitcoin::validation::TxRequestIds; +use signer::block_observer::get_signer_set_and_aggregate_key; use signer::context::Context; use signer::context::SbtcLimits; use signer::error::Error; @@ -28,14 +29,12 @@ use signer::storage::model; use signer::storage::model::BitcoinBlockHash; use signer::storage::model::BitcoinTxId; use signer::storage::model::BitcoinTxSigHash; -use signer::storage::model::RotateKeysTransaction; use signer::storage::model::SigHash; use signer::storage::model::StacksTxId; use signer::storage::DbRead as _; use signer::storage::DbWrite as _; use signer::testing; use signer::testing::context::*; -use signer::testing::storage::model::TestData; use signer::transaction_signer::ChainTipStatus; use signer::transaction_signer::MsgChainTipReport; use signer::transaction_signer::TxSignerEventLoop; @@ -49,100 +48,6 @@ use crate::setup::TestSignerSet; use crate::setup::TestSweepSetup; use crate::setup::TestSweepSetup2; -/// Test that [`TxSignerEventLoop::get_signer_public_keys`] falls back to -/// the bootstrap config if there is no rotate-keys transaction in the -/// database. -#[tokio::test] -async fn get_signer_public_keys_and_aggregate_key_falls_back() { - let db = testing::storage::new_test_database().await; - - let mut rng = rand::rngs::StdRng::seed_from_u64(51); - - let ctx = TestContext::builder() - .with_storage(db.clone()) - .with_mocked_clients() - .build(); - - let network = InMemoryNetwork::new(); - - let coord = TxSignerEventLoop { - network: network.connect(), - context: ctx.clone(), - context_window: 10000, - wsts_state_machines: LruCache::new(NonZeroUsize::new(100).unwrap()), - signer_private_key: ctx.config().signer.private_key, - threshold: 2, - rng: rand::rngs::StdRng::seed_from_u64(51), - dkg_begin_pause: None, - }; - - // We need stacks blocks for the rotate-keys transactions. - let test_params = testing::storage::model::Params { - num_bitcoin_blocks: 10, - num_stacks_blocks_per_bitcoin_block: 1, - num_deposit_requests_per_block: 0, - num_withdraw_requests_per_block: 0, - num_signers_per_request: 0, - consecutive_blocks: false, - }; - let test_data = TestData::generate(&mut rng, &[], &test_params); - test_data.write_to(&db).await; - - // We always need the chain tip. - let chain_tip = db.get_bitcoin_canonical_chain_tip().await.unwrap().unwrap(); - - // We have no transactions in the database, just blocks header hashes - // and block heights. The `get_signer_public_keys` function falls back - // to the config for keys if no rotate-keys transaction can be found. - // So this function almost never errors. - let bootstrap_signer_set = coord.get_signer_public_keys(&chain_tip).await.unwrap(); - // We check that the signer set can form a valid wallet when we load - // the config. In particular, the signing set should not be empty. - assert!(!bootstrap_signer_set.is_empty()); - - let config_signer_set = ctx.config().signer.bootstrap_signing_set(); - assert_eq!(bootstrap_signer_set, config_signer_set); - - // Okay now we write a rotate-keys transaction into the database. To do - // that we need the stacks chain tip, and a something in 3 different - // tables... - let stacks_chain_tip = db.get_stacks_chain_tip(&chain_tip).await.unwrap().unwrap(); - - let rotate_keys: RotateKeysTransaction = Faker.fake_with_rng(&mut rng); - let transaction = model::Transaction { - txid: rotate_keys.txid.into_bytes(), - tx: Vec::new(), - tx_type: model::TransactionType::RotateKeys, - block_hash: stacks_chain_tip.block_hash.into_bytes(), - }; - let tx = model::StacksTransaction { - txid: rotate_keys.txid, - block_hash: stacks_chain_tip.block_hash, - }; - - db.write_transaction(&transaction).await.unwrap(); - db.write_stacks_transaction(&tx).await.unwrap(); - db.write_rotate_keys_transaction(&rotate_keys) - .await - .unwrap(); - - // Alright, now that we have a rotate-keys transaction, we can check if - // it is preferred over the config. - let signer_set: Vec = coord - .get_signer_public_keys(&chain_tip) - .await - .unwrap() - .into_iter() - .collect(); - - let mut rotate_keys_signer_set = rotate_keys.signer_set.clone(); - rotate_keys_signer_set.sort(); - - assert_eq!(rotate_keys_signer_set, signer_set); - - testing::storage::drop_db(db).await; -} - /// Test that [`TxSignerEventLoop::assert_valid_stacks_tx_sign_request`] /// errors when the signer is not in the signer set. #[tokio::test] @@ -257,6 +162,14 @@ pub async fn assert_should_be_able_to_handle_sbtc_requests() { setup.store_deposit_request(&db).await; setup.store_deposit_decisions(&db).await; + let (aggregate_key, signer_set_public_keys) = get_signer_set_and_aggregate_key(&ctx, chain_tip) + .await + .unwrap(); + + let state = ctx.state(); + state.set_current_aggregate_key(aggregate_key.unwrap()); + state.update_current_signer_set(signer_set_public_keys); + // Initialize the transaction signer event loop let network = WanNetwork::default(); @@ -313,9 +226,10 @@ pub async fn assert_should_be_able_to_handle_sbtc_requests() { let mut handle = network.connect(&ctx).spawn(); - let result = tx_signer + tx_signer .handle_bitcoin_pre_sign_request(&sbtc_context, &chain_tip) - .await; + .await + .unwrap(); // check if we are receving an Ack from the signer tokio::time::timeout(Duration::from_secs(2), async move { @@ -324,8 +238,6 @@ pub async fn assert_should_be_able_to_handle_sbtc_requests() { .await .unwrap(); - assert!(result.is_ok()); - // Check that the intentions to sign the requests sighashes // are stored in the database let (will_sign, _) = db @@ -493,6 +405,13 @@ async fn max_one_state_machine_per_bitcoin_block_hash_for_dkg() { let chain_tip: BitcoinBlockHash = rpc.get_best_block_hash().unwrap().into(); backfill_bitcoin_blocks(&db, rpc, &chain_tip).await; + let (_, signer_set_public_keys) = get_signer_set_and_aggregate_key(&ctx, chain_tip) + .await + .unwrap(); + + ctx.state() + .update_current_signer_set(signer_set_public_keys); + // Initialize the transaction signer event loop let network = WanNetwork::default(); let net = network.connect(&ctx); diff --git a/signer/tests/integration/zmq.rs b/signer/tests/integration/zmq.rs index 977230c5d..40f05f5c7 100644 --- a/signer/tests/integration/zmq.rs +++ b/signer/tests/integration/zmq.rs @@ -13,10 +13,9 @@ pub const BITCOIN_CORE_ZMQ_ENDPOINT: &str = "tcp://localhost:28332"; async fn block_stream_streams_blocks() { let (_, faucet) = regtest::initialize_blockchain(); - let stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["rawblock"]) - .await - .unwrap(); + let stream = BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT) + .await + .unwrap(); let mut block_stream = stream.to_block_stream(); @@ -66,10 +65,9 @@ async fn block_stream_streams_blocks() { async fn block_hash_stream_streams_block_hashes() { let (_, faucet) = regtest::initialize_blockchain(); - let stream = - BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT, &["hashblock"]) - .await - .unwrap(); + let stream = BitcoinCoreMessageStream::new_from_endpoint(BITCOIN_CORE_ZMQ_ENDPOINT) + .await + .unwrap(); let mut block_hash_stream = stream.to_block_hash_stream();