Skip to content

Commit

Permalink
fmt changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Jan 28, 2025
1 parent 4a54200 commit 077309c
Show file tree
Hide file tree
Showing 29 changed files with 228 additions and 121 deletions.
9 changes: 8 additions & 1 deletion roles/jd-client/src/lib/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,14 @@ pub async fn listen_for_downstream_mining(
jd,
);

let mut incoming: StdFrame = node.receiver.subscribe().recv().await.unwrap().try_into().unwrap();
let mut incoming: StdFrame = node
.receiver
.subscribe()
.recv()
.await
.unwrap()
.try_into()
.unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None;
Expand Down
8 changes: 7 additions & 1 deletion roles/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,13 @@ impl JobDeclarator {
tokio::task::spawn(async move {
let receiver = self_mutex.safe_lock(|d| d.receiver.clone()).unwrap();
loop {
let mut incoming: StdFrame = receiver.subscribe().recv().await.unwrap().try_into().unwrap();
let mut incoming: StdFrame = receiver
.subscribe()
.recv()
.await
.unwrap()
.try_into()
.unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let next_message_to_send =
Expand Down
8 changes: 7 additions & 1 deletion roles/jd-client/src/lib/job_declarator/setup_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ impl SetupConnectionHandler {

sender.send(sv2_frame).map_err(|_| ())?;

let mut incoming: StdFrame = receiver.subscribe().recv().await.unwrap().try_into().unwrap();
let mut incoming: StdFrame = receiver
.subscribe()
.recv()
.await
.unwrap()
.try_into()
.unwrap();

let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
Expand Down
7 changes: 3 additions & 4 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,9 @@ impl JobDeclaratorClient {
{
Ok(c) => c,
Err(e) => {
let _ = tx_status
.send(status::Status {
state: status::State::UpstreamShutdown(e),
});
let _ = tx_status.send(status::Status {
state: status::State::UpstreamShutdown(e),
});
return;
}
};
Expand Down
33 changes: 18 additions & 15 deletions roles/jd-client/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@ pub enum Sender {
#[derive(Debug)]
pub enum ErrorS {
AsyncError(async_channel::SendError<Status<'static>>),
TokioError(tokio::sync::mpsc::error::SendError<Status<'static>>)
TokioError(tokio::sync::mpsc::error::SendError<Status<'static>>),
}

impl Sender {
pub async fn send(
&self,
status: Status<'static>,
) -> Result<(), ErrorS> {
pub async fn send(&self, status: Status<'static>) -> Result<(), ErrorS> {
match self {
Self::Downstream(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)),
Self::DownstreamListener(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)),
Self::DownstreamListener(inner) => {
inner.send(status).await.map_err(|e| ErrorS::AsyncError(e))
}
Self::Upstream(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)),
Self::TemplateReceiver(inner) => inner.send(status).await.map_err(|e| ErrorS::AsyncError(e)),
Self::TemplateReceiver(inner) => {
inner.send(status).await.map_err(|e| ErrorS::AsyncError(e))
}
Self::DownstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)),
Self::TemplateReceiverTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)),
Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e))
Self::TemplateReceiverTokio(inner) => {
inner.send(status).map_err(|e| ErrorS::TokioError(e))
}
Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| ErrorS::TokioError(e)),
}
}
}
Expand All @@ -42,9 +45,9 @@ impl Clone for Sender {
Self::DownstreamListener(inner) => Self::DownstreamListener(inner.clone()),
Self::Upstream(inner) => Self::Upstream(inner.clone()),
Self::TemplateReceiver(inner) => Self::TemplateReceiver(inner.clone()),
Self::DownstreamTokio( inner) => Self::DownstreamTokio(inner.clone()),
Self::TemplateReceiverTokio( inner) => Self::TemplateReceiverTokio(inner.clone()),
Self::UpstreamTokio( inner) => Self::UpstreamTokio(inner.clone())
Self::DownstreamTokio(inner) => Self::DownstreamTokio(inner.clone()),
Self::TemplateReceiverTokio(inner) => Self::TemplateReceiverTokio(inner.clone()),
Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone()),
}
}
}
Expand Down Expand Up @@ -95,19 +98,19 @@ async fn send_status(
})
.await
.unwrap_or(());
},
}
Sender::DownstreamTokio(tx) => {
tx.send(Status {
state: State::Healthy(e.to_string()),
})
.unwrap_or(());
},
}
Sender::TemplateReceiverTokio(tx) => {
tx.send(Status {
state: State::UpstreamShutdown(e),
})
.unwrap_or(());
},
}
Sender::UpstreamTokio(tx) => {
tx.send(Status {
state: State::UpstreamShutdown(e),
Expand Down
8 changes: 6 additions & 2 deletions roles/jd-client/src/lib/template_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ impl TemplateRx {
.clone()
.safe_lock(|s| s.receiver.clone())
.unwrap();
let received = handle_result!(tx_status.clone(), receiver.subscribe().recv().await);
let received =
handle_result!(tx_status.clone(), receiver.subscribe().recv().await);
let mut frame: StdFrame =
handle_result!(tx_status.clone(), received.try_into());
let message_type = frame.get_header().unwrap().msg_type();
Expand Down Expand Up @@ -312,7 +313,10 @@ impl TemplateRx {
.unwrap();
}

async fn on_new_solution(self_: Arc<Mutex<Self>>,mut rx: tokio::sync::mpsc::Receiver<SubmitSolution<'static>>) {
async fn on_new_solution(
self_: Arc<Mutex<Self>>,
mut rx: tokio::sync::mpsc::Receiver<SubmitSolution<'static>>,
) {
while let Some(solution) = rx.recv().await {
if !self_
.safe_lock(|s| s.test_only_do_not_send_solution_to_tp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ impl SetupConnectionHandler {
let sv2_frame = sv2_frame.into();
sender.send(sv2_frame).map_err(|_| ())?;

let mut incoming: StdFrame = receiver.subscribe()
let mut incoming: StdFrame = receiver
.subscribe()
.recv()
.await
.expect("Connection to TP closed!")
Expand Down
3 changes: 1 addition & 2 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ impl JobDeclaratorDownstream {
.unwrap();
let sender_add_txs_to_mempool = add_txs_to_mempool.sender_add_txs_to_mempool;
let add_txs_to_mempool_inner = add_txs_to_mempool.add_txs_to_mempool_inner;
let _ = sender_add_txs_to_mempool
.send(add_txs_to_mempool_inner);
let _ = sender_add_txs_to_mempool.send(add_txs_to_mempool_inner);
// the trasnactions sent to the mempool can be freed
let _ = self_mutex.safe_lock(|a| {
a.add_txs_to_mempool.add_txs_to_mempool_inner = AddTrasactionsToMempoolInner {
Expand Down
13 changes: 9 additions & 4 deletions roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ impl JobDeclaratorServer {
// broadcast can be used, as JDSMempool is clonable.
// let (new_block_sender, new_block_receiver): (Sender<String>, Receiver<String>) =
// bounded(10);
let (new_block_sender, _): (tokio::sync::broadcast::Sender<String>, tokio::sync::broadcast::Receiver<String>) = tokio::sync::broadcast::channel(10);
let (new_block_sender, _): (
tokio::sync::broadcast::Sender<String>,
tokio::sync::broadcast::Receiver<String>,
) = tokio::sync::broadcast::channel(10);
let mempool = Arc::new(Mutex::new(mempool::JDsMempool::new(
url.clone(),
username,
Expand All @@ -56,7 +59,7 @@ impl JobDeclaratorServer {
let mempool_cloned_ = mempool.clone();
// mpsc can be used.
// let (status_tx, status_rx) = unbounded();
let (status_tx,mut status_rx) = tokio::sync::mpsc::unbounded_channel();
let (status_tx, mut status_rx) = tokio::sync::mpsc::unbounded_channel();
let sender = status::Sender::DownstreamTokio(status_tx.clone());
let mut last_empty_mempool_warning =
std::time::Instant::now().sub(std::time::Duration::from_secs(60));
Expand Down Expand Up @@ -130,7 +133,8 @@ impl JobDeclaratorServer {
let mempool_cloned = mempool.clone();
// mpsc should work here
// let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded();
let (sender_add_txs_to_mempool,mut receiver_add_txs_to_mempool) = tokio::sync::mpsc::unbounded_channel();
let (sender_add_txs_to_mempool, mut receiver_add_txs_to_mempool) =
tokio::sync::mpsc::unbounded_channel();
task::spawn(async move {
JobDeclarator::start(
cloned,
Expand All @@ -143,7 +147,8 @@ impl JobDeclaratorServer {
});
task::spawn(async move {
loop {
if let Some(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await {
if let Some(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await
{
let mempool_cloned = mempool.clone();
task::spawn(async move {
match mempool::JDsMempool::add_tx_data_to_mempool(
Expand Down
4 changes: 2 additions & 2 deletions roles/jd-server/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn send_status(
})
.await
.unwrap_or(());
},
}
Sender::DownstreamTokio(tx) => match e {
JdsError::Sv2ProtocolError((id, Mining::OpenMiningChannelError(_))) => {
tx.send(Status {
Expand Down Expand Up @@ -156,7 +156,7 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error
}
JdsError::NoLastDeclaredJob => {
send_status(sender, e, error_handling::ErrorBranch::Continue).await
},
}
JdsError::ChannelRecvTokio(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
}
Expand Down
6 changes: 5 additions & 1 deletion roles/mining-proxy/src/lib/downstream_mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ impl DownstreamMiningNode {
.open_channel_for_down_hom_up_extended(channel_id, group_id);
}

pub fn new(receiver: tokio::sync::broadcast::Sender<EitherFrame>, sender: tokio::sync::broadcast::Sender<EitherFrame>, id: u32) -> Self {
pub fn new(
receiver: tokio::sync::broadcast::Sender<EitherFrame>,
sender: tokio::sync::broadcast::Sender<EitherFrame>,
id: u32,
) -> Self {
Self {
receiver,
sender,
Expand Down
5 changes: 4 additions & 1 deletion roles/mining-proxy/src/lib/upstream_mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ struct UpstreamMiningConnection {
}

impl UpstreamMiningConnection {
async fn send(&mut self, sv2_frame: StdFrame) -> Result<(), tokio::sync::broadcast::error::SendError<EitherFrame>> {
async fn send(
&mut self,
sv2_frame: StdFrame,
) -> Result<(), tokio::sync::broadcast::error::SendError<EitherFrame>> {
info!("SEND");
let either_frame = sv2_frame.into();
match self.sender.send(either_frame) {
Expand Down
14 changes: 9 additions & 5 deletions roles/pool/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub enum PoolError {
Custom(String),
Sv2ProtocolError((u32, Mining<'static>)),
TokioChannelRecv(Box<dyn std::marker::Send + Debug>),
TokioBroadcastChannelRecv(tokio::sync::broadcast::error::RecvError)
TokioBroadcastChannelRecv(tokio::sync::broadcast::error::RecvError),
}

impl std::fmt::Display for PoolError {
Expand All @@ -41,9 +41,9 @@ impl std::fmt::Display for PoolError {
Custom(ref e) => write!(f, "Custom SV2 error: `{:?}`", e),
Sv2ProtocolError(ref e) => {
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
},
}
TokioChannelRecv(ref e) => write!(f, "Channel recv failed: `{:?}`", e),
TokioBroadcastChannelRecv(ref e) => write!(f, "BroadCastChannel Recv failed: {:?}", e)
TokioBroadcastChannelRecv(ref e) => write!(f, "BroadCastChannel Recv failed: {:?}", e),
}
}
}
Expand Down Expand Up @@ -98,13 +98,17 @@ impl<'a, T: 'static + std::marker::Send + Debug> From<async_channel::SendError<T
}
}

impl<'a, T: 'static + std::marker::Send + Debug> From<tokio::sync::mpsc::error::SendError<T>> for PoolError {
impl<'a, T: 'static + std::marker::Send + Debug> From<tokio::sync::mpsc::error::SendError<T>>
for PoolError
{
fn from(e: tokio::sync::mpsc::error::SendError<T>) -> PoolError {
PoolError::TokioChannelRecv(Box::new(e))
}
}

impl<'a, T: 'static + std::marker::Send + Debug> From<tokio::sync::broadcast::error::SendError<T>> for PoolError {
impl<'a, T: 'static + std::marker::Send + Debug> From<tokio::sync::broadcast::error::SendError<T>>
for PoolError
{
fn from(e: tokio::sync::broadcast::error::SendError<T>) -> PoolError {
PoolError::TokioChannelRecv(Box::new(e))
}
Expand Down
4 changes: 2 additions & 2 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl PoolSv2 {
// producers are clonable so no issue. but its unbounded.
// tokio also provide unbounded mpsc.
// let (status_tx, status_rx) = unbounded();
let (status_tx,mut status_rx) = tokio::sync::mpsc::unbounded_channel();
let (status_tx, mut status_rx) = tokio::sync::mpsc::unbounded_channel();
// r_new_t consumer is sent in pool::start, s_new_t is sent in templateRx::connect
// sender or producer I dont give a damn about. even the r_new_t is passed in only
// start then to on_new_template, so mpsc makes sense here as well.
Expand All @@ -39,7 +39,7 @@ impl PoolSv2 {
// sent to on_new_prevhash, so mpsc also works here.
// let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_prev_hash, r_prev_hash) = tokio::sync::mpsc::channel(10);
// s_solution is sent to pool (no one give a damn about clonable), r_solution is sent
// s_solution is sent to pool (no one give a damn about clonable), r_solution is sent
// to templateRx and then to on_new_solution, so mpsc works.
let (s_solution, r_solution) = tokio::sync::mpsc::channel(10);
// This is spicy, as the r_message_recv_signal is cloning at few of the places, so, we can
Expand Down
21 changes: 12 additions & 9 deletions roles/pool/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ pub enum Sender {
DownstreamTokio(tokio::sync::mpsc::UnboundedSender<Status>),
DownstreamListenerTokio(tokio::sync::mpsc::UnboundedSender<Status>),
UpstreamTokio(tokio::sync::mpsc::UnboundedSender<Status>),

}

#[derive(Debug)]
pub enum Error {
AsyncChannel(async_channel::SendError<Status>),
TokioChannel(tokio::sync::mpsc::error::SendError<Status>),
TokioChannelUnbounded(tokio::sync::mpsc::error::SendError<Status>)
TokioChannelUnbounded(tokio::sync::mpsc::error::SendError<Status>),
}

impl From<async_channel::SendError<Status>> for Error {
Expand Down Expand Up @@ -50,11 +49,15 @@ impl Sender {
pub async fn send(&self, status: Status) -> Result<(), Error> {
match self {
Self::Downstream(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)),
Self::DownstreamListener(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)),
Self::DownstreamListener(inner) => {
inner.send(status).await.map_err(|e| Error::AsyncChannel(e))
}
Self::Upstream(inner) => inner.send(status).await.map_err(|e| Error::AsyncChannel(e)),
Self::DownstreamListenerTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)),
Self::DownstreamListenerTokio(inner) => {
inner.send(status).map_err(|e| Error::TokioChannel(e))
}
Self::DownstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)),
Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e))
Self::UpstreamTokio(inner) => inner.send(status).map_err(|e| Error::TokioChannel(e)),
}
}
}
Expand All @@ -67,7 +70,7 @@ impl Clone for Sender {
Self::Upstream(inner) => Self::Upstream(inner.clone()),
Self::DownstreamTokio(inner) => Self::DownstreamTokio(inner.clone()),
Self::DownstreamListenerTokio(inner) => Self::DownstreamListenerTokio(inner.clone()),
Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone())
Self::UpstreamTokio(inner) => Self::UpstreamTokio(inner.clone()),
}
}
}
Expand Down Expand Up @@ -134,7 +137,7 @@ async fn send_status(
})
.await
.unwrap_or(());
},
}
Sender::DownstreamTokio(tx) => match e {
PoolError::Sv2ProtocolError((id, Mining::OpenMiningChannelError(_))) => {
tx.send(Status {
Expand Down Expand Up @@ -210,10 +213,10 @@ pub async fn handle_error(sender: &Sender, e: PoolError) -> error_handling::Erro
}
PoolError::Sv2ProtocolError(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
},
}
PoolError::TokioChannelRecv(_) => {
send_status(sender, e, error_handling::ErrorBranch::Continue).await
},
}
PoolError::TokioBroadcastChannelRecv(_) => {
send_status(sender, e, error_handling::ErrorBranch::Continue).await
}
Expand Down
5 changes: 4 additions & 1 deletion roles/pool/src/lib/template_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ impl TemplateRx {
Ok(())
}

async fn on_new_solution(self_: Arc<Mutex<Self>>,mut rx: tokio::sync::mpsc::Receiver<SubmitSolution<'static>>) {
async fn on_new_solution(
self_: Arc<Mutex<Self>>,
mut rx: tokio::sync::mpsc::Receiver<SubmitSolution<'static>>,
) {
let status_tx = self_.safe_lock(|s| s.status_tx.clone()).unwrap();
while let Some(solution) = rx.recv().await {
info!("Sending Solution to TP: {:?}", &solution);
Expand Down
Loading

0 comments on commit 077309c

Please sign in to comment.