From 4738da18515fa3c4886ffcdd518c37be2163baf8 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 21 Dec 2023 13:11:54 -0800 Subject: [PATCH 1/5] Custom reed-solomon decoding --- Cargo.lock | 147 +++++------------ Cargo.toml | 3 - README.md | 5 - python/Cargo.toml | 2 +- rust/Cargo.toml | 4 +- rust/src/client.rs | 2 +- rust/src/ec/gf256.rs | 169 +++++++++++++++++++ rust/src/ec/matrix.rs | 252 +++++++++++++++++++++++++++++ rust/src/{hdfs/ec.rs => ec/mod.rs} | 117 +++----------- rust/src/error.rs | 5 +- rust/src/file.rs | 2 +- rust/src/hdfs/datanode.rs | 9 +- rust/src/hdfs/mod.rs | 1 - rust/src/lib.rs | 5 +- rust/tests/test_ec.rs | 2 +- 15 files changed, 493 insertions(+), 232 deletions(-) create mode 100644 rust/src/ec/gf256.rs create mode 100644 rust/src/ec/matrix.rs rename rust/src/{hdfs/ec.rs => ec/mod.rs} (54%) diff --git a/Cargo.lock b/Cargo.lock index f32b6ce..9a00086 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,17 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "ahash" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" -dependencies = [ - "getrandom", - "once_cell", - "version_check", -] - [[package]] name = "aho-corasick" version = "1.0.5" @@ -232,7 +221,7 @@ dependencies = [ "hashbrown 0.14.0", "lock_api", "once_cell", - "parking_lot_core 0.9.8", + "parking_lot_core", ] [[package]] @@ -412,6 +401,34 @@ dependencies = [ "slab", ] +[[package]] +name = "g2gen" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2c7625b2fc250dd90b63f7887a6bb0f7ec1d714c8278415bea2669ef20820e" +dependencies = [ + "g2poly", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "g2p" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc36d9bdc3d2da057775a9f4fa7d7b09edab3e0eda7a92cc353358fa63b8519e" +dependencies = [ + "g2gen", + "g2poly", +] + +[[package]] +name = "g2poly" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af6a86e750338603ea2c14b1c0bfe58cd61f87ca67a0021d9334996024608e12" + [[package]] name = "getrandom" version = "0.2.10" @@ -465,9 +482,6 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -dependencies = [ - "ahash", -] [[package]] name = "hashbrown" @@ -486,16 +500,17 @@ dependencies = [ "crc", "env_logger", "futures", + "g2p", "gsasl-sys", "libc", "libgssapi", "log", + "num-traits", "object_store", "prost", "prost-build", "prost-types", "protobuf-src", - "reed-solomon-erasure", "roxmltree", "serial_test", "socket2 0.5.4", @@ -679,15 +694,6 @@ version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e186cfbae8084e513daff4240b4797e342f988cecda4fb6c939150f96315fd8" -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - [[package]] name = "ipnet" version = "2.8.0" @@ -778,12 +784,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "libm" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" - [[package]] name = "linux-raw-sys" version = "0.4.7" @@ -806,15 +806,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "lru" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" -dependencies = [ - "hashbrown 0.12.3", -] - [[package]] name = "memchr" version = "2.6.3" @@ -880,9 +871,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -920,7 +911,7 @@ dependencies = [ "humantime", "hyper", "itertools", - "parking_lot 0.12.1", + "parking_lot", "percent-encoding", "quick-xml", "rand", @@ -941,17 +932,6 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -959,21 +939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.8", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall 0.2.16", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -984,7 +950,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.3.5", + "redox_syscall", "smallvec", "windows-targets", ] @@ -1121,7 +1087,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.1", + "parking_lot", "pyo3-build-config", "pyo3-ffi", "pyo3-macros", @@ -1221,15 +1187,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "redox_syscall" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.3.5" @@ -1239,18 +1196,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "reed-solomon-erasure" -version = "6.0.0" -source = "git+https://github.com/Kimahriman/reed-solomon-erasure.git?branch=SNB/23C24_external_matrix#4307577d92ffd4a75e3e4a619ef25b79d99d2c0b" -dependencies = [ - "libm", - "lru", - "parking_lot 0.11.2", - "smallvec", - "spin 0.9.8", -] - [[package]] name = "regex" version = "1.9.6" @@ -1330,7 +1275,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", + "spin", "untrusted", "web-sys", "winapi", @@ -1485,7 +1430,7 @@ dependencies = [ "futures", "lazy_static", "log", - "parking_lot 0.12.1", + "parking_lot", "serial_test_derive", ] @@ -1569,12 +1514,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "syn" version = "1.0.109" @@ -1611,7 +1550,7 @@ checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", "fastrand", - "redox_syscall 0.3.5", + "redox_syscall", "rustix", "windows-sys", ] @@ -1818,12 +1757,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "version_check" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" - [[package]] name = "walkdir" version = "2.4.0" diff --git a/Cargo.toml b/Cargo.toml index b4efe0a..f74c567 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,3 @@ members = [ "python", ] resolver = "2" - -[patch.crates-io] -reed-solomon-erasure = { git = "https://github.com/Kimahriman/reed-solomon-erasure.git", branch = "SNB/23C24_external_matrix" } \ No newline at end of file diff --git a/README.md b/README.md index bb4f667..154115d 100644 --- a/README.md +++ b/README.md @@ -67,11 +67,6 @@ cargo build --features token,kerberos - `token` - enables token based DIGEST-MD5 authentication support. This uses the `gsasl` native library and only supports authentication, not integrity or confidentiality - `kerberos` - enables kerberos GSSAPI authentication support. This uses the `libgssapi` crate and supports integrity as well as confidentiality - `object_store` - provides an `object_store` wrapper around the HDFS client -- `rs` - support Reed-Solomon codecs for erasure coded reads. It relies on a fork of https://github.com/rust-rse/reed-solomon-erasure, so you must include a `patch` for it to compile: - ```toml - [patch.crates-io] - reed-solomon-erasure = { git = "https://github.com/Kimahriman/reed-solomon-erasure.git", branch = "SNB/23C24_external_matrix" } - ``` ## Running tests The tests are mostly integration tests that utilize a small Java application in `rust/mindifs/` that runs a custom `MiniDFSCluster`. To run the tests, you need to have Java, Maven, Hadoop binaries, and Kerberos tools available and on your path. Any Java version between 8 and 17 should work. diff --git a/python/Cargo.toml b/python/Cargo.toml index 7fd8417..b8e29e2 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -27,7 +27,7 @@ name = "hdfs_native._internal" [dependencies] bytes = "1.4" env_logger = "0.10" -hdfs-native = { path = "../rust", features=["token", "kerberos", "rs"] } +hdfs-native = { path = "../rust", features=["token", "kerberos"] } log = "0.4" pyo3 = { version = "0.20", features = ["extension-module", "abi3", "abi3-py38"] } thiserror = "1.0.43" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 96b8cd7..f5303ef 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -17,14 +17,15 @@ bytes = "1" chrono = { version = "0.4", optional = true } crc = "3" futures = "0.3" +g2p = "1" gsasl-sys = { version = "0.2", default-features = false, optional = true } libc = "0.2" libgssapi = { version = "0.6", default-features = false, optional = true } log = "0.4" +num-traits = "0.2" object_store = { version = "0.7", optional = true, features = ["cloud"] } prost = "0.11" prost-types = "0.11" -reed-solomon-erasure = { version = "6.0.0", optional = true } roxmltree = "0.18" socket2 = "0.5" thiserror = "1" @@ -48,7 +49,6 @@ which = "4" kerberos = ["libgssapi"] object_store = ["dep:object_store", "async-trait", "chrono"] token = ["gsasl-sys"] -rs = ["reed-solomon-erasure"] generate-protobuf = ["prost-build", "protobuf-src"] integration-test = ["which"] diff --git a/rust/src/client.rs b/rust/src/client.rs index 332454a..a89c3c8 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -7,9 +7,9 @@ use futures::{stream, StreamExt}; use url::Url; use crate::common::config::{self, Configuration}; +use crate::ec::resolve_ec_policy; use crate::error::{HdfsError, Result}; use crate::file::{FileReader, FileWriter}; -use crate::hdfs::ec::resolve_ec_policy; use crate::hdfs::protocol::NamenodeProtocol; use crate::hdfs::proxy::NameServiceProxy; use crate::proto::hdfs::hdfs_file_status_proto::FileType; diff --git a/rust/src/ec/gf256.rs b/rust/src/ec/gf256.rs new file mode 100644 index 0000000..8d1a41e --- /dev/null +++ b/rust/src/ec/gf256.rs @@ -0,0 +1,169 @@ +use bytes::Bytes; +use num_traits::{One, Zero}; + +use super::matrix::Matrix; +use crate::Result; + +g2p::g2p!(GF256, 8, modulus: 0b1_0001_1101); + +impl Zero for GF256 { + fn zero() -> Self { + GF256(0) + } + + fn is_zero(&self) -> bool { + self.0 == 0 + } +} + +impl One for GF256 { + fn one() -> Self { + GF256(1) + } +} + +pub(crate) struct Coder { + data_units: usize, + parity_units: usize, +} + +impl Coder { + pub(crate) fn new(data_units: usize, parity_units: usize) -> Self { + Self { + data_units, + parity_units, + } + } + + fn gen_rs_matrix(data_units: usize, parity_units: usize) -> Matrix { + let mut matrix = Matrix::zeroes(data_units + parity_units, data_units); + for r in 0..data_units { + matrix[(r, r)] = GF256::one(); + } + + // For parity rows, inverse of i ^ j, or 0 (described as 1 / (i + j) | i != j) + for r in data_units..(data_units + parity_units) { + for c in 0..data_units { + matrix[(r, c)] = match GF256(r as u8) + GF256(c as u8) { + z @ GF256(0) => z, + s => GF256(1) / s, + }; + } + } + + matrix + } + + pub(crate) fn decode(&self, data: &mut [Option]) -> Result<()> { + let mut valid_indices: Vec = Vec::new(); + let mut invalid_indices: Vec = Vec::new(); + + let mut data_matrix: Vec<&[u8]> = Vec::new(); + + for (i, slice) in data.iter().enumerate() { + if let Some(slice) = slice.as_ref() { + data_matrix.push(slice); + valid_indices.push(i); + } else if i < self.data_units { + // We don't care about missing parity data for decoding + invalid_indices.push(i); + } + } + + if valid_indices.len() < self.data_units { + return Err(crate::HdfsError::ErasureCodingError( + "Not enough valid shards".to_string(), + )); + } + + // Build the encoding matrix + let mut decode_matrix = Self::gen_rs_matrix(self.data_units, self.parity_units); + + // Select just the rows we have data for + decode_matrix.select_rows(valid_indices.iter().cloned().take(self.data_units)); + + // Invert the matrix to get the decode matrix + decode_matrix.invert(); + + // Select just the rows from the decode matrix we are missing data for. + decode_matrix.select_rows(invalid_indices.iter().cloned()); + + // Construct the missing slices + let recovered_slices = decode_matrix * &data_matrix[..]; + + for (i, slice) in recovered_slices.into_inner().into_iter().enumerate() { + // This may require copying the slice to convert it from GF256 to u8, but hopefully the + // compiler can optimize this to avoid that. Can do some benchmarking in the future to test + data[invalid_indices[i]] = Some(Bytes::from( + slice.into_iter().map(Into::into).collect::>(), + )); + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crate::ec::{gf256::Coder, matrix::Matrix}; + + #[test] + fn test_build_rs_matrix() { + // These examples were taken directly from the matrices created by Hadoop via RSUtil.genCauchyMatrix + assert_eq!( + Coder::gen_rs_matrix(3, 2), + Matrix::new(vec![ + vec![1, 0, 0,], + vec![0, 1, 0,], + vec![0, 0, 1,], + vec![244, 142, 1,], + vec![71, 167, 122,], + ]), + ); + + assert_eq!( + Coder::gen_rs_matrix(6, 3), + Matrix::new(vec![ + vec![1, 0, 0, 0, 0, 0,], + vec![0, 1, 0, 0, 0, 0,], + vec![0, 0, 1, 0, 0, 0,], + vec![0, 0, 0, 1, 0, 0,], + vec![0, 0, 0, 0, 1, 0,], + vec![0, 0, 0, 0, 0, 1,], + vec![122, 186, 71, 167, 142, 244,], + vec![186, 122, 167, 71, 244, 142,], + vec![173, 157, 221, 152, 61, 170,], + ]), + ); + + assert_eq!( + Coder::gen_rs_matrix(10, 4), + Matrix::new(vec![ + vec![1, 0, 0, 0, 0, 0, 0, 0, 0, 0,], + vec![0, 1, 0, 0, 0, 0, 0, 0, 0, 0,], + vec![0, 0, 1, 0, 0, 0, 0, 0, 0, 0,], + vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0,], + vec![0, 0, 0, 0, 1, 0, 0, 0, 0, 0,], + vec![0, 0, 0, 0, 0, 1, 0, 0, 0, 0,], + vec![0, 0, 0, 0, 0, 0, 1, 0, 0, 0,], + vec![0, 0, 0, 0, 0, 0, 0, 1, 0, 0,], + vec![0, 0, 0, 0, 0, 0, 0, 0, 1, 0,], + vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 1,], + vec![221, 152, 173, 157, 93, 150, 61, 170, 142, 244,], + vec![152, 221, 157, 173, 150, 93, 170, 61, 244, 142,], + vec![61, 170, 93, 150, 173, 157, 221, 152, 71, 167,], + vec![170, 61, 150, 93, 157, 173, 152, 221, 167, 71,], + ]), + ); + } + + #[test] + fn test_invert_matrix() { + let mut matrix = Coder::gen_rs_matrix(3, 2); + matrix.select_rows([2, 3, 4].into_iter()); + let original_matrix = matrix.clone(); + matrix.invert(); + + assert_eq!(matrix * original_matrix, Matrix::identity(3)); + } +} diff --git a/rust/src/ec/matrix.rs b/rust/src/ec/matrix.rs new file mode 100644 index 0000000..1a5a9a7 --- /dev/null +++ b/rust/src/ec/matrix.rs @@ -0,0 +1,252 @@ +use std::{ + collections::HashSet, + ops::{Add, AddAssign, Div, Index, IndexMut, Mul, MulAssign, SubAssign}, +}; + +use num_traits::{One, Zero}; + +#[derive(PartialEq, Debug, Clone)] +pub(crate) struct Matrix { + data: Vec>, +} + +impl Matrix { + #[cfg(test)] + pub(crate) fn new(data: impl AsRef<[V]>) -> Self + where + U: Into + Copy, + V: AsRef<[U]>, + { + let data = data.as_ref(); + assert!(!data.is_empty()); + let cols = data[0].as_ref().len(); + assert!(cols > 0); + for row in data.iter() { + assert_eq!(row.as_ref().len(), cols); + } + Self { + data: data + .iter() + .map(|r| { + r.as_ref() + .iter() + .map(|d| Into::::into(*d)) + .collect::>() + }) + .collect::>(), + } + } + + #[inline] + fn rows(&self) -> usize { + self.data.len() + } + + #[inline] + fn cols(&self) -> usize { + self.data[0].len() + } + + pub(crate) fn zeroes(rows: usize, cols: usize) -> Self + where + T: Zero + Clone, + { + assert!(rows > 0 && cols > 0); + let data = vec![vec![T::zero(); cols]; rows]; + Self { data } + } + + pub(crate) fn identity(rows: usize) -> Self + where + T: Zero + One + Clone, + { + let mut matrix = Self::zeroes(rows, rows); + for i in 0..rows { + matrix[(i, i)] = T::one(); + } + matrix + } + + /// Filter this matrix down to only selected rows + pub(crate) fn select_rows(&mut self, rows: impl Iterator) { + let rows: HashSet = rows.collect(); + + let data = std::mem::take(&mut self.data); + + self.data = data + .into_iter() + .enumerate() + .filter_map(|(i, row)| if rows.contains(&i) { Some(row) } else { None }) + .collect(); + } + + pub(crate) fn into_inner(self) -> Vec> { + self.data + } + + /// Extend this matrix by inserting another matrix on the right hand side + fn extend_cols(&mut self, other: Matrix) + where + T: Clone, + { + assert_eq!(self.data.len(), other.data.len()); + for (lhr, rhr) in self.data.iter_mut().zip(other.data.iter()) { + lhr.extend(rhr.iter().cloned()); + } + } + + pub(crate) fn invert(&mut self) + where + T: Zero + One + MulAssign + SubAssign + Div + PartialEq + Clone + Copy, + { + if self.rows() != self.cols() { + panic!("Cannot invert a non-square matrix"); + } + + // Add an identity matrix to the right hand side + self.extend_cols(Matrix::identity(self.data.len())); + + for r in 0..self.rows() { + if self[(r, r)].is_zero() { + for r_swap in r + 1..self.rows() { + if !self[(r_swap, r)].is_zero() { + self.data.swap(r, r_swap); + } + } + } + + if self[(r, r)].is_zero() { + panic!("Matrix is singular"); + } + + // Scale the row + if !self[(r, r)].is_one() { + let scale = T::one() / self[(r, r)]; + for c in 0..self.cols() { + self[(r, c)] *= scale; + } + } + + // Clear below the diagonal + for r_below in r + 1..self.rows() { + if !self[(r_below, r)].is_zero() { + let scale = self[(r_below, r)]; + for c in 0..self.cols() { + let val = self[(r, c)]; + self[(r_below, c)] -= val * scale; + } + } + } + } + + // Clear above the diagonal + for r in 1..self.rows() { + for r_above in 0..r { + if !self[(r_above, r)].is_zero() { + let scale = self[(r_above, r)]; + for c in 0..self.cols() { + let val = self[(r, c)]; + self[(r_above, c)] -= val * scale; + } + } + } + } + + // Take just the right hand side now + for row in self.data.iter_mut() { + *row = row[row.len() / 2..].to_vec(); + } + } +} + +impl Index<(usize, usize)> for Matrix { + type Output = T; + + #[inline] + fn index(&self, index: (usize, usize)) -> &Self::Output { + &self.data[index.0][index.1] + } +} + +impl IndexMut<(usize, usize)> for Matrix { + #[inline] + fn index_mut(&mut self, index: (usize, usize)) -> &mut Self::Output { + &mut self.data[index.0][index.1] + } +} + +impl Mul for Matrix { + type Output = Matrix; + + fn mul(self, rhs: Self) -> Self::Output { + assert_eq!(self.cols(), rhs.rows()); + + let mut result: Matrix = Matrix::zeroes(self.rows(), rhs.cols()); + + for (i, row) in self.data.iter().enumerate() { + for j in 0..rhs.cols() { + for rhs_row in 0..rhs.rows() { + result[(i, j)] += row[rhs_row] * rhs[(rhs_row, j)]; + } + } + } + + result + } +} + +impl Mul<&[&[U]]> for Matrix +where + U: Into + Copy, +{ + type Output = Matrix; + + fn mul(self, rhs: &[&[U]]) -> Self::Output { + assert_eq!(self.cols(), rhs.len()); + let rhs_cols = rhs[0].len(); + for row in rhs.iter().skip(1) { + assert_eq!(rhs_cols, row.len()); + } + + let mut result: Matrix = Matrix::zeroes(self.rows(), rhs_cols); + + for j in 0..rhs_cols { + for rhs_row in 0..rhs.len() { + let rhs_val = rhs[rhs_row][j].into(); + for (i, row) in self.data.iter().enumerate() { + result[(i, j)] += row[rhs_row] * rhs_val; + } + } + } + + result + } +} + +#[cfg(test)] +mod test { + use super::Matrix; + + #[test] + fn test_matrix_multiply() { + let matrix1: Matrix = Matrix::new(vec![vec![1, 2], vec![3, 4], vec![5, 6]]); + let matrix2: Matrix = Matrix::new(vec![vec![1, 2, 3], vec![4, 5, 6]]); + + assert_eq!( + matrix1 * matrix2, + Matrix::new(vec![vec![9, 12, 15], vec![19, 26, 33], vec![29, 40, 51]]) + ); + } + + #[test] + fn test_matrix_invert() { + let mut matrix: Matrix = Matrix::new(vec![vec![4.0, 7.0], vec![2.0, 6.0]]); + matrix.invert(); + + // Floating point math makes it not exact + assert!(f64::abs(matrix[(0, 0)] - 0.6) < 0.0001); + assert!(f64::abs(matrix[(0, 1)] + 0.7) < 0.0001); + assert!(f64::abs(matrix[(1, 0)] + 0.2) < 0.0001); + assert!(f64::abs(matrix[(1, 1)] - 0.4) < 0.0001); + } +} diff --git a/rust/src/hdfs/ec.rs b/rust/src/ec/mod.rs similarity index 54% rename from rust/src/hdfs/ec.rs rename to rust/src/ec/mod.rs index c574158..9c0f5be 100644 --- a/rust/src/hdfs/ec.rs +++ b/rust/src/ec/mod.rs @@ -1,11 +1,10 @@ -use bytes::{Bytes, BytesMut}; -#[cfg(feature = "rs")] -use reed_solomon_erasure::{ - galois_8::{add, div, Field, ReedSolomon}, - matrix::Matrix, -}; +pub(crate) mod gf256; +mod matrix; use crate::{proto::hdfs, HdfsError, Result}; +use bytes::Bytes; + +use self::gf256::Coder; const RS_CODEC_NAME: &str = "rs"; const RS_LEGACY_CODEC_NAME: &str = "rs-legacy"; @@ -60,10 +59,7 @@ impl EcSchema { full_rows * self.cell_size + bytes_in_last_row } - pub(crate) fn ec_decode( - &self, - mut vertical_stripes: Vec>, - ) -> Result> { + pub(crate) fn ec_decode(&self, mut vertical_stripes: Vec>) -> Result> { let mut cells: Vec = Vec::new(); if !vertical_stripes .iter() @@ -71,12 +67,9 @@ impl EcSchema { .all(|(index, stripe)| stripe.is_some() || index >= self.data_units) { match self.codec_name.as_str() { - #[cfg(feature = "rs")] "rs" => { - let matrix = gen_rs_matrix(self.data_units, self.parity_units)?; - let decoder = - ReedSolomon::new_with_matrix(self.data_units, self.parity_units, matrix)?; - decoder.reconstruct_data(&mut vertical_stripes)?; + let coder = Coder::new(self.data_units, self.parity_units); + coder.decode(&mut vertical_stripes)?; } codec => { return Err(HdfsError::UnsupportedErasureCodingPolicy(format!( @@ -89,7 +82,7 @@ impl EcSchema { while vertical_stripes[0].as_ref().is_some_and(|b| !b.is_empty()) { for stripe in vertical_stripes.iter_mut().take(self.data_units) { - cells.push(stripe.as_mut().unwrap().split_to(self.cell_size).freeze()) + cells.push(stripe.as_mut().unwrap().split_to(self.cell_size)) } } @@ -151,91 +144,19 @@ pub(crate) fn resolve_ec_policy(policy: &hdfs::ErasureCodingPolicyProto) -> Resu } } -#[cfg(feature = "rs")] -fn gen_rs_matrix(data_units: usize, parity_units: usize) -> Result> { - // Identity matrix for the first `data_rows` rows - let mut data_rows: Vec> = (0..data_units) - .map(|i| { - let mut row = vec![0u8; data_units]; - row[i] = 1; - row - }) - .collect(); - - // For parity rows, inverse of i ^ j, or 0 (described as 1 / (i + j) | i != j) - let mut parity_rows: Vec> = (data_units..(data_units + parity_units)) - .map(|i| { - let row: Vec = (0..data_units) - .map(|j| match add(i as u8, j as u8) { - 0 => 0, - mult => div(1, mult), - }) - .collect(); - row - }) - .collect(); - - data_rows.append(&mut parity_rows); - - Ok(Matrix::new_with_data(data_rows)) -} - -#[cfg(feature = "rs")] #[cfg(test)] mod test { - use crate::Result; + use crate::ec::matrix::Matrix; + + use super::gf256::GF256; #[test] - fn test_build_rs_matrix() -> Result<()> { - use super::gen_rs_matrix; - use reed_solomon_erasure::matrix::Matrix; - - // These examples were taken directly from the matrices created by Hadoop via RSUtil.genCauchyMatrix - assert_eq!( - gen_rs_matrix(3, 2)?, - Matrix::new_with_data(vec![ - vec![1, 0, 0,], - vec![0, 1, 0,], - vec![0, 0, 1,], - vec![244, 142, 1,], - vec![71, 167, 122,], - ]), - ); - - assert_eq!( - gen_rs_matrix(6, 3)?, - Matrix::new_with_data(vec![ - vec![1, 0, 0, 0, 0, 0,], - vec![0, 1, 0, 0, 0, 0,], - vec![0, 0, 1, 0, 0, 0,], - vec![0, 0, 0, 1, 0, 0,], - vec![0, 0, 0, 0, 1, 0,], - vec![0, 0, 0, 0, 0, 1,], - vec![122, 186, 71, 167, 142, 244,], - vec![186, 122, 167, 71, 244, 142,], - vec![173, 157, 221, 152, 61, 170,], - ]), - ); - - assert_eq!( - gen_rs_matrix(10, 4)?, - Matrix::new_with_data(vec![ - vec![1, 0, 0, 0, 0, 0, 0, 0, 0, 0,], - vec![0, 1, 0, 0, 0, 0, 0, 0, 0, 0,], - vec![0, 0, 1, 0, 0, 0, 0, 0, 0, 0,], - vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0,], - vec![0, 0, 0, 0, 1, 0, 0, 0, 0, 0,], - vec![0, 0, 0, 0, 0, 1, 0, 0, 0, 0,], - vec![0, 0, 0, 0, 0, 0, 1, 0, 0, 0,], - vec![0, 0, 0, 0, 0, 0, 0, 1, 0, 0,], - vec![0, 0, 0, 0, 0, 0, 0, 0, 1, 0,], - vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 1,], - vec![221, 152, 173, 157, 93, 150, 61, 170, 142, 244,], - vec![152, 221, 157, 173, 150, 93, 170, 61, 244, 142,], - vec![61, 170, 93, 150, 173, 157, 221, 152, 71, 167,], - vec![170, 61, 150, 93, 157, 173, 152, 221, 167, 71,], - ]), - ); - Ok(()) + fn test_invert_matrix() { + let mut matrix: Matrix = + Matrix::new(vec![vec![0, 0, 1], vec![244, 142, 1], vec![71, 167, 122]]); + let original_matrix = matrix.clone(); + matrix.invert(); + + assert_eq!(matrix * original_matrix, Matrix::identity(3)); } } diff --git a/rust/src/error.rs b/rust/src/error.rs index 6c2a92c..e2aff56 100644 --- a/rust/src/error.rs +++ b/rust/src/error.rs @@ -3,8 +3,6 @@ use std::io; #[cfg(feature = "kerberos")] use libgssapi::error::Error as GssapiError; use prost::DecodeError; -#[cfg(feature = "rs")] -use reed_solomon_erasure::Error as RSError; use thiserror::Error; #[derive(Error, Debug)] @@ -33,9 +31,8 @@ pub enum HdfsError { IsADirectoryError(String), #[error("unsupported erasure coding policy")] UnsupportedErasureCodingPolicy(String), - #[cfg(feature = "rs")] #[error("erasure coding error")] - ErasureCodingError(#[from] RSError), + ErasureCodingError(String), #[error("operation not supported")] UnsupportedFeature(String), #[error("interal error, this shouldn't happen")] diff --git a/rust/src/file.rs b/rust/src/file.rs index c4b10de..2e6ea5b 100644 --- a/rust/src/file.rs +++ b/rust/src/file.rs @@ -4,8 +4,8 @@ use bytes::{BufMut, Bytes, BytesMut}; use futures::stream::BoxStream; use futures::{stream, Stream, StreamExt}; +use crate::ec::EcSchema; use crate::hdfs::datanode::{get_block_stream, BlockWriter}; -use crate::hdfs::ec::EcSchema; use crate::hdfs::protocol::NamenodeProtocol; use crate::proto::hdfs; use crate::Result; diff --git a/rust/src/hdfs/datanode.rs b/rust/src/hdfs/datanode.rs index 464860c..c42acf3 100644 --- a/rust/src/hdfs/datanode.rs +++ b/rust/src/hdfs/datanode.rs @@ -10,12 +10,13 @@ use log::{debug, error}; use tokio::sync::{mpsc, oneshot}; use crate::{ + ec::EcSchema, hdfs::connection::{DatanodeConnection, Op}, proto::{common, hdfs}, HdfsError, Result, }; -use super::{connection::Packet, ec::EcSchema}; +use super::connection::Packet; const HEART_BEAT_SEQNO: i64 = -1; const UNKNOWN_SEQNO: i64 = -1; @@ -208,7 +209,7 @@ impl StripedBlockStream { .zip(self.block.locs.iter()) .collect(); - let mut stripe_results: Vec> = + let mut stripe_results: Vec> = vec![None; self.ec_schema.data_units + self.ec_schema.parity_units]; let mut futures = Vec::new(); @@ -289,7 +290,7 @@ impl StripedBlockStream { datanode: Option<&&hdfs::DatanodeInfoProto>, offset: usize, len: usize, - ) -> Result { + ) -> Result { #[cfg(feature = "integration-test")] if let Some(fault_injection) = crate::test::EC_FAULT_INJECTOR.lock().unwrap().as_ref() { if fault_injection.fail_blocks.contains(&(index as usize)) { @@ -322,7 +323,7 @@ impl StripedBlockStream { .await?; } - Ok(buf) + Ok(buf.freeze()) } async fn read_from_datanode( diff --git a/rust/src/hdfs/mod.rs b/rust/src/hdfs/mod.rs index 0b12076..717b2fb 100644 --- a/rust/src/hdfs/mod.rs +++ b/rust/src/hdfs/mod.rs @@ -1,5 +1,4 @@ pub(crate) mod connection; pub(crate) mod datanode; -pub(crate) mod ec; pub(crate) mod protocol; pub(crate) mod proxy; diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 88d688d..43c4907 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -28,14 +28,11 @@ //! - `token` - include support for Token authentication. Uses the gsasl native library. Only //! supports authentication, not integrity or privacy modes. //! - `object_store` - an `object_store` implementation for HDFS. -//! - `rs` - support Reed-Solomon codecs for erasure coded reads. It relies on a fork of with the PR , so you must include a `patch` for it to compile: -//! ```toml -//! [patch.crates-io] -//! reed-solomon-erasure = { git = "https://github.com/Kimahriman/reed-solomon-erasure.git", branch = "SNB/23C24_external_matrix" } //! ``` pub mod client; pub(crate) mod common; +pub(crate) mod ec; pub(crate) mod error; pub mod file; pub(crate) mod hdfs; diff --git a/rust/tests/test_ec.rs b/rust/tests/test_ec.rs index 03505a4..272e765 100644 --- a/rust/tests/test_ec.rs +++ b/rust/tests/test_ec.rs @@ -1,4 +1,4 @@ -#[cfg(all(feature = "integration-test", feature = "rs"))] +#[cfg(feature = "integration-test")] mod test { use bytes::{Buf, Bytes}; From 846c5d7cff39c317e7ee3c49fb74cf1f277343f6 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 21 Dec 2023 15:05:04 -0800 Subject: [PATCH 2/5] Remove remaining rs feature usage --- .github/workflows/rust-test.yml | 6 +++--- README.md | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/rust-test.yml b/.github/workflows/rust-test.yml index 87547f3..031151b 100644 --- a/.github/workflows/rust-test.yml +++ b/.github/workflows/rust-test.yml @@ -48,7 +48,7 @@ jobs: run: sudo apt-get install -y libkrb5-dev libgsasl-dev - name: build and lint with clippy - run: cargo clippy --tests --features kerberos,token,integration-test,rs + run: cargo clippy --tests --features kerberos,token,integration-test - name: Check docs run: cargo doc @@ -66,7 +66,7 @@ jobs: run: cargo check --features object_store - name: Check all features - run: cargo check --features kerberos,token,object_store,rs,integration-test + run: cargo check --features kerberos,token,object_store,integration-test test: strategy: @@ -109,4 +109,4 @@ jobs: echo "$GITHUB_WORKSPACE/hadoop-3.3.6/bin" >> $GITHUB_PATH - name: Run tests - run: cargo test --features kerberos,token,object_store,integration-test,rs + run: cargo test --features kerberos,token,object_store,integration-test diff --git a/README.md b/README.md index 154115d..3c2b47c 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ cargo build --features token,kerberos The tests are mostly integration tests that utilize a small Java application in `rust/mindifs/` that runs a custom `MiniDFSCluster`. To run the tests, you need to have Java, Maven, Hadoop binaries, and Kerberos tools available and on your path. Any Java version between 8 and 17 should work. ```bash -cargo test -p hdfs-native --features token,kerberos,rs,intergation-test +cargo test -p hdfs-native --features token,kerberos,intergation-test ``` ### Python tests From f2b96edbe49640424e7e6aae24fa0da380e4c5f9 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 24 Dec 2023 14:35:42 -0800 Subject: [PATCH 3/5] Make matrix multiply algorithm a little faster --- Cargo.lock | 213 +++++++++++++++++++++++++++++++++++++++++- rust/Cargo.toml | 6 ++ rust/benches/ec.rs | 67 +++++++++++++ rust/src/ec/gf256.rs | 34 ++++++- rust/src/ec/matrix.rs | 33 ++++--- rust/src/ec/mod.rs | 2 +- rust/src/lib.rs | 3 + 7 files changed, 337 insertions(+), 21 deletions(-) create mode 100644 rust/benches/ec.rs diff --git a/Cargo.lock b/Cargo.lock index 9a00086..6fa01d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" + [[package]] name = "anyhow" version = "1.0.75" @@ -140,6 +152,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.0.83" @@ -179,6 +197,33 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "ciborium" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656" + +[[package]] +name = "ciborium-ll" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -190,6 +235,31 @@ dependencies = [ "libloading", ] +[[package]] +name = "clap" +version = "4.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a216b506622bb1d316cd51328dce24e07bdff4a6128a47c7e7fad11878d5adbb" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" + [[package]] name = "core-foundation-sys" version = "0.8.4" @@ -211,6 +281,74 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2fe95351b870527a5d09bf563ed3c97c0cffb87cf1c78a591bf48bb218d9aa" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" +dependencies = [ + "cfg-if", +] + [[package]] name = "dashmap" version = "5.5.3" @@ -477,6 +615,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + [[package]] name = "hashbrown" version = "0.12.3" @@ -498,6 +642,7 @@ dependencies = [ "bytes", "chrono", "crc", + "criterion", "env_logger", "futures", "g2p", @@ -932,6 +1077,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "parking_lot" version = "0.12.1" @@ -989,6 +1140,34 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "plotters" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c224ba00d7cadd4d5c660deaf2098e5e80e07846537c51f9cfa4be50c1fd45" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e76628b4d3a7581389a35d5b6e2139607ad7c75b17aed325f210aa91f4a9609" + +[[package]] +name = "plotters-svg" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f6d39893cca0701371e3c27294f09797214b86f1fb951b89ade8ec04e2abab" +dependencies = [ + "plotters-backend", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1187,6 +1366,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1462,9 +1661,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" [[package]] name = "snafu" @@ -1584,6 +1783,16 @@ dependencies = [ "syn 2.0.32", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index f5303ef..a86fc78 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -40,6 +40,7 @@ prost-build = { version = "0.11", optional = true } protobuf-src = { version = "1.1", optional = true } [dev-dependencies] +criterion = "0.5" env_logger = "0.10" serial_test = "2.0.0" tempfile = "3" @@ -52,3 +53,8 @@ token = ["gsasl-sys"] generate-protobuf = ["prost-build", "protobuf-src"] integration-test = ["which"] +benchmark = [] + +[[bench]] +name = "ec" +harness = false diff --git a/rust/benches/ec.rs b/rust/benches/ec.rs new file mode 100644 index 0000000..244147b --- /dev/null +++ b/rust/benches/ec.rs @@ -0,0 +1,67 @@ +use bytes::{BufMut, Bytes, BytesMut}; +use criterion::*; +use hdfs_native::ec::gf256::Coder; + +fn bench(c: &mut Criterion) { + let mut matrix = Coder::gen_rs_matrix(6, 3); + matrix.select_rows([3, 4, 5, 6, 7, 8].into_iter()); + + let mut group = c.benchmark_group("matrix-inversion"); + group.bench_function("invert", |b| { + let mut matrix = matrix.clone(); + b.iter(move || matrix.invert()); + }); + group.finish(); + + let coder = Coder::new(6, 3); + let slice_size: u64 = 16 * 1024 * 1024; + + let slices: Vec<_> = (0..6) + .map(|i| { + let mut buf = BytesMut::with_capacity(slice_size as usize); + for v in 0..slice_size / 4 { + buf.put_i32((v + i * slice_size) as i32); + } + buf.freeze() + }) + .collect(); + + let mut group = c.benchmark_group("rs-encode"); + group.throughput(Throughput::Bytes(slice_size * 6)); + group.sample_size(30); + group.bench_function("encode", |b| b.iter(|| coder.encode(&slices[..]))); + group.finish(); + + // Get the actual encoded slices + let parity_shards = coder.encode(&slices[..]); + let mut decode_slices: Vec> = Vec::new(); + decode_slices.extend(slices.iter().cloned().map(Some)); + decode_slices.extend(parity_shards.iter().cloned().map(Some)); + + let mut group = c.benchmark_group("rs-decode"); + group.throughput(Throughput::Bytes(slice_size * 6)); + group.bench_function("decode-1-slice", |b| { + b.iter(|| { + decode_slices[0] = None; + coder.decode(&mut decode_slices[..]) + }) + }); + group.bench_function("decode-2-slice", |b| { + b.iter(|| { + decode_slices[0] = None; + decode_slices[1] = None; + coder.decode(&mut decode_slices[..]) + }) + }); + group.bench_function("decode-3-slice", |b| { + b.iter(|| { + decode_slices[0] = None; + decode_slices[1] = None; + decode_slices[2] = None; + coder.decode(&mut decode_slices[..]) + }) + }); +} + +criterion_group!(benches, bench); +criterion_main!(benches); diff --git a/rust/src/ec/gf256.rs b/rust/src/ec/gf256.rs index 8d1a41e..c44222d 100644 --- a/rust/src/ec/gf256.rs +++ b/rust/src/ec/gf256.rs @@ -22,20 +22,20 @@ impl One for GF256 { } } -pub(crate) struct Coder { +pub struct Coder { data_units: usize, parity_units: usize, } impl Coder { - pub(crate) fn new(data_units: usize, parity_units: usize) -> Self { + pub fn new(data_units: usize, parity_units: usize) -> Self { Self { data_units, parity_units, } } - fn gen_rs_matrix(data_units: usize, parity_units: usize) -> Matrix { + pub fn gen_rs_matrix(data_units: usize, parity_units: usize) -> Matrix { let mut matrix = Matrix::zeroes(data_units + parity_units, data_units); for r in 0..data_units { matrix[(r, r)] = GF256::one(); @@ -54,7 +54,29 @@ impl Coder { matrix } - pub(crate) fn decode(&self, data: &mut [Option]) -> Result<()> { + /// Takes a slice of data slices and returns a vector of parity slices + #[allow(dead_code)] + pub fn encode(&self, data: &[Bytes]) -> Vec { + assert_eq!(data.len(), self.data_units); + let shard_bytes = data[0].len(); + + assert!(data.iter().skip(1).all(|s| s.len() == shard_bytes)); + + let mut encode_matrix = Self::gen_rs_matrix(self.data_units, self.parity_units); + // We only care about generating the parity rows + encode_matrix.select_rows(self.data_units..self.data_units + self.parity_units); + + let parity_shards = + encode_matrix * &data.iter().map(|r| &r[..]).collect::>()[..]; + + parity_shards + .into_inner() + .into_iter() + .map(|shard| Bytes::from(shard.into_iter().map(Into::into).collect::>())) + .collect() + } + + pub fn decode(&self, data: &mut [Option]) -> Result<()> { let mut valid_indices: Vec = Vec::new(); let mut invalid_indices: Vec = Vec::new(); @@ -62,7 +84,9 @@ impl Coder { for (i, slice) in data.iter().enumerate() { if let Some(slice) = slice.as_ref() { - data_matrix.push(slice); + if data_matrix.len() < self.data_units { + data_matrix.push(slice); + } valid_indices.push(i); } else if i < self.data_units { // We don't care about missing parity data for decoding diff --git a/rust/src/ec/matrix.rs b/rust/src/ec/matrix.rs index 1a5a9a7..905ec15 100644 --- a/rust/src/ec/matrix.rs +++ b/rust/src/ec/matrix.rs @@ -6,12 +6,11 @@ use std::{ use num_traits::{One, Zero}; #[derive(PartialEq, Debug, Clone)] -pub(crate) struct Matrix { +pub struct Matrix { data: Vec>, } impl Matrix { - #[cfg(test)] pub(crate) fn new(data: impl AsRef<[V]>) -> Self where U: Into + Copy, @@ -38,12 +37,12 @@ impl Matrix { } #[inline] - fn rows(&self) -> usize { + pub fn rows(&self) -> usize { self.data.len() } #[inline] - fn cols(&self) -> usize { + pub fn cols(&self) -> usize { self.data[0].len() } @@ -68,7 +67,7 @@ impl Matrix { } /// Filter this matrix down to only selected rows - pub(crate) fn select_rows(&mut self, rows: impl Iterator) { + pub fn select_rows(&mut self, rows: impl Iterator) { let rows: HashSet = rows.collect(); let data = std::mem::take(&mut self.data); @@ -84,6 +83,13 @@ impl Matrix { self.data } + pub fn convert(self) -> Matrix + where + T: Into + Copy, + { + Matrix::new(self.into_inner()) + } + /// Extend this matrix by inserting another matrix on the right hand side fn extend_cols(&mut self, other: Matrix) where @@ -95,7 +101,7 @@ impl Matrix { } } - pub(crate) fn invert(&mut self) + pub fn invert(&mut self) where T: Zero + One + MulAssign + SubAssign + Div + PartialEq + Clone + Copy, { @@ -195,9 +201,10 @@ impl Mul for Matrix { } } -impl Mul<&[&[U]]> for Matrix +impl Mul<&[&[U]]> for Matrix where - U: Into + Copy, + T: Zero + One + Add + AddAssign + Mul + Clone + Copy + std::fmt::Debug, + U: Into + Copy + std::fmt::Debug, { type Output = Matrix; @@ -210,11 +217,11 @@ where let mut result: Matrix = Matrix::zeroes(self.rows(), rhs_cols); - for j in 0..rhs_cols { - for rhs_row in 0..rhs.len() { - let rhs_val = rhs[rhs_row][j].into(); - for (i, row) in self.data.iter().enumerate() { - result[(i, j)] += row[rhs_row] * rhs_val; + for (i, rhs_row) in rhs.iter().enumerate() { + for (lhs_row, result_row) in self.data.iter().zip(result.data.iter_mut()) { + let lhs_value = lhs_row[i]; + for (rhs_cell, result_cell) in rhs_row.iter().zip(result_row.iter_mut()) { + *result_cell += lhs_value * (*rhs_cell).into(); } } } diff --git a/rust/src/ec/mod.rs b/rust/src/ec/mod.rs index 9c0f5be..a3f73eb 100644 --- a/rust/src/ec/mod.rs +++ b/rust/src/ec/mod.rs @@ -1,4 +1,4 @@ -pub(crate) mod gf256; +pub mod gf256; mod matrix; use crate::{proto::hdfs, HdfsError, Result}; diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 43c4907..6cac320 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -32,6 +32,9 @@ pub mod client; pub(crate) mod common; +#[cfg(feature = "benchmark")] +pub mod ec; +#[cfg(not(feature = "benchmark"))] pub(crate) mod ec; pub(crate) mod error; pub mod file; From a6760081611ff4688c1aca24271557d39d4de2a1 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 25 Dec 2023 12:50:53 -0800 Subject: [PATCH 4/5] Fix lints --- rust/src/ec/matrix.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/rust/src/ec/matrix.rs b/rust/src/ec/matrix.rs index 905ec15..91aeecc 100644 --- a/rust/src/ec/matrix.rs +++ b/rust/src/ec/matrix.rs @@ -11,7 +11,8 @@ pub struct Matrix { } impl Matrix { - pub(crate) fn new(data: impl AsRef<[V]>) -> Self + #[cfg(test)] + pub fn new(data: impl AsRef<[V]>) -> Self where U: Into + Copy, V: AsRef<[U]>, @@ -83,13 +84,6 @@ impl Matrix { self.data } - pub fn convert(self) -> Matrix - where - T: Into + Copy, - { - Matrix::new(self.into_inner()) - } - /// Extend this matrix by inserting another matrix on the right hand side fn extend_cols(&mut self, other: Matrix) where From 4295223b12dafbed4d0ceab89d9df10d99f54a46 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 26 Dec 2023 18:54:59 -0800 Subject: [PATCH 5/5] Add some comments so I maybe remember how this works in the future --- rust/src/ec/gf256.rs | 7 +++++++ rust/src/ec/matrix.rs | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/rust/src/ec/gf256.rs b/rust/src/ec/gf256.rs index c44222d..8bd0f03 100644 --- a/rust/src/ec/gf256.rs +++ b/rust/src/ec/gf256.rs @@ -76,6 +76,8 @@ impl Coder { .collect() } + /// Takes a slice of Option, and fills in any missing data shards using parity shards. + /// Returns an error if there aren't enough parity shards to recompute missing data shards. pub fn decode(&self, data: &mut [Option]) -> Result<()> { let mut valid_indices: Vec = Vec::new(); let mut invalid_indices: Vec = Vec::new(); @@ -94,6 +96,11 @@ impl Coder { } } + if invalid_indices.is_empty() { + // We have all the data shards so just return + return Ok(()); + } + if valid_indices.len() < self.data_units { return Err(crate::HdfsError::ErasureCodingError( "Not enough valid shards".to_string(), diff --git a/rust/src/ec/matrix.rs b/rust/src/ec/matrix.rs index 91aeecc..7bdfb16 100644 --- a/rust/src/ec/matrix.rs +++ b/rust/src/ec/matrix.rs @@ -5,6 +5,9 @@ use std::{ use num_traits::{One, Zero}; +/// Helper struct for matrix operations needed for erasure coded. Mostly used for inverting +/// matrices for reed-solomon decoding, and matrix multiplication for recovering data shards +/// or computing parity shards. #[derive(PartialEq, Debug, Clone)] pub struct Matrix { data: Vec>, @@ -195,6 +198,9 @@ impl Mul for Matrix { } } +/// Maybe overly convoluted generic function for computing matrix multiplication with a 2D +/// slice of entries that can be turned into the type stored in this Matrix. In practice this +/// is only used for multiplying a Matrix of GF256 by a 2D slice of u8. impl Mul<&[&[U]]> for Matrix where T: Zero + One + Add + AddAssign + Mul + Clone + Copy + std::fmt::Debug,