Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove async-channel #1408

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
69f4fb7
changed status channel and new_template channel to tokio channels
Shourya742 Jan 27, 2025
f373642
add prev_hash mpsc
Shourya742 Jan 27, 2025
da5bbc5
change submit solution channel to tokio
Shourya742 Jan 27, 2025
a86ba91
add broadcast for message recv channel
Shourya742 Jan 27, 2025
d497aeb
add tokio channel for status
Shourya742 Jan 27, 2025
9459369
translator submit share channel to tokio
Shourya742 Jan 28, 2025
024fef3
translator: new Extended mining job channel to tokio
Shourya742 Jan 28, 2025
4d079e3
translator: extranonce channel
Shourya742 Jan 28, 2025
5179618
translator: new_prev_hash_channel
Shourya742 Jan 28, 2025
ab7899e
translator/downstream: outgoing channel
Shourya742 Jan 28, 2025
39a1037
translator/downstream: shutdown channel
Shourya742 Jan 28, 2025
dd5ac57
translator/lib tx_sv1_bridge channel
Shourya742 Jan 28, 2025
b85a513
jd-client/mod tx_status channel
Shourya742 Jan 28, 2025
488ca1a
jd-client/lib/mod solution channel
Shourya742 Jan 28, 2025
f1244ec
comment out all test
Shourya742 Jan 28, 2025
d7bbf85
jd-server/lib/mod status channel
Shourya742 Jan 28, 2025
6a30386
jd-server/lib/mod add_txes_to_mempool channel
Shourya742 Jan 28, 2025
dd845b7
jd-server: new_block channel
Shourya742 Jan 28, 2025
d681e1e
mining-device: notify_changes channel
Shourya742 Jan 28, 2025
e601511
mining-device: share channel
Shourya742 Jan 28, 2025
58230c2
mining-device-sv1: channels
Shourya742 Jan 28, 2025
32137b9
remove async_channel from tokio_connection
Shourya742 Jan 28, 2025
e1885ea
changes to jd-server for network-helper changes
Shourya742 Jan 28, 2025
3dc21a3
changes to jd-client for network-helper changes
Shourya742 Jan 28, 2025
bbaad64
changes to pool for network-helper changes
Shourya742 Jan 28, 2025
20c67cc
changes to test-utils for network-helper changes
Shourya742 Jan 28, 2025
eb2607e
changes to mining-proxy for network-helper changes
Shourya742 Jan 28, 2025
452529c
changes to translator for network-helper changes
Shourya742 Jan 28, 2025
4a54200
changes to tests-integration for network-helper changes
Shourya742 Jan 28, 2025
077309c
fmt changes
Shourya742 Jan 28, 2025
f04184f
remove async-channel from pool
Shourya742 Jan 28, 2025
bb4bbd2
remove async-channel from jd-client
Shourya742 Jan 28, 2025
a375671
remove async-channel from jds
Shourya742 Jan 28, 2025
0702c50
remove async-channel from tests-integration
Shourya742 Jan 28, 2025
cb23676
remove async-channel from translator
Shourya742 Jan 28, 2025
a23993e
add cargo.lock file
Shourya742 Jan 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocols/v2/framing-sv2/src/framing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Slice = buffer_sv2::Slice;
/// A wrapper used when generic reference to a frame is needed, but the kind of frame ([`Sv2Frame`]
/// or [`HandShakeFrame`]) does not matter. Note that after the initial handshake is complete
/// between two Sv2 roles, all further messages are framed with [`Sv2Frame`].
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Frame<T, B> {
HandShake(HandShakeFrame),
Sv2(Sv2Frame<T, B>),
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<T, B> TryFrom<Frame<T, B>> for Sv2Frame<T, B> {
/// Contains only the serialized payload with a fixed length and is only used during Noise
/// handshake process. Once the handshake is complete, regular Sv2 communication switches to
/// [`Sv2Frame`] for ongoing communication.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct HandShakeFrame {
payload: Slice,
}
Expand Down
16 changes: 6 additions & 10 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion roles/jd-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ path = "src/lib/mod.rs"

[dependencies]
stratum-common = { path = "../../common" }
async-channel = "1.5.1"
async-recursion = "0.3.2"
binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" }
buffer_sv2 = { path = "../../utils/buffer" }
Expand Down
34 changes: 21 additions & 13 deletions roles/jd-client/src/lib/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
status::{self, State},
upstream_sv2::Upstream as UpstreamMiningNode,
};
use async_channel::{Receiver, SendError, Sender};

use roles_logic_sv2::{
channel_logic::channel_factory::{OnNewShare, PoolChannelFactory, Share},
common_messages_sv2::{SetupConnection, SetupConnectionSuccess},
Expand Down Expand Up @@ -36,12 +36,12 @@ pub type EitherFrame = StandardEitherFrame<Message>;
/// downstream do not make much sense.
#[derive(Debug)]
pub struct DownstreamMiningNode {
receiver: Receiver<EitherFrame>,
sender: Sender<EitherFrame>,
receiver: tokio::sync::broadcast::Sender<EitherFrame>,
sender: tokio::sync::broadcast::Sender<EitherFrame>,
pub status: DownstreamMiningNodeStatus,
#[allow(dead_code)]
pub prev_job_id: Option<u32>,
solution_sender: Sender<SubmitSolution<'static>>,
solution_sender: tokio::sync::mpsc::Sender<SubmitSolution<'static>>,
withhold: bool,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
tx_status: status::Sender,
Expand Down Expand Up @@ -154,10 +154,10 @@ use std::sync::Arc;
impl DownstreamMiningNode {
#[allow(clippy::too_many_arguments)]
pub fn new(
receiver: Receiver<EitherFrame>,
sender: Sender<EitherFrame>,
receiver: tokio::sync::broadcast::Sender<EitherFrame>,
sender: tokio::sync::broadcast::Sender<EitherFrame>,
upstream: Option<Arc<Mutex<UpstreamMiningNode>>>,
solution_sender: Sender<SubmitSolution<'static>>,
solution_sender: tokio::sync::mpsc::Sender<SubmitSolution<'static>>,
withhold: bool,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
tx_status: status::Sender,
Expand Down Expand Up @@ -207,7 +207,7 @@ impl DownstreamMiningNode {
.unwrap();
Self::set_channel_factory(self_mutex.clone());

while let Ok(message) = receiver.recv().await {
while let Ok(message) = receiver.subscribe().recv().await {
let incoming: StdFrame = message.try_into().unwrap();
Self::next(self_mutex, incoming).await;
}
Expand Down Expand Up @@ -341,10 +341,11 @@ impl DownstreamMiningNode {
pub async fn send(
self_mutex: &Arc<Mutex<Self>>,
sv2_frame: StdFrame,
) -> Result<(), SendError<StdFrame>> {
// Error needs to be taken care
) -> Result<(), ()> {
let either_frame = sv2_frame.into();
let sender = self_mutex.safe_lock(|self_| self_.sender.clone()).unwrap();
match sender.send(either_frame).await {
match sender.send(either_frame) {
Ok(_) => Ok(()),
Err(_) => {
todo!()
Expand Down Expand Up @@ -592,7 +593,7 @@ impl
coinbase_tx: coinbase.try_into()?,
};
// The below channel should never be full is ok to block
solution_sender.send_blocking(solution).unwrap();
solution_sender.blocking_send(solution).unwrap();
if !self.status.is_solo_miner() {
{
let jd = self.jd.clone();
Expand Down Expand Up @@ -671,7 +672,7 @@ use tokio::{
pub async fn listen_for_downstream_mining(
address: SocketAddr,
upstream: Option<Arc<Mutex<UpstreamMiningNode>>>,
solution_sender: Sender<SubmitSolution<'static>>,
solution_sender: tokio::sync::mpsc::Sender<SubmitSolution<'static>>,
withhold: bool,
authority_public_key: Secp256k1PublicKey,
authority_secret_key: Secp256k1SecretKey,
Expand Down Expand Up @@ -707,7 +708,14 @@ pub async fn listen_for_downstream_mining(
jd,
);

let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap();
let mut incoming: StdFrame = node
.receiver
.subscribe()
.recv()
.await
.unwrap()
.try_into()
.unwrap();
let message_type = incoming.get_header().unwrap().msg_type();
let payload = incoming.payload();
let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None;
Expand Down
Loading
Loading