From 1d4d57656c594bc883b341dc01fd1011113a3be3 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 13 Mar 2024 12:16:35 +0200 Subject: [PATCH 1/6] Rename `PieceReader` into `DiskPieceReader` --- crates/subspace-farmer/src/single_disk_farm.rs | 8 ++++---- .../subspace-farmer/src/single_disk_farm/piece_reader.rs | 4 ++-- crates/subspace-farmer/src/utils/plotted_pieces.rs | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 83c3670ec6..6009463404 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -14,7 +14,7 @@ use crate::single_disk_farm::farming::{ farming, slot_notification_forwarder, FarmingNotification, FarmingOptions, PlotAudit, }; use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError}; -use crate::single_disk_farm::piece_reader::PieceReader; +use crate::single_disk_farm::piece_reader::DiskPieceReader; use crate::single_disk_farm::plot_cache::DiskPlotCache; use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, @@ -613,7 +613,7 @@ pub struct SingleDiskFarm { handlers: Arc, piece_cache: DiskPieceCache, plot_cache: DiskPlotCache, - piece_reader: PieceReader, + piece_reader: DiskPieceReader, /// Sender that will be used to signal to background threads that they should start start_sender: Option>, /// Sender that will be used to signal to background threads that they must stop @@ -975,7 +975,7 @@ impl SingleDiskFarm { }) })); - let (piece_reader, reading_fut) = PieceReader::new::( + let (piece_reader, reading_fut) = DiskPieceReader::new::( public_key, pieces_in_sector, plot_file, @@ -1517,7 +1517,7 @@ impl SingleDiskFarm { } /// Get piece reader to read plotted pieces later - pub fn piece_reader(&self) -> PieceReader { + pub fn piece_reader(&self) -> DiskPieceReader { self.piece_reader.clone() } diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index e294d2c2d6..c8038d66c1 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -24,11 +24,11 @@ struct ReadPieceRequest { /// Wrapper data structure that can be used to read pieces from single disk farm #[derive(Debug, Clone)] -pub struct PieceReader { +pub struct DiskPieceReader { read_piece_sender: mpsc::Sender, } -impl PieceReader { +impl DiskPieceReader { /// Creates new piece reader instance and background future that handles reads internally. /// /// NOTE: Background future is async, but does blocking operations and should be running in diff --git a/crates/subspace-farmer/src/utils/plotted_pieces.rs b/crates/subspace-farmer/src/utils/plotted_pieces.rs index 809aebfed4..c0f71a2a90 100644 --- a/crates/subspace-farmer/src/utils/plotted_pieces.rs +++ b/crates/subspace-farmer/src/utils/plotted_pieces.rs @@ -1,4 +1,4 @@ -use crate::single_disk_farm::piece_reader::PieceReader; +use crate::single_disk_farm::piece_reader::DiskPieceReader; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::future::Future; @@ -16,13 +16,13 @@ struct PieceDetails { /// Wrapper data structure for pieces plotted under multiple plots. #[derive(Debug)] pub struct PlottedPieces { - readers: Vec, + readers: Vec, pieces: HashMap>, } impl PlottedPieces { /// Initialize with readers for each farm - pub fn new(readers: Vec) -> Self { + pub fn new(readers: Vec) -> Self { Self { readers, pieces: HashMap::new(), From 9510b140fef9b51794ff6bbb165a59ca0e048dc4 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 13 Mar 2024 14:27:46 +0200 Subject: [PATCH 2/6] Choose plotted pieces from farms at random --- crates/subspace-farmer/src/utils/plotted_pieces.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/src/utils/plotted_pieces.rs b/crates/subspace-farmer/src/utils/plotted_pieces.rs index c0f71a2a90..9f843bed6e 100644 --- a/crates/subspace-farmer/src/utils/plotted_pieces.rs +++ b/crates/subspace-farmer/src/utils/plotted_pieces.rs @@ -1,4 +1,5 @@ use crate::single_disk_farm::piece_reader::DiskPieceReader; +use rand::prelude::*; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::future::Future; @@ -34,7 +35,7 @@ impl PlottedPieces { self.pieces.contains_key(piece_index) } - /// Read plotted piece from oneof the farms. + /// Read plotted piece from one of the farms. /// /// If piece doesn't exist `None` is returned, if by the time future is polled piece is no /// longer in the plot, future will resolve with `None`. @@ -44,7 +45,7 @@ impl PlottedPieces { ) -> Option> + 'static> { let piece_details = match self.pieces.get(piece_index) { Some(piece_details) => piece_details - .first() + .choose(&mut thread_rng()) .copied() .expect("Empty lists are not stored in the map; qed"), None => { From a95c3e0553dcd856cc2e709cdf0620576cba8406 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 14 Mar 2024 13:57:50 +0200 Subject: [PATCH 3/6] Introduce a few async generic traits for farm components that can be implemented for local and remote farms, make some of the existing APIs async --- .../src/bin/subspace-farmer/commands/farm.rs | 6 +- .../bin/subspace-farmer/commands/farm/dsn.rs | 2 +- crates/subspace-farmer/src/farm.rs | 96 +++++ crates/subspace-farmer/src/farmer_cache.rs | 350 +++++++++--------- .../subspace-farmer/src/farmer_cache/tests.rs | 13 +- crates/subspace-farmer/src/lib.rs | 1 + crates/subspace-farmer/src/node_client.rs | 2 +- .../src/single_disk_farm/piece_cache.rs | 98 ++++- .../src/single_disk_farm/piece_cache/tests.rs | 10 +- .../src/single_disk_farm/piece_reader.rs | 16 +- .../src/single_disk_farm/plot_cache.rs | 93 +++-- .../src/single_disk_farm/plot_cache/tests.rs | 22 +- .../src/utils/farmer_piece_getter.rs | 2 +- .../src/utils/plotted_pieces.rs | 23 +- 14 files changed, 490 insertions(+), 244 deletions(-) create mode 100644 crates/subspace-farmer/src/farm.rs diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 89f209ac78..ad0cb0a231 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -807,12 +807,12 @@ where .replace_backing_caches( single_disk_farms .iter() - .map(|single_disk_farm| single_disk_farm.piece_cache()) + .map(|single_disk_farm| Arc::new(single_disk_farm.piece_cache()) as Arc<_>) .collect(), if plot_cache { single_disk_farms .iter() - .map(|single_disk_farm| single_disk_farm.plot_cache()) + .map(|single_disk_farm| Arc::new(single_disk_farm.plot_cache()) as Arc<_>) .collect() } else { Vec::new() @@ -824,7 +824,7 @@ where // Store piece readers so we can reference them later let piece_readers = single_disk_farms .iter() - .map(|single_disk_farm| single_disk_farm.piece_reader()) + .map(|single_disk_farm| Arc::new(single_disk_farm.piece_reader()) as Arc<_>) .collect::>(); info!("Collecting already plotted pieces (this will take some time)..."); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index eb94cbe32a..e0037a665f 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -109,7 +109,7 @@ pub(super) fn configure_dsn( } }; - plotted_pieces.read_piece(&piece_index)?.in_current_span() + plotted_pieces.read_piece(piece_index)?.in_current_span() }; let piece = read_piece_fut.await; diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs new file mode 100644 index 0000000000..8055fd4f55 --- /dev/null +++ b/crates/subspace-farmer/src/farm.rs @@ -0,0 +1,96 @@ +use crate::single_disk_farm::plot_cache::MaybePieceStoredResult; +use async_trait::async_trait; +use derive_more::Display; +use futures::Stream; +use parity_scale_codec::{Decode, Encode}; +use std::fmt; +use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex}; +use subspace_networking::libp2p::kad::RecordKey; + +/// Erased error type +pub type FarmError = Box; + +/// Offset wrapper for pieces in [`PieceCache`] +#[derive(Debug, Display, Copy, Clone, Encode, Decode)] +#[repr(transparent)] +pub struct PieceCacheOffset(pub(crate) u32); + +/// Abstract piece cache implementation +#[async_trait] +pub trait PieceCache: Send + Sync + fmt::Debug { + /// Max number of elements in this cache + fn max_num_elements(&self) -> usize; + + /// Contents of this piece cache. + /// + /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this + /// doesn't happen for the same piece being accessed! + async fn contents( + &self, + ) -> Box)> + Unpin + '_>; + + /// Store piece in cache at specified offset, replacing existing piece if there is any. + /// + /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this + /// doesn't happen for the same piece being accessed! + async fn write_piece( + &self, + offset: PieceCacheOffset, + piece_index: PieceIndex, + piece: &Piece, + ) -> Result<(), FarmError>; + + /// Read piece index from cache at specified offset. + /// + /// Returns `None` if offset is out of range. + /// + /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this + /// doesn't happen for the same piece being accessed! + async fn read_piece_index( + &self, + offset: PieceCacheOffset, + ) -> Result, FarmError>; + + /// Read piece from cache at specified offset. + /// + /// Returns `None` if offset is out of range. + /// + /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this + /// doesn't happen for the same piece being accessed! + async fn read_piece(&self, offset: PieceCacheOffset) -> Result, FarmError>; +} + +/// Abstract plot cache implementation +#[async_trait] +pub trait PlotCache: Send + Sync + fmt::Debug { + /// Check if piece is potentially stored in this cache (not guaranteed to be because it might be + /// overridden with sector any time) + async fn is_piece_maybe_stored( + &self, + key: &RecordKey, + ) -> Result; + + /// Store piece in cache if there is free space, otherwise `Ok(false)` is returned + async fn try_store_piece( + &self, + piece_index: PieceIndex, + piece: &Piece, + ) -> Result; + + /// Read piece from cache. + /// + /// Returns `None` if not cached. + async fn read_piece(&self, key: &RecordKey) -> Result, FarmError>; +} + +/// Abstract piece reader implementation +#[async_trait] +pub trait PieceReader: Send + Sync + fmt::Debug { + /// Read piece from sector by offset, `None` means input parameters are incorrect or piece + /// reader was shut down + async fn read_piece( + &self, + sector_index: SectorIndex, + piece_offset: PieceOffset, + ) -> Result, FarmError>; +} diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 102a7bfb5a..60e2799bc6 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -1,14 +1,14 @@ #[cfg(test)] mod tests; +use crate::farm::{PieceCache, PieceCacheOffset, PlotCache}; use crate::node_client::NodeClient; -use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset}; -use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult}; -use crate::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop}; +use crate::single_disk_farm::plot_cache::MaybePieceStoredResult; +use crate::utils::run_future_in_dedicated_thread; +use async_lock::RwLock as AsyncRwLock; use event_listener_primitives::{Bag, HandlerId}; use futures::stream::{FuturesOrdered, FuturesUnordered}; -use futures::{select, FutureExt, StreamExt}; -use parking_lot::RwLock; +use futures::{select, stream, FutureExt, StreamExt}; use rayon::prelude::*; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -21,8 +21,9 @@ use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey}; use subspace_networking::libp2p::PeerId; use subspace_networking::utils::multihash::ToMultihash; use subspace_networking::{KeyWrapper, LocalRecordProvider, UniqueRecordBinaryHeap}; +use tokio::runtime::Handle; use tokio::sync::mpsc; -use tokio::task::yield_now; +use tokio::task::{block_in_place, yield_now}; use tracing::{debug, error, info, trace, warn}; const WORKER_CHANNEL_CAPACITY: usize = 100; @@ -31,6 +32,9 @@ const CONCURRENT_PIECES_TO_DOWNLOAD: usize = 1_000; /// this number defines an interval in pieces after which cache is updated const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100; const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1); +/// How long to wait for `is_piece_maybe_stored` response from plot cache before timing out in order +/// to prevent blocking of executor for too long +const IS_PIECE_MAYBE_STORED_TIMEOUT: Duration = Duration::from_millis(100); type HandlerFn = Arc; type Handler = Bag, A>; @@ -41,16 +45,16 @@ struct Handlers { } #[derive(Debug, Clone)] -struct DiskPieceCacheState { - stored_pieces: HashMap, - free_offsets: VecDeque, - backend: DiskPieceCache, +struct PieceCacheState { + stored_pieces: HashMap, + free_offsets: VecDeque, + backend: Arc, } #[derive(Debug)] enum WorkerCommand { ReplaceBackingCaches { - new_piece_caches: Vec, + new_piece_caches: Vec>, }, ForgetKey { key: RecordKey, @@ -72,7 +76,7 @@ where { peer_id: PeerId, node_client: NC, - caches: Arc>>, + caches: Arc>>, handlers: Arc, worker_receiver: Option>, } @@ -162,7 +166,7 @@ where } // TODO: Consider implementing optional re-sync of the piece instead of just forgetting WorkerCommand::ForgetKey { key } => { - let mut caches = self.caches.write(); + let mut caches = self.caches.write().await; for (disk_farm_index, cache) in caches.iter_mut().enumerate() { let Some(offset) = cache.stored_pieces.remove(&key) else { @@ -172,7 +176,7 @@ where // Making offset as unoccupied and remove corresponding key from heap cache.free_offsets.push_front(offset); - match cache.backend.read_piece_index(offset) { + match cache.backend.read_piece_index(offset).await { Ok(Some(piece_index)) => { worker_state.heap.remove(KeyWrapper(piece_index)); } @@ -204,13 +208,13 @@ where &self, piece_getter: &PG, worker_state: &mut CacheWorkerState, - new_piece_caches: Vec, + new_piece_caches: Vec>, ) where PG: PieceGetter, { info!("Initializing piece cache"); // Pull old cache state since it will be replaced with a new one and reuse its allocations - let cache_state = mem::take(&mut *self.caches.write()); + let cache_state = mem::take(&mut *self.caches.write().await); let mut stored_pieces = Vec::with_capacity(new_piece_caches.len()); let mut free_offsets = Vec::with_capacity(new_piece_caches.len()); for mut state in cache_state { @@ -234,10 +238,10 @@ where |(index, ((mut stored_pieces, mut free_offsets), new_cache))| { run_future_in_dedicated_thread( move || async { - let contents = new_cache.contents(); - stored_pieces.reserve(contents.len()); + let mut contents = new_cache.contents().await; + stored_pieces.reserve(new_cache.max_num_elements()); - for (offset, maybe_piece_index) in contents { + while let Some((offset, maybe_piece_index)) = contents.next().await { match maybe_piece_index { Some(piece_index) => { stored_pieces.insert( @@ -254,7 +258,9 @@ where yield_now().await; } - DiskPieceCacheState { + drop(contents); + + PieceCacheState { stored_pieces, free_offsets, backend: new_cache, @@ -320,7 +326,7 @@ where ); // Not the latest, but at least something - *self.caches.write() = caches; + *self.caches.write().await = caches; return; } } @@ -369,7 +375,7 @@ where }); // Store whatever correct pieces are immediately available after restart - *self.caches.write() = caches.clone(); + *self.caches.write().await = caches.clone(); debug!( count = %piece_indices_to_store.len(), @@ -419,7 +425,7 @@ where downloading_pieces.push(download_piece(piece_index_to_download)); } - let Some((piece_index, piece)) = maybe_piece else { + let Some((piece_index, piece)) = &maybe_piece else { continue; }; @@ -428,26 +434,30 @@ where // Sort piece caches by number of stored pieces to fill those that are less // populated first sorted_caches.sort_by_key(|(_, cache)| cache.stored_pieces.len()); - if !sorted_caches.into_iter().any(|(disk_farm_index, cache)| { - let Some(offset) = cache.free_offsets.pop_front() else { - return false; - }; + if !stream::iter(sorted_caches) + .any(|(disk_farm_index, cache)| async move { + let Some(offset) = cache.free_offsets.pop_front() else { + return false; + }; - if let Err(error) = cache.backend.write_piece(offset, piece_index, &piece) { - error!( - %error, - %disk_farm_index, - %piece_index, - %offset, - "Failed to write piece into cache" - ); - return false; - } - cache - .stored_pieces - .insert(RecordKey::from(piece_index.to_multihash()), offset); - true - }) { + if let Err(error) = cache.backend.write_piece(offset, *piece_index, piece).await + { + error!( + %error, + %disk_farm_index, + %piece_index, + %offset, + "Failed to write piece into cache" + ); + return false; + } + cache + .stored_pieces + .insert(RecordKey::from(piece_index.to_multihash()), offset); + true + }) + .await + { error!( %piece_index, "Failed to store piece in cache, there was no space" @@ -457,14 +467,14 @@ where downloaded_pieces_count += 1; let progress = downloaded_pieces_count as f32 / pieces_to_download_total as f32 * 100.0; if downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL == 0 { - *self.caches.write() = caches.clone(); + *self.caches.write().await = caches.clone(); info!("Piece cache sync {progress:.2}% complete"); } self.handlers.progress.call_simple(&progress); } - *self.caches.write() = caches; + *self.caches.write().await = caches; self.handlers.progress.call_simple(&100.0); worker_state.last_segment_index = last_segment_index; @@ -551,7 +561,8 @@ where trace!(%piece_index, "Piece needs to be cached #1"); - self.persist_piece_in_cache(piece_index, piece, worker_state); + self.persist_piece_in_cache(piece_index, piece, worker_state) + .await; } worker_state.last_segment_index = segment_index; @@ -639,7 +650,8 @@ where } }; - self.persist_piece_in_cache(piece_index, piece, worker_state); + self.persist_piece_in_cache(piece_index, piece, worker_state) + .await; } info!("Finished syncing piece cache to the latest history size"); @@ -649,7 +661,7 @@ where /// This assumes it was already checked that piece needs to be stored, no verification for this /// is done internally and invariants will break if this assumption doesn't hold true - fn persist_piece_in_cache( + async fn persist_piece_in_cache( &self, piece_index: PieceIndex, piece: Piece, @@ -658,7 +670,7 @@ where let record_key = RecordKey::from(piece_index.to_multihash()); let heap_key = KeyWrapper(piece_index); - let mut caches = self.caches.write(); + let mut caches = self.caches.write().await; match worker_state.heap.insert(heap_key) { // Entry is already occupied, we need to find and replace old piece with new one Some(KeyWrapper(old_piece_index)) => { @@ -669,7 +681,8 @@ where continue; }; - if let Err(error) = cache.backend.write_piece(offset, piece_index, &piece) { + if let Err(error) = cache.backend.write_piece(offset, piece_index, &piece).await + { error!( %error, %disk_farm_index, @@ -709,7 +722,8 @@ where continue; }; - if let Err(error) = cache.backend.write_piece(offset, piece_index, &piece) { + if let Err(error) = cache.backend.write_piece(offset, piece_index, &piece).await + { error!( %error, %disk_farm_index, @@ -744,9 +758,9 @@ where pub struct FarmerCache { peer_id: PeerId, /// Individual dedicated piece caches - piece_caches: Arc>>, + piece_caches: Arc>>, /// Additional piece caches - plot_caches: Arc>>, + plot_caches: Arc>>>, /// Next plot cache to use for storing pieces next_plot_cache: Arc, handlers: Arc, @@ -788,84 +802,78 @@ impl FarmerCache { /// Get piece from cache pub async fn get_piece(&self, key: RecordKey) -> Option { - let maybe_piece_fut = tokio::task::spawn_blocking({ - let key = key.clone(); - let piece_caches = Arc::clone(&self.piece_caches); - let plot_caches = Arc::clone(&self.plot_caches); - let worker_sender = Arc::clone(&self.worker_sender); - - move || { - { - let piece_caches = piece_caches.read(); - for (disk_farm_index, cache) in piece_caches.iter().enumerate() { - let Some(&offset) = cache.stored_pieces.get(&key) else { - continue; - }; - match cache.backend.read_piece(offset) { - Ok(maybe_piece) => { - return maybe_piece; - } - Err(error) => { - error!( - %error, - %disk_farm_index, - ?key, - %offset, - "Error while reading piece from cache, might be a disk corruption" - ); - - if let Err(error) = - worker_sender.blocking_send(WorkerCommand::ForgetKey { key }) - { - trace!(%error, "Failed to send ForgetKey command to worker"); - } - - return None; - } - } - } + for (disk_farm_index, cache) in self.piece_caches.read().await.iter().enumerate() { + let Some(&offset) = cache.stored_pieces.get(&key) else { + continue; + }; + match cache.backend.read_piece(offset).await { + Ok(maybe_piece) => { + return maybe_piece; } + Err(error) => { + error!( + %error, + %disk_farm_index, + ?key, + %offset, + "Error while reading piece from cache, might be a disk corruption" + ); - { - let plot_caches = plot_caches.read(); - for cache in plot_caches.iter() { - if let Some(piece) = cache.read_piece(&key) { - return Some(piece); - } + if let Err(error) = self + .worker_sender + .send(WorkerCommand::ForgetKey { key }) + .await + { + trace!(%error, "Failed to send ForgetKey command to worker"); } - } - None + return None; + } } - }); + } - match AsyncJoinOnDrop::new(maybe_piece_fut, false).await { - Ok(maybe_piece) => maybe_piece, - Err(error) => { - error!(%error, ?key, "Piece reading task failed"); - None + for cache in self.plot_caches.read().await.iter() { + if let Ok(Some(piece)) = cache.read_piece(&key).await { + return Some(piece); } } + + None } /// Try to store a piece in additional downloaded pieces, if there is space for them pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) { let key = RecordKey::from(piece_index.to_multihash()); + for cache in self.piece_caches.read().await.iter() { + if cache.stored_pieces.contains_key(&key) { + // Already stored in normal piece cache, no need to store it again + return; + } + } + let mut should_store = false; - for cache in self.plot_caches.read().iter() { - match cache.is_piece_maybe_stored(&key) { - MaybePieceStoredResult::No => { + for (disk_farm_index, cache) in self.plot_caches.read().await.iter().enumerate() { + match cache.is_piece_maybe_stored(&key).await { + Ok(MaybePieceStoredResult::No) => { // Try another one if there is any } - MaybePieceStoredResult::Vacant => { + Ok(MaybePieceStoredResult::Vacant) => { should_store = true; break; } - MaybePieceStoredResult::Yes => { + Ok(MaybePieceStoredResult::Yes) => { // Already stored, nothing else left to do return; } + Err(error) => { + warn!( + %disk_farm_index, + %piece_index, + %error, + "Failed to check piece stored in cache" + ); + } } } @@ -873,59 +881,42 @@ impl FarmerCache { return; } - let should_store_fut = tokio::task::spawn_blocking({ - let plot_caches = Arc::clone(&self.plot_caches); - let piece_caches = Arc::clone(&self.piece_caches); - let next_plot_cache = Arc::clone(&self.next_plot_cache); - let piece = piece.clone(); + let plot_caches = self.plot_caches.read().await; + let plot_caches_len = plot_caches.len(); - move || { - for cache in piece_caches.read().iter() { - if cache.stored_pieces.contains_key(&key) { - // Already stored in normal piece cache, no need to store it again - return; - } - } - - let plot_caches = plot_caches.read(); - let plot_caches_len = plot_caches.len(); + // Store pieces in plots using round-robin distribution + for _ in 0..plot_caches_len { + let plot_cache_index = + self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len; - // Store pieces in plots using round-robin distribution - for _ in 0..plot_caches_len { - let plot_cache_index = - next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len; - - match plot_caches[plot_cache_index].try_store_piece(piece_index, &piece) { - Ok(true) => { - return; - } - Ok(false) => { - continue; - } - Err(error) => { - error!( - %error, - %piece_index, - %plot_cache_index, - "Failed to store additional piece in cache" - ); - continue; - } - } + match plot_caches[plot_cache_index] + .try_store_piece(piece_index, piece) + .await + { + Ok(true) => { + return; + } + Ok(false) => { + continue; + } + Err(error) => { + error!( + %error, + %piece_index, + %plot_cache_index, + "Failed to store additional piece in cache" + ); + continue; } } - }); - - if let Err(error) = AsyncJoinOnDrop::new(should_store_fut, true).await { - error!(%error, %piece_index, "Failed to store additional piece in cache"); } } /// Initialize replacement of backing caches pub async fn replace_backing_caches( &self, - new_piece_caches: Vec, - new_plot_caches: Vec, + new_piece_caches: Vec>, + new_plot_caches: Vec>, ) { if let Err(error) = self .worker_sender @@ -935,7 +926,7 @@ impl FarmerCache { warn!(%error, "Failed to replace backing caches, worker exited"); } - *self.plot_caches.write() = new_plot_caches; + *self.plot_caches.write().await = new_plot_caches; } /// Subscribe to cache sync notifications @@ -946,8 +937,7 @@ impl FarmerCache { impl LocalRecordProvider for FarmerCache { fn record(&self, key: &RecordKey) -> Option { - // It is okay to take read lock here, writes locks are very infrequent and very short - for piece_cache in self.piece_caches.read().iter() { + for piece_cache in self.piece_caches.try_read()?.iter() { if piece_cache.stored_pieces.contains_key(key) { // Note: We store our own provider records locally without local addresses // to avoid redundant storage and outdated addresses. Instead, these are @@ -960,24 +950,42 @@ impl LocalRecordProvider for FarmerCache { }); }; } - // It is okay to take read lock here, writes locks almost never happen - for plot_cache in self.plot_caches.read().iter() { - if matches!( - plot_cache.is_piece_maybe_stored(key), - MaybePieceStoredResult::Yes - ) { - // Note: We store our own provider records locally without local addresses - // to avoid redundant storage and outdated addresses. Instead, these are - // acquired on demand when returning a `ProviderRecord` for the local node. - return Some(ProviderRecord { - key: key.clone(), - provider: self.peer_id, - expires: None, - addresses: Vec::new(), - }); - }; - } - None + let found_fut = self + .plot_caches + .try_read()? + .iter() + .map(|plot_cache| { + let plot_cache = Arc::clone(plot_cache); + + async move { + matches!( + plot_cache.is_piece_maybe_stored(key).await, + Ok(MaybePieceStoredResult::Yes) + ) + } + }) + .collect::>() + .any(|found| async move { found }); + + // TODO: Ideally libp2p would have an async API record store API, + let found = block_in_place(|| { + Handle::current() + .block_on(tokio::time::timeout( + IS_PIECE_MAYBE_STORED_TIMEOUT, + found_fut, + )) + .unwrap_or_default() + }); + + // Note: We store our own provider records locally without local addresses + // to avoid redundant storage and outdated addresses. Instead, these are + // acquired on demand when returning a `ProviderRecord` for the local node. + found.then_some(ProviderRecord { + key: key.clone(), + provider: self.peer_id, + expires: None, + addresses: Vec::new(), + }) } } diff --git a/crates/subspace-farmer/src/farmer_cache/tests.rs b/crates/subspace-farmer/src/farmer_cache/tests.rs index 5b6a5f8ebf..ef3837170a 100644 --- a/crates/subspace-farmer/src/farmer_cache/tests.rs +++ b/crates/subspace-farmer/src/farmer_cache/tests.rs @@ -2,6 +2,7 @@ use crate::farmer_cache::FarmerCache; use crate::node_client::Error; use crate::single_disk_farm::piece_cache::DiskPieceCache; use crate::NodeClient; +use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, Stream, StreamExt}; use parking_lot::Mutex; @@ -33,7 +34,7 @@ struct MockNodeClient { acknowledge_archived_segment_header_sender: mpsc::Sender, } -#[async_trait::async_trait] +#[async_trait] impl NodeClient for MockNodeClient { async fn farmer_app_info(&self) -> Result { // Most of these values make no sense, but they are not used by piece cache anyway @@ -136,7 +137,7 @@ struct MockPieceGetter { pieces: Arc>>, } -#[async_trait::async_trait] +#[async_trait] impl PieceGetter for MockPieceGetter { async fn get_piece( &self, @@ -207,8 +208,8 @@ async fn basic() { farmer_cache .replace_backing_caches( vec![ - DiskPieceCache::open(path1.as_ref(), 1).unwrap(), - DiskPieceCache::open(path2.as_ref(), 1).unwrap(), + Arc::new(DiskPieceCache::open(path1.as_ref(), 1).unwrap()), + Arc::new(DiskPieceCache::open(path2.as_ref(), 1).unwrap()), ], vec![], ) @@ -407,8 +408,8 @@ async fn basic() { farmer_cache .replace_backing_caches( vec![ - DiskPieceCache::open(path1.as_ref(), 1).unwrap(), - DiskPieceCache::open(path2.as_ref(), 1).unwrap(), + Arc::new(DiskPieceCache::open(path1.as_ref(), 1).unwrap()), + Arc::new(DiskPieceCache::open(path2.as_ref(), 1).unwrap()), ], vec![], ) diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index c352fd5e75..c3571bce6f 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -37,6 +37,7 @@ //! are `target ± ½ * solution range` (while also handing overflow/underflow) when interpreted as //! 64-bit unsigned integers. +pub mod farm; pub mod farmer_cache; pub(crate) mod identity; pub mod node_client; diff --git a/crates/subspace-farmer/src/node_client.rs b/crates/subspace-farmer/src/node_client.rs index f0cf926cf5..1a96f12da0 100644 --- a/crates/subspace-farmer/src/node_client.rs +++ b/crates/subspace-farmer/src/node_client.rs @@ -9,7 +9,7 @@ use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; -/// To become error type agnostic +/// Erased error type pub type Error = Box; /// Abstraction of the Node Client diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index 2c69629283..a2c612341a 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -1,14 +1,20 @@ #[cfg(test)] mod tests; +use crate::farm::{FarmError, PieceCache, PieceCacheOffset}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; -use derive_more::Display; +use crate::utils::AsyncJoinOnDrop; +use async_trait::async_trait; +use futures::channel::mpsc; +use futures::{stream, SinkExt, Stream, StreamExt}; +use parking_lot::Mutex; #[cfg(not(windows))] use std::fs::{File, OpenOptions}; use std::path::Path; use std::sync::Arc; +use std::task::Poll; use std::{fs, io, mem}; use subspace_core_primitives::crypto::blake3_hash_list; use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex}; @@ -16,6 +22,8 @@ use subspace_farmer_components::file_ext::FileExt; #[cfg(not(windows))] use subspace_farmer_components::file_ext::OpenOptionsExt; use thiserror::Error; +use tokio::runtime::Handle; +use tokio::task; use tracing::{debug, info, warn}; /// Disk piece cache open error @@ -43,11 +51,6 @@ pub enum DiskPieceCacheError { ChecksumMismatch, } -/// Offset wrapper for pieces in [`DiskPieceCache`] -#[derive(Debug, Display, Copy, Clone)] -#[repr(transparent)] -pub struct Offset(u32); - #[derive(Debug)] struct Inner { #[cfg(not(windows))] @@ -64,6 +67,64 @@ pub struct DiskPieceCache { inner: Arc, } +#[async_trait] +impl PieceCache for DiskPieceCache { + fn max_num_elements(&self) -> usize { + self.inner.num_elements as usize + } + + async fn contents( + &self, + ) -> Box)> + Unpin + '_> { + let this = self.clone(); + let (mut sender, receiver) = mpsc::channel(1); + let read_contents = task::spawn_blocking(move || { + let contents = this.contents(); + for (piece_cache_offset, maybe_piece) in contents { + if let Err(error) = + Handle::current().block_on(sender.send((piece_cache_offset, maybe_piece))) + { + debug!(%error, "Aborting contents iteration due to receiver dropping"); + break; + } + } + }); + let read_contents = Mutex::new(Some(AsyncJoinOnDrop::new(read_contents, false))); + // Change order such that in closure below `receiver` is dropped before `read_contents` + let mut receiver = receiver; + + Box::new(stream::poll_fn(move |ctx| { + let poll_result = receiver.poll_next_unpin(ctx); + + if matches!(poll_result, Poll::Ready(None)) { + read_contents.lock().take(); + } + + poll_result + })) + } + + async fn write_piece( + &self, + offset: PieceCacheOffset, + piece_index: PieceIndex, + piece: &Piece, + ) -> Result<(), FarmError> { + Ok(self.write_piece(offset, piece_index, piece)?) + } + + async fn read_piece_index( + &self, + offset: PieceCacheOffset, + ) -> Result, FarmError> { + Ok(self.read_piece_index(offset)?) + } + + async fn read_piece(&self, offset: PieceCacheOffset) -> Result, FarmError> { + Ok(self.read_piece(offset)?) + } +} + impl DiskPieceCache { pub(crate) const FILE_NAME: &'static str = "piece_cache.bin"; @@ -111,20 +172,20 @@ impl DiskPieceCache { (PieceIndex::SIZE + Piece::SIZE + mem::size_of::()) as u32 } - /// Contents of this disk cache + /// Contents of this disk piece cache /// /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this /// doesn't happen for the same piece being accessed! pub(crate) fn contents( &self, - ) -> impl ExactSizeIterator)> + '_ { + ) -> impl ExactSizeIterator)> + '_ { let mut element = vec![0; Self::element_size() as usize]; let mut early_exit = false; // TODO: Parallelize or read in larger batches (0..self.inner.num_elements).map(move |offset| { if early_exit { - return (Offset(offset), None); + return (PieceCacheOffset(offset), None); } match self.read_piece_internal(offset, &mut element) { @@ -134,11 +195,11 @@ impl DiskPieceCache { early_exit = true; } - (Offset(offset), maybe_piece_index) + (PieceCacheOffset(offset), maybe_piece_index) } Err(error) => { warn!(%error, %offset, "Failed to read cache element"); - (Offset(offset), None) + (PieceCacheOffset(offset), None) } } }) @@ -150,11 +211,11 @@ impl DiskPieceCache { /// doesn't happen for the same piece being accessed! pub(crate) fn write_piece( &self, - offset: Offset, + offset: PieceCacheOffset, piece_index: PieceIndex, piece: &Piece, ) -> Result<(), DiskPieceCacheError> { - let Offset(offset) = offset; + let PieceCacheOffset(offset) = offset; if offset >= self.inner.num_elements { return Err(DiskPieceCacheError::OffsetOutsideOfRange { provided: offset, @@ -187,9 +248,9 @@ impl DiskPieceCache { /// doesn't happen for the same piece being accessed! pub(crate) fn read_piece_index( &self, - offset: Offset, + offset: PieceCacheOffset, ) -> Result, DiskPieceCacheError> { - let Offset(offset) = offset; + let PieceCacheOffset(offset) = offset; if offset >= self.inner.num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); return Err(DiskPieceCacheError::OffsetOutsideOfRange { @@ -207,8 +268,11 @@ impl DiskPieceCache { /// /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this /// doesn't happen for the same piece being accessed! - pub(crate) fn read_piece(&self, offset: Offset) -> Result, DiskPieceCacheError> { - let Offset(offset) = offset; + pub(crate) fn read_piece( + &self, + offset: PieceCacheOffset, + ) -> Result, DiskPieceCacheError> { + let PieceCacheOffset(offset) = offset; if offset >= self.inner.num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); return Err(DiskPieceCacheError::OffsetOutsideOfRange { diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs index b5e23de140..09d9a5b17b 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache/tests.rs @@ -1,4 +1,4 @@ -use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError, Offset}; +use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError, PieceCacheOffset}; use rand::prelude::*; use std::assert_matches::assert_matches; use subspace_core_primitives::{Piece, PieceIndex}; @@ -21,7 +21,7 @@ fn basic() { // Write first piece into cache { - let offset = Offset(0); + let offset = PieceCacheOffset(0); let piece_index = PieceIndex::ZERO; let piece = { let mut piece = Piece::default(); @@ -54,7 +54,7 @@ fn basic() { // Write second piece into cache { - let offset = Offset(1); + let offset = PieceCacheOffset(1); let piece_index = PieceIndex::from(10); let piece = { let mut piece = Piece::default(); @@ -87,13 +87,13 @@ fn basic() { // Writing beyond capacity fails assert_matches!( - disk_piece_cache.write_piece(Offset(2), PieceIndex::ZERO, &Piece::default()), + disk_piece_cache.write_piece(PieceCacheOffset(2), PieceIndex::ZERO, &Piece::default()), Err(DiskPieceCacheError::OffsetOutsideOfRange { .. }) ); // Override works { - let offset = Offset(0); + let offset = PieceCacheOffset(0); let piece_index = PieceIndex::from(13); let piece = { let mut piece = Piece::default(); diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index c8038d66c1..cc8faf151b 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -1,6 +1,8 @@ +use crate::farm::{FarmError, PieceReader}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; +use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, StreamExt}; #[cfg(not(windows))] @@ -28,6 +30,17 @@ pub struct DiskPieceReader { read_piece_sender: mpsc::Sender, } +#[async_trait] +impl PieceReader for DiskPieceReader { + async fn read_piece( + &self, + sector_index: SectorIndex, + piece_offset: PieceOffset, + ) -> Result, FarmError> { + Ok(self.read_piece(sector_index, piece_offset).await) + } +} + impl DiskPieceReader { /// Creates new piece reader instance and background future that handles reads internally. /// @@ -75,12 +88,13 @@ impl DiskPieceReader { /// Read piece from sector by offset, `None` means input parameters are incorrect or piece /// reader was shut down pub async fn read_piece( - &mut self, + &self, sector_index: SectorIndex, piece_offset: PieceOffset, ) -> Option { let (response_sender, response_receiver) = oneshot::channel(); self.read_piece_sender + .clone() .send(ReadPieceRequest { sector_index, piece_offset, diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs index 449cd2930f..be4b3b2c8a 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs @@ -1,9 +1,13 @@ #[cfg(test)] mod tests; +use crate::farm::{FarmError, PlotCache}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; +use crate::utils::AsyncJoinOnDrop; use async_lock::RwLock as AsyncRwLock; +use async_trait::async_trait; +use parity_scale_codec::{Decode, Encode}; use parking_lot::RwLock; use std::collections::HashMap; #[cfg(not(windows))] @@ -25,13 +29,16 @@ pub enum DiskPlotCacheError { /// I/O error occurred #[error("Plot cache I/O error: {0}")] Io(#[from] io::Error), + /// Failed to spawn task for blocking thread + #[error("Failed to spawn task for blocking thread: {0}")] + TokioJoinError(#[from] tokio::task::JoinError), /// Checksum mismatch #[error("Checksum mismatch")] ChecksumMismatch, } -#[derive(Debug)] -pub(crate) enum MaybePieceStoredResult { +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub enum MaybePieceStoredResult { /// Definitely not stored No, /// Maybe has vacant slot to store @@ -59,6 +66,28 @@ pub struct DiskPlotCache { sector_size: u64, } +#[async_trait] +impl PlotCache for DiskPlotCache { + async fn is_piece_maybe_stored( + &self, + key: &RecordKey, + ) -> Result { + Ok(self.is_piece_maybe_stored(key)) + } + + async fn try_store_piece( + &self, + piece_index: PieceIndex, + piece: &Piece, + ) -> Result { + Ok(self.try_store_piece(piece_index, piece).await?) + } + + async fn read_piece(&self, key: &RecordKey) -> Result, FarmError> { + Ok(self.read_piece(key).await) + } +} + impl DiskPlotCache { pub(crate) fn new( #[cfg(not(windows))] file: &Arc, @@ -145,6 +174,7 @@ impl DiskPlotCache { }; let element_offset = u64::from(offset) * u64::from(Self::element_size()); + // Blocking read is fine because writes in farmer are very rare and very brief let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64; // Make sure offset is after anything that is already plotted @@ -158,7 +188,7 @@ impl DiskPlotCache { } /// Store piece in cache if there is free space, otherwise `Ok(false)` is returned - pub(crate) fn try_store_piece( + pub(crate) async fn try_store_piece( &self, piece_index: PieceIndex, piece: &Piece, @@ -179,7 +209,7 @@ impl DiskPlotCache { }; let element_offset = u64::from(offset) * u64::from(Self::element_size()); - let sectors_metadata = sectors_metadata.write_blocking(); + let sectors_metadata = sectors_metadata.read().await; let plotted_bytes = self.sector_size * sectors_metadata.len() as u64; // Make sure offset is after anything that is already plotted @@ -196,12 +226,21 @@ impl DiskPlotCache { }; let piece_index_bytes = piece_index.to_bytes(); - file.write_all_at(&piece_index_bytes, element_offset)?; - file.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?; - file.write_all_at( - &blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]), - element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64, - )?; + let write_fut = tokio::task::spawn_blocking({ + let piece = piece.clone(); + + move || { + file.write_all_at(&piece_index_bytes, element_offset)?; + file.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?; + file.write_all_at( + &blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]), + element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64, + ) + } + }); + + AsyncJoinOnDrop::new(write_fut, false).await??; + // Just to be safe, avoid any overlap of write locks drop(sectors_metadata); // Store newly written piece in the map @@ -216,24 +255,32 @@ impl DiskPlotCache { /// Read piece from cache. /// /// Returns `None` if not cached. - pub(crate) fn read_piece(&self, key: &RecordKey) -> Option { + pub(crate) async fn read_piece(&self, key: &RecordKey) -> Option { let offset = self.cached_pieces.read().map.get(key).copied()?; let file = self.file.upgrade()?; + let cached_pieces = Arc::clone(&self.cached_pieces); + let key = key.clone(); - let mut element = vec![0; Self::element_size() as usize]; - match Self::read_piece_internal(&file, offset, &mut element) { - Ok(Some(_piece_index)) => { - let mut piece = Piece::default(); - piece.copy_from_slice(&element[PieceIndex::SIZE..][..Piece::SIZE]); - Some(piece) - } - _ => { - // Remove entry just in case it was overridden with a sector already - self.cached_pieces.write().map.remove(key); - None + let read_fut = tokio::task::spawn_blocking(move || { + let mut element = vec![0; Self::element_size() as usize]; + match Self::read_piece_internal(&file, offset, &mut element) { + Ok(Some(_piece_index)) => { + let mut piece = Piece::default(); + piece.copy_from_slice(&element[PieceIndex::SIZE..][..Piece::SIZE]); + Some(piece) + } + _ => { + // Remove entry just in case it was overridden with a sector already + cached_pieces.write().map.remove(&key); + None + } } - } + }); + + AsyncJoinOnDrop::new(read_fut, false) + .await + .unwrap_or_default() } fn read_piece_internal( diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs index b2bce1df49..3fd76227af 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs @@ -20,8 +20,8 @@ use tempfile::tempdir; const FAKE_SECTOR_SIZE: usize = 2 * 1024 * 1024; const TARGET_SECTOR_COUNT: SectorIndex = 5; -#[test] -fn basic() { +#[tokio::test] +async fn basic() { let dummy_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata { sector_index: 0, pieces_in_sector: 0, @@ -84,7 +84,7 @@ fn basic() { ); // Initially empty - assert_matches!(disk_plot_cache.read_piece(&record_key_0), None); + assert_matches!(disk_plot_cache.read_piece(&record_key_0).await, None); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_0), MaybePieceStoredResult::Vacant @@ -97,6 +97,7 @@ fn basic() { ); assert!(!disk_plot_cache .try_store_piece(piece_index_0, &piece_0) + .await .unwrap()); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_0), @@ -115,12 +116,13 @@ fn basic() { // Successfully stores piece if not all sectors are plotted assert!(disk_plot_cache .try_store_piece(piece_index_0, &piece_0) + .await .unwrap()); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_0), MaybePieceStoredResult::Yes ); - assert!(disk_plot_cache.read_piece(&record_key_0).unwrap() == piece_0); + assert!(disk_plot_cache.read_piece(&record_key_0).await.unwrap() == piece_0); // Store two more pieces and make sure they can be read assert_matches!( @@ -129,12 +131,13 @@ fn basic() { ); assert!(disk_plot_cache .try_store_piece(piece_index_1, &piece_1) + .await .unwrap()); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_1), MaybePieceStoredResult::Yes ); - assert!(disk_plot_cache.read_piece(&record_key_1).unwrap() == piece_1); + assert!(disk_plot_cache.read_piece(&record_key_1).await.unwrap() == piece_1); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_2), @@ -142,12 +145,13 @@ fn basic() { ); assert!(disk_plot_cache .try_store_piece(piece_index_2, &piece_2) + .await .unwrap()); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_2), MaybePieceStoredResult::Yes ); - assert!(disk_plot_cache.read_piece(&record_key_2).unwrap() == piece_2); + assert!(disk_plot_cache.read_piece(&record_key_2).await.unwrap() == piece_2); // Write almost all sectors even without updating metadata, this will result in internal piece // read error due to checksum mismatch and eviction of the piece from cache @@ -160,7 +164,7 @@ fn basic() { disk_plot_cache.is_piece_maybe_stored(&record_key_2), MaybePieceStoredResult::Yes ); - assert_matches!(disk_plot_cache.read_piece(&record_key_2), None); + assert_matches!(disk_plot_cache.read_piece(&record_key_2).await, None); assert_matches!( disk_plot_cache.is_piece_maybe_stored(&record_key_2), MaybePieceStoredResult::Vacant @@ -180,7 +184,7 @@ fn basic() { ); // Closing file will render cache unusable - assert!(disk_plot_cache.read_piece(&record_key_0).unwrap() == piece_0); + assert!(disk_plot_cache.read_piece(&record_key_0).await.unwrap() == piece_0); drop(file); - assert_matches!(disk_plot_cache.read_piece(&record_key_0), None); + assert_matches!(disk_plot_cache.read_piece(&record_key_0).await, None); } diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index 12ab3a50ae..7ab1542d97 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -213,7 +213,7 @@ where .plotted_pieces .lock() .as_ref() - .and_then(|plotted_pieces| plotted_pieces.read_piece(&piece_index)); + .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index)); if let Some(read_piece_fut) = maybe_read_piece_fut { if let Some(piece) = read_piece_fut.await { diff --git a/crates/subspace-farmer/src/utils/plotted_pieces.rs b/crates/subspace-farmer/src/utils/plotted_pieces.rs index 9f843bed6e..fdcf107739 100644 --- a/crates/subspace-farmer/src/utils/plotted_pieces.rs +++ b/crates/subspace-farmer/src/utils/plotted_pieces.rs @@ -1,8 +1,9 @@ -use crate::single_disk_farm::piece_reader::DiskPieceReader; +use crate::farm::PieceReader; use rand::prelude::*; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::future::Future; +use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex}; use subspace_farmer_components::plotting::PlottedSector; use tracing::{trace, warn}; @@ -17,13 +18,13 @@ struct PieceDetails { /// Wrapper data structure for pieces plotted under multiple plots. #[derive(Debug)] pub struct PlottedPieces { - readers: Vec, + readers: Vec>, pieces: HashMap>, } impl PlottedPieces { /// Initialize with readers for each farm - pub fn new(readers: Vec) -> Self { + pub fn new(readers: Vec>) -> Self { Self { readers, pieces: HashMap::new(), @@ -41,9 +42,9 @@ impl PlottedPieces { /// longer in the plot, future will resolve with `None`. pub fn read_piece( &self, - piece_index: &PieceIndex, + piece_index: PieceIndex, ) -> Option> + 'static> { - let piece_details = match self.pieces.get(piece_index) { + let piece_details = match self.pieces.get(&piece_index) { Some(piece_details) => piece_details .choose(&mut thread_rng()) .copied() @@ -56,7 +57,7 @@ impl PlottedPieces { return None; } }; - let mut reader = match self.readers.get(usize::from(piece_details.disk_farm_index)) { + let reader = match self.readers.get(usize::from(piece_details.disk_farm_index)) { Some(reader) => reader.clone(), None => { warn!(?piece_index, ?piece_details, "Plot offset is invalid"); @@ -68,6 +69,16 @@ impl PlottedPieces { reader .read_piece(piece_details.sector_index, piece_details.piece_offset) .await + .unwrap_or_else(|error| { + warn!( + %error, + %piece_index, + disk_farm_index = piece_details.disk_farm_index, + sector_index = piece_details.sector_index, + "Failed to retrieve piece" + ); + None + }) }) } From 90972e11ab08ad946e61f86fa216356a93d5369b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 14 Mar 2024 19:05:03 +0200 Subject: [PATCH 4/6] Introduce `Farm` trait that abstracts away multiple potential farm implementations --- .../src/bin/subspace-farmer/commands/farm.rs | 47 +++++--- .../subspace-farmer/commands/farm/metrics.rs | 90 ++++----------- crates/subspace-farmer/src/farm.rs | 87 +++++++++++++- .../subspace-farmer/src/single_disk_farm.rs | 106 +++++++++++++----- 4 files changed, 215 insertions(+), 115 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index ad0cb0a231..10f1fbb4c7 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -11,7 +11,7 @@ use bytesize::ByteSize; use clap::{Parser, ValueHint}; use futures::channel::oneshot; use futures::stream::{FuturesOrdered, FuturesUnordered}; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use parking_lot::Mutex; use prometheus_client::registry::Registry; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; @@ -25,6 +25,7 @@ use std::{fmt, fs}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{PublicKey, Record, SectorIndex}; use subspace_erasure_coding::ErasureCoding; +use subspace_farmer::farm::Farm; use subspace_farmer::farmer_cache::FarmerCache; use subspace_farmer::single_disk_farm::farming::FarmingNotification; use subspace_farmer::single_disk_farm::{ @@ -776,7 +777,7 @@ where let single_disk_farms = single_disk_farms .into_iter() - .map(|(_disk_farm_index, single_disk_farm)| single_disk_farm) + .map(|(_disk_farm_index, single_disk_farm)| Box::new(single_disk_farm) as Box) .collect::>(); (single_disk_farms, plotting_delay_senders) @@ -807,12 +808,12 @@ where .replace_backing_caches( single_disk_farms .iter() - .map(|single_disk_farm| Arc::new(single_disk_farm.piece_cache()) as Arc<_>) + .map(|single_disk_farm| single_disk_farm.piece_cache()) .collect(), if plot_cache { single_disk_farms .iter() - .map(|single_disk_farm| Arc::new(single_disk_farm.plot_cache()) as Arc<_>) + .map(|single_disk_farm| single_disk_farm.plot_cache()) .collect() } else { Vec::new() @@ -824,7 +825,7 @@ where // Store piece readers so we can reference them later let piece_readers = single_disk_farms .iter() - .map(|single_disk_farm| Arc::new(single_disk_farm.piece_reader()) as Arc<_>) + .map(|single_disk_farm| single_disk_farm.piece_reader()) .collect::>(); info!("Collecting already plotted pieces (this will take some time)..."); @@ -841,10 +842,11 @@ where ) })?; - (0 as SectorIndex..) - .zip(single_disk_farm.plotted_sectors().await) - .for_each( - |(sector_index, plotted_sector_result)| match plotted_sector_result { + for (sector_index, mut plotted_sectors) in + (0 as SectorIndex..).zip(single_disk_farm.plotted_sectors().await) + { + while let Some(plotted_sector_result) = plotted_sectors.next().await { + match plotted_sector_result { Ok(plotted_sector) => { future_plotted_pieces.add_sector(disk_farm_index, &plotted_sector); } @@ -856,8 +858,9 @@ where "Failed reading plotted sector on startup, skipping" ); } - }, - ); + } + } + } } plotted_pieces.lock().replace(future_plotted_pieces); @@ -867,15 +870,25 @@ where let total_and_plotted_sectors = single_disk_farms .iter() - .map(|single_disk_farm| async { + .enumerate() + .map(|(disk_farm_index, single_disk_farm)| async move { let total_sector_count = single_disk_farm.total_sectors_count(); - let plotted_sectors_count = single_disk_farm.plotted_sectors_count().await; - - (total_sector_count, plotted_sectors_count) + let plotted_sectors_count = + single_disk_farm + .plotted_sectors_count() + .await + .map_err(|error| { + anyhow!( + "Failed to get plotted sectors count from from index \ + {disk_farm_index}: {error}" + ) + })?; + + anyhow::Ok((total_sector_count, plotted_sectors_count)) }) .collect::>() - .collect::>() - .await; + .try_collect::>() + .await?; let mut single_disk_farms_stream = single_disk_farms .into_iter() diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs index 4de52aa86a..3a5abf558e 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs @@ -7,8 +7,9 @@ use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::time::Duration; use subspace_core_primitives::SectorIndex; +use subspace_farmer::farm::FarmId; use subspace_farmer::single_disk_farm::farming::ProvingResult; -use subspace_farmer::single_disk_farm::{FarmingError, SingleDiskFarmId}; +use subspace_farmer::single_disk_farm::FarmingError; #[derive(Debug, Copy, Clone)] pub(super) enum SectorState { @@ -228,41 +229,30 @@ impl FarmerMetrics { } } - pub(super) fn observe_auditing_time( - &self, - single_disk_farm_id: &SingleDiskFarmId, - time: &Duration, - ) { + pub(super) fn observe_auditing_time(&self, farm_id: &FarmId, time: &Duration) { self.auditing_time - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) + .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } pub(super) fn observe_proving_time( &self, - single_disk_farm_id: &SingleDiskFarmId, + farm_id: &FarmId, time: &Duration, result: ProvingResult, ) { self.proving_time .get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("result".to_string(), result.to_string()), ]) .observe(time.as_secs_f64()); } - pub(super) fn note_farming_error( - &self, - single_disk_farm_id: &SingleDiskFarmId, - error: &FarmingError, - ) { + pub(super) fn note_farming_error(&self, farm_id: &FarmId, error: &FarmingError) { self.farming_errors .get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("error".to_string(), error.str_variant().to_string()), ]) .inc(); @@ -270,26 +260,22 @@ impl FarmerMetrics { pub(super) fn update_sectors_total( &self, - single_disk_farm_id: &SingleDiskFarmId, + farm_id: &FarmId, sectors: SectorIndex, state: SectorState, ) { self.sectors_total .get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("state".to_string(), state.to_string()), ]) .set(i64::from(sectors)); } - pub(super) fn update_sector_state( - &self, - single_disk_farm_id: &SingleDiskFarmId, - state: SectorState, - ) { + pub(super) fn update_sector_state(&self, farm_id: &FarmId, state: SectorState) { self.sectors_total .get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("state".to_string(), state.to_string()), ]) .inc(); @@ -302,7 +288,7 @@ impl FarmerMetrics { // in deadlock otherwise { let not_plotted_sectors = self.sectors_total.get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("state".to_string(), SectorState::NotPlotted.to_string()), ]); if not_plotted_sectors.get() > 0 { @@ -313,7 +299,7 @@ impl FarmerMetrics { } { let expired_sectors = self.sectors_total.get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("state".to_string(), SectorState::Expired.to_string()), ]); if expired_sectors.get() > 0 { @@ -325,7 +311,7 @@ impl FarmerMetrics { // Replaced about to expire sector self.sectors_total .get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("state".to_string(), SectorState::AboutToExpire.to_string()), ]) .dec(); @@ -333,7 +319,7 @@ impl FarmerMetrics { SectorState::AboutToExpire | SectorState::Expired => { self.sectors_total .get_or_create(&vec![ - ("farm_id".to_string(), single_disk_farm_id.to_string()), + ("farm_id".to_string(), farm_id.to_string()), ("state".to_string(), SectorState::Plotted.to_string()), ]) .dec(); @@ -341,55 +327,27 @@ impl FarmerMetrics { } } - pub(super) fn observe_sector_downloading_time( - &self, - single_disk_farm_id: &SingleDiskFarmId, - time: &Duration, - ) { + pub(super) fn observe_sector_downloading_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_downloading_time - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) + .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } - pub(super) fn observe_sector_encoding_time( - &self, - single_disk_farm_id: &SingleDiskFarmId, - time: &Duration, - ) { + pub(super) fn observe_sector_encoding_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_encoding_time - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) + .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } - pub(super) fn observe_sector_writing_time( - &self, - single_disk_farm_id: &SingleDiskFarmId, - time: &Duration, - ) { + pub(super) fn observe_sector_writing_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_writing_time - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) + .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } - pub(super) fn observe_sector_plotting_time( - &self, - single_disk_farm_id: &SingleDiskFarmId, - time: &Duration, - ) { + pub(super) fn observe_sector_plotting_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_plotting_time - .get_or_create(&vec![( - "farm_id".to_string(), - single_disk_farm_id.to_string(), - )]) + .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } } diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 8055fd4f55..5efa13c8cf 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -1,14 +1,24 @@ +use crate::single_disk_farm::farming::FarmingNotification; use crate::single_disk_farm::plot_cache::MaybePieceStoredResult; +use crate::single_disk_farm::SectorUpdate; use async_trait::async_trait; -use derive_more::Display; +use derive_more::{Display, From}; use futures::Stream; use parity_scale_codec::{Decode, Encode}; +use serde::{Deserialize, Serialize}; use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex}; +use subspace_farmer_components::plotting::PlottedSector; use subspace_networking::libp2p::kad::RecordKey; +use subspace_rpc_primitives::SolutionResponse; +use ulid::Ulid; /// Erased error type pub type FarmError = Box; +pub type HandlerFn = Arc; /// Offset wrapper for pieces in [`PieceCache`] #[derive(Debug, Display, Copy, Clone, Encode, Decode)] @@ -94,3 +104,78 @@ pub trait PieceReader: Send + Sync + fmt::Debug { piece_offset: PieceOffset, ) -> Result, FarmError>; } + +/// Opaque handler ID for event handlers, once dropped handler will be removed automatically +pub trait HandlerId: Send + fmt::Debug { + /// Consumes [`HandlerId`] and prevents handler from being removed automatically. + fn detach(&self); +} + +impl HandlerId for event_listener_primitives::HandlerId { + fn detach(&self) { + self.detach(); + } +} + +/// An identifier for a farm, can be used for in logs, thread names, etc. +#[derive( + Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, +)] +#[serde(untagged)] +pub enum FarmId { + /// Farm ID + Ulid(Ulid), +} + +#[allow(clippy::new_without_default)] +impl FarmId { + /// Creates new ID + pub fn new() -> Self { + Self::Ulid(Ulid::new()) + } +} + +/// Abstract farm implementation +#[async_trait(?Send)] +pub trait Farm { + /// ID of this farm + fn id(&self) -> &FarmId; + + /// Number of sectors in this farm + fn total_sectors_count(&self) -> SectorIndex; + + /// Number of sectors successfully plotted so far + async fn plotted_sectors_count(&self) -> Result; + + /// Read information about sectors plotted so far + async fn plotted_sectors( + &self, + ) -> Result> + Unpin + '_>, FarmError>; + + /// Get piece cache instance + fn piece_cache(&self) -> Arc; + + /// Get plot cache instance + fn plot_cache(&self) -> Arc; + + /// Get piece reader to read plotted pieces later + fn piece_reader(&self) -> Arc; + + /// Subscribe to sector updates + fn on_sector_update( + &self, + callback: HandlerFn<(SectorIndex, SectorUpdate)>, + ) -> Box; + + /// Subscribe to farming notifications + fn on_farming_notification( + &self, + callback: HandlerFn, + ) -> Box; + + /// Subscribe to new solution notification + fn on_solution(&self, callback: HandlerFn) -> Box; + + /// Run and wait for background threads to exit or return an error + fn run(self: Box) -> Pin> + Send>>; +} diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 6009463404..e7cbfe8d91 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -5,6 +5,7 @@ pub mod plot_cache; mod plotting; pub mod unbuffered_io_file_windows; +use crate::farm::{Farm, FarmError, FarmId, HandlerFn, PieceCache, PieceReader, PlotCache}; use crate::identity::{Identity, IdentityError}; use crate::node_client::NodeClient; use crate::reward_signing::reward_signing; @@ -29,11 +30,11 @@ use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::KNOWN_PEERS_CACHE_SIZE; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; -use derive_more::{Display, From}; +use async_trait::async_trait; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::{mpsc, oneshot}; use futures::stream::FuturesUnordered; -use futures::{select, FutureExt, StreamExt}; +use futures::{select, stream, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use rand::prelude::*; @@ -72,7 +73,6 @@ use thiserror::Error; use tokio::runtime::Handle; use tokio::sync::{broadcast, Barrier, Semaphore}; use tracing::{debug, error, info, trace, warn, Instrument, Span}; -use ulid::Ulid; // Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to // usize depending on chain parameters @@ -88,24 +88,6 @@ const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_secs(30); /// 4 seconds is proving time, hence 3 seconds for reads. const INTERNAL_BENCHMARK_READ_TIMEOUT: Duration = Duration::from_secs(3); -/// An identifier for single disk farm, can be used for in logs, thread names, etc. -#[derive( - Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From, -)] -#[serde(untagged)] -pub enum SingleDiskFarmId { - /// Farm ID - Ulid(Ulid), -} - -#[allow(clippy::new_without_default)] -impl SingleDiskFarmId { - /// Creates new ID - pub fn new() -> Self { - Self::Ulid(Ulid::new()) - } -} - /// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating processes is done #[must_use = "Lock file must be kept around or as long as farm is used"] pub struct SingleDiskFarmInfoLock { @@ -120,7 +102,7 @@ pub enum SingleDiskFarmInfo { #[serde(rename_all = "camelCase")] V0 { /// ID of the farm - id: SingleDiskFarmId, + id: FarmId, /// Genesis hash of the chain used for farm creation #[serde(with = "hex::serde")] genesis_hash: [u8; 32], @@ -137,7 +119,7 @@ impl SingleDiskFarmInfo { const FILE_NAME: &'static str = "single_disk_farm.json"; pub fn new( - id: SingleDiskFarmId, + id: FarmId, genesis_hash: [u8; 32], public_key: PublicKey, pieces_in_sector: u16, @@ -189,7 +171,7 @@ impl SingleDiskFarmInfo { } // ID of the farm - pub fn id(&self) -> &SingleDiskFarmId { + pub fn id(&self) -> &FarmId { let Self::V0 { id, .. } = self; id } @@ -346,7 +328,7 @@ pub enum SingleDiskFarmError { )] WrongChain { /// Farm ID - id: SingleDiskFarmId, + id: FarmId, /// Hex-encoded genesis hash during farm creation // TODO: Wrapper type with `Display` impl for genesis hash correct_chain: String, @@ -360,7 +342,7 @@ pub enum SingleDiskFarmError { )] IdentityMismatch { /// Farm ID - id: SingleDiskFarmId, + id: FarmId, /// Public key used during farm creation correct_public_key: PublicKey, /// Current public key @@ -373,7 +355,7 @@ pub enum SingleDiskFarmError { )] InvalidPiecesInSector { /// Farm ID - id: SingleDiskFarmId, + id: FarmId, /// Max supported pieces in sector max_supported: u16, /// Number of pieces in sector farm is initialized with @@ -558,7 +540,6 @@ pub enum BackgroundTaskError { type BackgroundTask = Pin> + Send>>; -type HandlerFn = Arc; type Handler = Bag, A>; /// Various sector updates @@ -631,6 +612,69 @@ impl Drop for SingleDiskFarm { } } +#[async_trait(?Send)] +impl Farm for SingleDiskFarm { + fn id(&self) -> &FarmId { + self.id() + } + + fn total_sectors_count(&self) -> SectorIndex { + self.total_sectors_count + } + + async fn plotted_sectors_count(&self) -> Result { + Ok(self.plotted_sectors_count().await) + } + + async fn plotted_sectors( + &self, + ) -> Result> + Unpin + '_>, FarmError> + { + Ok(Box::new(stream::iter( + self.plotted_sectors() + .await + .map(|result| result.map_err(Into::into)), + ))) + } + + fn piece_cache(&self) -> Arc { + Arc::new(self.piece_cache()) + } + + fn plot_cache(&self) -> Arc { + Arc::new(self.plot_cache()) + } + + fn piece_reader(&self) -> Arc { + Arc::new(self.piece_reader()) + } + + fn on_sector_update( + &self, + callback: HandlerFn<(SectorIndex, SectorUpdate)>, + ) -> Box { + Box::new(self.on_sector_update(callback)) + } + + fn on_farming_notification( + &self, + callback: HandlerFn, + ) -> Box { + Box::new(self.on_farming_notification(callback)) + } + + fn on_solution( + &self, + callback: HandlerFn, + ) -> Box { + Box::new(self.on_solution(callback)) + } + + fn run(self: Box) -> Pin> + Send>> { + Box::pin((*self).run()) + } +} + impl SingleDiskFarm { pub const PLOT_FILE: &'static str = "plot.bin"; pub const METADATA_FILE: &'static str = "metadata.bin"; @@ -1130,7 +1174,7 @@ impl SingleDiskFarm { } None => { let single_disk_farm_info = SingleDiskFarmInfo::new( - SingleDiskFarmId::new(), + FarmId::new(), farmer_app_info.genesis_hash, public_key, max_pieces_in_sector, @@ -1447,7 +1491,7 @@ impl SingleDiskFarm { } /// ID of this farm - pub fn id(&self) -> &SingleDiskFarmId { + pub fn id(&self) -> &FarmId { self.single_disk_farm_info.id() } @@ -1537,7 +1581,7 @@ impl SingleDiskFarm { } /// Run and wait for background threads to exit or return an error - pub async fn run(mut self) -> anyhow::Result { + pub async fn run(mut self) -> anyhow::Result { if let Some(start_sender) = self.start_sender.take() { // Do not care if anyone is listening on the other side let _ = start_sender.send(()); From 561fd105a7144794515912369154052f784c48e4 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 18 Mar 2024 14:05:22 +0200 Subject: [PATCH 5/6] Rename `single_disk_farm` to `farm` and `disk_farm_index` to `farm_index` in many places, no other code changes --- .../src/bin/subspace-farmer/commands/farm.rs | 275 ++++++++---------- .../src/bin/subspace-farmer/commands/info.rs | 6 +- .../src/bin/subspace-farmer/commands/scrub.rs | 4 +- .../bin/subspace-farmer/commands/shared.rs | 4 +- crates/subspace-farmer/src/farmer_cache.rs | 30 +- .../subspace-farmer/src/single_disk_farm.rs | 10 +- .../src/utils/plotted_pieces.rs | 14 +- 7 files changed, 157 insertions(+), 186 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 10f1fbb4c7..e5b785dd0d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -650,7 +650,7 @@ where .map(|farming_thread_pool_size| farming_thread_pool_size.get()) .unwrap_or_else(recommended_number_of_farming_threads); - let (single_disk_farms, plotting_delay_senders) = { + let (farms, plotting_delay_senders) = { let node_rpc_url = &node_rpc_url; let global_mutex = Arc::default(); let info_mutex = &AsyncMutex::new(()); @@ -661,12 +661,12 @@ where .map(|_| oneshot::channel()) .unzip::<_, _, Vec<_>, Vec<_>>(); - let mut single_disk_farms = Vec::with_capacity(disk_farms.len()); - let mut single_disk_farms_stream = disk_farms + let mut farms = Vec::with_capacity(disk_farms.len()); + let mut farms_stream = disk_farms .into_iter() .zip(plotting_delay_receivers) .enumerate() - .map(|(disk_farm_index, (disk_farm, plotting_delay_receiver))| { + .map(|(farm_index, (disk_farm, plotting_delay_receiver))| { let farmer_app_info = farmer_app_info.clone(); let kzg = kzg.clone(); let erasure_coding = erasure_coding.clone(); @@ -684,11 +684,11 @@ where let node_client = match NodeRpcClient::new(node_rpc_url).await { Ok(node_client) => node_client, Err(error) => { - return (disk_farm_index, Err(error.into())); + return (farm_index, Err(error.into())); } }; - let single_disk_farm_fut = SingleDiskFarm::new::<_, _, PosTable>( + let farm_fut = SingleDiskFarm::new::<_, _, PosTable>( SingleDiskFarmOptions { directory: disk_farm.directory.clone(), farmer_app_info, @@ -711,17 +711,17 @@ where faster_read_sector_record_chunks_mode_barrier, faster_read_sector_record_chunks_mode_concurrency, }, - disk_farm_index, + farm_index, ); - let single_disk_farm = match single_disk_farm_fut.await { - Ok(single_disk_farm) => single_disk_farm, + let farm = match farm_fut.await { + Ok(farm) => farm, Err(SingleDiskFarmError::InsufficientAllocatedSpace { min_space, allocated_space, }) => { return ( - disk_farm_index, + farm_index, Err(anyhow::anyhow!( "Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \ {} bytes to be exact)", @@ -734,15 +734,15 @@ where ); } Err(error) => { - return (disk_farm_index, Err(error.into())); + return (farm_index, Err(error.into())); } }; if !no_info { let _info_guard = info_mutex.lock().await; - let info = single_disk_farm.info(); - info!("Single disk farm {disk_farm_index}:"); + let info = farm.info(); + info!("Farm {farm_index}:"); info!(" ID: {}", info.id()); info!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash())); info!(" Public key: 0x{}", hex::encode(info.public_key())); @@ -754,33 +754,31 @@ where info!(" Directory: {}", disk_farm.directory.display()); } - (disk_farm_index, Ok(single_disk_farm)) + (farm_index, Ok(Box::new(farm) as Box)) } - .instrument(info_span!("", %disk_farm_index)) + .instrument(info_span!("", %farm_index)) }) .collect::>(); - while let Some((disk_farm_index, single_disk_farm)) = single_disk_farms_stream.next().await - { - if let Err(error) = &single_disk_farm { - let span = info_span!("", %disk_farm_index); + while let Some((farm_index, farm)) = farms_stream.next().await { + if let Err(error) = &farm { + let span = info_span!("", %farm_index); let _span_guard = span.enter(); - error!(%error, "Single disk creation failed"); + error!(%error, "Farm creation failed"); } - single_disk_farms.push((disk_farm_index, single_disk_farm?)); + farms.push((farm_index, farm?)); } // Restore order after unordered initialization - single_disk_farms - .sort_unstable_by_key(|(disk_farm_index, _single_disk_farm)| *disk_farm_index); + farms.sort_unstable_by_key(|(farm_index, _farm)| *farm_index); - let single_disk_farms = single_disk_farms + let farms = farms .into_iter() - .map(|(_disk_farm_index, single_disk_farm)| Box::new(single_disk_farm) as Box) + .map(|(_farm_index, farm)| farm) .collect::>(); - (single_disk_farms, plotting_delay_senders) + (farms, plotting_delay_senders) }; { @@ -806,15 +804,9 @@ where } farmer_cache .replace_backing_caches( - single_disk_farms - .iter() - .map(|single_disk_farm| single_disk_farm.piece_cache()) - .collect(), + farms.iter().map(|farm| farm.piece_cache()).collect(), if plot_cache { - single_disk_farms - .iter() - .map(|single_disk_farm| single_disk_farm.plot_cache()) - .collect() + farms.iter().map(|farm| farm.plot_cache()).collect() } else { Vec::new() }, @@ -823,9 +815,9 @@ where drop(farmer_cache); // Store piece readers so we can reference them later - let piece_readers = single_disk_farms + let piece_readers = farms .iter() - .map(|single_disk_farm| single_disk_farm.piece_reader()) + .map(|farm| farm.piece_reader()) .collect::>(); info!("Collecting already plotted pieces (this will take some time)..."); @@ -834,8 +826,8 @@ where { let mut future_plotted_pieces = PlottedPieces::new(piece_readers); - for (disk_farm_index, single_disk_farm) in single_disk_farms.iter().enumerate() { - let disk_farm_index = disk_farm_index.try_into().map_err(|_error| { + for (farm_index, farm) in farms.iter().enumerate() { + let farm_index = farm_index.try_into().map_err(|_error| { anyhow!( "More than 256 plots are not supported, consider running multiple farmer \ instances" @@ -843,17 +835,17 @@ where })?; for (sector_index, mut plotted_sectors) in - (0 as SectorIndex..).zip(single_disk_farm.plotted_sectors().await) + (0 as SectorIndex..).zip(farm.plotted_sectors().await) { while let Some(plotted_sector_result) = plotted_sectors.next().await { match plotted_sector_result { Ok(plotted_sector) => { - future_plotted_pieces.add_sector(disk_farm_index, &plotted_sector); + future_plotted_pieces.add_sector(farm_index, &plotted_sector); } Err(error) => { error!( %error, - %disk_farm_index, + %farm_index, %sector_index, "Failed reading plotted sector on startup, skipping" ); @@ -868,21 +860,17 @@ where info!("Finished collecting already plotted pieces successfully"); - let total_and_plotted_sectors = single_disk_farms + let total_and_plotted_sectors = farms .iter() .enumerate() - .map(|(disk_farm_index, single_disk_farm)| async move { - let total_sector_count = single_disk_farm.total_sectors_count(); - let plotted_sectors_count = - single_disk_farm - .plotted_sectors_count() - .await - .map_err(|error| { - anyhow!( - "Failed to get plotted sectors count from from index \ - {disk_farm_index}: {error}" - ) - })?; + .map(|(farm_index, farm)| async move { + let total_sector_count = farm.total_sectors_count(); + let plotted_sectors_count = farm.plotted_sectors_count().await.map_err(|error| { + anyhow!( + "Failed to get plotted sectors count from from index {farm_index}: \ + {error}" + ) + })?; anyhow::Ok((total_sector_count, plotted_sectors_count)) }) @@ -890,16 +878,12 @@ where .try_collect::>() .await?; - let mut single_disk_farms_stream = single_disk_farms - .into_iter() - .enumerate() + let mut farms_stream = (0u8..) + .zip(farms) .zip(total_and_plotted_sectors) - .map(|((disk_farm_index, single_disk_farm), sector_counts)| { - let disk_farm_index = disk_farm_index.try_into().expect( - "More than 256 plots are not supported, this is checked above already; qed", - ); + .map(|((farm_index, farm), sector_counts)| { let plotted_pieces = Arc::clone(&plotted_pieces); - let span = info_span!("", %disk_farm_index); + let span = info_span!("", %farm_index); // Collect newly plotted pieces let on_plotted_sector_callback = @@ -914,111 +898,98 @@ where .expect("Initial value was populated above; qed"); if let Some(old_plotted_sector) = &maybe_old_plotted_sector { - plotted_pieces.delete_sector(disk_farm_index, old_plotted_sector); + plotted_pieces.delete_sector(farm_index, old_plotted_sector); } - plotted_pieces.add_sector(disk_farm_index, plotted_sector); + plotted_pieces.add_sector(farm_index, plotted_sector); } }; let (total_sector_count, plotted_sectors_count) = sector_counts; farmer_metrics.update_sectors_total( - single_disk_farm.id(), + farm.id(), total_sector_count - plotted_sectors_count, SectorState::NotPlotted, ); farmer_metrics.update_sectors_total( - single_disk_farm.id(), + farm.id(), plotted_sectors_count, SectorState::Plotted, ); - single_disk_farm - .on_sector_update(Arc::new({ - let single_disk_farm_id = *single_disk_farm.id(); - let farmer_metrics = farmer_metrics.clone(); - - move |(_sector_index, sector_state)| match sector_state { - SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => { - farmer_metrics.sector_plotting.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => { - farmer_metrics.sector_downloading.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => { - farmer_metrics - .observe_sector_downloading_time(&single_disk_farm_id, time); - farmer_metrics.sector_downloaded.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => { - farmer_metrics.sector_encoding.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => { - farmer_metrics.observe_sector_encoding_time(&single_disk_farm_id, time); - farmer_metrics.sector_encoded.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Writing) => { - farmer_metrics.sector_writing.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => { - farmer_metrics.observe_sector_writing_time(&single_disk_farm_id, time); - farmer_metrics.sector_written.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Finished { - plotted_sector, - old_plotted_sector, - time, - }) => { - on_plotted_sector_callback(plotted_sector, old_plotted_sector); - farmer_metrics.observe_sector_plotting_time(&single_disk_farm_id, time); - farmer_metrics.sector_plotted.inc(); - farmer_metrics - .update_sector_state(&single_disk_farm_id, SectorState::Plotted); - } - SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { - farmer_metrics.update_sector_state( - &single_disk_farm_id, - SectorState::AboutToExpire, - ); - } - SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { - farmer_metrics - .update_sector_state(&single_disk_farm_id, SectorState::Expired); - } - SectorUpdate::Expiration(SectorExpirationDetails::Determined { - .. - }) => { - // Not interested in here - } + farm.on_sector_update(Arc::new({ + let farm_id = *farm.id(); + let farmer_metrics = farmer_metrics.clone(); + + move |(_sector_index, sector_state)| match sector_state { + SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => { + farmer_metrics.sector_plotting.inc(); } - })) - .detach(); - - single_disk_farm - .on_farming_notification(Arc::new({ - let single_disk_farm_id = *single_disk_farm.id(); - let farmer_metrics = farmer_metrics.clone(); - - move |farming_notification| match farming_notification { - FarmingNotification::Auditing(auditing_details) => { - farmer_metrics.observe_auditing_time( - &single_disk_farm_id, - &auditing_details.time, - ); - } - FarmingNotification::Proving(proving_details) => { - farmer_metrics.observe_proving_time( - &single_disk_farm_id, - &proving_details.time, - proving_details.result, - ); - } - FarmingNotification::NonFatalError(error) => { - farmer_metrics.note_farming_error(&single_disk_farm_id, error); - } + SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => { + farmer_metrics.sector_downloading.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => { + farmer_metrics.observe_sector_downloading_time(&farm_id, time); + farmer_metrics.sector_downloaded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => { + farmer_metrics.sector_encoding.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => { + farmer_metrics.observe_sector_encoding_time(&farm_id, time); + farmer_metrics.sector_encoded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Writing) => { + farmer_metrics.sector_writing.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => { + farmer_metrics.observe_sector_writing_time(&farm_id, time); + farmer_metrics.sector_written.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Finished { + plotted_sector, + old_plotted_sector, + time, + }) => { + on_plotted_sector_callback(plotted_sector, old_plotted_sector); + farmer_metrics.observe_sector_plotting_time(&farm_id, time); + farmer_metrics.sector_plotted.inc(); + farmer_metrics.update_sector_state(&farm_id, SectorState::Plotted); } - })) - .detach(); + SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { + farmer_metrics.update_sector_state(&farm_id, SectorState::AboutToExpire); + } + SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { + farmer_metrics.update_sector_state(&farm_id, SectorState::Expired); + } + SectorUpdate::Expiration(SectorExpirationDetails::Determined { .. }) => { + // Not interested in here + } + } + })) + .detach(); + + farm.on_farming_notification(Arc::new({ + let farm_id = *farm.id(); + let farmer_metrics = farmer_metrics.clone(); + + move |farming_notification| match farming_notification { + FarmingNotification::Auditing(auditing_details) => { + farmer_metrics.observe_auditing_time(&farm_id, &auditing_details.time); + } + FarmingNotification::Proving(proving_details) => { + farmer_metrics.observe_proving_time( + &farm_id, + &proving_details.time, + proving_details.result, + ); + } + FarmingNotification::NonFatalError(error) => { + farmer_metrics.note_farming_error(&farm_id, error); + } + } + })) + .detach(); - single_disk_farm.run() + farm.run() }) .collect::>(); @@ -1028,7 +999,7 @@ where let farm_fut = run_future_in_dedicated_thread( move || async move { - while let Some(result) = single_disk_farms_stream.next().await { + while let Some(result) = farms_stream.next().await { let id = result?; info!(%id, "Farm exited successfully"); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/info.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/info.rs index 149bd453ed..2eb3d4b390 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/info.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/info.rs @@ -2,11 +2,11 @@ use crate::commands::shared::print_disk_farm_info; use std::path::PathBuf; pub(crate) fn info(disk_farms: Vec) { - for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() { - if disk_farm_index > 0 { + for (farm_index, disk_farm) in disk_farms.into_iter().enumerate() { + if farm_index > 0 { println!(); } - print_disk_farm_info(disk_farm, disk_farm_index); + print_disk_farm_info(disk_farm, farm_index); } } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/scrub.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/scrub.rs index 8af94c906c..186e8851de 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/scrub.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/scrub.rs @@ -7,8 +7,8 @@ pub(crate) fn scrub(disk_farms: &[PathBuf], disable_farm_locking: bool) { disk_farms .into_par_iter() .enumerate() - .for_each(|(disk_farm_index, directory)| { - let span = info_span!("", %disk_farm_index); + .for_each(|(farm_index, directory)| { + let span = info_span!("", %farm_index); let _span_guard = span.enter(); info!( path = %directory.display(), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs index c135d394d4..422c13099d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs @@ -1,8 +1,8 @@ use std::path::PathBuf; use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary}; -pub(crate) fn print_disk_farm_info(directory: PathBuf, disk_farm_index: usize) { - println!("Single disk farm {disk_farm_index}:"); +pub(crate) fn print_disk_farm_info(directory: PathBuf, farm_index: usize) { + println!("Single disk farm {farm_index}:"); match SingleDiskFarm::collect_summary(directory) { SingleDiskFarmSummary::Found { info, directory } => { println!(" ID: {}", info.id()); diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 60e2799bc6..2fa2ce0d0f 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -168,7 +168,7 @@ where WorkerCommand::ForgetKey { key } => { let mut caches = self.caches.write().await; - for (disk_farm_index, cache) in caches.iter_mut().enumerate() { + for (farm_index, cache) in caches.iter_mut().enumerate() { let Some(offset) = cache.stored_pieces.remove(&key) else { // Not this disk farm continue; @@ -182,7 +182,7 @@ where } Ok(None) => { warn!( - %disk_farm_index, + %farm_index, %offset, "Piece index out of range, this is likely an implementation bug, \ not freeing heap element" @@ -191,7 +191,7 @@ where Err(error) => { error!( %error, - %disk_farm_index, + %farm_index, ?key, %offset, "Error while reading piece from cache, might be a disk corruption" @@ -435,7 +435,7 @@ where // populated first sorted_caches.sort_by_key(|(_, cache)| cache.stored_pieces.len()); if !stream::iter(sorted_caches) - .any(|(disk_farm_index, cache)| async move { + .any(|(farm_index, cache)| async move { let Some(offset) = cache.free_offsets.pop_front() else { return false; }; @@ -444,7 +444,7 @@ where { error!( %error, - %disk_farm_index, + %farm_index, %piece_index, %offset, "Failed to write piece into cache" @@ -674,7 +674,7 @@ where match worker_state.heap.insert(heap_key) { // Entry is already occupied, we need to find and replace old piece with new one Some(KeyWrapper(old_piece_index)) => { - for (disk_farm_index, cache) in caches.iter_mut().enumerate() { + for (farm_index, cache) in caches.iter_mut().enumerate() { let old_record_key = RecordKey::from(old_piece_index.to_multihash()); let Some(offset) = cache.stored_pieces.remove(&old_record_key) else { // Not this disk farm @@ -685,14 +685,14 @@ where { error!( %error, - %disk_farm_index, + %farm_index, %piece_index, %offset, "Failed to write piece into cache" ); } else { trace!( - %disk_farm_index, + %farm_index, %old_piece_index, %piece_index, %offset, @@ -716,7 +716,7 @@ where // Sort piece caches by number of stored pieces to fill those that are less // populated first sorted_caches.sort_by_key(|(_, cache)| cache.stored_pieces.len()); - for (disk_farm_index, cache) in sorted_caches { + for (farm_index, cache) in sorted_caches { let Some(offset) = cache.free_offsets.pop_front() else { // Not this disk farm continue; @@ -726,14 +726,14 @@ where { error!( %error, - %disk_farm_index, + %farm_index, %piece_index, %offset, "Failed to write piece into cache" ); } else { trace!( - %disk_farm_index, + %farm_index, %piece_index, %offset, "Successfully stored piece in cache" @@ -802,7 +802,7 @@ impl FarmerCache { /// Get piece from cache pub async fn get_piece(&self, key: RecordKey) -> Option { - for (disk_farm_index, cache) in self.piece_caches.read().await.iter().enumerate() { + for (farm_index, cache) in self.piece_caches.read().await.iter().enumerate() { let Some(&offset) = cache.stored_pieces.get(&key) else { continue; }; @@ -813,7 +813,7 @@ impl FarmerCache { Err(error) => { error!( %error, - %disk_farm_index, + %farm_index, ?key, %offset, "Error while reading piece from cache, might be a disk corruption" @@ -853,7 +853,7 @@ impl FarmerCache { } let mut should_store = false; - for (disk_farm_index, cache) in self.plot_caches.read().await.iter().enumerate() { + for (farm_index, cache) in self.plot_caches.read().await.iter().enumerate() { match cache.is_piece_maybe_stored(&key).await { Ok(MaybePieceStoredResult::No) => { // Try another one if there is any @@ -868,7 +868,7 @@ impl FarmerCache { } Err(error) => { warn!( - %disk_farm_index, + %farm_index, %piece_index, %error, "Failed to check piece stored in cache" diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index e7cbfe8d91..39a15eb733 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -683,7 +683,7 @@ impl SingleDiskFarm { /// Create new single disk farm instance pub async fn new( options: SingleDiskFarmOptions, - disk_farm_index: usize, + farm_index: usize, ) -> Result where NC: NodeClient, @@ -772,7 +772,7 @@ impl SingleDiskFarm { }; let farming_thread_pool = ThreadPoolBuilder::new() - .thread_name(move |thread_index| format!("farming-{disk_farm_index}.{thread_index}")) + .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}")) .num_threads(farming_thread_pool_size) .spawn_handler(tokio_rayon_spawn_handler()) .build() @@ -909,7 +909,7 @@ impl SingleDiskFarm { // Panic will already be printed by now plotting_join_handle.await.map_err(|_error| { BackgroundTaskError::BackgroundTaskPanicked { - task: format!("plotting-{disk_farm_index}"), + task: format!("plotting-{farm_index}"), } }) })); @@ -1014,7 +1014,7 @@ impl SingleDiskFarm { // Panic will already be printed by now farming_join_handle.await.map_err(|_error| { BackgroundTaskError::BackgroundTaskPanicked { - task: format!("farming-{disk_farm_index}"), + task: format!("farming-{farm_index}"), } }) })); @@ -1053,7 +1053,7 @@ impl SingleDiskFarm { // Panic will already be printed by now reading_join_handle.await.map_err(|_error| { BackgroundTaskError::BackgroundTaskPanicked { - task: format!("reading-{disk_farm_index}"), + task: format!("reading-{farm_index}"), } }) })); diff --git a/crates/subspace-farmer/src/utils/plotted_pieces.rs b/crates/subspace-farmer/src/utils/plotted_pieces.rs index fdcf107739..c88fab264c 100644 --- a/crates/subspace-farmer/src/utils/plotted_pieces.rs +++ b/crates/subspace-farmer/src/utils/plotted_pieces.rs @@ -10,7 +10,7 @@ use tracing::{trace, warn}; #[derive(Debug, Copy, Clone, Eq, PartialEq)] struct PieceDetails { - disk_farm_index: u8, + farm_index: u8, sector_index: SectorIndex, piece_offset: PieceOffset, } @@ -57,7 +57,7 @@ impl PlottedPieces { return None; } }; - let reader = match self.readers.get(usize::from(piece_details.disk_farm_index)) { + let reader = match self.readers.get(usize::from(piece_details.farm_index)) { Some(reader) => reader.clone(), None => { warn!(?piece_index, ?piece_details, "Plot offset is invalid"); @@ -73,7 +73,7 @@ impl PlottedPieces { warn!( %error, %piece_index, - disk_farm_index = piece_details.disk_farm_index, + farm_index = piece_details.farm_index, sector_index = piece_details.sector_index, "Failed to retrieve piece" ); @@ -83,12 +83,12 @@ impl PlottedPieces { } /// Add new sector to collect plotted pieces - pub fn add_sector(&mut self, disk_farm_index: u8, plotted_sector: &PlottedSector) { + pub fn add_sector(&mut self, farm_index: u8, plotted_sector: &PlottedSector) { for (piece_offset, &piece_index) in (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter()) { let piece_details = PieceDetails { - disk_farm_index, + farm_index, sector_index: plotted_sector.sector_index, piece_offset, }; @@ -105,12 +105,12 @@ impl PlottedPieces { } /// Add old sector from plotted pieces (happens on replotting) - pub fn delete_sector(&mut self, disk_farm_index: u8, plotted_sector: &PlottedSector) { + pub fn delete_sector(&mut self, farm_index: u8, plotted_sector: &PlottedSector) { for (piece_offset, &piece_index) in (PieceOffset::ZERO..).zip(plotted_sector.piece_indexes.iter()) { let searching_piece_details = PieceDetails { - disk_farm_index, + farm_index, sector_index: plotted_sector.sector_index, piece_offset, }; From 7d050c3cf52ab8d521af0719c715f29c031a3398 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 18 Mar 2024 14:17:07 +0200 Subject: [PATCH 6/6] Move some generic data structures from under `single_disk_farm` into `farm` module, no other code changes --- .../src/bin/subspace-farmer/commands/farm.rs | 8 +- .../subspace-farmer/commands/farm/metrics.rs | 4 +- crates/subspace-farmer/src/farm.rs | 225 +++++++++++++++++- crates/subspace-farmer/src/farmer_cache.rs | 3 +- .../subspace-farmer/src/single_disk_farm.rs | 23 +- .../src/single_disk_farm/farming.rs | 152 +----------- .../src/single_disk_farm/plot_cache.rs | 13 +- .../src/single_disk_farm/plot_cache/tests.rs | 3 +- .../src/single_disk_farm/plotting.rs | 54 +---- 9 files changed, 242 insertions(+), 243 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index e5b785dd0d..f8d7a78af2 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -25,12 +25,12 @@ use std::{fmt, fs}; use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{PublicKey, Record, SectorIndex}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer::farm::Farm; +use subspace_farmer::farm::{ + Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, +}; use subspace_farmer::farmer_cache::FarmerCache; -use subspace_farmer::single_disk_farm::farming::FarmingNotification; use subspace_farmer::single_disk_farm::{ - SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, SingleDiskFarm, - SingleDiskFarmError, SingleDiskFarmOptions, + SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, }; use subspace_farmer::utils::farmer_piece_getter::{DsnCacheRetryPolicy, FarmerPieceGetter}; use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator; diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs index 3a5abf558e..888f69716b 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/metrics.rs @@ -7,9 +7,7 @@ use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::time::Duration; use subspace_core_primitives::SectorIndex; -use subspace_farmer::farm::FarmId; -use subspace_farmer::single_disk_farm::farming::ProvingResult; -use subspace_farmer::single_disk_farm::FarmingError; +use subspace_farmer::farm::{FarmId, FarmingError, ProvingResult}; #[derive(Debug, Copy, Clone)] pub(super) enum SectorState { diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 5efa13c8cf..16e59237e1 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -1,19 +1,21 @@ -use crate::single_disk_farm::farming::FarmingNotification; -use crate::single_disk_farm::plot_cache::MaybePieceStoredResult; -use crate::single_disk_farm::SectorUpdate; +use crate::node_client; use async_trait::async_trait; use derive_more::{Display, From}; use futures::Stream; -use parity_scale_codec::{Decode, Encode}; +use parity_scale_codec::{Decode, Encode, Input, Output}; use serde::{Deserialize, Serialize}; -use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex}; +use std::time::Duration; +use std::{fmt, io}; +use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex, SegmentIndex}; +use subspace_farmer_components::auditing::AuditingError; use subspace_farmer_components::plotting::PlottedSector; +use subspace_farmer_components::proving::ProvingError; use subspace_networking::libp2p::kad::RecordKey; use subspace_rpc_primitives::SolutionResponse; +use thiserror::Error; use ulid::Ulid; /// Erased error type @@ -70,6 +72,16 @@ pub trait PieceCache: Send + Sync + fmt::Debug { async fn read_piece(&self, offset: PieceCacheOffset) -> Result, FarmError>; } +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub enum MaybePieceStoredResult { + /// Definitely not stored + No, + /// Maybe has vacant slot to store + Vacant, + /// Maybe still stored + Yes, +} + /// Abstract plot cache implementation #[async_trait] pub trait PlotCache: Send + Sync + fmt::Debug { @@ -93,6 +105,207 @@ pub trait PlotCache: Send + Sync + fmt::Debug { async fn read_piece(&self, key: &RecordKey) -> Result, FarmError>; } +/// Auditing details +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub struct AuditingDetails { + /// Number of sectors that were audited + pub sectors_count: SectorIndex, + /// Audit duration + pub time: Duration, +} + +/// Result of the proving +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub enum ProvingResult { + /// Proved successfully and accepted by the node + Success, + /// Proving took too long + Timeout, + /// Managed to prove within time limit, but node rejected solution, likely due to timeout on its + /// end + Rejected, +} + +impl fmt::Display for ProvingResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + ProvingResult::Success => "Success", + ProvingResult::Timeout => "Timeout", + ProvingResult::Rejected => "Rejected", + }) + } +} + +/// Proving details +#[derive(Debug, Copy, Clone, Encode, Decode)] +pub struct ProvingDetails { + /// Whether proving ended up being successful + pub result: ProvingResult, + /// Audit duration + pub time: Duration, +} + +/// Special decoded farming error +#[derive(Debug, Encode, Decode)] +pub struct DecodedFarmingError { + /// String representation of an error + error: String, + /// Whether error is fatal + is_fatal: bool, +} + +impl fmt::Display for DecodedFarmingError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.error.fmt(f) + } +} + +/// Errors that happen during farming +#[derive(Debug, Error)] +pub enum FarmingError { + /// Failed to subscribe to slot info notifications + #[error("Failed to subscribe to slot info notifications: {error}")] + FailedToSubscribeSlotInfo { + /// Lower-level error + error: node_client::Error, + }, + /// Failed to retrieve farmer info + #[error("Failed to retrieve farmer info: {error}")] + FailedToGetFarmerInfo { + /// Lower-level error + error: node_client::Error, + }, + /// Slot info notification stream ended + #[error("Slot info notification stream ended")] + SlotNotificationStreamEnded, + /// Low-level auditing error + #[error("Low-level auditing error: {0}")] + LowLevelAuditing(#[from] AuditingError), + /// Low-level proving error + #[error("Low-level proving error: {0}")] + LowLevelProving(#[from] ProvingError), + /// I/O error occurred + #[error("Farming I/O error: {0}")] + Io(#[from] io::Error), + /// Decoded farming error + #[error("Decoded farming error {0}")] + Decoded(DecodedFarmingError), +} + +impl Encode for FarmingError { + fn encode_to(&self, dest: &mut O) { + let error = DecodedFarmingError { + error: self.to_string(), + is_fatal: self.is_fatal(), + }; + + error.encode_to(dest) + } +} + +impl Decode for FarmingError { + fn decode(input: &mut I) -> Result { + DecodedFarmingError::decode(input).map(FarmingError::Decoded) + } +} + +impl FarmingError { + /// String variant of the error, primarily for monitoring purposes + pub fn str_variant(&self) -> &str { + match self { + FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo", + FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo", + FarmingError::LowLevelAuditing(_) => "LowLevelAuditing", + FarmingError::LowLevelProving(_) => "LowLevelProving", + FarmingError::Io(_) => "Io", + FarmingError::Decoded(_) => "Decoded", + FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded", + } + } + + /// Whether this error is fatal and makes farm unusable + pub fn is_fatal(&self) -> bool { + match self { + FarmingError::FailedToSubscribeSlotInfo { .. } => true, + FarmingError::FailedToGetFarmerInfo { .. } => true, + FarmingError::LowLevelAuditing(_) => true, + FarmingError::LowLevelProving(error) => error.is_fatal(), + FarmingError::Io(_) => true, + FarmingError::Decoded(error) => error.is_fatal, + FarmingError::SlotNotificationStreamEnded => true, + } + } +} + +/// Various farming notifications +#[derive(Debug, Clone, Encode, Decode)] +pub enum FarmingNotification { + /// Auditing + Auditing(AuditingDetails), + /// Proving + Proving(ProvingDetails), + /// Non-fatal farming error + NonFatalError(Arc), +} + +/// Details about sector currently being plotted +#[derive(Debug, Clone, Encode, Decode)] +pub enum SectorPlottingDetails { + /// Starting plotting of a sector + Starting { + /// Progress so far in % (not including this sector) + progress: f32, + /// Whether sector is being replotted + replotting: bool, + /// Whether this is the last sector queued so far + last_queued: bool, + }, + /// Downloading sector pieces + Downloading, + /// Downloaded sector pieces + Downloaded(Duration), + /// Encoding sector pieces + Encoding, + /// Encoded sector pieces + Encoded(Duration), + /// Writing sector + Writing, + /// Written sector + Written(Duration), + /// Finished plotting + Finished { + /// Information about plotted sector + plotted_sector: PlottedSector, + /// Information about old plotted sector that was replaced + old_plotted_sector: Option, + /// How much time it took to plot a sector + time: Duration, + }, +} + +/// Details about sector expiration +#[derive(Debug, Clone, Encode, Decode)] +pub enum SectorExpirationDetails { + /// Sector expiration became known + Determined { + /// Segment index at which sector expires + expires_at: SegmentIndex, + }, + /// Sector will expire at the next segment index and should be replotted + AboutToExpire, + /// Sector already expired + Expired, +} + +/// Various sector updates +#[derive(Debug, Clone, Encode, Decode)] +pub enum SectorUpdate { + /// Sector is being plotted + Plotting(SectorPlottingDetails), + /// Sector expiration information updated + Expiration(SectorExpirationDetails), +} + /// Abstract piece reader implementation #[async_trait] pub trait PieceReader: Send + Sync + fmt::Debug { diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 2fa2ce0d0f..0a48970231 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -1,9 +1,8 @@ #[cfg(test)] mod tests; -use crate::farm::{PieceCache, PieceCacheOffset, PlotCache}; +use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheOffset, PlotCache}; use crate::node_client::NodeClient; -use crate::single_disk_farm::plot_cache::MaybePieceStoredResult; use crate::utils::run_future_in_dedicated_thread; use async_lock::RwLock as AsyncRwLock; use event_listener_primitives::{Bag, HandlerId}; diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 39a15eb733..58bfb172ed 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -5,24 +5,24 @@ pub mod plot_cache; mod plotting; pub mod unbuffered_io_file_windows; -use crate::farm::{Farm, FarmError, FarmId, HandlerFn, PieceCache, PieceReader, PlotCache}; +use crate::farm::{ + Farm, FarmError, FarmId, HandlerFn, PieceCache, PieceReader, PlotCache, SectorUpdate, +}; +pub use crate::farm::{FarmingError, FarmingNotification}; use crate::identity::{Identity, IdentityError}; use crate::node_client::NodeClient; use crate::reward_signing::reward_signing; use crate::single_disk_farm::farming::rayon_files::RayonFiles; -pub use crate::single_disk_farm::farming::FarmingError; use crate::single_disk_farm::farming::{ - farming, slot_notification_forwarder, FarmingNotification, FarmingOptions, PlotAudit, + farming, slot_notification_forwarder, FarmingOptions, PlotAudit, }; use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError}; use crate::single_disk_farm::piece_reader::DiskPieceReader; use crate::single_disk_farm::plot_cache::DiskPlotCache; +pub use crate::single_disk_farm::plotting::PlottingError; use crate::single_disk_farm::plotting::{ plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, }; -pub use crate::single_disk_farm::plotting::{ - PlottingError, SectorExpirationDetails, SectorPlottingDetails, -}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; @@ -153,7 +153,7 @@ impl SingleDiskFarmInfo { .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error)) } - /// Store `SingleDiskFarm` info to path so it can be loaded again upon restart. + /// Store `SingleDiskFarm` info to path, so it can be loaded again upon restart. pub fn store_to(&self, directory: &Path) -> io::Result<()> { fs::write( directory.join(Self::FILE_NAME), @@ -542,15 +542,6 @@ type BackgroundTask = Pin = Bag, A>; -/// Various sector updates -#[derive(Debug, Clone, Encode, Decode)] -pub enum SectorUpdate { - /// Sector is being plotted - Plotting(SectorPlottingDetails), - /// Sector expiration information updated - Expiration(SectorExpirationDetails), -} - #[derive(Default, Debug)] struct Handlers { sector_update: Handler<(SectorIndex, SectorUpdate)>, diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index d72b1981a1..6d26e7e2a6 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -1,17 +1,17 @@ pub mod rayon_files; -use crate::node_client; +use crate::farm::{ + AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult, +}; use crate::node_client::NodeClient; use crate::single_disk_farm::Handlers; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use futures::channel::mpsc; use futures::StreamExt; -use parity_scale_codec::{Decode, Encode, Error, Input, Output}; use parking_lot::Mutex; use rayon::ThreadPool; use std::sync::Arc; -use std::time::{Duration, Instant}; -use std::{fmt, io}; +use std::time::Instant; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{PosSeed, PublicKey, SectorIndex, Solution, SolutionRange}; use subspace_erasure_coding::ErasureCoding; @@ -22,152 +22,8 @@ use subspace_farmer_components::sector::SectorMetadataChecksummed; use subspace_farmer_components::ReadAtSync; use subspace_proof_of_space::{Table, TableGenerator}; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; -use thiserror::Error; use tracing::{debug, error, info, trace, warn, Span}; -/// Auditing details -#[derive(Debug, Copy, Clone, Encode, Decode)] -pub struct AuditingDetails { - /// Number of sectors that were audited - pub sectors_count: SectorIndex, - /// Audit duration - pub time: Duration, -} - -/// Result of the proving -#[derive(Debug, Copy, Clone, Encode, Decode)] -pub enum ProvingResult { - /// Proved successfully and accepted by the node - Success, - /// Proving took too long - Timeout, - /// Managed to prove within time limit, but node rejected solution, likely due to timeout on its - /// end - Rejected, -} - -impl fmt::Display for ProvingResult { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(match self { - ProvingResult::Success => "Success", - ProvingResult::Timeout => "Timeout", - ProvingResult::Rejected => "Rejected", - }) - } -} - -/// Proving details -#[derive(Debug, Copy, Clone, Encode, Decode)] -pub struct ProvingDetails { - /// Whether proving ended up being successful - pub result: ProvingResult, - /// Audit duration - pub time: Duration, -} - -/// Various farming notifications -#[derive(Debug, Clone, Encode, Decode)] -pub enum FarmingNotification { - /// Auditing - Auditing(AuditingDetails), - /// Proving - Proving(ProvingDetails), - /// Non-fatal farming error - NonFatalError(Arc), -} - -/// Special decoded farming error -#[derive(Debug, Encode, Decode)] -pub struct DecodedFarmingError { - /// String representation of an error - error: String, - /// Whether error is fatal - is_fatal: bool, -} - -impl fmt::Display for DecodedFarmingError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.error.fmt(f) - } -} - -/// Errors that happen during farming -#[derive(Debug, Error)] -pub enum FarmingError { - /// Failed to subscribe to slot info notifications - #[error("Failed to subscribe to slot info notifications: {error}")] - FailedToSubscribeSlotInfo { - /// Lower-level error - error: node_client::Error, - }, - /// Failed to retrieve farmer info - #[error("Failed to retrieve farmer info: {error}")] - FailedToGetFarmerInfo { - /// Lower-level error - error: node_client::Error, - }, - /// Slot info notification stream ended - #[error("Slot info notification stream ended")] - SlotNotificationStreamEnded, - /// Low-level auditing error - #[error("Low-level auditing error: {0}")] - LowLevelAuditing(#[from] AuditingError), - /// Low-level proving error - #[error("Low-level proving error: {0}")] - LowLevelProving(#[from] ProvingError), - /// I/O error occurred - #[error("Farming I/O error: {0}")] - Io(#[from] io::Error), - /// Decoded farming error - #[error("Decoded farming error {0}")] - Decoded(DecodedFarmingError), -} - -impl Encode for FarmingError { - fn encode_to(&self, dest: &mut O) { - let error = DecodedFarmingError { - error: self.to_string(), - is_fatal: self.is_fatal(), - }; - - error.encode_to(dest) - } -} - -impl Decode for FarmingError { - fn decode(input: &mut I) -> Result { - DecodedFarmingError::decode(input).map(FarmingError::Decoded) - } -} - -impl FarmingError { - /// String variant of the error, primarily for monitoring purposes - pub fn str_variant(&self) -> &str { - match self { - FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo", - FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo", - FarmingError::LowLevelAuditing(_) => "LowLevelAuditing", - FarmingError::LowLevelProving(_) => "LowLevelProving", - FarmingError::Io(_) => "Io", - FarmingError::Decoded(_) => "Decoded", - FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded", - } - } - - /// Whether this error is fatal and makes farm unusable - pub fn is_fatal(&self) -> bool { - match self { - FarmingError::FailedToSubscribeSlotInfo { .. } => true, - FarmingError::FailedToGetFarmerInfo { .. } => true, - FarmingError::LowLevelAuditing(_) => true, - FarmingError::LowLevelProving(error) => error.is_fatal(), - FarmingError::Io(_) => true, - FarmingError::Decoded(error) => error.is_fatal, - FarmingError::SlotNotificationStreamEnded => true, - } - } -} - pub(super) async fn slot_notification_forwarder( node_client: &NC, mut slot_info_forwarder_sender: mpsc::Sender, diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs index be4b3b2c8a..a3dc7c5e74 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs @@ -1,13 +1,12 @@ #[cfg(test)] mod tests; -use crate::farm::{FarmError, PlotCache}; +use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use crate::utils::AsyncJoinOnDrop; use async_lock::RwLock as AsyncRwLock; use async_trait::async_trait; -use parity_scale_codec::{Decode, Encode}; use parking_lot::RwLock; use std::collections::HashMap; #[cfg(not(windows))] @@ -37,16 +36,6 @@ pub enum DiskPlotCacheError { ChecksumMismatch, } -#[derive(Debug, Copy, Clone, Encode, Decode)] -pub enum MaybePieceStoredResult { - /// Definitely not stored - No, - /// Maybe has vacant slot to store - Vacant, - /// Maybe still stored - Yes, -} - #[derive(Debug)] struct CachedPieces { /// Map of piece index into offset diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs index 3fd76227af..0e58176f3c 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs @@ -1,4 +1,5 @@ -use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult}; +use crate::farm::MaybePieceStoredResult; +use crate::single_disk_farm::plot_cache::DiskPlotCache; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 82a5c3aee0..c29ed05dbb 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -1,7 +1,8 @@ +use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate}; #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; use crate::single_disk_farm::{ - BackgroundTaskError, Handlers, PlotMetadataHeader, SectorUpdate, RESERVED_PLOT_METADATA, + BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA, }; use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::AsyncJoinOnDrop; @@ -11,7 +12,7 @@ use atomic::Atomic; use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, SinkExt, StreamExt}; use lru::LruCache; -use parity_scale_codec::{Decode, Encode}; +use parity_scale_codec::Encode; use std::collections::HashMap; #[cfg(not(windows))] use std::fs::File; @@ -44,55 +45,6 @@ const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); /// Size of the cache of archived segments for the purposes of faster sector expiration checks. const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expect("Not zero; qed"); -/// Details about sector currently being plotted -#[derive(Debug, Clone, Encode, Decode)] -pub enum SectorPlottingDetails { - /// Starting plotting of a sector - Starting { - /// Progress so far in % (not including this sector) - progress: f32, - /// Whether sector is being replotted - replotting: bool, - /// Whether this is the last sector queued so far - last_queued: bool, - }, - /// Downloading sector pieces - Downloading, - /// Downloaded sector pieces - Downloaded(Duration), - /// Encoding sector pieces - Encoding, - /// Encoded sector pieces - Encoded(Duration), - /// Writing sector - Writing, - /// Written sector - Written(Duration), - /// Finished plotting - Finished { - /// Information about plotted sector - plotted_sector: PlottedSector, - /// Information about old plotted sector that was replaced - old_plotted_sector: Option, - /// How much time it took to plot a sector - time: Duration, - }, -} - -/// Details about sector expiration -#[derive(Debug, Clone, Encode, Decode)] -pub enum SectorExpirationDetails { - /// Sector expiration became known - Determined { - /// Segment index at which sector expires - expires_at: SegmentIndex, - }, - /// Sector will expire at the next segment index and should be replotted - AboutToExpire, - /// Sector already expired - Expired, -} - pub(super) struct SectorToPlot { sector_index: SectorIndex, /// Progress so far in % (not including this sector)