Skip to content

Commit

Permalink
feat: cache key-rotation data block observer (#1298)
Browse files Browse the repository at this point in the history
* Add functions for updating the signer state with key information
* Update the signer state at the end of block observer duties
* Simplify the creation of a block observer in tests
  • Loading branch information
djordon authored Feb 4, 2025
1 parent 6bca320 commit 138f911
Show file tree
Hide file tree
Showing 12 changed files with 642 additions and 513 deletions.
1 change: 0 additions & 1 deletion signer/src/bitcoin/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ impl BitcoinPreSignRequest {
tx_fee: Amount::from_sat(tx.tx_fee),
reports,
chain_tip_height: btc_ctx.chain_tip_height,
// If the cap is None, then we assume that it is unlimited.
sbtc_limits: ctx.state().get_current_limits(),
};

Expand Down
7 changes: 2 additions & 5 deletions signer/src/bitcoin/zmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ pub struct BitcoinCoreMessageStream {
}

impl BitcoinCoreMessageStream {
/// Create a new one using the endpoint(s) in the config.
pub async fn new_from_endpoint<T>(endpoint: &str, _subscriptions: &[T]) -> Result<Self, Error>
where
T: AsRef<str>,
{
/// Create a new one using the given endpoint.
pub async fn new_from_endpoint(endpoint: &str) -> Result<Self, Error> {
let inner_stream = tokio::time::timeout(Duration::from_secs(10), async {
bitcoincore_zmq::subscribe_async_monitor(&[endpoint])
})
Expand Down
102 changes: 100 additions & 2 deletions signer/src/block_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! - Update signer set transactions
//! - Set aggregate key transactions
use std::collections::BTreeSet;
use std::future::Future;
use std::time::Duration;

Expand All @@ -29,6 +30,7 @@ use crate::context::SbtcLimits;
use crate::context::SignerEvent;
use crate::emily_client::EmilyInteract;
use crate::error::Error;
use crate::keys::PublicKey;
use crate::metrics::Metrics;
use crate::metrics::BITCOIN_BLOCKCHAIN;
use crate::stacks::api::GetNakamotoStartHeight as _;
Expand Down Expand Up @@ -157,8 +159,9 @@ where
tracing::warn!(%error, "could not process stacks blocks");
}

if let Err(error) = self.update_sbtc_limits().await {
tracing::warn!(%error, "could not update sBTC limits");
tracing::debug!("updating the signer state");
if let Err(error) = self.update_signer_state(block_hash).await {
tracing::warn!(%error, "could not update the signer state");
continue;
}

Expand Down Expand Up @@ -577,6 +580,101 @@ impl<C: Context, B> BlockObserver<C, B> {
}
Ok(())
}

/// Update the `SignerState` object with current signer set and
/// aggregate key data.
///
/// # Notes
///
/// The query used for fetching the cached information can take quite a
/// lot of some time to complete on mainnet. So this function updates
/// the signers state once so that the other event loops do not need to
/// execute them. The cached information is:
///
/// * The current signer set. It gets this information from the last
/// successful key-rotation contract call if it exists. If such a
/// contract call does not exist this function uses the latest DKG
/// shares, and if that doesn't exist it uses the bootstrap signing
/// set from the configuration.
/// * The current aggregate key. It gets this information from the last
/// successful key-rotation contract call if it exists, and from the
/// latest DKG shares if no such contract call can be found.
async fn set_signer_set_and_aggregate_key(&self, chain_tip: BlockHash) -> Result<(), Error> {
let (aggregate_key, public_keys) =
get_signer_set_and_aggregate_key(&self.context, chain_tip).await?;

let state = self.context.state();
if let Some(aggregate_key) = aggregate_key {
state.set_current_aggregate_key(aggregate_key);
}

state.update_current_signer_set(public_keys);
Ok(())
}

/// Update the `SignerState` object with data that is unlikely to
/// change until the arrival of the next bitcoin block.
///
/// # Notes
///
/// The function updates the following:
/// * sBTC limits from Emily.
/// * The current signer set.
/// * The current aggregate key.
async fn update_signer_state(&self, chain_tip: BlockHash) -> Result<(), Error> {
tracing::info!("loading sbtc limits from Emily");
self.update_sbtc_limits().await?;

tracing::info!("updating the signer state with the current signer set");
self.set_signer_set_and_aggregate_key(chain_tip).await
}
}

/// Return the signing set that can make sBTC related contract calls along
/// with the current aggregate key to use for locking UTXOs on bitcoin.
///
/// The aggregate key fetched here is the one confirmed on the canonical
/// Stacks blockchain as part of a `rotate-keys` contract call. It will be
/// the public key that is the result of a DKG run. If there are no
/// rotate-keys transactions on the canonical stacks blockchain, then we
/// fall back on the last known DKG shares row in our database, and return
/// None as the aggregate key if no DKG shares can be found, implying that
/// this signer has not participated in DKG.
#[tracing::instrument(skip_all)]
pub async fn get_signer_set_and_aggregate_key<C, B>(
context: &C,
chain_tip: B,
) -> Result<(Option<PublicKey>, BTreeSet<PublicKey>), Error>
where
C: Context,
B: Into<model::BitcoinBlockHash>,
{
let db = context.get_storage();
let chain_tip = chain_tip.into();

// We are supposed to submit a rotate-keys transaction after running
// DKG, but that transaction may not have been submitted yet (if we
// have just run DKG) or it may not have been confirmed on the
// canonical Stacks blockchain.
//
// If the signers have already run DKG, then we know that all
// participating signers should have the same view of the latest
// aggregate key, so we can fall back on the stored DKG shares for
// getting the current aggregate key and associated signing set.
match db.get_last_key_rotation(&chain_tip).await? {
Some(last_key) => {
let aggregate_key = last_key.aggregate_key;
let signer_set = last_key.signer_set.into_iter().collect();
Ok((Some(aggregate_key), signer_set))
}
None => match db.get_latest_encrypted_dkg_shares().await? {
Some(shares) => {
let signer_set = shares.signer_set_public_keys.into_iter().collect();
Ok((Some(shares.aggregate_key), signer_set))
}
None => Ok((None, context.config().signer.bootstrap_signing_set())),
},
}
}

#[cfg(test)]
Expand Down
88 changes: 81 additions & 7 deletions signer/src/context/signer_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Module for signer state
use std::collections::BTreeSet;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
RwLock,
Expand All @@ -18,6 +19,7 @@ use crate::keys::PublicKey;
pub struct SignerState {
current_signer_set: SignerSet,
current_limits: RwLock<SbtcLimits>,
current_aggregate_key: RwLock<Option<PublicKey>>,
sbtc_contracts_deployed: AtomicBool,
sbtc_bitcoin_start_height: AtomicU64,
is_sbtc_bitcoin_start_height_set: AtomicBool,
Expand All @@ -29,6 +31,38 @@ impl SignerState {
&self.current_signer_set
}

/// Return the public keys of the current signer set.
pub fn current_signer_public_keys(&self) -> BTreeSet<PublicKey> {
self.current_signer_set
.get_signers()
.into_iter()
.map(|signer| signer.public_key)
.collect()
}

/// Replace the current signer set with the given set of public keys.
pub fn update_current_signer_set(&self, public_keys: BTreeSet<PublicKey>) {
self.current_signer_set.replace_signers(public_keys);
}

/// Return the current aggregate key from the cache.
#[allow(clippy::unwrap_in_result)]
pub fn current_aggregate_key(&self) -> Option<PublicKey> {
self.current_aggregate_key
.read()
.expect("BUG: Failed to acquire read lock")
.as_ref()
.copied()
}

/// Set the current aggregate key to the given public key.
pub fn set_current_aggregate_key(&self, aggregate_key: PublicKey) {
self.current_aggregate_key
.write()
.expect("BUG: Failed to acquire write lock")
.replace(aggregate_key);
}

/// Get the current sBTC limits.
pub fn get_current_limits(&self) -> SbtcLimits {
// We should never fail to acquire a lock from the RwLock so that it panics.
Expand Down Expand Up @@ -82,6 +116,7 @@ impl Default for SignerState {
Self {
current_signer_set: Default::default(),
current_limits: RwLock::new(SbtcLimits::zero()),
current_aggregate_key: RwLock::new(None),
sbtc_contracts_deployed: Default::default(),
sbtc_bitcoin_start_height: Default::default(),
is_sbtc_bitcoin_start_height_set: Default::default(),
Expand Down Expand Up @@ -132,11 +167,6 @@ impl SbtcLimits {
}
}

/// Create a new `SbtcLimits` object without any limits
pub fn unlimited() -> Self {
Self::new(None, None, None, None, None)
}

/// Create a new `SbtcLimits` object with limits set to zero (fully constraining)
pub fn zero() -> Self {
Self::new(
Expand Down Expand Up @@ -177,9 +207,21 @@ impl SbtcLimits {
pub fn max_mintable_cap(&self) -> Amount {
self.max_mintable_cap.unwrap_or(Amount::MAX_MONEY)
}
}

#[cfg(any(test, feature = "testing"))]
impl SbtcLimits {
/// Create a new `SbtcLimits` object without any limits
pub fn unlimited() -> Self {
Self {
total_cap: Some(Amount::MAX_MONEY),
per_deposit_minimum: Some(Amount::ZERO),
per_deposit_cap: Some(Amount::MAX_MONEY),
per_withdrawal_cap: Some(Amount::MAX_MONEY),
max_mintable_cap: Some(Amount::MAX_MONEY),
}
}

/// TODO: Document this
#[cfg(test)]
/// Create a new Self with only the given deposit minimum and maximums
/// set.
pub fn new_per_deposit(min: u64, max: u64) -> Self {
Expand Down Expand Up @@ -266,6 +308,38 @@ impl SignerSet {
.insert(signer);
}

/// Replace the current signer set with the given set of public keys.
pub fn replace_signers(&self, public_keys: BTreeSet<PublicKey>) {
let inner_signer_set = self.get_signers();

// Get a guard for the peer IDs.
#[allow(clippy::expect_used)]
let mut inner_peer_ids = self
.peer_ids
.write()
.expect("BUG: Failed to acquire write lock");

// Get a guard for the Signer objects the signer into the set.
#[allow(clippy::expect_used)]
let mut inner_public_keys = self
.signers
.write()
.expect("BUG: Failed to acquire write lock");

// Remove the old signer set
for signer in inner_signer_set {
inner_peer_ids.remove(signer.peer_id());
inner_public_keys.remove(signer.public_key());
}

// Add the new signer set
for public_key in public_keys {
let signer = Signer::new(public_key);
inner_peer_ids.insert(signer.peer_id);
inner_public_keys.insert(signer);
}
}

/// Remove a signer (public key) from the known active signer set.
pub fn remove_signer(&self, signer: &PublicKey) {
if self.is_signer(signer) {
Expand Down
10 changes: 4 additions & 6 deletions signer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,10 @@ async fn run_block_observer(ctx: impl Context) -> Result<(), Error> {

// TODO: Need to handle multiple endpoints, so some sort of
// failover-stream-wrapper.
let stream = BitcoinCoreMessageStream::new_from_endpoint(
config.bitcoin.block_hash_stream_endpoints[0].as_str(),
&["hashblock"],
)
.await
.unwrap();
let endpoint = config.bitcoin.block_hash_stream_endpoints[0].as_str();
let stream = BitcoinCoreMessageStream::new_from_endpoint(endpoint)
.await
.unwrap();

// TODO: We should have a new() method that builds from the context
let block_observer = block_observer::BlockObserver {
Expand Down
31 changes: 31 additions & 0 deletions signer/src/testing/btc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Helper functions for the bitcoin module
//!
use bitcoin::Amount;
use bitcoin::BlockHash;
use bitcoin::OutPoint;
use bitcoin::ScriptBuf;
use bitcoin::Sequence;
Expand All @@ -10,8 +11,12 @@ use bitcoin::TxOut;
use bitcoin::Witness;

use emily_client::models::CreateDepositRequestBody;
use futures::StreamExt as _;
use tokio_stream::wrappers::ReceiverStream;

use crate::bitcoin::utxo;
use crate::bitcoin::zmq::BitcoinCoreMessageStream;
use crate::error::Error;

/// Return a transaction that is kinda like the signers' transaction,
/// but it does not service any requests, and it does not have any
Expand Down Expand Up @@ -56,3 +61,29 @@ impl utxo::DepositRequest {
}
}
}

/// Create a new BlockHash stream for messages from bitcoin core over the
/// ZMQ interface.
///
/// The returned object implements Stream + Send + Sync, which is sometimes
/// needed in our integration tests.
///
/// # Notes
///
/// This function panics if it cannot establish a connection the bitcoin
/// core in 10 seconds.
pub async fn new_zmq_block_hash_stream(endpoint: &str) -> ReceiverStream<Result<BlockHash, Error>> {
let zmq_stream = BitcoinCoreMessageStream::new_from_endpoint(endpoint)
.await
.unwrap();

let (sender, receiver) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
let mut stream = zmq_stream.to_block_hash_stream();
while let Some(block) = stream.next().await {
sender.send(block).await.unwrap();
}
});

ReceiverStream::new(receiver)
}
Loading

0 comments on commit 138f911

Please sign in to comment.