From 49cdc51a9c0d920f94158710ea341b69c59b4d56 Mon Sep 17 00:00:00 2001 From: xwb1136021767 <1136021767@qq.com> Date: Tue, 8 Aug 2023 19:20:11 +0800 Subject: [PATCH] nydus-image: Adapt local cas to the latest version of nydus. Original version from https://github.com/dragonflyoss/image-service/pull/956. Signed-off-by: xwb1136021767 <1136021767@qq.com> Signed-off-by: mofishzz --- Cargo.lock | 221 +++++++++-- Cargo.toml | 1 + api/src/config.rs | 111 +++++- builder/Cargo.toml | 10 +- builder/src/core/bootstrap.rs | 235 ++++++++++- builder/src/core/bootstrap_dedup.rs | 166 ++++++++ builder/src/core/context.rs | 9 +- builder/src/core/mod.rs | 2 + builder/src/core/node.rs | 250 +++++++++++- builder/src/core/v6.rs | 4 +- builder/src/lib.rs | 2 +- rafs/Cargo.toml | 2 +- rafs/src/metadata/cached_v5.rs | 4 + rafs/src/metadata/chunk.rs | 30 +- rafs/src/metadata/direct_v5.rs | 4 + rafs/src/metadata/direct_v6.rs | 10 + rafs/src/metadata/layout/v5.rs | 15 +- rafs/src/metadata/layout/v6.rs | 7 +- rafs/src/metadata/md_v5.rs | 4 + rafs/src/metadata/mod.rs | 5 +- rafs/src/mock/mock_chunk.rs | 4 + src/bin/nydus-image/main.rs | 49 ++- src/bin/nydus-image/unpack/pax/test.rs | 4 + storage/src/cache/state/blob_state_map.rs | 5 +- storage/src/device.rs | 129 +++++- storage/src/meta/mod.rs | 4 + storage/src/test.rs | 4 + utils/Cargo.toml | 3 +- utils/src/cas.rs | 456 ++++++++++++++++++++++ utils/src/compress/mod.rs | 2 +- utils/src/crypt.rs | 4 +- utils/src/digest.rs | 6 +- utils/src/lib.rs | 1 + 33 files changed, 1687 insertions(+), 76 deletions(-) create mode 100644 builder/src/core/bootstrap_dedup.rs create mode 100644 utils/src/cas.rs diff --git a/Cargo.lock b/Cargo.lock index eb25be9cca2..67eb54da9e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.18" @@ -26,6 +37,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -109,6 +126,25 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" + +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "serde", + "tap", + "wyz", +] + [[package]] name = "blake3" version = "1.3.1" @@ -194,7 +230,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335867764ed2de42325fafe6d18b8af74ba97ee0c590fa016f157535b42ab04b" dependencies = [ "atty", - "bitflags", + "bitflags 1.3.2", "clap_derive", "clap_lex", "once_cell", @@ -212,7 +248,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -326,7 +362,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn", + "syn 1.0.98", ] [[package]] @@ -343,7 +379,7 @@ checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -385,6 +421,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "erased-serde" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f94c0e13118e7d7533271f754a168ae8400e6a1cc043f2bfd53cc7290f1a1de3" +dependencies = [ + "serde", +] + [[package]] name = "errno" version = "0.2.8" @@ -406,6 +451,18 @@ dependencies = [ "libc", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "1.7.0" @@ -486,6 +543,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "fuse-backend-rs" version = "0.10.5" @@ -493,7 +556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f85357722be4bf3d0b7548bedf7499686c77628c2c61cb99c6519463f7a9e5f0" dependencies = [ "arc-swap", - "bitflags", + "bitflags 1.3.2", "caps", "core-foundation-sys", "lazy_static", @@ -536,7 +599,7 @@ checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -606,7 +669,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dd7365d734a70ac5dd7be791b0c96083852188df015b8c665bb2dadb108a743" dependencies = [ - "bitflags", + "bitflags 1.3.2", "crc", "log", "uuid", @@ -637,6 +700,25 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "hashlink" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "312f66718a2d7789ffef4f4b7b213138ed9f1eb3aa1d0d82fc99f88fb3ffd26f" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "heck" version = "0.4.1" @@ -793,7 +875,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -821,7 +903,7 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ba34abb5175052fc1a2227a10d2275b7386c9990167de9786c0b88d8b062330" dependencies = [ - "bitflags", + "bitflags 1.3.2", "libc", ] @@ -878,6 +960,17 @@ version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +[[package]] +name = "libsqlite3-sys" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.8" @@ -1027,7 +1120,7 @@ version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "195cdbc1741b8134346d515b3a56a1c94b0912758009cfd53f99ea0f57b065fc" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "libc", "memoffset", @@ -1105,6 +1198,8 @@ version = "0.1.0" dependencies = [ "anyhow", "base64", + "bitvec", + "erased-serde", "hex", "indexmap", "libc", @@ -1118,8 +1213,9 @@ dependencies = [ "serde_json", "sha2", "tar", + "toml", "vmm-sys-util", - "xattr", + "xattr 0.2.3", ] [[package]] @@ -1141,7 +1237,7 @@ dependencies = [ "anyhow", "arc-swap", "assert_matches", - "bitflags", + "bitflags 1.3.2", "fuse-backend-rs", "lazy_static", "libc", @@ -1187,13 +1283,14 @@ dependencies = [ "tar", "time", "tokio", + "toml", "vhost", "vhost-user-backend", "virtio-bindings", "virtio-queue", "vm-memory", "vmm-sys-util", - "xattr", + "xattr 1.0.1", ] [[package]] @@ -1232,7 +1329,7 @@ version = "0.6.4" dependencies = [ "arc-swap", "base64", - "bitflags", + "bitflags 1.3.2", "fuse-backend-rs", "gpt", "hex", @@ -1278,6 +1375,7 @@ dependencies = [ "nix", "nydus-api", "openssl", + "rusqlite", "serde", "serde_json", "sha2", @@ -1298,9 +1396,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.13.0" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "openssl" @@ -1308,7 +1406,7 @@ version = "0.10.55" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "foreign-types", "libc", @@ -1325,7 +1423,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1414,7 +1512,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1444,7 +1542,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.98", "version_check", ] @@ -1461,29 +1559,35 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.47" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.20" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bcdf212e9776fbcb2d23ab029360416bb1706b1aea2d1a5ba002727cbcab804" +checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "redox_syscall" version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -1579,7 +1683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44237c429621e3606374941c3061fe95686bdaddb9b4f6524e4edc2d21da9c58" dependencies = [ "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1594,7 +1698,7 @@ version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", @@ -1641,7 +1745,7 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -1660,22 +1764,22 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.139" +version = "1.0.166" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6" +checksum = "d01b7404f9d441d3ad40e6a636a7782c377d2abdbe4fa2440e2edcc2f4f10db8" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.139" +version = "1.0.166" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb" +checksum = "5dd83d6dde2b6b2d466e14d9d1acce8816dedee94f735eac6395808b3483c6d6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.23", ] [[package]] @@ -1771,6 +1875,23 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tar" version = "0.4.40" @@ -1779,7 +1900,7 @@ checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" dependencies = [ "filetime", "libc", - "xattr", + "xattr 1.0.1", ] [[package]] @@ -1821,7 +1942,7 @@ checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1877,7 +1998,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -1953,7 +2074,7 @@ checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", ] [[package]] @@ -2043,7 +2164,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9b791c5b0717a0558888a4cf7240cea836f39a99cb342e12ce633dcaa078072" dependencies = [ - "bitflags", + "bitflags 1.3.2", "libc", "vm-memory", "vmm-sys-util", @@ -2099,7 +2220,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd64fe09d8e880e600c324e7d664760a17f56e9672b7495a86381b49e4f72f46" dependencies = [ - "bitflags", + "bitflags 1.3.2", "libc", ] @@ -2140,7 +2261,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn", + "syn 1.0.98", "wasm-bindgen-shared", ] @@ -2174,7 +2295,7 @@ checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.98", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2292,6 +2413,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + +[[package]] +name = "xattr" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d1526bbe5aaeb5eb06885f4d987bcdfa5e23187055de9b83fe00156a821fabc" +dependencies = [ + "libc", +] + [[package]] name = "xattr" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 9e9c1d16e27..f200be9bcba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.51" tar = "0.4.40" tokio = { version = "1.24", features = ["macros"] } +toml = "0.5" # Build static linked openssl library openssl = { version = "0.10.55", features = ["vendored"] } diff --git a/api/src/config.rs b/api/src/config.rs index e606770ddc9..8d5100dd449 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -29,6 +29,8 @@ pub struct ConfigV2 { pub cache: Option, /// Configuration information for RAFS filesystem. pub rafs: Option, + /// Configuration information for image deduplication. + pub dedup: Option, /// Internal runtime configuration. #[serde(skip)] pub internal: ConfigV2Internal, @@ -42,6 +44,7 @@ impl Default for ConfigV2 { backend: None, cache: None, rafs: None, + dedup: None, internal: ConfigV2Internal::default(), } } @@ -56,6 +59,7 @@ impl ConfigV2 { backend: None, cache: None, rafs: None, + dedup: None, internal: ConfigV2Internal::default(), } } @@ -126,6 +130,16 @@ impl ConfigV2 { }) } + /// Get configuration information for image deduplication. + pub fn get_dedup_config(&self) -> Result<&DeduplicationConfigV2> { + self.dedup.as_ref().ok_or_else(|| { + Error::new( + ErrorKind::InvalidInput, + "no configuration information for deduplication", + ) + }) + } + /// Get configuration information for cache subsystem. pub fn get_cache_config(&self) -> Result<&CacheConfigV2> { self.cache.as_ref().ok_or_else(|| { @@ -962,6 +976,9 @@ pub struct BlobCacheEntryConfigV2 { /// Configuration information for local cache system. #[serde(default)] pub cache: CacheConfigV2, + /// Configuration information for chunk deduplication. + #[serde(default)] + pub dedup: Option, /// Optional file path for metadata blob. #[serde(default)] pub metadata_path: Option, @@ -1024,11 +1041,59 @@ impl From<&BlobCacheEntryConfigV2> for ConfigV2 { backend: Some(c.backend.clone()), cache: Some(c.cache.clone()), rafs: None, + dedup: c.dedup.clone(), internal: ConfigV2Internal::default(), } } } +/// Configuration information for image deduplication. +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +pub struct DeduplicationConfigV2 { + #[serde(default)] + pub enable: bool, + #[serde(default)] + pub work_dir: String, +} + +impl DeduplicationConfigV2 { + /// Validate image deduplication configuration. + pub fn validate(&self) -> bool { + true + } + + pub fn get_enable(&self) -> bool { + self.enable + } + pub fn get_work_dir(&self) -> Result<&str> { + let path = fs::metadata(&self.work_dir) + .or_else(|_| { + fs::create_dir_all(&self.work_dir)?; + fs::metadata(&self.work_dir) + }) + .map_err(|e| { + log::error!( + "fail to stat deduplication work_dir {}: {}", + self.work_dir, + e + ); + e + })?; + + if path.is_dir() { + Ok(&self.work_dir) + } else { + Err(Error::new( + ErrorKind::NotFound, + format!( + "deduplication work_dir {} is not a directory", + self.work_dir + ), + )) + } + } +} + /// Internal runtime configuration. #[derive(Clone, Debug)] pub struct ConfigV2Internal { @@ -1070,7 +1135,7 @@ pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap"; pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob"; /// Configuration information for a cached blob. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct BlobCacheEntry { /// Type of blob object, bootstrap or data blob. #[serde(rename = "type")] @@ -1325,6 +1390,28 @@ impl TryFrom<&CacheConfig> for CacheConfigV2 { } } +/// Configuration information for image deduplication. +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +struct DeduplicationConfig { + /// Whether to enable image dedup + #[serde(default)] + pub enable: bool, + /// Work fir for image dedup + #[serde(default)] + pub work_dir: String, +} + +impl TryFrom<&DeduplicationConfig> for DeduplicationConfigV2 { + type Error = std::io::Error; + + fn try_from(v: &DeduplicationConfig) -> std::result::Result { + Ok(DeduplicationConfigV2 { + enable: v.enable, + work_dir: v.work_dir.clone(), + }) + } +} + /// Configuration information to create blob cache manager. #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] struct FactoryConfig { @@ -1336,6 +1423,9 @@ struct FactoryConfig { /// Configuration for blob cache manager. #[serde(default)] pub cache: CacheConfig, + /// Configuration information for image deduplication. + #[serde(default)] + pub dedup: Option, } /// Rafs storage backend configuration information. @@ -1374,6 +1464,14 @@ impl TryFrom for ConfigV2 { fn try_from(v: RafsConfig) -> std::result::Result { let backend: BackendConfigV2 = (&v.device.backend).try_into()?; let mut cache: CacheConfigV2 = (&v.device.cache).try_into()?; + let dedup: Option = match &v.device.dedup { + Some(dedup) => { + let dedup_v2: DeduplicationConfigV2 = dedup.try_into()?; + Some(dedup_v2) + } + None => None, + }; + // (&v.device.dedup).try_into()?; let rafs = RafsConfigV2 { mode: v.mode, batch_size: v.amplify_io, @@ -1394,6 +1492,7 @@ impl TryFrom for ConfigV2 { backend: Some(backend), cache: Some(cache), rafs: Some(rafs), + dedup, internal: ConfigV2Internal::default(), }) } @@ -1487,6 +1586,8 @@ pub(crate) struct BlobCacheEntryConfig { /// /// Possible value: `FileCacheConfig`, `FsCacheConfig`. cache_config: Value, + /// Configuration for chunk deduplication + dedup_config: Option, /// Configuration for data prefetch. #[serde(default)] prefetch_config: BlobPrefetchConfig, @@ -1510,11 +1611,19 @@ impl TryFrom<&BlobCacheEntryConfig> for BlobCacheEntryConfigV2 { cache_validate: false, prefetch_config: v.prefetch_config.clone(), }; + let dedup_config = match &v.dedup_config { + Some(cfg) => { + let cfg_v2: DeduplicationConfigV2 = cfg.try_into()?; + Some(cfg_v2) + } + None => None, + }; Ok(BlobCacheEntryConfigV2 { version: 2, id: v.id.clone(), backend: (&backend_config).try_into()?, cache: (&cache_config).try_into()?, + dedup: dedup_config, metadata_path: v.metadata_path.clone(), }) } diff --git a/builder/Cargo.toml b/builder/Cargo.toml index 51639e41b9b..01ef2b4e347 100644 --- a/builder/Cargo.toml +++ b/builder/Cargo.toml @@ -16,12 +16,18 @@ indexmap = "1" libc = "0.2" log = "0.4" nix = "0.24" -serde = { version = "1.0.110", features = ["serde_derive", "rc"] } +serde = { version = "1.0.137", features = ["derive", "rc"] } serde_json = "1.0.53" +erased-serde = "0.3" sha2 = "0.10.2" tar = "0.4.40" vmm-sys-util = "0.11.0" -xattr = "1.0.1" +xattr = "0.2.3" +toml = "0.5" +bitvec = { version="1", default-features = false, features = ["alloc", + "atomic", + "serde", + "std",]} nydus-api = { version = "0.3", path = "../api" } nydus-rafs = { version = "0.3", path = "../rafs" } diff --git a/builder/src/core/bootstrap.rs b/builder/src/core/bootstrap.rs index 8999879820e..d58539d44f6 100644 --- a/builder/src/core/bootstrap.rs +++ b/builder/src/core/bootstrap.rs @@ -2,16 +2,34 @@ // Copyright (C) 2023 Alibaba Cloud. All rights reserved. // // SPDX-License-Identifier: Apache-2.0 - -use anyhow::{Context, Error, Result}; +#![allow(unused_variables, unused_imports)] +use anyhow::{bail, Context, Error, Result}; +use nydus_rafs::metadata::chunk::ChunkWrapper; +use nydus_rafs::metadata::layout::v5::{ + dedup_rafsv5_align, RafsV5BlobTable, RafsV5ExtBlobEntry, RafsV5SuperBlock, +}; +use nydus_rafs::metadata::layout::v6::{ + align_offset, RafsV6BlobTable, RafsV6Device, RafsV6SuperBlock, RafsV6SuperBlockExt, + EROFS_BLOCK_SIZE_4096, EROFS_SUPER_BLOCK_SIZE, EROFS_SUPER_OFFSET, +}; +use nydus_rafs::{RafsIoReader, RafsIoWrite}; +use nydus_storage::device::BlobFeatures; use nydus_utils::digest::{self, RafsDigest}; + +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryFrom; +use std::io::SeekFrom; +use std::mem::size_of; use std::ops::Deref; +use std::sync::Arc; use nydus_rafs::metadata::layout::{RafsBlobTable, RAFS_V5_ROOT_INODE}; -use nydus_rafs::metadata::{RafsSuper, RafsSuperConfig, RafsSuperFlags}; +use nydus_rafs::metadata::{RafsStore, RafsSuper, RafsSuperConfig, RafsSuperFlags}; use crate::{ArtifactStorage, BlobManager, BootstrapContext, BootstrapManager, BuildContext, Tree}; +use super::chunk_dict::DigestWithBlobIndex; + /// RAFS bootstrap/meta builder. pub struct Bootstrap { pub(crate) tree: Tree, @@ -210,4 +228,215 @@ impl Bootstrap { Tree::from_bootstrap(&rs, &mut blob_mgr.layered_chunk_dict) .context("failed to build tree from bootstrap") } + + pub fn dedup( + &mut self, + build_ctx: &BuildContext, + rs: &RafsSuper, + reader: &mut RafsIoReader, + writer: &mut dyn RafsIoWrite, + blob_table: &RafsBlobTable, + chunk_cache: &BTreeMap>, + ) -> Result<()> { + match blob_table { + RafsBlobTable::V5(table) => { + self.rafsv5_dedup(build_ctx, rs, reader, writer, table, chunk_cache)? + } + RafsBlobTable::V6(table) => { + self.rafsv6_dedup(build_ctx, rs, reader, writer, table, chunk_cache)? + } + } + + Ok(()) + } + + fn rafsv5_dedup( + &mut self, + build_ctx: &BuildContext, + _rs: &RafsSuper, + reader: &mut RafsIoReader, + writer: &mut dyn RafsIoWrite, + blob_table: &RafsV5BlobTable, + chunk_cache: &BTreeMap>, + ) -> Result<()> { + reader.seek_to_offset(0)?; + let mut sb = RafsV5SuperBlock::new(); + reader.read_exact(sb.as_mut())?; + + let old_blob_table_offset = sb.blob_table_offset(); + let old_table_size = sb.blob_table_size() + + dedup_rafsv5_align( + size_of::() * sb.extended_blob_table_entries() as usize, + ) as u32; + let blob_table_size = blob_table.size() as u32; + let bootstrap_end = writer + .seek_to_end() + .context("failed to seek to bootstrap's end for devtable")?; + + let (blob_table_offset, ext_blob_table_offset) = if blob_table_size > old_table_size { + (bootstrap_end, bootstrap_end + blob_table_size as u64) + } else { + (old_blob_table_offset, bootstrap_end) + }; + //write rs + sb.set_blob_table_offset(blob_table_offset as u64); + sb.set_blob_table_size(blob_table_size as u32); + sb.set_extended_blob_table_offset(ext_blob_table_offset as u64); + sb.set_extended_blob_table_entries(u32::try_from(blob_table.extended.entries())?); + writer.seek(SeekFrom::Start(0))?; + sb.store(writer).context("failed to store superblock")?; + //rewrite blob table + writer + .seek_offset(blob_table_offset) + .context("failed seek for extended blob table offset")?; + blob_table + .store(writer) + .context("failed to store blob table")?; + writer + .seek_offset(ext_blob_table_offset) + .context("failed seek for extended blob table offset")?; + blob_table + .store_extended(writer) + .context("failed to store extended blob table")?; + writer.finalize(Some(String::default()))?; + + Ok(()) + } + + fn rafsv6_dedup( + &mut self, + build_ctx: &BuildContext, + rs: &RafsSuper, + reader: &mut RafsIoReader, + writer: &mut dyn RafsIoWrite, + blob_table: &RafsV6BlobTable, + chunk_cache: &BTreeMap>, + ) -> Result<()> { + let mut sb = RafsV6SuperBlock::new(); + sb.load(reader)?; + let mut ext_sb = RafsV6SuperBlockExt::new(); + ext_sb.load(reader)?; + + let blobs = blob_table.get_all(); + let devtable_len = (blobs.len() * size_of::()) as u64; + let blob_table_size = blob_table.size() as u64; + let old_devtable_offset = sb.s_devt_slotoff as u64 * size_of::() as u64; + let old_blob_table_offset = rs.meta.blob_table_offset as u64; + let old_blob_table_size = rs.meta.blob_table_size as u64; + let old_table_size = + old_blob_table_offset + old_blob_table_size as u64 - old_devtable_offset; + let chunk_table_offset = ext_sb.chunk_table_offset(); + let chunk_table_size = ext_sb.chunk_table_size(); + + let bootstrap_end = writer + .seek_to_end() + .context("failed to seek to bootstrap's end for devtable")?; + + let (dev_table_offset, blob_table_offset) = if devtable_len > old_table_size { + ( + bootstrap_end, + align_offset(bootstrap_end + devtable_len, EROFS_BLOCK_SIZE_4096 as u64), + ) + } else { + (old_devtable_offset, bootstrap_end) + }; + + // Dump super block + writer.seek(SeekFrom::Start(0))?; + sb.set_devt_slotoff(dev_table_offset); + sb.store(writer).context("failed to store rs")?; + + // Dump ext_sb + ext_sb.set_blob_table_offset(blob_table_offset); + ext_sb.set_blob_table_size(blob_table_size as u32); + writer + .seek_offset((EROFS_SUPER_OFFSET + EROFS_SUPER_BLOCK_SIZE) as u64) + .context("failed to seek for extended super block")?; + ext_sb + .store(writer) + .context("failed to store extended super block")?; + + // Dump chunk info array. + writer + .seek_offset(chunk_table_offset) + .context("failed to seek to bootstrap's end for chunk table")?; + for (_, chunk) in chunk_cache.iter() { + chunk.store(writer).context("failed to dump chunk table")?; + } + + // Dump blob table + writer + .seek_offset(blob_table_offset) + .context("failed seek for extended blob table offset")?; + blob_table + .store(writer) + .context("failed to store extended blob table")?; + + // Dump padding + writer.flush().context("failed to flush bootstrap")?; + + let pos = writer + .seek_to_end() + .context("failed to seek to bootstrap's end")?; + let padding = align_offset(pos, EROFS_BLOCK_SIZE_4096 as u64) - pos; + let padding_data: [u8; 4096] = [0u8; 4096]; + writer + .write_all(&padding_data[0..padding as usize]) + .context("failed to write 0 to padding of bootstrap's end")?; + writer.flush().context("failed to flush bootstrap")?; + + //prepare devtable + let block_size = build_ctx.v6_block_size(); + let mut devtable: Vec = Vec::new(); + let mut pos = writer + .seek_to_end() + .context("failed to seek to bootstrap's end for chunk table")?; + assert_eq!(pos % block_size, 0); + let mut block_count = 0u32; + for entry in blobs.iter() { + let mut devslot = RafsV6Device::new(); + // blob id is String, which is processed by sha256.finalize(). + if entry.blob_id().is_empty() { + bail!(" blob id is empty"); + } else if entry.blob_id().len() > 64 { + bail!(format!( + "blob id length is bigger than 64 bytes, blob id {:?}", + entry.blob_id() + )); + } else if entry.uncompressed_size() / block_size > u32::MAX as u64 { + bail!(format!( + "uncompressed blob size (0x:{:x}) is too big", + entry.uncompressed_size() + )); + } + + let cnt = (entry.uncompressed_size() / block_size) as u32; + if block_count.checked_add(cnt).is_none() { + bail!("Too many data blocks in RAFS filesystem, block size 0x{:x}, block count 0x{:x}", block_size, block_count as u64 + cnt as u64); + } + let mapped_blkaddr = Self::v6_align_mapped_blkaddr(block_size, pos)?; + pos = (mapped_blkaddr + cnt) as u64 * block_size; + block_count += cnt; + + let id = entry.blob_id(); + let id = id.as_bytes(); + let mut blob_id = [0u8; 64]; + blob_id[..id.len()].copy_from_slice(id); + devslot.set_blob_id(&blob_id); + devslot.set_blocks(cnt); + devslot.set_mapped_blkaddr(mapped_blkaddr); + devtable.push(devslot); + } + + // Dump devslot table + writer + .seek_offset(dev_table_offset) + .context("failed to seek devtslot")?; + + for slot in devtable.iter() { + slot.store(writer).context("failed to store device slot")?; + } + + Ok(()) + } } diff --git a/builder/src/core/bootstrap_dedup.rs b/builder/src/core/bootstrap_dedup.rs new file mode 100644 index 00000000000..ab619f2499a --- /dev/null +++ b/builder/src/core/bootstrap_dedup.rs @@ -0,0 +1,166 @@ +// // Copyright (C) 2022 Alibaba Cloud. All rights reserved. +// // +// // SPDX-License-Identifier: Apache-2.0 + +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryFrom; +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::{Ok, Result}; + +use nydus_api::{BackendConfigV2, ConfigV2}; +use nydus_rafs::metadata::chunk::ChunkWrapper; +use nydus_rafs::metadata::{RafsSuper, RafsSuperFlags, RafsVersion}; +use nydus_rafs::{RafsIoReader, RafsIoWrite}; +use nydus_utils::cas::CasMgr; +use nydus_utils::digest::{self, RafsDigest}; +use nydus_utils::{root_tracer, timing_tracer}; + +use crate::core::bootstrap::Bootstrap; +use crate::core::context::{ + ArtifactFileWriter, ArtifactWriter, BlobManager, BuildContext, ConversionType, +}; + +use crate::core::feature::Features; +use crate::core::overlay::WhiteoutSpec; +use crate::core::tree::Tree; +use crate::ArtifactStorage; + +pub struct BootstrapDedup { + cas_mgr: CasMgr, + rs: RafsSuper, + cache_chunks: HashMap, + insert_chunks: Vec<(String, String, String)>, + insert_blobs: Vec<(String, String, String)>, + reader: RafsIoReader, + writer: Box, + backend: BackendConfigV2, + encrypt: bool, +} + +impl BootstrapDedup { + pub fn new(bootstrap_path: PathBuf, output_path: PathBuf, cfg: &Arc) -> Result { + let (rs, _reader) = RafsSuper::load_from_file(&bootstrap_path, cfg.clone(), false)?; + + let reader = Box::new( + fs::OpenOptions::new() + .read(true) + .write(false) + .open(&bootstrap_path)?, + ) as RafsIoReader; + + let dedup_config = cfg.get_dedup_config()?; + let db_dir = dedup_config.get_work_dir()?; + let cas_mgr = CasMgr::new(db_dir)?; + + let cache_chunks = HashMap::new(); + let insert_chunks = vec![]; + let insert_blobs = vec![]; + let backend = cfg.get_backend_config().unwrap().clone(); + let encrypt = !rs.meta.flags.contains(RafsSuperFlags::ENCRYPTION_NONE); + let writer = Box::new(ArtifactFileWriter(ArtifactWriter::new( + ArtifactStorage::SingleFile(PathBuf::from(&output_path)), + )?)) as Box; + + fs::copy(&bootstrap_path, &output_path)?; + + Ok(BootstrapDedup { + cas_mgr, + rs, + cache_chunks, + insert_chunks, + insert_blobs, + reader, + writer, + backend, + encrypt, + }) + } + + fn is_tarfs(&self) -> bool { + self.rs.meta.flags.contains(RafsSuperFlags::TARTFS_MODE) + } + + pub fn do_dedup(&mut self) -> Result<()> { + let conversion_type = if self.is_tarfs() { + ConversionType::TarToTarfs + } else { + ConversionType::DirectoryToRafs + }; + let mut build_ctx = BuildContext::new( + "".to_string(), + false, + 0, + self.rs.meta.get_compressor(), + self.rs.meta.get_digester(), + self.rs.meta.explicit_uidgid(), + WhiteoutSpec::Oci, + conversion_type, + PathBuf::from(""), + Default::default(), + None, + false, + Features::new(), + self.encrypt, + ); + + build_ctx.set_fs_version(RafsVersion::try_from(self.rs.meta.version).unwrap()); + + let is_v6 = match &build_ctx.fs_version { + RafsVersion::V5 => false, + RafsVersion::V6 => true, + }; + let tree = Tree::from_bootstrap(&self.rs, &mut ())?; + + let mut chunk_cache = BTreeMap::new(); + let mut blob_mgr = BlobManager::new(digest::Algorithm::Sha256); + blob_mgr + .extend_from_blob_table(&build_ctx, self.rs.superblock.get_blob_infos()) + .unwrap(); + + // Dump bootstrap + timing_tracer!( + { + tree.walk_bfs(true, &mut |n| { + n.lock_node().dedup_chunk_for_node( + &build_ctx, + &mut blob_mgr, + &self.rs.meta, + self.writer.as_mut(), + &mut self.cache_chunks, + &mut self.insert_chunks, + &self.cas_mgr, + &mut chunk_cache, + ) + }) + }, + "chunk_dedup" + )?; + + let blob_table = blob_mgr.to_blob_table(&build_ctx)?; + let mut bootstrap = Bootstrap::new(tree)?; + bootstrap.dedup( + &build_ctx, + &self.rs, + &mut self.reader, + self.writer.as_mut(), + &blob_table, + &chunk_cache, + )?; + + let blobs = self.rs.superblock.get_blob_infos(); + for blob in &blobs { + self.insert_blobs.push(( + blob.blob_id(), + serde_json::to_string(&blob)?, + serde_json::to_string(&self.backend)?, + )); + } + self.cas_mgr.add_blobs(&self.insert_blobs, is_v6)?; + self.cas_mgr.add_chunks(&self.insert_chunks, is_v6)?; + + Ok(()) + } +} diff --git a/builder/src/core/context.rs b/builder/src/core/context.rs index 36f48a3c325..8cd10867e98 100644 --- a/builder/src/core/context.rs +++ b/builder/src/core/context.rs @@ -193,7 +193,7 @@ impl Write for ArtifactMemoryWriter { } } -struct ArtifactFileWriter(ArtifactWriter); +pub struct ArtifactFileWriter(pub ArtifactWriter); impl RafsIoWrite for ArtifactFileWriter { fn as_any(&self) -> &dyn Any { @@ -913,6 +913,13 @@ impl BlobManager { self.blobs.iter().map(|b| b.blob_id.to_owned()).collect() } + pub fn get_blob_id_by_idx(&mut self, idx: usize) -> Option { + if idx >= self.len() { + return None; + } + self.blobs[idx].blob_id() + } + /// Prepend all blobs from `blob_table` to the blob manager. pub fn extend_from_blob_table( &mut self, diff --git a/builder/src/core/mod.rs b/builder/src/core/mod.rs index 311625c5fc1..ca1290480d6 100644 --- a/builder/src/core/mod.rs +++ b/builder/src/core/mod.rs @@ -14,3 +14,5 @@ pub(crate) mod prefetch; pub(crate) mod tree; pub(crate) mod v5; pub(crate) mod v6; + +pub mod bootstrap_dedup; diff --git a/builder/src/core/node.rs b/builder/src/core/node.rs index 5357d639a28..16e72873fa0 100644 --- a/builder/src/core/node.rs +++ b/builder/src/core/node.rs @@ -2,11 +2,13 @@ // Copyright (C) 2021-2023 Alibaba Cloud. All rights reserved. // // SPDX-License-Identifier: Apache-2.0 - +#![allow(unused_variables, unused_imports)] +use std::collections::{BTreeMap, HashMap}; use std::ffi::{OsStr, OsString}; use std::fmt::{self, Display, Formatter, Result as FmtResult}; use std::fs::{self, File}; -use std::io::{Read, Write}; +use std::io::{Read, SeekFrom, Write}; +use std::mem::size_of; use std::ops::Deref; #[cfg(target_os = "linux")] use std::os::linux::fs::MetadataExt; @@ -17,13 +19,19 @@ use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use anyhow::{anyhow, bail, Context, Error, Result}; -use nydus_rafs::metadata::chunk::ChunkWrapper; -use nydus_rafs::metadata::inode::InodeWrapper; -use nydus_rafs::metadata::layout::v6::EROFS_INODE_FLAT_PLAIN; -use nydus_rafs::metadata::layout::RafsXAttrs; -use nydus_rafs::metadata::{Inode, RafsVersion}; -use nydus_storage::device::BlobFeatures; +use nydus_rafs::metadata::chunk::{convert_ref_to_rafs_v5_chunk_info, ChunkWrapper}; +use nydus_rafs::metadata::inode::{InodeWrapper, RafsV6Inode}; +use nydus_rafs::metadata::layout::v5::{RafsV5ChunkInfo, RafsV5Inode, RafsV5InodeWrapper}; +use nydus_rafs::metadata::layout::v6::{ + align_offset, RafsV6InodeChunkAddr, EROFS_BLOCK_SIZE_4096, EROFS_BLOCK_SIZE_512, + EROFS_INODE_FLAT_PLAIN, EROFS_INODE_SLOT_SIZE, +}; +use nydus_rafs::metadata::layout::{RafsXAttrs, RAFS_SUPER_VERSION_V6}; +use nydus_rafs::metadata::{Inode, RafsStore, RafsSuperFlags, RafsSuperMeta, RafsVersion}; +use nydus_rafs::RafsIoWrite; +use nydus_storage::device::{BlobFeatures, BlobInfo}; use nydus_storage::meta::{BlobChunkInfoV2Ondisk, BlobMetaChunkInfo}; +use nydus_utils::cas::CasMgr; use nydus_utils::digest::{DigestHasher, RafsDigest}; use nydus_utils::{compress, crypt}; use nydus_utils::{div_round_up, event_tracer, root_tracer, try_round_up_4k, ByteSize}; @@ -33,6 +41,8 @@ use crate::{ ArtifactWriter, BlobContext, BlobManager, BuildContext, ChunkDict, ConversionType, Overlay, }; +use super::chunk_dict::DigestWithBlobIndex; + /// Filesystem root path for Unix OSs. const ROOT_PATH_NAME: &[u8] = &[b'/']; @@ -108,7 +118,7 @@ impl NodeChunk { } /// Struct to host sharable fields of [Node]. -#[derive(Clone, Default, Debug)] +#[derive(Clone, Default)] pub struct NodeInfo { /// Whether the explicit UID/GID feature is enabled or not. pub explicit_uidgid: bool, @@ -725,6 +735,228 @@ impl Node { .symlink_metadata() .with_context(|| format!("failed to get metadata of {}", self.path().display())) } + + pub fn get_chunk_ofs(&mut self, meta: &RafsSuperMeta) -> Result<(u64, u64)> { + if meta.version == RAFS_SUPER_VERSION_V6 { + self.get_chunk_ofs_v6(meta) + } else { + self.get_chunk_ofs_v5(meta) + } + } + + fn get_chunk_ofs_v5(&mut self, meta: &RafsSuperMeta) -> Result<(u64, u64)> { + unimplemented!() + } + + fn get_chunk_ofs_v6(&mut self, meta: &RafsSuperMeta) -> Result<(u64, u64)> { + let unit = size_of::() as u64; + let block_size = if meta.flags.contains(RafsSuperFlags::TARTFS_MODE) { + EROFS_BLOCK_SIZE_512 + } else { + EROFS_BLOCK_SIZE_4096 + }; + let meta_offset = meta.meta_blkaddr as usize * block_size as usize; + let nid = self.info.src_ino; + let inode_offset = meta_offset + .checked_add(nid as usize * EROFS_INODE_SLOT_SIZE) + .unwrap(); + let inode = match self.inode.clone() { + InodeWrapper::V5(i) => InodeWrapper::V5(i), + InodeWrapper::V6(i) => InodeWrapper::V6(i), + InodeWrapper::Ref(i) => { + if meta.version == RAFS_SUPER_VERSION_V6 { + InodeWrapper::V6(RafsV6Inode::from(i.deref())) + } else { + InodeWrapper::V5(RafsV5Inode::from(i.deref())) + } + } + }; + self.inode = inode; + let base = self.v6_size_with_xattr(); + let chunk_ofs = align_offset(inode_offset as u64 + base, unit); + + Ok((chunk_ofs, unit)) + } + + #[allow(clippy::too_many_arguments)] + pub fn dedup_chunk_for_node( + &mut self, + build_ctx: &BuildContext, + blob_mgr: &mut BlobManager, + meta: &RafsSuperMeta, + writer: &mut dyn RafsIoWrite, + cache_chunks: &mut HashMap, + insert_chunks: &mut Vec<(String, String, String)>, + cas_mgr: &CasMgr, + chunk_cache: &mut BTreeMap>, + ) -> Result<()> { + let (mut chunk_ofs, chunk_size) = self.get_chunk_ofs(meta)?; + + for chunk in &self.chunks { + let chunk_id = chunk.inner.id(); + let origin_blob_index = chunk.inner.blob_index() as usize; + let blob_id = blob_mgr + .get_blob_id_by_idx(chunk.inner.blob_index() as usize) + .unwrap(); + + writer + .seek(SeekFrom::Start(chunk_ofs)) + .context("failed seek for chunk_ofs") + .unwrap(); + + match cache_chunks.get(chunk_id) { + // dedup chunk between layers + Some(new_chunk) => { + // if the chunk is belong to other image's blob + let mut new_chunk = new_chunk.deref().clone(); + let blob_index = new_chunk.blob_index() as usize; + if origin_blob_index != blob_index { + new_chunk.set_deduped(true); + } + + chunk_cache.insert( + DigestWithBlobIndex(*new_chunk.id(), new_chunk.blob_index() + 1), + Arc::new(new_chunk.clone()), + ); + self.dedup_bootstrap(build_ctx, &new_chunk, writer)? + } + None => match cas_mgr.get_chunk(chunk_id, &blob_id, true)? { + Some((new_blob_id, chunk_info)) => { + let blob_idx = match blob_mgr.get_blob_idx_by_id(&new_blob_id) { + Some(blob_idx) => blob_idx, + None => { + //Safe to use unwarp since we get blob_id from chunk table + let blob_info = cas_mgr.get_blob(&new_blob_id, true)?.unwrap(); + let blob = serde_json::from_str::(&blob_info)?; + let blob_idx = blob_mgr.alloc_index()?; + blob_mgr.add_blob(BlobContext::from( + build_ctx, + &blob, + ChunkSource::Parent, + )?); + + blob_idx + } + }; + + let new_chunk = serde_json::from_str::(&chunk_info)?; + let mut new_chunk = match &build_ctx.fs_version { + RafsVersion::V5 => ChunkWrapper::V5(new_chunk), + RafsVersion::V6 => ChunkWrapper::V6(new_chunk), + }; + + // if this chunk is from other blob, mark it as dedup + if origin_blob_index != blob_idx as usize { + new_chunk.set_deduped(true); + } + new_chunk.set_blob_index(blob_idx); + chunk_cache.insert( + DigestWithBlobIndex(*new_chunk.id(), new_chunk.blob_index() + 1), + Arc::new(new_chunk.clone()), + ); + + self.dedup_bootstrap(build_ctx, &new_chunk, writer)?; + cache_chunks.insert(*chunk_id, new_chunk); + } + None => { + let new_chunk = chunk.inner.as_ref().clone(); + cache_chunks.insert(*chunk_id, new_chunk.clone()); + chunk_cache.insert( + DigestWithBlobIndex(*new_chunk.id(), new_chunk.blob_index() + 1), + Arc::new(new_chunk.clone()), + ); + + let chunk_info = match new_chunk.clone() { + ChunkWrapper::V5(c) => serde_json::to_string(&c).unwrap(), + ChunkWrapper::V6(c) => serde_json::to_string(&c).unwrap(), + ChunkWrapper::Ref(c) => { + let chunk = convert_ref_to_rafs_v5_chunk_info(c.deref()); + serde_json::to_string(&chunk).unwrap() + } + }; + insert_chunks.push((String::from(*chunk_id), chunk_info, blob_id)); + } + }, + } + + chunk_ofs += chunk_size; + } + Ok(()) + } + + pub fn dedup_bootstrap( + &self, + build_ctx: &BuildContext, + chunk: &ChunkWrapper, + writer: &mut dyn RafsIoWrite, + ) -> Result<()> { + match chunk { + ChunkWrapper::V5(_) => self.dedup_bootstrap_v5(build_ctx, chunk, writer), + ChunkWrapper::V6(_) => self.dedup_bootstrap_v6(build_ctx, chunk, writer), + ChunkWrapper::Ref(_) => match &build_ctx.fs_version { + RafsVersion::V5 => self.dedup_bootstrap_v5(build_ctx, chunk, writer), + RafsVersion::V6 => self.dedup_bootstrap_v6(build_ctx, chunk, writer), + }, + } + } + + pub fn update_inode_digest_for_bootstrap(&self, writer: &mut dyn RafsIoWrite) -> Result<()> { + // Dump inode info + if let InodeWrapper::V5(raw_inode) = &self.inode { + let name = self.name(); + let inode = RafsV5InodeWrapper { + name, + symlink: self.info.symlink.as_deref(), + inode: raw_inode, + }; + inode + .store(writer) + .context("failed to dump inode to bootstrap")?; + } + + Ok(()) + } + + fn dedup_bootstrap_v5( + &self, + build_ctx: &BuildContext, + chunk: &ChunkWrapper, + writer: &mut dyn RafsIoWrite, + ) -> Result<()> { + chunk + .store(writer) + .context("failed to dump chunk info to bootstrap") + .unwrap(); + anyhow::Ok(()) + } + + fn dedup_bootstrap_v6( + &self, + build_ctx: &BuildContext, + chunk: &ChunkWrapper, + writer: &mut dyn RafsIoWrite, + ) -> Result<()> { + let mut v6_chunk = RafsV6InodeChunkAddr::new(); + // for erofs, bump id by 1 since device id 0 is bootstrap. + let offset = chunk.uncompressed_offset(); + let blk_addr = build_ctx.v6_block_addr(offset).with_context(|| { + format!( + "failed to compute blk_addr for chunk with uncompressed offset 0x{:x}", + offset + ) + })?; + v6_chunk.set_blob_index(chunk.blob_index()); + v6_chunk.set_blob_ci_index(chunk.index()); + v6_chunk.set_block_addr(blk_addr); + + let mut chunks: Vec = Vec::new(); + chunks.extend(v6_chunk.as_ref()); + writer + .write(chunks.as_slice()) + .context("failed to write chunkindexes") + .unwrap(); + anyhow::Ok(()) + } } // Access Methods diff --git a/builder/src/core/v6.rs b/builder/src/core/v6.rs index f81e0b95432..4d5b81daf5e 100644 --- a/builder/src/core/v6.rs +++ b/builder/src/core/v6.rs @@ -199,7 +199,7 @@ impl Node { Ok(d_size) } - fn v6_size_with_xattr(&self) -> u64 { + pub fn v6_size_with_xattr(&self) -> u64 { self.inode .get_inode_size_with_xattr(&self.info.xattrs, self.v6_compact_inode) as u64 } @@ -882,7 +882,7 @@ impl Bootstrap { Ok(()) } - fn v6_align_mapped_blkaddr(block_size: u64, addr: u64) -> Result { + pub fn v6_align_mapped_blkaddr(block_size: u64, addr: u64) -> Result { match addr.checked_add(V6_BLOCK_SEG_ALIGNMENT - 1) { None => bail!("address 0x{:x} is too big", addr), Some(v) => { diff --git a/builder/src/lib.rs b/builder/src/lib.rs index d2409a16bc0..c4d9db93f30 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -40,7 +40,7 @@ pub use self::stargz::StargzBuilder; pub use self::tarball::TarballBuilder; mod compact; -mod core; +pub mod core; mod directory; mod merge; mod stargz; diff --git a/rafs/Cargo.toml b/rafs/Cargo.toml index 4581d5e0524..f0e4960d753 100644 --- a/rafs/Cargo.toml +++ b/rafs/Cargo.toml @@ -20,7 +20,7 @@ serde = { version = "1.0.110", features = ["serde_derive", "rc"] } serde_json = "1.0.53" vm-memory = "0.10" fuse-backend-rs = "^0.10.3" - + nydus-api = { version = "0.3", path = "../api" } nydus-storage = { version = "0.6", path = "../storage", features = ["backend-localfs"] } nydus-utils = { version = "0.4", path = "../utils" } diff --git a/rafs/src/metadata/cached_v5.rs b/rafs/src/metadata/cached_v5.rs index 597e1ddb06a..98f56631d2b 100644 --- a/rafs/src/metadata/cached_v5.rs +++ b/rafs/src/metadata/cached_v5.rs @@ -737,6 +737,10 @@ impl BlobChunkInfo for CachedChunkInfoV5 { false } + fn is_deduped(&self) -> bool { + false + } + fn as_any(&self) -> &dyn Any { self } diff --git a/rafs/src/metadata/chunk.rs b/rafs/src/metadata/chunk.rs index 5e8a3446d37..f695498c17c 100644 --- a/rafs/src/metadata/chunk.rs +++ b/rafs/src/metadata/chunk.rs @@ -12,6 +12,7 @@ use nydus_storage::device::v5::BlobV5ChunkInfo; use nydus_storage::device::{BlobChunkFlags, BlobChunkInfo}; use nydus_storage::meta::BlobMetaChunk; use nydus_utils::digest::RafsDigest; +use serde::{Deserialize, Serialize}; use crate::metadata::cached_v5::CachedChunkInfoV5; use crate::metadata::direct_v5::DirectChunkInfoV5; @@ -21,13 +22,14 @@ use crate::metadata::{RafsStore, RafsVersion}; use crate::RafsIoWrite; /// A wrapper to encapsulate different versions of chunk information objects. -#[derive(Clone)] +#[derive(Clone, Deserialize, Serialize)] pub enum ChunkWrapper { /// Chunk info for RAFS v5. V5(RafsV5ChunkInfo), /// Chunk info RAFS v6, reuse `RafsV5ChunkInfo` as IR for v6. V6(RafsV5ChunkInfo), /// Reference to a `BlobChunkInfo` object. + #[serde(skip_deserializing, skip_serializing)] Ref(Arc), } @@ -259,6 +261,27 @@ impl ChunkWrapper { } } + /// Check whether the chunk is deduped or not. + pub fn is_deduped(&self) -> bool { + match self { + ChunkWrapper::V5(c) => c.flags.contains(BlobChunkFlags::DEDUPED), + ChunkWrapper::V6(c) => c.flags.contains(BlobChunkFlags::DEDUPED), + ChunkWrapper::Ref(c) => as_blob_v5_chunk_info(c.deref()) + .flags() + .contains(BlobChunkFlags::DEDUPED), + } + } + + /// Set flag for whether chunk is deduped. + pub fn set_deduped(&mut self, deduped: bool) { + self.ensure_owned(); + match self { + ChunkWrapper::V5(c) => c.flags.set(BlobChunkFlags::DEDUPED, deduped), + ChunkWrapper::V6(c) => c.flags.set(BlobChunkFlags::DEDUPED, deduped), + ChunkWrapper::Ref(_c) => panic!("unexpected"), + } + } + /// Set flag for whether chunk is encrypted. pub fn set_encrypted(&mut self, encrypted: bool) { self.ensure_owned(); @@ -421,3 +444,8 @@ fn to_rafs_v5_chunk_info(cki: &dyn BlobV5ChunkInfo) -> RafsV5ChunkInfo { reserved: 0u32, } } + +pub fn convert_ref_to_rafs_v5_chunk_info(cki: &dyn BlobChunkInfo) -> RafsV5ChunkInfo { + let chunk = to_rafs_v5_chunk_info(as_blob_v5_chunk_info(cki.deref())); + chunk +} diff --git a/rafs/src/metadata/direct_v5.rs b/rafs/src/metadata/direct_v5.rs index 2b7221525eb..a1010562a07 100644 --- a/rafs/src/metadata/direct_v5.rs +++ b/rafs/src/metadata/direct_v5.rs @@ -846,6 +846,10 @@ impl BlobChunkInfo for DirectChunkInfoV5 { false } + fn is_deduped(&self) -> bool { + false + } + fn as_any(&self) -> &dyn Any { self } diff --git a/rafs/src/metadata/direct_v6.rs b/rafs/src/metadata/direct_v6.rs index f7731aa1c9c..a2237dc594b 100644 --- a/rafs/src/metadata/direct_v6.rs +++ b/rafs/src/metadata/direct_v6.rs @@ -1373,6 +1373,7 @@ pub(crate) struct DirectChunkInfoV6 { mapping: DirectSuperBlockV6, offset: usize, digest: RafsDigest, + is_dedup: bool, } // This is *direct* metadata mode in-memory chunk info object. @@ -1394,6 +1395,7 @@ impl DirectChunkInfoV6 { mapping, offset, digest: chunk.block_id, + is_dedup: chunk.is_deduped(), }) } @@ -1436,6 +1438,10 @@ impl BlobChunkInfo for DirectChunkInfoV6 { .contains(BlobChunkFlags::ENCYPTED) } + fn is_deduped(&self) -> bool { + self.is_dedup + } + fn as_any(&self) -> &dyn Any { self } @@ -1523,6 +1529,10 @@ impl BlobChunkInfo for TarfsChunkInfoV6 { false } + fn is_deduped(&self) -> bool { + false + } + fn as_any(&self) -> &dyn Any { self } diff --git a/rafs/src/metadata/layout/v5.rs b/rafs/src/metadata/layout/v5.rs index 859e56ee989..abd73dde564 100644 --- a/rafs/src/metadata/layout/v5.rs +++ b/rafs/src/metadata/layout/v5.rs @@ -45,6 +45,7 @@ use std::sync::Arc; use nydus_utils::digest::{self, DigestHasher, RafsDigest}; use nydus_utils::{compress, ByteSize}; +use serde::{Deserialize, Serialize}; use vm_memory::VolatileMemory; // With Rafs v5, the storage manager needs to access file system metadata to decompress the // compressed blob file. To avoid circular dependency, the following Rafs v5 metadata structures @@ -1089,7 +1090,7 @@ impl<'a> RafsStore for RafsV5InodeWrapper<'a> { /// Rafs v5 chunk on disk metadata. #[repr(C)] -#[derive(Default, Clone, Copy, Debug)] +#[derive(Default, Clone, Copy, Debug, Deserialize, Serialize)] pub struct RafsV5ChunkInfo { /// sha256(chunk), [char; RAFS_SHA256_LENGTH] pub block_id: RafsDigest, // 32 @@ -1123,6 +1124,10 @@ impl RafsV5ChunkInfo { pub fn load(&mut self, r: &mut RafsIoReader) -> Result<()> { r.read_exact(self.as_mut()) } + + pub fn is_deduped(&self) -> bool { + self.flags.contains(BlobChunkFlags::DEDUPED) + } } impl RafsStore for RafsV5ChunkInfo { @@ -1382,6 +1387,10 @@ pub(crate) fn rafsv5_align(size: usize) -> usize { } } +pub fn dedup_rafsv5_align(size: usize) -> usize { + rafsv5_align(size) +} + /// Validate inode metadata, include children, chunks and symblink etc. /// /// The default implementation is for rafs v5. The chunk data is not validated here, which will @@ -1628,6 +1637,10 @@ pub mod tests { self.flags.contains(BlobChunkFlags::COMPRESSED) } + fn is_deduped(&self) -> bool { + self.flags.contains(BlobChunkFlags::DEDUPED) + } + fn is_encrypted(&self) -> bool { false } diff --git a/rafs/src/metadata/layout/v6.rs b/rafs/src/metadata/layout/v6.rs index d386855105c..fa2e2d71f54 100644 --- a/rafs/src/metadata/layout/v6.rs +++ b/rafs/src/metadata/layout/v6.rs @@ -133,7 +133,7 @@ pub struct RafsV6SuperBlock { /// # of devices besides the primary device. s_extra_devices: u16, /// Offset of the device table, `startoff = s_devt_slotoff * 128`. - s_devt_slotoff: u16, + pub s_devt_slotoff: u16, /// Padding. s_reserved: [u8; 38], } @@ -328,6 +328,11 @@ impl RafsV6SuperBlock { self.s_blkszbits = block_bits; } + /// Set Offset of the device table. + pub fn set_devt_slotoff(&mut self, count: u64) { + self.s_devt_slotoff = ((count / size_of::() as u64) as u16).to_le(); + } + impl_pub_getter_setter!(magic, set_magic, s_magic, u32); impl_pub_getter_setter!(extra_devices, set_extra_devices, s_extra_devices, u16); } diff --git a/rafs/src/metadata/md_v5.rs b/rafs/src/metadata/md_v5.rs index 039dcd05811..0c0fa757023 100644 --- a/rafs/src/metadata/md_v5.rs +++ b/rafs/src/metadata/md_v5.rs @@ -245,6 +245,10 @@ impl BlobChunkInfo for V5IoChunk { false } + fn is_deduped(&self) -> bool { + false + } + fn as_any(&self) -> &dyn Any { self } diff --git a/rafs/src/metadata/mod.rs b/rafs/src/metadata/mod.rs index c04c3b4da1d..601b40aae84 100644 --- a/rafs/src/metadata/mod.rs +++ b/rafs/src/metadata/mod.rs @@ -28,6 +28,7 @@ use nydus_storage::device::{ use nydus_storage::meta::toc::TocEntryList; use nydus_utils::digest::{self, RafsDigest}; use nydus_utils::{compress, crypt}; +use serde::Deserialize; use serde::Serialize; use self::layout::v5::RafsV5PrefetchTable; @@ -264,7 +265,7 @@ pub trait RafsStore { bitflags! { /// Rafs filesystem feature flags. - #[derive(Serialize)] + #[derive(Deserialize, Serialize)] pub struct RafsSuperFlags: u64 { /// Data chunks are not compressed. const COMPRESSION_NONE = 0x0000_0001; @@ -441,7 +442,7 @@ impl RafsSuperConfig { } /// Rafs filesystem meta-data cached from on disk RAFS super block. -#[derive(Clone, Copy, Debug, Serialize)] +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] pub struct RafsSuperMeta { /// Filesystem magic number. pub magic: u32, diff --git a/rafs/src/mock/mock_chunk.rs b/rafs/src/mock/mock_chunk.rs index 9a01340ffd9..1eb26852537 100644 --- a/rafs/src/mock/mock_chunk.rs +++ b/rafs/src/mock/mock_chunk.rs @@ -66,6 +66,10 @@ impl BlobChunkInfo for MockChunkInfo { false } + fn is_deduped(&self) -> bool { + self.c_flags.contains(BlobChunkFlags::DEDUPED) + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index f551930b78b..5f2c0330607 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -26,6 +26,7 @@ use clap::{Arg, ArgAction, ArgMatches, Command as App}; use nix::unistd::{getegid, geteuid}; use nydus::{get_build_time_info, setup_logging}; use nydus_api::{BuildTimeInfo, ConfigV2, LocalFsConfig}; +use nydus_builder::core::bootstrap_dedup::BootstrapDedup; use nydus_builder::{ parse_chunk_dict_arg, ArtifactStorage, BlobCompactor, BlobManager, BootstrapManager, BuildContext, BuildOutput, Builder, ConversionType, DirectoryBuilder, Feature, Features, @@ -705,8 +706,33 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .long("output") .help("path for output tar file") .required(true), - ), - ) + ) + ) + .subcommand( + App::new("dedup") + .about("(experimental)Rebuild the given bootstrap, remap duplicate blocks to those that already exist locally.") + .arg( + Arg::new("bootstrap") + .long("bootstrap") + .short('B') + .help("Path to nydus image's metadata blob (required)") + .required(true), + ). + arg( + Arg::new("output-bootstrap") + .long("output") + .short('O') + .help("Bootstrap to output, default is source bootstrap add suffix .debup") + .required(false), + ) + .arg( + Arg::new("config") + .long("config") + .short('C') + .help("config to compactor") + .required(true), + ) + ) } fn init_log(matches: &ArgMatches) -> Result<()> { @@ -766,6 +792,8 @@ fn main() -> Result<()> { Command::compact(matches, &build_info) } else if let Some(matches) = cmd.subcommand_matches("unpack") { Command::unpack(matches) + } else if let Some(matches) = cmd.subcommand_matches("dedup") { + Command::dedup(matches) } else { println!("{}", usage); Ok(()) @@ -1442,6 +1470,23 @@ impl Command { Ok(()) } + fn dedup(matches: &clap::ArgMatches) -> Result<()> { + let bootstrap_path = PathBuf::from(Self::get_bootstrap(matches)?); + let output_path = match matches.get_one::("output-bootstrap") { + None => bootstrap_path.with_extension("boot.dedup"), + Some(s) => PathBuf::from(s), + }; + + //config + let config = + Self::get_configuration(matches).context("failed to get configuration information")?; + config.internal.set_blob_accessible(true); + + let mut dedup = BootstrapDedup::new(bootstrap_path, output_path, &config)?; + dedup.do_dedup()?; + Ok(()) + } + fn get_bootstrap(matches: &ArgMatches) -> Result<&Path> { match matches.get_one::("bootstrap") { Some(s) => Ok(Path::new(s)), diff --git a/src/bin/nydus-image/unpack/pax/test.rs b/src/bin/nydus-image/unpack/pax/test.rs index 35c43be9068..3dc8f1e24e4 100644 --- a/src/bin/nydus-image/unpack/pax/test.rs +++ b/src/bin/nydus-image/unpack/pax/test.rs @@ -106,6 +106,10 @@ impl BlobChunkInfo for MockChunkInfo { false } + fn is_deduped(&self) -> bool { + false + } + fn as_any(&self) -> &dyn std::any::Any { todo!(); } diff --git a/storage/src/cache/state/blob_state_map.rs b/storage/src/cache/state/blob_state_map.rs index d58e5869a8a..bbc85157cf1 100644 --- a/storage/src/cache/state/blob_state_map.rs +++ b/storage/src/cache/state/blob_state_map.rs @@ -104,7 +104,6 @@ where fn check_ready_and_mark_pending(&self, chunk: &dyn BlobChunkInfo) -> StorageResult { let mut ready = self.c.is_ready(chunk).map_err(StorageError::CacheIndex)?; - if ready { return Ok(true); } @@ -436,6 +435,10 @@ pub(crate) mod tests { false } + fn is_deduped(&self) -> bool { + false + } + fn as_any(&self) -> &dyn Any { self } diff --git a/storage/src/device.rs b/storage/src/device.rs index 6c559cf481c..766db2bb189 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -40,6 +40,7 @@ use nydus_api::ConfigV2; use nydus_utils::compress; use nydus_utils::crypt::{self, Cipher, CipherContext}; use nydus_utils::digest::{self, RafsDigest}; +use serde::{de, Deserialize, Deserializer, Serialize}; use crate::cache::BlobCache; use crate::factory::BLOB_FACTORY; @@ -48,6 +49,7 @@ pub(crate) const BLOB_FEATURE_INCOMPAT_MASK: u32 = 0x0000_ffff; pub(crate) const BLOB_FEATURE_INCOMPAT_VALUE: u32 = 0x0000_0fff; bitflags! { + #[derive(Deserialize, Serialize)] /// Features bits for blob management. pub struct BlobFeatures: u32 { /// Uncompressed chunk data is aligned. @@ -112,7 +114,7 @@ impl TryFrom for BlobFeatures { /// /// The `BlobInfo` structure provides information for the storage subsystem to manage a blob file /// and serve blob IO requests for clients. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct BlobInfo { /// The index of blob in RAFS blob table. blob_index: u32, @@ -164,16 +166,68 @@ pub struct BlobInfo { // Size of blob ToC content, it's zero for blobs with inlined-meta. blob_toc_size: u32, + #[serde(skip_deserializing, skip_serializing)] /// V6: support fs-cache mode fs_cache_file: Option>, /// V6: support inlined-meta meta_path: Arc>, + #[serde( + serialize_with = "serialize_cipher_object", + deserialize_with = "deserialize_cipher_object" + )] /// V6: support data encryption. cipher_object: Arc, /// Cipher context for encryption. cipher_ctx: Option, } +fn serialize_cipher_object(cipher_object: &Arc, serializer: S) -> Result +where + S: serde::Serializer, +{ + let cipher_object_str = match cipher_object.deref() { + Cipher::None => "none", + Cipher::Aes128Xts(_) => "aes128_xts", + Cipher::Aes256Xts(_) => "aes256_xts", + Cipher::Aes256Gcm(_) => "aes256_gcm", + }; + serializer.serialize_str(cipher_object_str) +} + +fn deserialize_cipher_object<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let cipher_object_str = String::deserialize(deserializer)?; + + let cipher_object = match cipher_object_str.as_str() { + "none" => Ok(Arc::new(Cipher::None)), + "aes128_xts" => { + let cipher: Result = crypt::Algorithm::Aes128Xts + .new_cipher() + .map_err(de::Error::custom); + Ok(Arc::new(cipher.unwrap())) + } + "aes256_xts" => { + let cipher: Result = crypt::Algorithm::Aes256Xts + .new_cipher() + .map_err(de::Error::custom); + Ok(Arc::new(cipher.unwrap())) + } + "aes256_gcm" => { + let cipher: Result = crypt::Algorithm::Aes256Gcm + .new_cipher() + .map_err(de::Error::custom); + Ok(Arc::new(cipher.unwrap())) + } + _ => { + unimplemented!() + } + }; + + cipher_object +} + impl BlobInfo { /// Create a new instance of `BlobInfo`. pub fn new( @@ -571,6 +625,7 @@ impl BlobInfo { bitflags! { /// Blob chunk flags. + #[derive(Deserialize, Serialize)] pub struct BlobChunkFlags: u32 { /// Chunk data is compressed. const COMPRESSED = 0x0000_0001; @@ -580,6 +635,8 @@ bitflags! { const ENCYPTED = 0x0000_0004; /// Chunk data is merged into a batch chunk. const BATCH = 0x0000_0008; + /// Chunk data is deduped. + const DEDUPED = 0x0000_0010; } } @@ -643,6 +700,8 @@ pub trait BlobChunkInfo: Any + Sync + Send { /// Check whether the chunk is encrypted or not. fn is_encrypted(&self) -> bool; + /// Check whether the chunk is deduped or not. + fn is_deduped(&self) -> bool; fn as_any(&self) -> &dyn Any; } @@ -697,6 +756,10 @@ impl BlobChunkInfo for BlobIoChunk { self.0.is_encrypted() } + fn is_deduped(&self) -> bool { + self.0.is_deduped() + } + fn as_any(&self) -> &dyn Any { self } @@ -1611,4 +1674,68 @@ mod tests { assert_eq!(size, iovec.size()); assert_eq!(chunk_count, iovec.len() as u32); } + + #[test] + fn test_serialize_cipher_object() { + let mut blob_info = BlobInfo::new( + 1, + "test1".to_owned(), + 0x200000, + 0x100000, + 0x100000, + 512, + BlobFeatures::_V5_NO_EXT_BLOB_TABLE, + ); + + let algos = vec![ + crypt::Algorithm::None, + crypt::Algorithm::Aes128Xts, + crypt::Algorithm::Aes256Xts, + crypt::Algorithm::Aes256Gcm, + ]; + + for algo in algos { + let (cipher_object, cipher_ctx) = match algo { + crypt::Algorithm::None => (Default::default(), None), + crypt::Algorithm::Aes128Xts => { + let key = crypt::Cipher::generate_random_key(algo).unwrap(); + let iv = crypt::Cipher::generate_random_iv().unwrap(); + let cipher_ctx = CipherContext::new(key, iv, false, algo).unwrap(); + ( + algo.new_cipher().ok().unwrap_or(Default::default()), + Some(cipher_ctx), + ) + } + crypt::Algorithm::Aes256Xts => { + let key = crypt::Cipher::generate_random_key(algo).unwrap(); + let iv = crypt::Cipher::generate_random_iv().unwrap(); + let cipher_ctx = CipherContext::new(key, iv, false, algo).unwrap(); + ( + algo.new_cipher().ok().unwrap_or(Default::default()), + Some(cipher_ctx), + ) + } + crypt::Algorithm::Aes256Gcm => { + let key = crypt::Cipher::generate_random_key(algo).unwrap(); + let iv = crypt::Cipher::generate_random_iv().unwrap(); + let cipher_ctx = CipherContext::new(key, iv, false, algo).unwrap(); + ( + algo.new_cipher().ok().unwrap_or(Default::default()), + Some(cipher_ctx), + ) + } + }; + blob_info.set_cipher_info(algo, Arc::new(cipher_object), cipher_ctx); + + let content = serde_json::to_string(&blob_info).unwrap(); + let blob_info_from_deserialize: BlobInfo = serde_json::from_str(&content).unwrap(); + println!("{}", algo); + println!("origin blob_info: {:?}", blob_info); + + println!( + "after deserialize, blob_info: {:?}", + blob_info_from_deserialize + ); + } + } } diff --git a/storage/src/meta/mod.rs b/storage/src/meta/mod.rs index a8d1e23d0a4..fb6d460bf6c 100644 --- a/storage/src/meta/mod.rs +++ b/storage/src/meta/mod.rs @@ -1844,6 +1844,10 @@ impl BlobChunkInfo for BlobMetaChunk { self.meta.chunk_info_array.is_encrypted(self.chunk_index) } + fn is_deduped(&self) -> bool { + false + } + fn as_any(&self) -> &dyn Any { self } diff --git a/storage/src/test.rs b/storage/src/test.rs index fc115f3b867..fd782ec7810 100644 --- a/storage/src/test.rs +++ b/storage/src/test.rs @@ -91,6 +91,10 @@ impl BlobChunkInfo for MockChunkInfo { false } + fn is_deduped(&self) -> bool { + self.flags.contains(BlobChunkFlags::DEDUPED) + } + fn as_any(&self) -> &dyn Any { self } diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 3ffb9206b54..3f6a832e7bc 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -16,8 +16,9 @@ libc = "0.2" log = "0.4" lz4-sys = "1.9.4" lz4 = "1.24.0" +rusqlite = { version = "0.29.0", features = ["bundled"] } openssl = { version = "0.10.48", features = ["vendored"], optional = true } -serde = { version = ">=1.0.27", features = ["serde_derive", "rc"] } +serde = { version = "1.0.137", features = ["derive", "rc"] } serde_json = ">=1.0.9" sha2 = "0.10.0" tokio = { version = "1.19.0", features = ["rt", "sync"] } diff --git a/utils/src/cas.rs b/utils/src/cas.rs new file mode 100644 index 00000000000..65364c6fba1 --- /dev/null +++ b/utils/src/cas.rs @@ -0,0 +1,456 @@ +// Copyright (C) 2022 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use std::fmt::{self, Display, Formatter}; +use std::io; +use std::path::{Path, PathBuf}; + +use rusqlite::{Connection, OptionalExtension}; + +use crate::digest::RafsDigest; + +/// Error codes related to local cas. +#[derive(Debug)] +pub enum CasError { + Io(std::io::Error), + Db(rusqlite::Error), +} + +impl Display for CasError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for CasError {} + +impl From for CasError { + fn from(e: rusqlite::Error) -> Self { + CasError::Db(e) + } +} + +impl From for CasError { + fn from(e: io::Error) -> Self { + CasError::Io(e) + } +} + +/// Specialized `Result` for local cas. +type Result = std::result::Result; + +pub struct CasMgr { + conn: Connection, +} + +impl CasMgr { + pub fn new(path: impl AsRef) -> Result { + let mut db_path = path.as_ref().to_owned(); + db_path.push("cas.db"); + let conn = Connection::open(db_path)?; + + // create blob and chunk table for nydus v5 + conn.execute( + "CREATE TABLE IF NOT EXISTS BlobInfos_V5 ( + BlobId TEXT NOT NULL PRIMARY KEY, + BlobInfo TEXT NOT NULL, + Backend TEXT NOT NULL + )", + (), + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS BlobIndex_V5 ON BlobInfos_V5(BlobId)", + (), + )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS ChunkInfos_V5 ( + ChunkId INTEGER PRIMARY KEY, + ChunkDigestValue TEXT NOT NULL, + ChunkInfo TEXT NOT NULL, + BlobId TEXT NOT NULL, + UNIQUE(ChunkDigestValue, BlobId) ON CONFLICT IGNORE, + FOREIGN KEY(BlobId) REFERENCES BlobInfos_V5(BlobId) + )", + (), + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS ChunkIndex_V5 ON ChunkInfos_V5(ChunkDigestValue)", + (), + )?; + + // create blob and chunk table for nydus v6 + conn.execute( + "CREATE TABLE IF NOT EXISTS BlobInfos_V6 ( + BlobId TEXT NOT NULL PRIMARY KEY, + BlobInfo TEXT NOT NULL, + Backend TEXT NOT NULL + )", + (), + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS BlobIndex_V6 ON BlobInfos_V6(BlobId)", + (), + )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS ChunkInfos_V6 ( + ChunkId INTEGER PRIMARY KEY, + ChunkDigestValue TEXT NOT NULL, + ChunkInfo TEXT NOT NULL, + BlobId TEXT NOT NULL, + UNIQUE(ChunkDigestValue, BlobId) ON CONFLICT IGNORE, + FOREIGN KEY(BlobId) REFERENCES BlobInfos_V6(BlobId) + )", + (), + )?; + conn.execute( + "CREATE INDEX IF NOT EXISTS ChunkIndex_V6 ON ChunkInfos_V6(ChunkDigestValue)", + (), + )?; + + Ok(CasMgr { conn }) + } + + pub fn get_bootstrap(&mut self, _file: impl AsRef) -> Option { + unimplemented!() + } + + pub fn add_bootstrap( + &mut self, + _source: impl AsRef, + _target: impl AsRef, + ) -> Result<()> { + unimplemented!() + } + + pub fn remove_bootstrap(&mut self, _file: impl AsRef) -> Result<()> { + unimplemented!() + } + + pub fn get_blob(&self, blob_id: &str, is_v6: bool) -> Result> { + let sql = if is_v6 { + "SELECT BlobInfo FROM BlobInfos_V6 WHERE BlobId = ?" + } else { + "SELECT BlobInfo FROM BlobInfos_V5 WHERE BlobId = ?" + }; + + if let Some(blob_info) = self + .conn + .query_row(sql, [blob_id], |row| row.get::(0)) + .optional()? + { + return Ok(Some(blob_info)); + }; + + Ok(None) + } + + pub fn get_backend_by_blob_id(&self, blob_id: &str, is_v6: bool) -> Result> { + let sql = if is_v6 { + "SELECT Backend FROM BlobInfos_V6 WHERE BlobId = ?" + } else { + "SELECT Backend FROM BlobInfos_V5 WHERE BlobId = ?" + }; + + if let Some(backend) = self + .conn + .query_row(sql, [blob_id], |row| row.get::(0)) + .optional()? + { + return Ok(Some(backend)); + }; + + Ok(None) + } + + pub fn get_all_blobs(&self, is_v6: bool) -> Result> { + let mut stmt = if is_v6 { + self.conn + .prepare("SELECT BlobId, BlobInfo FROM BlobInfos_V6")? + } else { + self.conn + .prepare("SELECT BlobId, BlobInfo FROM BlobInfos_V5")? + }; + + let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?; + + let mut results: Vec<(String, String)> = Vec::new(); + for row in rows { + results.push(row?); + } + + Ok(results) + } + + pub fn add_blobs(&mut self, blobs: &Vec<(String, String, String)>, is_v6: bool) -> Result<()> { + let sql = if is_v6 { + "INSERT OR IGNORE INTO BlobInfos_V6 (BlobId, BlobInfo, Backend) + VALUES (?1, ?2, ?3)" + } else { + "INSERT OR IGNORE INTO BlobInfos_V5 (BlobId, BlobInfo, Backend) + VALUES (?1, ?2, ?3)" + }; + + // let tran = self.conn.transaction()?; + let tran = self + .conn + .transaction_with_behavior(rusqlite::TransactionBehavior::Exclusive)?; + for blob in blobs { + tran.execute(sql, (&blob.0, &blob.1, &blob.2)).unwrap(); + } + tran.commit()?; + + Ok(()) + } + + pub fn delete_blobs(&mut self, blobs: &[String], is_v6: bool) -> Result<()> { + let delete_blobs_sql = if is_v6 { + "DELETE FROM BlobInfos_V6 WHERE BlobId = (?1)" + } else { + "DELETE FROM BlobInfos_V5 WHERE BlobId = (?1)" + }; + + let delete_chunks_sql = if is_v6 { + "DELETE FROM ChunkInfos_V6 WHERE BlobId = (?1)" + } else { + "DELETE FROM ChunkInfos_V5 WHERE BlobId = (?1)" + }; + + let tran = self + .conn + .transaction_with_behavior(rusqlite::TransactionBehavior::Exclusive)?; + for blob_id in blobs { + tran.execute(delete_blobs_sql, [blob_id]).unwrap(); + tran.execute(delete_chunks_sql, [blob_id]).unwrap(); + } + tran.commit()?; + + Ok(()) + } + + pub fn get_chunk( + &self, + chunk_id: &RafsDigest, + blob_id: &str, + is_v6: bool, + ) -> Result> { + let sql = if is_v6 { + "SELECT BlobId, ChunkInfo + FROM ChunkInfos_V6 INDEXED BY ChunkIndex_V6 + WHERE ChunkDigestValue = ?" + } else { + "SELECT BlobId, ChunkInfo + FROM ChunkInfos_V5 INDEXED BY ChunkIndex_V5 + WHERE ChunkDigestValue = ?" + }; + + if let Some((new_blob_id, chunk_info)) = self + .conn + .query_row(sql, [String::from(chunk_id.to_owned()).as_str()], |row| { + Ok((row.get(0)?, row.get(1)?)) + }) + .optional()? + { + trace!("new_blob_id = {}, chunk_info = {}", new_blob_id, chunk_info); + + if new_blob_id != *blob_id { + return Ok(Some((new_blob_id, chunk_info))); + } + } + + Ok(None) + } + + pub fn add_chunks( + &mut self, + chunks: &Vec<(String, String, String)>, + is_v6: bool, + ) -> Result<()> { + let sql = if is_v6 { + "INSERT OR IGNORE INTO ChunkInfos_V6 (ChunkDigestValue, ChunkInfo, BlobId) + VALUES (?1, ?2, ?3)" + } else { + "INSERT OR IGNORE INTO ChunkInfos_V5 (ChunkDigestValue, ChunkInfo, BlobId) + VALUES (?1, ?2, ?3)" + }; + + let tran = self + .conn + .transaction_with_behavior(rusqlite::TransactionBehavior::Exclusive)?; + for chunk in chunks { + tran.execute(sql, (&chunk.0, &chunk.1, &chunk.2)).unwrap(); + } + tran.commit()?; + + Ok(()) + } + + pub fn delete_chunks(&mut self, blob_id: &str, chunk_id: &str, is_v6: bool) -> Result<()> { + let sql = if is_v6 { + "DELETE OR IGNORE FROM ChunkInfos_V6 WHERE BlobId = (?1) AND ChunkId = (?2)" + } else { + "DELETE OR IGNORE FROM ChunkInfos_V5 WHERE BlobId = (?1) AND ChunkId = (?2)" + }; + + let tran = self + .conn + .transaction_with_behavior(rusqlite::TransactionBehavior::Exclusive)?; + tran.execute(sql, [blob_id, chunk_id]).unwrap(); + tran.commit()?; + + Ok(()) + } + + pub fn delete_chunks_by_blobid(&mut self, blob_id: &str, is_v6: bool) -> Result<()> { + let sql = if is_v6 { + "DELETE FROM ChunkInfos_V6 WHERE BlobId = (?1)" + } else { + "DELETE FROM ChunkInfos_V5 WHERE BlobId = (?1)" + }; + + let tran = self + .conn + .transaction_with_behavior(rusqlite::TransactionBehavior::Exclusive)?; + tran.execute(sql, [blob_id]).unwrap(); + tran.commit()?; + + Ok(()) + } + + pub fn get_chunks_by_blobid( + &self, + blob_id: &str, + is_v6: bool, + ) -> Result> { + let sql = if is_v6 { + "SELECT BlobId, ChunkInfo FROM ChunkInfos_V6 WHERE BlobId = (?1)" + } else { + "SELECT BlobId, ChunkInfo FROM ChunkInfos_V5 WHERE BlobId = (?1)" + }; + + let mut stmt = self.conn.prepare(sql)?; + let rows = stmt.query_map([blob_id], |row| Ok((row.get(0)?, row.get(1)?)))?; + + let mut results: Vec<(String, String)> = Vec::new(); + for row in rows { + results.push(row?); + } + + Ok(results) + } + + pub fn get_all_chunks(&self, is_v6: bool) -> Result> { + let sql = if is_v6 { + "SELECT BlobId, ChunkInfo FROM ChunkInfos_V6" + } else { + "SELECT BlobId, ChunkInfo FROM ChunkInfos_V5" + }; + + let mut stmt = self.conn.prepare(sql)?; + let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?; + + let mut results: Vec<(String, String)> = Vec::new(); + for row in rows { + results.push(row?); + } + + Ok(results) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use std::path::PathBuf; + + #[test] + fn test_get_all_blobs() { + let path = PathBuf::from("/home/t4/containerd/io.containerd.snapshotter.v1.nydus/"); + let cas_mgr = CasMgr::new(path).unwrap(); + let vec = cas_mgr.get_all_blobs(false).unwrap(); + + println!("v5 blobs"); + for (blob_id, _blob_info) in vec { + let backend = cas_mgr + .get_backend_by_blob_id(&blob_id, false) + .unwrap() + .unwrap(); + println!("blob_id: {}, backend: {}", blob_id, backend); + } + + println!("v6 blobs"); + let vec = cas_mgr.get_all_blobs(true).unwrap(); + for (blob_id, _blob_info) in vec { + let backend = cas_mgr + .get_backend_by_blob_id(&blob_id, true) + .unwrap() + .unwrap(); + println!("blob_id: {}, backend: {}", blob_id, backend); + } + } + + #[test] + fn test_get_all_chunks() { + let path = PathBuf::from("/home/t4/containerd/io.containerd.snapshotter.v1.nydus/"); + let cas_mgr = CasMgr::new(path).unwrap(); + let vec = cas_mgr.get_all_chunks(false).unwrap(); + for (blob_id, chunk_info) in vec { + println!("[{}, {}]", blob_id, chunk_info); + } + + let vec = cas_mgr.get_all_chunks(true).unwrap(); + for (blob_id, chunk_info) in vec { + println!("[{}, {}]", blob_id, chunk_info); + } + } + + #[test] + fn test_get_blob() { + let path = PathBuf::from("/tmp/nydus-cas"); + let cas_mgr = CasMgr::new(path).unwrap(); + let blob_id = "c0f347fe9faa635093e4a197eaae35166a748ad3c34d63af3cc1a39c6d1ed803"; + let blob_info: Option = cas_mgr.get_blob(blob_id, false).unwrap(); + println!("{}", blob_info.unwrap()); + } + + #[test] + fn test_delete_chunks_by_blobid() { + let path = PathBuf::from("/tmp/nydus-cas"); + let mut cas_mgr = CasMgr::new(path).unwrap(); + let blob_id = "32d9d48ee65b3526fe40627dabd12330f3aa1f370676972e566ccba446aeeda9"; + + println!("delete chunks for blob_id: [{}]", blob_id); + cas_mgr.delete_chunks_by_blobid(blob_id, false).unwrap(); + + println!("delete chunks for blob_id: [{}]", blob_id); + cas_mgr.delete_chunks_by_blobid(blob_id, false).unwrap(); + } + + #[test] + fn test_delete_chunks() { + let path = PathBuf::from("/tmp/nydus-cas"); + let mut cas_mgr = CasMgr::new(path).unwrap(); + let blobs = cas_mgr.get_all_blobs(false).unwrap(); + for (blob_id, _blob_info) in &blobs { + println!("delete blob: [{}]", blob_id); + cas_mgr.delete_chunks_by_blobid(blob_id, false).unwrap(); + cas_mgr.delete_chunks_by_blobid(blob_id, true).unwrap(); + } + } + + #[test] + fn test_delete_blobs() { + let path = PathBuf::from("/tmp/nydus-cas"); + let mut cas_mgr = CasMgr::new(path).unwrap(); + let blobs = cas_mgr.get_all_blobs(false).unwrap(); + for (blob_id, _blob_info) in &blobs { + println!("delete blob: [{}]", blob_id); + let blob_id = vec![blob_id.to_owned()]; + cas_mgr.delete_blobs(&blob_id, false).unwrap(); + cas_mgr.delete_blobs(&blob_id, true).unwrap(); + } + } +} diff --git a/utils/src/compress/mod.rs b/utils/src/compress/mod.rs index fd19ca811f7..e4104240cd1 100644 --- a/utils/src/compress/mod.rs +++ b/utils/src/compress/mod.rs @@ -18,7 +18,7 @@ const COMPRESSION_MINIMUM_RATIO: usize = 100; /// Supported compression algorithms. #[repr(u32)] -#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)] +#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq, Deserialize, Serialize)] pub enum Algorithm { #[default] None = 0, diff --git a/utils/src/crypt.rs b/utils/src/crypt.rs index 225898e4bfb..b648292432a 100644 --- a/utils/src/crypt.rs +++ b/utils/src/crypt.rs @@ -42,7 +42,7 @@ const DEFAULT_CE_KEY_64: [u8; 64] = [ /// Supported cipher algorithms. #[repr(u32)] -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Deserialize, Serialize)] pub enum Algorithm { #[default] None = 0, @@ -380,7 +380,7 @@ impl Cipher { } /// Struct to provide context information for data encryption/decryption. -#[derive(Default, Debug, Clone)] +#[derive(Default, Debug, Clone, Deserialize, Serialize)] pub struct CipherContext { key: Vec, iv: Vec, diff --git a/utils/src/digest.rs b/utils/src/digest.rs index 7e8ab74593a..84f908f3392 100644 --- a/utils/src/digest.rs +++ b/utils/src/digest.rs @@ -20,7 +20,7 @@ pub const RAFS_DIGEST_LENGTH: usize = 32; pub type DigestData = [u8; RAFS_DIGEST_LENGTH]; #[repr(u32)] -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Deserialize, Serialize)] pub enum Algorithm { #[default] Blake3 = 0, @@ -144,7 +144,9 @@ impl DigestHasher for Sha256 { } #[repr(C)] -#[derive(Clone, Copy, Hash, PartialEq, Eq, Debug, Default, Ord, PartialOrd)] +#[derive( + Clone, Copy, Hash, PartialEq, Eq, Debug, Default, Ord, PartialOrd, Deserialize, Serialize, +)] pub struct RafsDigest { pub data: DigestData, } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index f5d2cb44246..9fcdbdb1787 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -20,6 +20,7 @@ pub use self::reader::*; pub use self::types::*; pub mod async_helper; +pub mod cas; pub mod compact; pub mod compress; #[cfg(feature = "encryption")]