Skip to content

Commit

Permalink
chore: avoid .unwrap() calls
Browse files Browse the repository at this point in the history
  • Loading branch information
doscortados committed Jan 29, 2025
1 parent d66338f commit 63439bb
Showing 1 changed file with 9 additions and 21 deletions.
30 changes: 9 additions & 21 deletions core/src/rpc/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use bitcoin::{Amount, TapSighash};
use futures::{future::try_join_all, stream::BoxStream, FutureExt, Stream, StreamExt};
use secp256k1::musig::{MusigAggNonce, MusigPartialSignature, MusigPubNonce};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{async_trait, Request, Response, Status, Streaming};

struct AggNonceQueueItem {
Expand Down Expand Up @@ -308,21 +309,8 @@ async fn pull<T>(mut stream: Streaming<T>) -> Result<Vec<T>, Status> {
Ok(ret)
}

/*
vp <- verifiers
op <- operators
wp <- watchtovers
op -> verifiers
wp -> verifiers
*/

#[async_trait]
impl ClementineAggregator for Aggregator {
// TODO: HERE #464
#[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
async fn setup(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
tracing::info!("Collecting verifier details...");
Expand Down Expand Up @@ -375,11 +363,11 @@ impl ClementineAggregator for Aggregator {
let operator_params = operator_params.clone();
async move {
for params in operator_params {
let (tx, rx) = tokio::sync::mpsc::channel(params.len());
let future =
client.set_operator(tokio_stream::wrappers::ReceiverStream::new(rx));
let (tx, rx) = channel(params.len());
let future = client.set_operator(ReceiverStream::new(rx));
for param in params {
tx.send(param).await.unwrap();
tx.send(param).await
.map_err(|e| Status::aborted(e.to_string()))?
}
future.await?;
}
Expand All @@ -394,11 +382,11 @@ impl ClementineAggregator for Aggregator {
let watchtower_params = watchtower_params.clone();
async move {
for params in watchtower_params {
let (tx, rx) = tokio::sync::mpsc::channel(params.len());
let future =
client.set_watchtower(tokio_stream::wrappers::ReceiverStream::new(rx));
let (tx, rx) = channel(params.len());
let future = client.set_watchtower(ReceiverStream::new(rx));
for param in params {
tx.send(param).await.unwrap();
tx.send(param).await
.map_err(|e| Status::aborted(e.to_string()))?
}
future.await?;
}
Expand Down

0 comments on commit 63439bb

Please sign in to comment.