Skip to content

Commit

Permalink
add shutdown signalling in pool start method along with task_handler …
Browse files Browse the repository at this point in the history
…for graceful shutdown of role
  • Loading branch information
Shourya742 committed Jan 22, 2025
1 parent 4c8a56a commit 44520d8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
57 changes: 49 additions & 8 deletions roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use stratum_common::{
bitcoin::{Script, TxOut},
secp256k1,
};
use tokio::{net::TcpListener, task};
use tokio::{
net::TcpListener,
sync::Notify,
task::{self, JoinHandle},
};
use tracing::{debug, error, info, warn};

pub mod setup_connection;
Expand Down Expand Up @@ -382,6 +386,35 @@ impl IsDownstream for Downstream {

impl IsMiningDownstream for Downstream {}

struct TaskHandler {
handles: Vec<JoinHandle<()>>,
}

impl TaskHandler {
fn new() -> Self {
TaskHandler {
handles: Vec::new(),
}
}

fn add(&mut self, handle: JoinHandle<()>) {
self.handles.push(handle);
}

fn abort_all(&mut self) {
for handle in self.handles.drain(..) {
handle.abort();
}
}
}

impl Drop for TaskHandler {
fn drop(&mut self) {
info!("Aborting all child task.");
self.abort_all();
}
}

impl Pool {
#[cfg(feature = "test_only_allow_unencrypted")]
async fn accept_incoming_plain_connection(
Expand Down Expand Up @@ -596,6 +629,7 @@ impl Pool {
solution_sender: Sender<SubmitSolution<'static>>,
sender_message_received_signal: Sender<()>,
status_tx: status::Sender,
shutdown: Arc<Notify>,
) -> Arc<Mutex<Self>> {
let extranonce_len = 32;
let range_0 = std::ops::Range { start: 0, end: 0 };
Expand Down Expand Up @@ -633,13 +667,15 @@ impl Pool {
let cloned2 = pool.clone();
let cloned3 = pool.clone();

let mut task_handler = TaskHandler::new();

#[cfg(feature = "test_only_allow_unencrypted")]
{
let cloned4 = pool.clone();
let status_tx_clone_unenc = status_tx.clone();
let config_unenc = config.clone();

task::spawn(async move {
task_handler.add(task::spawn(async move {
if let Err(e) = Self::accept_incoming_plain_connection(cloned4, config_unenc).await
{
error!("{}", e);
Expand All @@ -655,12 +691,12 @@ impl Pool {
{
error!("Downstream shutdown and Status Channel dropped");
}
});
}));
}

info!("Starting up pool listener");
let status_tx_clone = status_tx.clone();
task::spawn(async move {
task_handler.add(task::spawn(async move {
if let Err(e) = Self::accept_incoming_connection(cloned, config).await {
error!("{}", e);
}
Expand All @@ -675,11 +711,11 @@ impl Pool {
{
error!("Downstream shutdown and Status Channel dropped");
}
});
}));

let cloned = sender_message_received_signal.clone();
let status_tx_clone = status_tx.clone();
task::spawn(async move {
task_handler.add(task::spawn(async move {
if let Err(e) = Self::on_new_prev_hash(cloned2, new_prev_hash_rx, cloned).await {
error!("{}", e);
}
Expand All @@ -695,10 +731,10 @@ impl Pool {
{
error!("Downstream shutdown and Status Channel dropped");
}
});
}));

let status_tx_clone = status_tx;
task::spawn(async move {
task_handler.add(task::spawn(async move {
if let Err(e) =
Self::on_new_template(pool, new_template_rx, sender_message_received_signal).await
{
Expand All @@ -716,6 +752,11 @@ impl Pool {
{
error!("Downstream shutdown and Status Channel dropped");
}
}));

task::spawn(async move {
shutdown.notified().await;
drop(task_handler);
});
cloned3
}
Expand Down
1 change: 1 addition & 0 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl PoolSv2 {
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
self.shutdown.clone(),
);

task::spawn({
Expand Down

0 comments on commit 44520d8

Please sign in to comment.