diff --git a/Cargo.lock b/Cargo.lock index 5bef06de94..2e35134956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12602,6 +12602,7 @@ dependencies = [ "futures", "hex", "parity-scale-codec", + "rand", "subspace-archiving", "subspace-core-primitives", "subspace-erasure-coding", diff --git a/crates/subspace-core-primitives/src/pieces.rs b/crates/subspace-core-primitives/src/pieces.rs index e8240e8695..061e58d0ab 100644 --- a/crates/subspace-core-primitives/src/pieces.rs +++ b/crates/subspace-core-primitives/src/pieces.rs @@ -624,11 +624,21 @@ impl Record { pub fn to_raw_record_chunks( &self, ) -> impl Iterator + '_ { - // We have zero byte padding from [`ScalarBytes::SAFE_BYTES`] to [`ScalarBytes::FULL_BYTES`] that we need - // to skip + // We have zero byte padding from [`ScalarBytes::SAFE_BYTES`] to + // [`ScalarBytes::FULL_BYTES`] that we need to skip self.iter() .map(|bytes| bytes[1..].try_into().expect("Correct length; qed")) } + + /// Convert from a record to mutable raw bytes, assumes dealing with source record that only stores + /// safe bytes in its chunks. + #[inline] + pub fn to_mut_raw_record_chunks( + &mut self, + ) -> impl Iterator + '_ { + self.iter_mut() + .map(|bytes| (&mut bytes[1..]).try_into().expect("Correct length; qed")) + } } /// Record commitment contained within a piece. diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 304856343f..2c789b94d7 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -26,6 +26,8 @@ tracing = "0.1.40" [dev-dependencies] subspace-runtime-primitives = { version = "0.1.0", path = "../../crates/subspace-runtime-primitives" } +rand = { version = "0.8.5", features = ["min_const_gen"] } +tokio = { version = "1.40.0", features = ["rt-multi-thread", "macros"] } [features] parallel = [ diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 98e1b8f54f..e057924b22 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -1215,6 +1215,7 @@ fn decode_data_length( mod test { use super::*; use parity_scale_codec::{Compact, CompactLen, Encode}; + use rand::{thread_rng, RngCore}; use subspace_core_primitives::hashes::Blake3Hash; use subspace_core_primitives::segments::{ ArchivedBlockProgress, LastArchivedBlock, SegmentCommitment, SegmentHeader, @@ -1268,4 +1269,66 @@ mod test { }; assert_eq!(segment_header.encoded_size(), MAX_SEGMENT_HEADER_SIZE); } + + #[tokio::test(flavor = "multi_thread")] + async fn get_single_piece_object() { + // TODO: + // We need to cover 6 known good cases: + // - start of segment, offset already excludes segment header + // - middle of segment + // - end of segment, no padding + // - end of segment, end of object goes into padding (but not into the next segment) + // - end of segment, end of object length overlaps start of padding (but object does not cross into the next segment) + // - end of segment, start of object length is in padding (but object does not cross into the next segment) + // + // For multiple pieces, we need to cover 5 known good cases: + // - end of segment, end of object goes into padding (but not into the next segment) + // - end of segment, end of object goes into padding, and one piece into the next segment + // - end of segment, end of object goes into padding, and multiple pieces into the next segment + // - end of segment, end of object length overlaps start of padding, and one piece into the next segment + // - end of segment, end of object length overlaps start of padding, and multiple pieces into the next segment + + // Generate random piece data + let mut piece_data = vec![0u8; Piece::SIZE]; + thread_rng().fill_bytes(piece_data.as_mut_slice()); + let mut piece = Piece::try_from(piece_data).unwrap(); + + // Encode the length of the object at the offset + let object_len = 100; + let object_len_encoded = Compact(object_len as u32).encode(); + let offset = MAX_SEGMENT_HEADER_SIZE + 1; + + let raw_data = piece.record_mut().to_mut_raw_record_chunks().flatten(); + raw_data + .skip(offset) + .zip(object_len_encoded.iter()) + .for_each(|(raw_data_byte, len_byte)| { + *raw_data_byte = *len_byte; + }); + + // Set up the mapping + let piece_index = PieceIndex::from(0); + let object_data = piece + .record() + .to_raw_record_chunks() + .flatten() + .skip(offset + object_len_encoded.len()) + .take(object_len) + .copied() + .collect::>(); + + let mapping = GlobalObject { + piece_index, + offset: offset as u32, + hash: blake3_hash(&object_data), + }; + + // Set up the object fetcher + let piece_getter = vec![(piece_index, piece)]; + let object_fetcher = ObjectFetcher::new(Arc::new(piece_getter), object_len); + + // Now get the object back + let fetched_data = object_fetcher.fetch_object(mapping).await.unwrap(); + assert_eq!(fetched_data, object_data); + } } diff --git a/shared/subspace-data-retrieval/src/piece_getter.rs b/shared/subspace-data-retrieval/src/piece_getter.rs index f323e5309b..dbadc453e2 100644 --- a/shared/subspace-data-retrieval/src/piece_getter.rs +++ b/shared/subspace-data-retrieval/src/piece_getter.rs @@ -96,6 +96,28 @@ impl PieceGetter for (PieceIndex, Piece) { } } +#[async_trait] +impl PieceGetter for Vec<(PieceIndex, Piece)> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result> { + Ok(self.iter().find_map(|(index, piece)| { + if *index == piece_index { + Some(piece.clone()) + } else { + None + } + })) + } + + async fn get_pieces<'a>( + &'a self, + piece_indices: Vec, + ) -> anyhow::Result< + Box>)> + Send + Unpin + 'a>, + > { + get_pieces_individually(|piece_index| self.get_piece(piece_index), piece_indices) + } +} + /// A default implementation which gets each piece individually, using the `get_piece` async /// function. ///