Skip to content

Commit

Permalink
Merge pull request #1382 from subspace/improve-sector-downloading
Browse files Browse the repository at this point in the history
Improve sector downloading
  • Loading branch information
nazar-pc authored Apr 18, 2023
2 parents 6661d49 + 776f678 commit 677ab36
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bench = false

[dependencies]
async-trait = "0.1.68"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
fs2 = "0.4.3"
futures = "0.3.28"
libc = "0.2.139"
Expand Down
78 changes: 57 additions & 21 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,34 @@ use crate::piece_caching::PieceMemoryCache;
use crate::segment_reconstruction::recover_missing_piece;
use crate::{FarmerProtocolInfo, SectorMetadata};
use async_trait::async_trait;
use backoff::future::retry;
use backoff::{Error as BackoffError, ExponentialBackoff};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use parity_scale_codec::Encode;
use parking_lot::Mutex;
use std::error::Error;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg};
use subspace_core_primitives::crypto::{Scalar, ScalarLegacy};
use subspace_core_primitives::sector_codec::{SectorCodec, SectorCodecError};
use subspace_core_primitives::{
LegacySectorId, Piece, PieceIndex, PieceIndexHash, PublicKey, SectorIndex, PLOT_SECTOR_SIZE,
};
use thiserror::Error;
use tracing::info;
use tracing::{info, warn};

fn default_backoff() -> ExponentialBackoff {
ExponentialBackoff {
initial_interval: Duration::from_secs(15),
max_interval: Duration::from_secs(10 * 60),
// Try until we get a valid piece
max_elapsed_time: None,
..ExponentialBackoff::default()
}
}

/// Defines retry policy on error during piece acquiring.
#[derive(PartialEq, Eq, Clone, Debug, Copy)]
Expand Down Expand Up @@ -139,20 +153,36 @@ where
})
.collect();

let mut in_memory_sector_scalars =
Vec::with_capacity(PLOT_SECTOR_SIZE as usize / Scalar::FULL_BYTES);
let in_memory_sector_scalars = Mutex::new(Vec::with_capacity(
PLOT_SECTOR_SIZE as usize / Scalar::FULL_BYTES,
));

plot_pieces_in_batches_non_blocking(
&mut in_memory_sector_scalars,
sector_index,
piece_getter,
piece_getter_retry_policy,
kzg,
&piece_indexes,
piece_memory_cache,
)
retry(default_backoff(), || async {
let mut in_memory_sector_scalars = in_memory_sector_scalars.lock();
in_memory_sector_scalars.clear();

if let Err(error) = download_pieces_in_batches_non_blocking(
&mut in_memory_sector_scalars,
sector_index,
piece_getter,
piece_getter_retry_policy,
kzg,
&piece_indexes,
piece_memory_cache.clone(),
)
.await
{
warn!(%error, "Sector plotting attempt failed, will retry later");

return Err(BackoffError::transient(error));
}

Ok(())
})
.await?;

let mut in_memory_sector_scalars = in_memory_sector_scalars.into_inner();

sector_codec
.encode(&mut in_memory_sector_scalars)
.map_err(PlottingError::FailedToEncodeSector)?;
Expand Down Expand Up @@ -213,7 +243,7 @@ where
})
}

async fn plot_pieces_in_batches_non_blocking<PG: PieceGetter>(
async fn download_pieces_in_batches_non_blocking<PG: PieceGetter>(
in_memory_sector_scalars: &mut Vec<ScalarLegacy>,
sector_index: u64,
piece_getter: &PG,
Expand All @@ -225,24 +255,30 @@ async fn plot_pieces_in_batches_non_blocking<PG: PieceGetter>(
let mut pieces_receiving_futures = piece_indexes
.iter()
.map(|piece_index| async {
let piece_index = *piece_index;

if let Some(piece) = piece_memory_cache.get_piece(&piece_index.hash()) {
return (piece_index, Ok(Some(piece)));
}

let piece_result = piece_getter
.get_piece(*piece_index, piece_getter_retry_policy)
.get_piece(piece_index, piece_getter_retry_policy)
.await;

let failed = piece_result
let succeeded = piece_result
.as_ref()
.map(|piece| piece.is_none())
.unwrap_or(true);
.map(|piece| piece.is_some())
.unwrap_or_default();

// all retries failed
if failed {
if !succeeded {
let recovered_piece =
recover_missing_piece(piece_getter, kzg.clone(), *piece_index).await;
recover_missing_piece(piece_getter, kzg.clone(), piece_index).await;

return (*piece_index, recovered_piece.map(Some).map_err(Into::into));
return (piece_index, recovered_piece.map(Some).map_err(Into::into));
}

(*piece_index, piece_result)
(piece_index, piece_result)
})
.collect::<FuturesOrdered<_>>();

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/single_disk_plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument, Span};
use ulid::Ulid;

/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(30).expect("Not zero; qed");
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(3).expect("Not zero; qed");

// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
// usize depending on chain parameters
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash};
use tracing::{debug, error, trace, warn};

/// Defines initial duration between get_piece calls.
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(3);
/// Defines max duration between get_piece calls.
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5);
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(10);

#[async_trait]
pub trait PieceValidator: Sync + Send {
Expand Down

0 comments on commit 677ab36

Please sign in to comment.