From a2ad207a0f4c89940bc40abb493a8620edd28d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Efe=20Ak=C3=A7a?= Date: Wed, 5 Feb 2025 15:23:08 +0300 Subject: [PATCH] cargo fmt + fix deadlocking problem --- core/src/rpc/aggregator.rs | 50 ++++++++++++++++++++------------------ core/src/rpc/verifier.rs | 7 +++++- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 311dbee7..af3e2328 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use super::clementine::{ clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params, DepositParams, Empty, RawSignedMoveTx, VerifierDepositFinalizeParams, @@ -23,6 +25,7 @@ use bitcoin::hashes::Hash; use bitcoin::secp256k1::schnorr::Signature; use bitcoin::secp256k1::{Message, PublicKey}; use bitcoin::{Amount, TapSighash}; +use futures::TryFutureExt; use futures::{future::try_join_all, stream::BoxStream, FutureExt, Stream, StreamExt}; use secp256k1::musig::{MusigAggNonce, MusigPartialSignature, MusigPubNonce}; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -175,7 +178,7 @@ async fn signature_aggregator( async fn signature_distributor( mut final_sig_receiver: Receiver, deposit_finalize_sender: Vec>, - movetx_agg_nonce: MusigAggNonce, + movetx_agg_nonce: impl Future>, ) -> Result<(), BridgeError> { use verifier_deposit_finalize_params::Params; while let Some(queue_item) = final_sig_receiver.recv().await { @@ -190,6 +193,7 @@ async fn signature_distributor( } } + let movetx_agg_nonce = movetx_agg_nonce.await?; // Send the movetx agg nonce to the verifiers. for tx in &deposit_finalize_sender { tx.send(VerifierDepositFinalizeParams { @@ -604,9 +608,9 @@ impl ClementineAggregator for Aggregator { )); // Create channels for pipeline communication - let (agg_nonce_sender, agg_nonce_receiver) = channel(3200); - let (partial_sig_sender, partial_sig_receiver) = channel(3200); - let (final_sig_sender, final_sig_receiver) = channel(3200); + let (agg_nonce_sender, agg_nonce_receiver) = channel(32); + let (partial_sig_sender, partial_sig_receiver) = channel(32); + let (final_sig_sender, final_sig_receiver) = channel(32); // Start the nonce aggregation pipe. let nonce_agg_handle = tokio::spawn(nonce_aggregator( @@ -629,18 +633,6 @@ impl ClementineAggregator for Aggregator { final_sig_sender, )); - // Join the nonce aggregation handle to get the movetx agg nonce. - let movetx_agg_nonce = nonce_agg_handle - .await - .map_err(|_| Status::internal("panic when aggregating nonces"))??; - - // Start the deposit finalization pipe. - let sig_dist_handle = tokio::spawn(signature_distributor( - final_sig_receiver, - deposit_finalize_sender.clone(), - movetx_agg_nonce, - )); - tracing::debug!("Getting signatures from operators"); // Get sigs from each operator in background let operator_sigs_fut = tokio::spawn(Aggregator::get_operator_sigs( @@ -649,19 +641,27 @@ impl ClementineAggregator for Aggregator { deposit_sign_session, )); + // Join the nonce aggregation handle to get the movetx agg nonce. + let nonce_agg_handle = nonce_agg_handle + .map_err(|_| Status::internal("panic when aggregating nonces")) + .map(|res| -> Result { res.and_then(|r| r.map_err(Into::into)) }) + .shared(); + + // Start the deposit finalization pipe. + let sig_dist_handle = tokio::spawn(signature_distributor( + final_sig_receiver, + deposit_finalize_sender.clone(), + nonce_agg_handle.clone(), + )); + tracing::debug!( "Waiting for pipeline tasks to complete (nonce agg, sig agg, sig dist, operator sigs)" ); // Wait for all pipeline tasks to complete - nonce_dist_handle - .await - .map_err(|_| Status::internal("panic when distributing nonces"))??; - sig_agg_handle - .await - .map_err(|_| Status::internal("panic when aggregating signatures"))??; - sig_dist_handle + try_join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]) .await - .map_err(|_| Status::internal("panic when aggregating nonces"))??; + .map_err(|_| Status::internal("panic when pipelining"))?; + let operator_sigs = operator_sigs_fut .await .map_err(|_| Status::internal("panic when collecting operator signatures"))??; @@ -702,7 +702,9 @@ impl ClementineAggregator for Aggregator { .map_err(|e| Status::internal(format!("Failed to finalize deposit: {:?}", e)))?; tracing::debug!("Received move tx partial sigs: {:?}", move_tx_partial_sigs); + // Create the final move transaction and check the signatures + let movetx_agg_nonce = nonce_agg_handle.await?; let raw_signed_movetx = self.create_movetx_check_sig(move_tx_partial_sigs, movetx_agg_nonce, deposit_params)?; diff --git a/core/src/rpc/verifier.rs b/core/src/rpc/verifier.rs index cd801bec..28281528 100644 --- a/core/src/rpc/verifier.rs +++ b/core/src/rpc/verifier.rs @@ -487,7 +487,12 @@ impl ClementineVerifier for Verifier { })?; nonce_idx += 1; - tracing::info!("Verifier {} signed sighash {} of {}", verifier.idx, nonce_idx, num_required_sigs); + tracing::info!( + "Verifier {} signed sighash {} of {}", + verifier.idx, + nonce_idx, + num_required_sigs + ); if nonce_idx == num_required_sigs { break; }