diff --git a/Cargo.lock b/Cargo.lock index d4b79cf3f1..55d107e6a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10334,6 +10334,7 @@ name = "subspace-farmer-components" version = "0.1.0" dependencies = [ "async-trait", + "backoff", "criterion", "fs2", "futures", diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index d2a8c8a39e..950fc9a36c 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -17,6 +17,7 @@ bench = false [dependencies] async-trait = "0.1.68" +backoff = { version = "0.4.0", features = ["futures", "tokio"] } fs2 = "0.4.3" futures = "0.3.28" libc = "0.2.139" diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 9a8f8142a6..ecabc9feff 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -2,12 +2,16 @@ use crate::piece_caching::PieceMemoryCache; use crate::segment_reconstruction::recover_missing_piece; use crate::{FarmerProtocolInfo, SectorMetadata}; use async_trait::async_trait; +use backoff::future::retry; +use backoff::{Error as BackoffError, ExponentialBackoff}; use futures::stream::FuturesOrdered; use futures::StreamExt; use parity_scale_codec::Encode; +use parking_lot::Mutex; use std::error::Error; use std::io; use std::sync::Arc; +use std::time::Duration; use subspace_core_primitives::crypto::kzg::{Commitment, Kzg}; use subspace_core_primitives::crypto::{Scalar, ScalarLegacy}; use subspace_core_primitives::sector_codec::{SectorCodec, SectorCodecError}; @@ -15,7 +19,17 @@ use subspace_core_primitives::{ LegacySectorId, Piece, PieceIndex, PieceIndexHash, PublicKey, SectorIndex, PLOT_SECTOR_SIZE, }; use thiserror::Error; -use tracing::info; +use tracing::{info, warn}; + +fn default_backoff() -> ExponentialBackoff { + ExponentialBackoff { + initial_interval: Duration::from_secs(15), + max_interval: Duration::from_secs(10 * 60), + // Try until we get a valid piece + max_elapsed_time: None, + ..ExponentialBackoff::default() + } +} /// Defines retry policy on error during piece acquiring. #[derive(PartialEq, Eq, Clone, Debug, Copy)] @@ -139,20 +153,36 @@ where }) .collect(); - let mut in_memory_sector_scalars = - Vec::with_capacity(PLOT_SECTOR_SIZE as usize / Scalar::FULL_BYTES); + let in_memory_sector_scalars = Mutex::new(Vec::with_capacity( + PLOT_SECTOR_SIZE as usize / Scalar::FULL_BYTES, + )); - plot_pieces_in_batches_non_blocking( - &mut in_memory_sector_scalars, - sector_index, - piece_getter, - piece_getter_retry_policy, - kzg, - &piece_indexes, - piece_memory_cache, - ) + retry(default_backoff(), || async { + let mut in_memory_sector_scalars = in_memory_sector_scalars.lock(); + in_memory_sector_scalars.clear(); + + if let Err(error) = download_pieces_in_batches_non_blocking( + &mut in_memory_sector_scalars, + sector_index, + piece_getter, + piece_getter_retry_policy, + kzg, + &piece_indexes, + piece_memory_cache.clone(), + ) + .await + { + warn!(%error, "Sector plotting attempt failed, will retry later"); + + return Err(BackoffError::transient(error)); + } + + Ok(()) + }) .await?; + let mut in_memory_sector_scalars = in_memory_sector_scalars.into_inner(); + sector_codec .encode(&mut in_memory_sector_scalars) .map_err(PlottingError::FailedToEncodeSector)?; @@ -213,7 +243,7 @@ where }) } -async fn plot_pieces_in_batches_non_blocking( +async fn download_pieces_in_batches_non_blocking( in_memory_sector_scalars: &mut Vec, sector_index: u64, piece_getter: &PG, diff --git a/crates/subspace-farmer/src/single_disk_plot.rs b/crates/subspace-farmer/src/single_disk_plot.rs index c9ae37744e..61da1c92ea 100644 --- a/crates/subspace-farmer/src/single_disk_plot.rs +++ b/crates/subspace-farmer/src/single_disk_plot.rs @@ -48,7 +48,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument, Span}; use ulid::Ulid; /// Get piece retry attempts number. -const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(30).expect("Not zero; qed"); +const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(3).expect("Not zero; qed"); // Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to // usize depending on chain parameters diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 3db932ef63..3e430e95c3 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -12,9 +12,9 @@ use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash}; use tracing::{debug, error, trace, warn}; /// Defines initial duration between get_piece calls. -const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1); +const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(3); /// Defines max duration between get_piece calls. -const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5); +const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(10); #[async_trait] pub trait PieceValidator: Sync + Send {