From 4a8e8212459ddb55202e5bfd3b8436281578fa85 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 29 Oct 2024 14:05:22 +0200 Subject: [PATCH 1/3] Make `DirectIoFile` API a bit more flexible --- .../bin/subspace-farmer/commands/benchmark.rs | 18 ++++++++---------- crates/subspace-farmer/src/single_disk_farm.rs | 8 +++++--- .../src/single_disk_farm/direct_io_file.rs | 5 ++++- .../single_disk_farm/farming/rayon_files.rs | 14 ++++++++++---- .../src/single_disk_farm/plot_cache/tests.rs | 2 +- 5 files changed, 28 insertions(+), 19 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index 0c534f8033..761c484130 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -212,10 +212,9 @@ where }); } { - let plot = RayonFiles::open_with( - &disk_farm.join(SingleDiskFarm::PLOT_FILE), - DirectIoFile::open, - ) + let plot = RayonFiles::open_with(disk_farm.join(SingleDiskFarm::PLOT_FILE), |path| { + DirectIoFile::open(path) + }) .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; let plot_audit = PlotAudit::new(&plot); @@ -250,7 +249,7 @@ where }); } { - let plot = RayonFiles::open(&disk_farm.join(SingleDiskFarm::PLOT_FILE)) + let plot = RayonFiles::open(disk_farm.join(SingleDiskFarm::PLOT_FILE)) .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; let plot_audit = PlotAudit::new(&plot); @@ -429,10 +428,9 @@ where }); } { - let plot = RayonFiles::open_with( - &disk_farm.join(SingleDiskFarm::PLOT_FILE), - DirectIoFile::open, - ) + let plot = RayonFiles::open_with(disk_farm.join(SingleDiskFarm::PLOT_FILE), |path| { + DirectIoFile::open(path) + }) .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; let plot_audit = PlotAudit::new(&plot); let mut options = PlotAuditOptions:: { @@ -504,7 +502,7 @@ where }); } { - let plot = RayonFiles::open(&disk_farm.join(SingleDiskFarm::PLOT_FILE)) + let plot = RayonFiles::open(disk_farm.join(SingleDiskFarm::PLOT_FILE)) .map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?; let plot_audit = PlotAudit::new(&plot); let mut options = PlotAuditOptions:: { diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index a77f118ee3..db9c9c1b66 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -962,7 +962,9 @@ impl SingleDiskFarm { let farming_plot_fut = task::spawn_blocking(|| { farming_thread_pool .install(move || { - RayonFiles::open_with(&directory.join(Self::PLOT_FILE), DirectIoFile::open) + RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| { + DirectIoFile::open(path) + }) }) .map(|farming_plot| (farming_plot, farming_thread_pool)) }); @@ -1466,7 +1468,7 @@ impl SingleDiskFarm { Arc::new(AsyncRwLock::new(sectors_metadata)) }; - let plot_file = DirectIoFile::open(&directory.join(Self::PLOT_FILE))?; + let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?; if plot_file.size()? != allocated_space_distribution.plot_file_size { // Allocating the whole file (`set_len` below can create a sparse file, which will cause @@ -1609,7 +1611,7 @@ impl SingleDiskFarm { pub fn read_all_sectors_metadata( directory: &Path, ) -> io::Result> { - let metadata_file = DirectIoFile::open(&directory.join(Self::METADATA_FILE))?; + let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?; let metadata_size = metadata_file.size()?; let sector_metadata_size = SectorMetadataChecksummed::encoded_size(); diff --git a/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs index 1694bcc11a..3206d68a26 100644 --- a/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs +++ b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs @@ -155,7 +155,10 @@ impl DirectIoFile { /// will be created). /// /// This is especially important on Windows to prevent huge memory usage. - pub fn open(path: &Path) -> io::Result { + pub fn open

(path: P) -> io::Result + where + P: AsRef, + { let mut open_options = OpenOptions::new(); open_options.use_direct_io(); let file = open_options diff --git a/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs b/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs index 61ba4e94ca..8350690984 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs @@ -40,13 +40,16 @@ where impl RayonFiles { /// Open file at specified path as many times as there is number of threads in current [`rayon`] /// thread pool. - pub fn open(path: &Path) -> io::Result { + pub fn open

(path: P) -> io::Result + where + P: AsRef, + { let files = (0..rayon::current_num_threads()) .map(|_| { let file = OpenOptions::new() .read(true) .advise_random_access() - .open(path)?; + .open(path.as_ref())?; file.advise_random_access()?; Ok::<_, io::Error>(file) @@ -63,9 +66,12 @@ where { /// Open file at specified path as many times as there is number of threads in current [`rayon`] /// thread pool with a provided function - pub fn open_with(path: &Path, open: fn(&Path) -> io::Result) -> io::Result { + pub fn open_with

(path: P, open: fn(&Path) -> io::Result) -> io::Result + where + P: AsRef, + { let files = (0..rayon::current_num_threads()) - .map(|_| open(path)) + .map(|_| open(path.as_ref())) .collect::, _>>()?; Ok(Self { files }) diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs index 8c30bc091c..0fcb50f81e 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache/tests.rs @@ -27,7 +27,7 @@ async fn basic() { }); let tempdir = tempdir().unwrap(); - let file = DirectIoFile::open(&tempdir.path().join("plot.bin")).unwrap(); + let file = DirectIoFile::open(tempdir.path().join("plot.bin")).unwrap(); // Align plot file size for disk sector size file.preallocate( From ce21b3b8bdd2f82a91bfad54458d843676f86b44 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 29 Oct 2024 15:10:30 +0200 Subject: [PATCH 2/3] Remove `physical_sector_size` in `DirectIoFile` --- .../src/single_disk_farm/direct_io_file.rs | 154 +++++++----------- 1 file changed, 56 insertions(+), 98 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs index 3206d68a26..0142349b17 100644 --- a/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs +++ b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs @@ -38,7 +38,6 @@ impl AlignedSectorSize { #[derive(Debug)] pub struct DirectIoFile { file: File, - physical_sector_size: usize, /// Scratch buffer of aligned memory for reads and writes scratch_buffer: Mutex>, } @@ -89,7 +88,7 @@ impl FileExt for DirectIoFile { let mut scratch_buffer = self.scratch_buffer.lock(); // First read up to `MAX_READ_SIZE - padding` - let padding = (offset % self.physical_sector_size as u64) as usize; + let padding = (offset % DISK_SECTOR_SIZE as u64) as usize; let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len()); let (unaligned_start, buf) = buf.split_at_mut(first_unaligned_chunk_size); { @@ -128,7 +127,7 @@ impl FileExt for DirectIoFile { let mut scratch_buffer = self.scratch_buffer.lock(); // First write up to `MAX_READ_SIZE - padding` - let padding = (offset % self.physical_sector_size as u64) as usize; + let padding = (offset % DISK_SECTOR_SIZE as u64) as usize; let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len()); let (unaligned_start, buf) = buf.split_at(first_unaligned_chunk_size); { @@ -170,16 +169,8 @@ impl DirectIoFile { file.disable_cache()?; - // Physical sector size on many SSDs is smaller than 4096 and should improve performance - let physical_sector_size = if file.read_at(&mut [0; 512], 512).is_ok() { - 512 - } else { - DISK_SECTOR_SIZE - }; - Ok(Self { file, - physical_sector_size, // In many cases we'll want to read this much at once, so pre-allocate it right away scratch_buffer: Mutex::new(vec![ AlignedSectorSize::default(); @@ -210,14 +201,13 @@ impl DirectIoFile { let scratch_buffer = AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut(); - // While buffer above is allocated with granularity of `MAX_DISK_SECTOR_SIZE`, reads are + // While buffer above is allocated with granularity of `DISK_SECTOR_SIZE`, reads are // done with granularity of physical sector size - let offset_in_buffer = (offset % self.physical_sector_size as u64) as usize; + let offset_in_buffer = (offset % DISK_SECTOR_SIZE as u64) as usize; self.file.read_exact_at( - &mut scratch_buffer[..(bytes_to_read + offset_in_buffer) - .div_ceil(self.physical_sector_size) - * self.physical_sector_size], - offset / self.physical_sector_size as u64 * self.physical_sector_size as u64, + &mut scratch_buffer[..(bytes_to_read + offset_in_buffer).div_ceil(DISK_SECTOR_SIZE) + * DISK_SECTOR_SIZE], + offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64, )?; Ok(&scratch_buffer[offset_in_buffer..][..bytes_to_read]) @@ -238,12 +228,11 @@ impl DirectIoFile { >= MAX_READ_SIZE ); - let aligned_offset = - offset / self.physical_sector_size as u64 * self.physical_sector_size as u64; + let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64; let padding = (offset - aligned_offset) as usize; // Calculate the size of the read including padding on both ends - let bytes_to_read = (padding + bytes_to_write.len()).div_ceil(self.physical_sector_size) - * self.physical_sector_size; + let bytes_to_read = + (padding + bytes_to_write.len()).div_ceil(DISK_SECTOR_SIZE) * DISK_SECTOR_SIZE; if padding == 0 && bytes_to_read == bytes_to_write.len() { let scratch_buffer = @@ -282,83 +271,52 @@ mod tests { thread_rng().fill(data.as_mut_slice()); fs::write(&file_path, &data).unwrap(); - let mut file = DirectIoFile::open(&file_path).unwrap(); - - for override_physical_sector_size in [None, Some(4096)] { - if let Some(physical_sector_size) = override_physical_sector_size { - file.physical_sector_size = physical_sector_size; - } - - let mut buffer = Vec::new(); - for (offset, size) in [ - (0_usize, 512_usize), - (0_usize, 4096_usize), - (0, 500), - (0, 4000), - (5, 50), - (12, 500), - (96, 4000), - (4000, 96), - (10000, 5), - (0, MAX_READ_SIZE), - (0, MAX_READ_SIZE * 2), - (5, MAX_READ_SIZE - 5), - (5, MAX_READ_SIZE * 2 - 5), - (5, MAX_READ_SIZE), - (5, MAX_READ_SIZE * 2), - (MAX_READ_SIZE, MAX_READ_SIZE), - (MAX_READ_SIZE, MAX_READ_SIZE * 2), - (MAX_READ_SIZE + 5, MAX_READ_SIZE - 5), - (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2 - 5), - (MAX_READ_SIZE + 5, MAX_READ_SIZE), - (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2), - ] { - let data = &mut data[offset..][..size]; - buffer.resize(size, 0); - // Read contents - file.read_exact_at(buffer.as_mut_slice(), offset as u64) - .unwrap_or_else(|error| { - panic!( - "Offset {offset}, size {size}, override physical sector size \ - {override_physical_sector_size:?}: {error}" - ) - }); - - // Ensure it is correct - assert_eq!( - data, - buffer.as_slice(), - "Offset {offset}, size {size}, override physical sector size \ - {override_physical_sector_size:?}" - ); - - // Update data with random contents and write - thread_rng().fill(data); - file.write_all_at(data, offset as u64) - .unwrap_or_else(|error| { - panic!( - "Offset {offset}, size {size}, override physical sector size \ - {override_physical_sector_size:?}: {error}" - ) - }); - - // Read contents again - file.read_exact_at(buffer.as_mut_slice(), offset as u64) - .unwrap_or_else(|error| { - panic!( - "Offset {offset}, size {size}, override physical sector size \ - {override_physical_sector_size:?}: {error}" - ) - }); - - // Ensure it is correct too - assert_eq!( - data, - buffer.as_slice(), - "Offset {offset}, size {size}, override physical sector size \ - {override_physical_sector_size:?}" - ); - } + let file = DirectIoFile::open(&file_path).unwrap(); + + let mut buffer = Vec::new(); + for (offset, size) in [ + (0_usize, 512_usize), + (0_usize, 4096_usize), + (0, 500), + (0, 4000), + (5, 50), + (12, 500), + (96, 4000), + (4000, 96), + (10000, 5), + (0, MAX_READ_SIZE), + (0, MAX_READ_SIZE * 2), + (5, MAX_READ_SIZE - 5), + (5, MAX_READ_SIZE * 2 - 5), + (5, MAX_READ_SIZE), + (5, MAX_READ_SIZE * 2), + (MAX_READ_SIZE, MAX_READ_SIZE), + (MAX_READ_SIZE, MAX_READ_SIZE * 2), + (MAX_READ_SIZE + 5, MAX_READ_SIZE - 5), + (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2 - 5), + (MAX_READ_SIZE + 5, MAX_READ_SIZE), + (MAX_READ_SIZE + 5, MAX_READ_SIZE * 2), + ] { + let data = &mut data[offset..][..size]; + buffer.resize(size, 0); + // Read contents + file.read_exact_at(buffer.as_mut_slice(), offset as u64) + .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}")); + + // Ensure it is correct + assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}"); + + // Update data with random contents and write + thread_rng().fill(data); + file.write_all_at(data, offset as u64) + .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}")); + + // Read contents again + file.read_exact_at(buffer.as_mut_slice(), offset as u64) + .unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}")); + + // Ensure it is correct too + assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}"); } } } From 1baeceebff2fe66aa44a8540e365fd962bf4b99c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 29 Oct 2024 15:23:29 +0200 Subject: [PATCH 3/3] Fix assertion, simplify and unify code between reads and writes --- .../src/single_disk_farm/direct_io_file.rs | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs index 0142349b17..b568f5db0b 100644 --- a/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs +++ b/crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs @@ -190,27 +190,22 @@ impl DirectIoFile { bytes_to_read: usize, offset: u64, ) -> io::Result<&'a [u8]> { + let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64; + let padding = (offset - aligned_offset) as usize; + // Make scratch buffer of a size that is necessary to read aligned memory, accounting // for extra bytes at the beginning and the end that will be thrown away - let offset_in_buffer = (offset % DISK_SECTOR_SIZE as u64) as usize; - let desired_buffer_size = (bytes_to_read + offset_in_buffer).div_ceil(DISK_SECTOR_SIZE); + let desired_buffer_size = (padding + bytes_to_read).div_ceil(DISK_SECTOR_SIZE); if scratch_buffer.len() < desired_buffer_size { scratch_buffer.resize_with(desired_buffer_size, AlignedSectorSize::default); } + let scratch_buffer = AlignedSectorSize::slice_mut_to_repr(scratch_buffer) + [..desired_buffer_size] + .as_flattened_mut(); - let scratch_buffer = - AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut(); + self.file.read_exact_at(scratch_buffer, aligned_offset)?; - // While buffer above is allocated with granularity of `DISK_SECTOR_SIZE`, reads are - // done with granularity of physical sector size - let offset_in_buffer = (offset % DISK_SECTOR_SIZE as u64) as usize; - self.file.read_exact_at( - &mut scratch_buffer[..(bytes_to_read + offset_in_buffer).div_ceil(DISK_SECTOR_SIZE) - * DISK_SECTOR_SIZE], - offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64, - )?; - - Ok(&scratch_buffer[offset_in_buffer..][..bytes_to_read]) + Ok(&scratch_buffer[padding..][..bytes_to_read]) } /// Panics on writes over `MAX_READ_SIZE` (including padding on both ends) @@ -223,13 +218,14 @@ impl DirectIoFile { // This is guaranteed by constructor assert!( AlignedSectorSize::slice_mut_to_repr(scratch_buffer) - .as_flattened_mut() + .as_flattened() .len() - >= MAX_READ_SIZE + <= MAX_READ_SIZE ); let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64; let padding = (offset - aligned_offset) as usize; + // Calculate the size of the read including padding on both ends let bytes_to_read = (padding + bytes_to_write.len()).div_ceil(DISK_SECTOR_SIZE) * DISK_SECTOR_SIZE;