Skip to content

Commit

Permalink
Merge pull request #3189 from autonomys/use-only-4096-for-direct-io
Browse files Browse the repository at this point in the history
Use only 4096 bytes sectors for Direct I/O
  • Loading branch information
nazar-pc authored Oct 30, 2024
2 parents 82e8e0a + 1baecee commit d8693f0
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,9 @@ where
});
}
{
let plot = RayonFiles::open_with(
&disk_farm.join(SingleDiskFarm::PLOT_FILE),
DirectIoFile::open,
)
let plot = RayonFiles::open_with(disk_farm.join(SingleDiskFarm::PLOT_FILE), |path| {
DirectIoFile::open(path)
})
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);

Expand Down Expand Up @@ -250,7 +249,7 @@ where
});
}
{
let plot = RayonFiles::open(&disk_farm.join(SingleDiskFarm::PLOT_FILE))
let plot = RayonFiles::open(disk_farm.join(SingleDiskFarm::PLOT_FILE))
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);

Expand Down Expand Up @@ -429,10 +428,9 @@ where
});
}
{
let plot = RayonFiles::open_with(
&disk_farm.join(SingleDiskFarm::PLOT_FILE),
DirectIoFile::open,
)
let plot = RayonFiles::open_with(disk_farm.join(SingleDiskFarm::PLOT_FILE), |path| {
DirectIoFile::open(path)
})
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);
let mut options = PlotAuditOptions::<PosTable> {
Expand Down Expand Up @@ -504,7 +502,7 @@ where
});
}
{
let plot = RayonFiles::open(&disk_farm.join(SingleDiskFarm::PLOT_FILE))
let plot = RayonFiles::open(disk_farm.join(SingleDiskFarm::PLOT_FILE))
.map_err(|error| anyhow::anyhow!("Failed to open plot: {error}"))?;
let plot_audit = PlotAudit::new(&plot);
let mut options = PlotAuditOptions::<PosTable> {
Expand Down
8 changes: 5 additions & 3 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,9 @@ impl SingleDiskFarm {
let farming_plot_fut = task::spawn_blocking(|| {
farming_thread_pool
.install(move || {
RayonFiles::open_with(&directory.join(Self::PLOT_FILE), DirectIoFile::open)
RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
DirectIoFile::open(path)
})
})
.map(|farming_plot| (farming_plot, farming_thread_pool))
});
Expand Down Expand Up @@ -1466,7 +1468,7 @@ impl SingleDiskFarm {
Arc::new(AsyncRwLock::new(sectors_metadata))
};

let plot_file = DirectIoFile::open(&directory.join(Self::PLOT_FILE))?;
let plot_file = DirectIoFile::open(directory.join(Self::PLOT_FILE))?;

if plot_file.size()? != allocated_space_distribution.plot_file_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
Expand Down Expand Up @@ -1609,7 +1611,7 @@ impl SingleDiskFarm {
pub fn read_all_sectors_metadata(
directory: &Path,
) -> io::Result<Vec<SectorMetadataChecksummed>> {
let metadata_file = DirectIoFile::open(&directory.join(Self::METADATA_FILE))?;
let metadata_file = DirectIoFile::open(directory.join(Self::METADATA_FILE))?;

let metadata_size = metadata_file.size()?;
let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
Expand Down
177 changes: 67 additions & 110 deletions crates/subspace-farmer/src/single_disk_farm/direct_io_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ impl AlignedSectorSize {
#[derive(Debug)]
pub struct DirectIoFile {
file: File,
physical_sector_size: usize,
/// Scratch buffer of aligned memory for reads and writes
scratch_buffer: Mutex<Vec<AlignedSectorSize>>,
}
Expand Down Expand Up @@ -89,7 +88,7 @@ impl FileExt for DirectIoFile {
let mut scratch_buffer = self.scratch_buffer.lock();

// First read up to `MAX_READ_SIZE - padding`
let padding = (offset % self.physical_sector_size as u64) as usize;
let padding = (offset % DISK_SECTOR_SIZE as u64) as usize;
let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
let (unaligned_start, buf) = buf.split_at_mut(first_unaligned_chunk_size);
{
Expand Down Expand Up @@ -128,7 +127,7 @@ impl FileExt for DirectIoFile {
let mut scratch_buffer = self.scratch_buffer.lock();

// First write up to `MAX_READ_SIZE - padding`
let padding = (offset % self.physical_sector_size as u64) as usize;
let padding = (offset % DISK_SECTOR_SIZE as u64) as usize;
let first_unaligned_chunk_size = (MAX_READ_SIZE - padding).min(buf.len());
let (unaligned_start, buf) = buf.split_at(first_unaligned_chunk_size);
{
Expand All @@ -155,7 +154,10 @@ impl DirectIoFile {
/// will be created).
///
/// This is especially important on Windows to prevent huge memory usage.
pub fn open(path: &Path) -> io::Result<Self> {
pub fn open<P>(path: P) -> io::Result<Self>
where
P: AsRef<Path>,
{
let mut open_options = OpenOptions::new();
open_options.use_direct_io();
let file = open_options
Expand All @@ -167,16 +169,8 @@ impl DirectIoFile {

file.disable_cache()?;

// Physical sector size on many SSDs is smaller than 4096 and should improve performance
let physical_sector_size = if file.read_at(&mut [0; 512], 512).is_ok() {
512
} else {
DISK_SECTOR_SIZE
};

Ok(Self {
file,
physical_sector_size,
// In many cases we'll want to read this much at once, so pre-allocate it right away
scratch_buffer: Mutex::new(vec![
AlignedSectorSize::default();
Expand All @@ -196,28 +190,22 @@ impl DirectIoFile {
bytes_to_read: usize,
offset: u64,
) -> io::Result<&'a [u8]> {
let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64;
let padding = (offset - aligned_offset) as usize;

// Make scratch buffer of a size that is necessary to read aligned memory, accounting
// for extra bytes at the beginning and the end that will be thrown away
let offset_in_buffer = (offset % DISK_SECTOR_SIZE as u64) as usize;
let desired_buffer_size = (bytes_to_read + offset_in_buffer).div_ceil(DISK_SECTOR_SIZE);
let desired_buffer_size = (padding + bytes_to_read).div_ceil(DISK_SECTOR_SIZE);
if scratch_buffer.len() < desired_buffer_size {
scratch_buffer.resize_with(desired_buffer_size, AlignedSectorSize::default);
}
let scratch_buffer = AlignedSectorSize::slice_mut_to_repr(scratch_buffer)
[..desired_buffer_size]
.as_flattened_mut();

let scratch_buffer =
AlignedSectorSize::slice_mut_to_repr(scratch_buffer).as_flattened_mut();

// While buffer above is allocated with granularity of `MAX_DISK_SECTOR_SIZE`, reads are
// done with granularity of physical sector size
let offset_in_buffer = (offset % self.physical_sector_size as u64) as usize;
self.file.read_exact_at(
&mut scratch_buffer[..(bytes_to_read + offset_in_buffer)
.div_ceil(self.physical_sector_size)
* self.physical_sector_size],
offset / self.physical_sector_size as u64 * self.physical_sector_size as u64,
)?;
self.file.read_exact_at(scratch_buffer, aligned_offset)?;

Ok(&scratch_buffer[offset_in_buffer..][..bytes_to_read])
Ok(&scratch_buffer[padding..][..bytes_to_read])
}

/// Panics on writes over `MAX_READ_SIZE` (including padding on both ends)
Expand All @@ -230,17 +218,17 @@ impl DirectIoFile {
// This is guaranteed by constructor
assert!(
AlignedSectorSize::slice_mut_to_repr(scratch_buffer)
.as_flattened_mut()
.as_flattened()
.len()
>= MAX_READ_SIZE
<= MAX_READ_SIZE
);

let aligned_offset =
offset / self.physical_sector_size as u64 * self.physical_sector_size as u64;
let aligned_offset = offset / DISK_SECTOR_SIZE as u64 * DISK_SECTOR_SIZE as u64;
let padding = (offset - aligned_offset) as usize;

// Calculate the size of the read including padding on both ends
let bytes_to_read = (padding + bytes_to_write.len()).div_ceil(self.physical_sector_size)
* self.physical_sector_size;
let bytes_to_read =
(padding + bytes_to_write.len()).div_ceil(DISK_SECTOR_SIZE) * DISK_SECTOR_SIZE;

if padding == 0 && bytes_to_read == bytes_to_write.len() {
let scratch_buffer =
Expand Down Expand Up @@ -279,83 +267,52 @@ mod tests {
thread_rng().fill(data.as_mut_slice());
fs::write(&file_path, &data).unwrap();

let mut file = DirectIoFile::open(&file_path).unwrap();

for override_physical_sector_size in [None, Some(4096)] {
if let Some(physical_sector_size) = override_physical_sector_size {
file.physical_sector_size = physical_sector_size;
}

let mut buffer = Vec::new();
for (offset, size) in [
(0_usize, 512_usize),
(0_usize, 4096_usize),
(0, 500),
(0, 4000),
(5, 50),
(12, 500),
(96, 4000),
(4000, 96),
(10000, 5),
(0, MAX_READ_SIZE),
(0, MAX_READ_SIZE * 2),
(5, MAX_READ_SIZE - 5),
(5, MAX_READ_SIZE * 2 - 5),
(5, MAX_READ_SIZE),
(5, MAX_READ_SIZE * 2),
(MAX_READ_SIZE, MAX_READ_SIZE),
(MAX_READ_SIZE, MAX_READ_SIZE * 2),
(MAX_READ_SIZE + 5, MAX_READ_SIZE - 5),
(MAX_READ_SIZE + 5, MAX_READ_SIZE * 2 - 5),
(MAX_READ_SIZE + 5, MAX_READ_SIZE),
(MAX_READ_SIZE + 5, MAX_READ_SIZE * 2),
] {
let data = &mut data[offset..][..size];
buffer.resize(size, 0);
// Read contents
file.read_exact_at(buffer.as_mut_slice(), offset as u64)
.unwrap_or_else(|error| {
panic!(
"Offset {offset}, size {size}, override physical sector size \
{override_physical_sector_size:?}: {error}"
)
});

// Ensure it is correct
assert_eq!(
data,
buffer.as_slice(),
"Offset {offset}, size {size}, override physical sector size \
{override_physical_sector_size:?}"
);

// Update data with random contents and write
thread_rng().fill(data);
file.write_all_at(data, offset as u64)
.unwrap_or_else(|error| {
panic!(
"Offset {offset}, size {size}, override physical sector size \
{override_physical_sector_size:?}: {error}"
)
});

// Read contents again
file.read_exact_at(buffer.as_mut_slice(), offset as u64)
.unwrap_or_else(|error| {
panic!(
"Offset {offset}, size {size}, override physical sector size \
{override_physical_sector_size:?}: {error}"
)
});

// Ensure it is correct too
assert_eq!(
data,
buffer.as_slice(),
"Offset {offset}, size {size}, override physical sector size \
{override_physical_sector_size:?}"
);
}
let file = DirectIoFile::open(&file_path).unwrap();

let mut buffer = Vec::new();
for (offset, size) in [
(0_usize, 512_usize),
(0_usize, 4096_usize),
(0, 500),
(0, 4000),
(5, 50),
(12, 500),
(96, 4000),
(4000, 96),
(10000, 5),
(0, MAX_READ_SIZE),
(0, MAX_READ_SIZE * 2),
(5, MAX_READ_SIZE - 5),
(5, MAX_READ_SIZE * 2 - 5),
(5, MAX_READ_SIZE),
(5, MAX_READ_SIZE * 2),
(MAX_READ_SIZE, MAX_READ_SIZE),
(MAX_READ_SIZE, MAX_READ_SIZE * 2),
(MAX_READ_SIZE + 5, MAX_READ_SIZE - 5),
(MAX_READ_SIZE + 5, MAX_READ_SIZE * 2 - 5),
(MAX_READ_SIZE + 5, MAX_READ_SIZE),
(MAX_READ_SIZE + 5, MAX_READ_SIZE * 2),
] {
let data = &mut data[offset..][..size];
buffer.resize(size, 0);
// Read contents
file.read_exact_at(buffer.as_mut_slice(), offset as u64)
.unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));

// Ensure it is correct
assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}");

// Update data with random contents and write
thread_rng().fill(data);
file.write_all_at(data, offset as u64)
.unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));

// Read contents again
file.read_exact_at(buffer.as_mut_slice(), offset as u64)
.unwrap_or_else(|error| panic!("Offset {offset}, size {size}: {error}"));

// Ensure it is correct too
assert_eq!(data, buffer.as_slice(), "Offset {offset}, size {size}");
}
}
}
14 changes: 10 additions & 4 deletions crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ where
impl RayonFiles<File> {
/// Open file at specified path as many times as there is number of threads in current [`rayon`]
/// thread pool.
pub fn open(path: &Path) -> io::Result<Self> {
pub fn open<P>(path: P) -> io::Result<Self>
where
P: AsRef<Path>,
{
let files = (0..rayon::current_num_threads())
.map(|_| {
let file = OpenOptions::new()
.read(true)
.advise_random_access()
.open(path)?;
.open(path.as_ref())?;
file.advise_random_access()?;

Ok::<_, io::Error>(file)
Expand All @@ -63,9 +66,12 @@ where
{
/// Open file at specified path as many times as there is number of threads in current [`rayon`]
/// thread pool with a provided function
pub fn open_with(path: &Path, open: fn(&Path) -> io::Result<File>) -> io::Result<Self> {
pub fn open_with<P>(path: P, open: fn(&Path) -> io::Result<File>) -> io::Result<Self>
where
P: AsRef<Path>,
{
let files = (0..rayon::current_num_threads())
.map(|_| open(path))
.map(|_| open(path.as_ref()))
.collect::<Result<Vec<_>, _>>()?;

Ok(Self { files })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn basic() {
});

let tempdir = tempdir().unwrap();
let file = DirectIoFile::open(&tempdir.path().join("plot.bin")).unwrap();
let file = DirectIoFile::open(tempdir.path().join("plot.bin")).unwrap();

// Align plot file size for disk sector size
file.preallocate(
Expand Down

0 comments on commit d8693f0

Please sign in to comment.