Skip to content

Commit

Permalink
cargo fmt + fix deadlocking problem
Browse files Browse the repository at this point in the history
  • Loading branch information
mmtftr committed Feb 5, 2025
1 parent 07424ce commit a2ad207
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
50 changes: 26 additions & 24 deletions core/src/rpc/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::Future;

use super::clementine::{
clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params,
DepositParams, Empty, RawSignedMoveTx, VerifierDepositFinalizeParams,
Expand All @@ -23,6 +25,7 @@ use bitcoin::hashes::Hash;
use bitcoin::secp256k1::schnorr::Signature;
use bitcoin::secp256k1::{Message, PublicKey};
use bitcoin::{Amount, TapSighash};
use futures::TryFutureExt;
use futures::{future::try_join_all, stream::BoxStream, FutureExt, Stream, StreamExt};
use secp256k1::musig::{MusigAggNonce, MusigPartialSignature, MusigPubNonce};
use tokio::sync::mpsc::{channel, Receiver, Sender};
Expand Down Expand Up @@ -175,7 +178,7 @@ async fn signature_aggregator(
async fn signature_distributor(
mut final_sig_receiver: Receiver<FinalSigQueueItem>,
deposit_finalize_sender: Vec<Sender<VerifierDepositFinalizeParams>>,
movetx_agg_nonce: MusigAggNonce,
movetx_agg_nonce: impl Future<Output = Result<MusigAggNonce, Status>>,
) -> Result<(), BridgeError> {
use verifier_deposit_finalize_params::Params;
while let Some(queue_item) = final_sig_receiver.recv().await {
Expand All @@ -190,6 +193,7 @@ async fn signature_distributor(
}
}

let movetx_agg_nonce = movetx_agg_nonce.await?;
// Send the movetx agg nonce to the verifiers.
for tx in &deposit_finalize_sender {
tx.send(VerifierDepositFinalizeParams {
Expand Down Expand Up @@ -604,9 +608,9 @@ impl ClementineAggregator for Aggregator {
));

// Create channels for pipeline communication
let (agg_nonce_sender, agg_nonce_receiver) = channel(3200);
let (partial_sig_sender, partial_sig_receiver) = channel(3200);
let (final_sig_sender, final_sig_receiver) = channel(3200);
let (agg_nonce_sender, agg_nonce_receiver) = channel(32);
let (partial_sig_sender, partial_sig_receiver) = channel(32);
let (final_sig_sender, final_sig_receiver) = channel(32);

// Start the nonce aggregation pipe.
let nonce_agg_handle = tokio::spawn(nonce_aggregator(
Expand All @@ -629,18 +633,6 @@ impl ClementineAggregator for Aggregator {
final_sig_sender,
));

// Join the nonce aggregation handle to get the movetx agg nonce.
let movetx_agg_nonce = nonce_agg_handle
.await
.map_err(|_| Status::internal("panic when aggregating nonces"))??;

// Start the deposit finalization pipe.
let sig_dist_handle = tokio::spawn(signature_distributor(
final_sig_receiver,
deposit_finalize_sender.clone(),
movetx_agg_nonce,
));

tracing::debug!("Getting signatures from operators");
// Get sigs from each operator in background
let operator_sigs_fut = tokio::spawn(Aggregator::get_operator_sigs(
Expand All @@ -649,19 +641,27 @@ impl ClementineAggregator for Aggregator {
deposit_sign_session,
));

// Join the nonce aggregation handle to get the movetx agg nonce.
let nonce_agg_handle = nonce_agg_handle
.map_err(|_| Status::internal("panic when aggregating nonces"))
.map(|res| -> Result<MusigAggNonce, Status> { res.and_then(|r| r.map_err(Into::into)) })
.shared();

// Start the deposit finalization pipe.
let sig_dist_handle = tokio::spawn(signature_distributor(
final_sig_receiver,
deposit_finalize_sender.clone(),
nonce_agg_handle.clone(),
));

tracing::debug!(
"Waiting for pipeline tasks to complete (nonce agg, sig agg, sig dist, operator sigs)"
);
// Wait for all pipeline tasks to complete
nonce_dist_handle
.await
.map_err(|_| Status::internal("panic when distributing nonces"))??;
sig_agg_handle
.await
.map_err(|_| Status::internal("panic when aggregating signatures"))??;
sig_dist_handle
try_join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle])
.await
.map_err(|_| Status::internal("panic when aggregating nonces"))??;
.map_err(|_| Status::internal("panic when pipelining"))?;

let operator_sigs = operator_sigs_fut
.await
.map_err(|_| Status::internal("panic when collecting operator signatures"))??;
Expand Down Expand Up @@ -702,7 +702,9 @@ impl ClementineAggregator for Aggregator {
.map_err(|e| Status::internal(format!("Failed to finalize deposit: {:?}", e)))?;

tracing::debug!("Received move tx partial sigs: {:?}", move_tx_partial_sigs);

// Create the final move transaction and check the signatures
let movetx_agg_nonce = nonce_agg_handle.await?;
let raw_signed_movetx =
self.create_movetx_check_sig(move_tx_partial_sigs, movetx_agg_nonce, deposit_params)?;

Expand Down
7 changes: 6 additions & 1 deletion core/src/rpc/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,12 @@ impl ClementineVerifier for Verifier {
})?;

nonce_idx += 1;
tracing::info!("Verifier {} signed sighash {} of {}", verifier.idx, nonce_idx, num_required_sigs);
tracing::info!(
"Verifier {} signed sighash {} of {}",
verifier.idx,
nonce_idx,
num_required_sigs
);
if nonce_idx == num_required_sigs {
break;
}
Expand Down

0 comments on commit a2ad207

Please sign in to comment.