From 087c0b1bafd2975d3000bef8b1eff521db6633f5 Mon Sep 17 00:00:00 2001 From: Lin Wang Date: Tue, 26 Mar 2024 16:13:16 +0800 Subject: [PATCH] nydus-image: Add support for chunkdict generation Signed-off-by: Lin Wang --- builder/src/chunkdict_generator.rs | 280 ++++++ builder/src/core/context.rs | 40 + builder/src/lib.rs | 4 + rafs/src/metadata/cached_v5.rs | 1 + rafs/src/metadata/layout/v5.rs | 4 + rafs/src/metadata/layout/v6.rs | 8 +- src/bin/nydus-image/deduplicate.rs | 1336 ++++++++++++++++++++++++++-- src/bin/nydus-image/main.rs | 228 +++-- storage/src/cache/filecache/mod.rs | 4 +- storage/src/device.rs | 17 + storage/src/meta/mod.rs | 25 +- utils/src/digest.rs | 12 + 12 files changed, 1830 insertions(+), 129 deletions(-) create mode 100644 builder/src/chunkdict_generator.rs diff --git a/builder/src/chunkdict_generator.rs b/builder/src/chunkdict_generator.rs new file mode 100644 index 00000000000..4f7ab105d2b --- /dev/null +++ b/builder/src/chunkdict_generator.rs @@ -0,0 +1,280 @@ +// Copyright (C) 2023 Nydus Developers. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Generate Chunkdict RAFS bootstrap. +//! ------------------------------------------------------------------------------------------------- +//! Bug 1: Inconsistent Chunk Size Leading to Blob Size Less Than 4K(v6_block_size) +//! Description: The size of chunks is not consistent, which results in the possibility that a blob, +//! composed of a group of these chunks, may be less than 4K(v6_block_size) in size. +//! This inconsistency leads to a failure in passing the size check. +//! ------------------------------------------------------------------------------------------------- +//! Bug 2: Incorrect Chunk Number Calculation Due to Premature Check Logic +//! Description: The current logic for calculating the chunk number is based on the formula size/chunk size. +//! However, this approach is flawed as it precedes the actual check which accounts for chunk statistics. +//! Consequently, this leads to inaccurate counting of chunk numbers. + +use super::core::node::{ChunkSource, NodeInfo}; +use super::{BlobManager, Bootstrap, BootstrapManager, BuildContext, BuildOutput, Tree}; +use crate::core::node::Node; +use crate::NodeChunk; +use anyhow::Result; +use nydus_rafs::metadata::chunk::ChunkWrapper; +use nydus_rafs::metadata::inode::InodeWrapper; +use nydus_rafs::metadata::layout::RafsXAttrs; +use nydus_storage::meta::BlobChunkInfoV1Ondisk; +use nydus_utils::compress::Algorithm; +use nydus_utils::digest::RafsDigest; +use std::ffi::OsString; +use std::mem::size_of; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ChunkdictChunkInfo { + pub image_reference: String, + pub version: String, + pub chunk_blob_id: String, + pub chunk_digest: String, + pub chunk_compressed_size: u32, + pub chunk_uncompressed_size: u32, + pub chunk_compressed_offset: u64, + pub chunk_uncompressed_offset: u64, +} + +pub struct ChunkdictBlobInfo { + pub blob_id: String, + pub blob_compressed_size: u64, + pub blob_uncompressed_size: u64, + pub blob_compressor: String, + pub blob_meta_ci_compressed_size: u64, + pub blob_meta_ci_uncompressed_size: u64, + pub blob_meta_ci_offset: u64, +} + +/// Struct to generate chunkdict RAFS bootstrap. +pub struct Generator {} + +impl Generator { + // Generate chunkdict RAFS bootstrap. + pub fn generate( + ctx: &mut BuildContext, + bootstrap_mgr: &mut BootstrapManager, + blob_mgr: &mut BlobManager, + chunkdict_chunks_origin: Vec, + chunkdict_blobs: Vec, + ) -> Result { + // Validate and remove chunks whose belonged blob sizes are smaller than a block. + let mut chunkdict_chunks = chunkdict_chunks_origin.to_vec(); + Self::validate_and_remove_chunks(ctx, &mut chunkdict_chunks); + // Build root tree. + let mut tree = Self::build_root_tree(ctx)?; + + // Build child tree. + let child = Self::build_child_tree(ctx, blob_mgr, &chunkdict_chunks, &chunkdict_blobs)?; + let result = vec![child]; + tree.children = result; + + Self::validate_tree(&tree)?; + + // Build bootstrap. + let mut bootstrap_ctx = bootstrap_mgr.create_ctx()?; + let mut bootstrap = Bootstrap::new(tree)?; + bootstrap.build(ctx, &mut bootstrap_ctx)?; + + let blob_table = blob_mgr.to_blob_table(ctx)?; + let storage = &mut bootstrap_mgr.bootstrap_storage; + bootstrap.dump(ctx, storage, &mut bootstrap_ctx, &blob_table)?; + + BuildOutput::new(blob_mgr, &bootstrap_mgr.bootstrap_storage) + } + + /// Validate tree. + fn validate_tree(tree: &Tree) -> Result<()> { + let pre = &mut |t: &Tree| -> Result<()> { + let node = t.lock_node(); + debug!("chunkdict tree: "); + debug!("inode: {}", node); + for chunk in &node.chunks { + debug!("\t chunk: {}", chunk); + } + Ok(()) + }; + tree.walk_dfs_pre(pre)?; + debug!("chunkdict tree is valid."); + Ok(()) + } + + /// Validates and removes chunks with a total uncompressed size smaller than the block size limit. + fn validate_and_remove_chunks(ctx: &mut BuildContext, chunkdict: &mut Vec) { + let mut chunk_sizes = std::collections::HashMap::new(); + + // Accumulate the uncompressed size for each chunk_blob_id. + for chunk in chunkdict.iter() { + *chunk_sizes.entry(chunk.chunk_blob_id.clone()).or_insert(0) += + chunk.chunk_uncompressed_size as u64; + } + // Find all chunk_blob_ids with a total uncompressed size > v6_block_size. + let small_chunks: Vec = chunk_sizes + .into_iter() + .filter(|&(_, size)| size < ctx.v6_block_size()) + .inspect(|(id, _)| { + eprintln!( + "Warning: Blob with id '{}' is smaller than {} bytes.", + id, + ctx.v6_block_size() + ) + }) + .map(|(id, _)| id) + .collect(); + + // Retain only chunks with chunk_blob_id that has a total uncompressed size > v6_block_size. + chunkdict.retain(|chunk| !small_chunks.contains(&chunk.chunk_blob_id)); + } + + /// Build the root tree. + pub fn build_root_tree(ctx: &mut BuildContext) -> Result { + let mut inode = InodeWrapper::new(ctx.fs_version); + inode.set_ino(1); + inode.set_uid(1000); + inode.set_gid(1000); + inode.set_projid(0); + inode.set_mode(0o660 | libc::S_IFDIR as u32); + inode.set_nlink(3); + inode.set_name_size("/".len()); + inode.set_rdev(0); + inode.set_blocks(256); + let node_info = NodeInfo { + explicit_uidgid: true, + src_dev: 0, + src_ino: 0, + rdev: 0, + source: PathBuf::from("/"), + path: PathBuf::from("/"), + target: PathBuf::from("/"), + target_vec: vec![OsString::from("/")], + symlink: None, + xattrs: RafsXAttrs::default(), + v6_force_extended_inode: true, + }; + let root_node = Node::new(inode, node_info, 0); + let tree = Tree::new(root_node); + Ok(tree) + } + + /// Build the child tree. + fn build_child_tree( + ctx: &mut BuildContext, + blob_mgr: &mut BlobManager, + chunkdict_chunks: &[ChunkdictChunkInfo], + chunkdict_blobs: &[ChunkdictBlobInfo], + ) -> Result { + let mut inode = InodeWrapper::new(ctx.fs_version); + inode.set_ino(2); + inode.set_uid(0); + inode.set_gid(0); + inode.set_projid(0); + inode.set_mode(0o660 | libc::S_IFREG as u32); + inode.set_nlink(1); + inode.set_name_size("chunkdict".len()); + inode.set_rdev(0); + inode.set_blocks(256); + let node_info = NodeInfo { + explicit_uidgid: true, + src_dev: 0, + src_ino: 1, + rdev: 0, + source: PathBuf::from("/"), + path: PathBuf::from("/chunkdict"), + target: PathBuf::from("/chunkdict"), + target_vec: vec![OsString::from("/"), OsString::from("/chunkdict")], + symlink: None, + xattrs: RafsXAttrs::new(), + v6_force_extended_inode: true, + }; + let mut node = Node::new(inode, node_info, 0); + + // Insert chunks. + Self::insert_chunks(ctx, blob_mgr, &mut node, chunkdict_chunks, chunkdict_blobs)?; + let node_size: u64 = node + .chunks + .iter() + .map(|chunk| chunk.inner.uncompressed_size() as u64) + .sum(); + node.inode.set_size(node_size); + + // Update child count. + node.inode.set_child_count(node.chunks.len() as u32); + let child = Tree::new(node); + child + .lock_node() + .v5_set_dir_size(ctx.fs_version, &child.children); + Ok(child) + } + + /// Insert chunks. + fn insert_chunks( + ctx: &mut BuildContext, + blob_mgr: &mut BlobManager, + node: &mut Node, + chunkdict_chunks: &[ChunkdictChunkInfo], + chunkdict_blobs: &[ChunkdictBlobInfo], + ) -> Result<()> { + for (index, chunk_info) in chunkdict_chunks.iter().enumerate() { + let chunk_size: u32 = chunk_info.chunk_compressed_size; + let file_offset = index as u64 * chunk_size as u64; + let mut chunk = ChunkWrapper::new(ctx.fs_version); + + // Update blob context. + let (blob_index, blob_ctx) = + blob_mgr.get_or_cerate_blob_for_chunkdict(ctx, &chunk_info.chunk_blob_id)?; + let chunk_uncompressed_size = chunk_info.chunk_uncompressed_size; + let pre_d_offset = blob_ctx.current_uncompressed_offset; + blob_ctx.uncompressed_blob_size = pre_d_offset + chunk_uncompressed_size as u64; + blob_ctx.current_uncompressed_offset += chunk_uncompressed_size as u64; + + blob_ctx.blob_meta_header.set_ci_uncompressed_size( + blob_ctx.blob_meta_header.ci_uncompressed_size() + + size_of::() as u64, + ); + blob_ctx.blob_meta_header.set_ci_compressed_size( + blob_ctx.blob_meta_header.ci_uncompressed_size() + + size_of::() as u64, + ); + let chunkdict_blob_info = chunkdict_blobs + .iter() + .find(|blob| blob.blob_id == chunk_info.chunk_blob_id) + .unwrap(); + blob_ctx.blob_compressor = + Algorithm::from_str(chunkdict_blob_info.blob_compressor.as_str())?; + blob_ctx + .blob_meta_header + .set_ci_uncompressed_size(chunkdict_blob_info.blob_meta_ci_uncompressed_size); + blob_ctx + .blob_meta_header + .set_ci_compressed_size(chunkdict_blob_info.blob_meta_ci_compressed_size); + blob_ctx + .blob_meta_header + .set_ci_compressed_offset(chunkdict_blob_info.blob_meta_ci_offset); + blob_ctx.blob_meta_header.set_ci_compressor(Algorithm::Zstd); + + // Update chunk context. + let chunk_index = blob_ctx.alloc_chunk_index()?; + chunk.set_blob_index(blob_index); + chunk.set_index(chunk_index); + chunk.set_file_offset(file_offset); + chunk.set_compressed_size(chunk_info.chunk_compressed_size); + chunk.set_compressed_offset(chunk_info.chunk_compressed_offset); + chunk.set_uncompressed_size(chunk_info.chunk_uncompressed_size); + chunk.set_uncompressed_offset(chunk_info.chunk_uncompressed_offset); + chunk.set_id(RafsDigest::from_string(&chunk_info.chunk_digest)); + + node.chunks.push(NodeChunk { + source: ChunkSource::Build, + inner: Arc::new(chunk.clone()), + }); + } + Ok(()) + } +} diff --git a/builder/src/core/context.rs b/builder/src/core/context.rs index 14f33db855c..eb7a77728c8 100644 --- a/builder/src/core/context.rs +++ b/builder/src/core/context.rs @@ -597,6 +597,9 @@ impl BlobContext { blob_ctx .blob_meta_header .set_encrypted(features.contains(BlobFeatures::ENCRYPTED)); + blob_ctx + .blob_meta_header + .set_is_chunkdict_generated(features.contains(BlobFeatures::IS_CHUNKDICT_GENERATED)); blob_ctx } @@ -955,6 +958,32 @@ impl BlobManager { } } + /// Get or cerate blob for chunkdict, this is used for chunk deduplication. + pub fn get_or_cerate_blob_for_chunkdict( + &mut self, + ctx: &BuildContext, + id: &str, + ) -> Result<(u32, &mut BlobContext)> { + if self.get_blob_idx_by_id(id).is_none() { + let blob_ctx = Self::new_blob_ctx(ctx)?; + self.current_blob_index = Some(self.alloc_index()?); + self.add_blob(blob_ctx); + } else { + self.current_blob_index = self.get_blob_idx_by_id(id); + } + let (_, blob_ctx) = self.get_current_blob().unwrap(); + if blob_ctx.blob_id.is_empty() { + blob_ctx.blob_id = id.to_string(); + } + // Safe to unwrap because the blob context has been added. + Ok(self.get_current_blob().unwrap()) + } + + /// Determine if the given blob has been created. + pub fn has_blob(&self, blob_id: &str) -> bool { + self.get_blob_idx_by_id(blob_id).is_some() + } + /// Set the global chunk dictionary for chunk deduplication. pub fn set_chunk_dict(&mut self, dict: Arc) { self.global_chunk_dict = dict @@ -1097,6 +1126,7 @@ impl BlobManager { compressed_blob_size, blob_features, flags, + build_ctx.is_chunkdict_generated, ); } RafsBlobTable::V6(table) => { @@ -1116,6 +1146,7 @@ impl BlobManager { ctx.blob_toc_digest, ctx.blob_meta_size, ctx.blob_toc_size, + build_ctx.is_chunkdict_generated, ctx.blob_meta_header, ctx.cipher_object.clone(), ctx.cipher_ctx.clone(), @@ -1293,6 +1324,9 @@ pub struct BuildContext { pub configuration: Arc, /// Generate the blob cache and blob meta pub blob_cache_generator: Option, + + /// Whether is chunkdict. + pub is_chunkdict_generated: bool, } impl BuildContext { @@ -1361,6 +1395,7 @@ impl BuildContext { features, configuration: Arc::new(ConfigV2::default()), blob_cache_generator: None, + is_chunkdict_generated: false, } } @@ -1379,6 +1414,10 @@ impl BuildContext { pub fn set_configuration(&mut self, config: Arc) { self.configuration = config; } + + pub fn set_is_chunkdict(&mut self, is_chunkdict: bool) { + self.is_chunkdict_generated = is_chunkdict; + } } impl Default for BuildContext { @@ -1411,6 +1450,7 @@ impl Default for BuildContext { features: Features::new(), configuration: Arc::new(ConfigV2::default()), blob_cache_generator: None, + is_chunkdict_generated: false, } } } diff --git a/builder/src/lib.rs b/builder/src/lib.rs index 7d785ea3f88..54f47e264a7 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -23,6 +23,9 @@ use sha2::Digest; use self::core::node::{Node, NodeInfo}; +pub use self::chunkdict_generator::ChunkdictBlobInfo; +pub use self::chunkdict_generator::ChunkdictChunkInfo; +pub use self::chunkdict_generator::Generator; pub use self::compact::BlobCompactor; pub use self::core::bootstrap::Bootstrap; pub use self::core::chunk_dict::{parse_chunk_dict_arg, ChunkDict, HashChunkDict}; @@ -40,6 +43,7 @@ pub use self::merge::Merger; pub use self::stargz::StargzBuilder; pub use self::tarball::TarballBuilder; +mod chunkdict_generator; mod compact; mod core; mod directory; diff --git a/rafs/src/metadata/cached_v5.rs b/rafs/src/metadata/cached_v5.rs index d6ba0b02742..61e4dd1d1b4 100644 --- a/rafs/src/metadata/cached_v5.rs +++ b/rafs/src/metadata/cached_v5.rs @@ -994,6 +994,7 @@ mod cached_tests { 0, BlobFeatures::_V5_NO_EXT_BLOB_TABLE, meta.flags, + false, ); let mut cached_inode = CachedInodeV5::new(blob_table, meta.clone()); cached_inode.load(&meta, &mut reader).unwrap(); diff --git a/rafs/src/metadata/layout/v5.rs b/rafs/src/metadata/layout/v5.rs index a1b8db1ea63..52a4c21a358 100644 --- a/rafs/src/metadata/layout/v5.rs +++ b/rafs/src/metadata/layout/v5.rs @@ -563,6 +563,7 @@ impl RafsV5BlobTable { compressed_size: u64, blob_features: BlobFeatures, flags: RafsSuperFlags, + is_chunkdict: bool, ) -> u32 { let blob_index = self.entries.len() as u32; let mut blob_info = BlobInfo::new( @@ -578,6 +579,9 @@ impl RafsV5BlobTable { blob_info.set_compressor(flags.into()); blob_info.set_digester(flags.into()); blob_info.set_prefetch_info(prefetch_offset as u64, prefetch_size as u64); + if is_chunkdict { + blob_info.set_chunkdict_generated(true); + } self.entries.push(Arc::new(blob_info)); self.extended.add( diff --git a/rafs/src/metadata/layout/v6.rs b/rafs/src/metadata/layout/v6.rs index 5099e4a722c..6a64607fb07 100644 --- a/rafs/src/metadata/layout/v6.rs +++ b/rafs/src/metadata/layout/v6.rs @@ -1754,7 +1754,8 @@ impl RafsV6Blob { blob_features.bits() ); return false; - } else if !tarfs_mode + } else if !blob_features.contains(BlobFeatures::IS_CHUNKDICT_GENERATED) + && !tarfs_mode && ci_uncompr_size != count * size_of::() as u64 { error!( @@ -1819,6 +1820,7 @@ impl RafsV6BlobTable { blob_toc_digest: [u8; 32], blob_meta_size: u64, blob_toc_size: u32, + is_chunkdict: bool, header: BlobCompressionContextHeader, cipher_object: Arc, cipher_context: Option, @@ -1851,6 +1853,8 @@ impl RafsV6BlobTable { blob_info.set_blob_toc_size(blob_toc_size); blob_info.set_cipher_info(flags.into(), cipher_object, cipher_context); + blob_info.set_chunkdict_generated(is_chunkdict); + self.entries.push(Arc::new(blob_info)); blob_index @@ -2725,6 +2729,7 @@ mod tests { [0; 32], 0, 0, + false, BlobCompressionContextHeader::default(), Arc::new(crypt::Algorithm::Aes128Xts.new_cipher().unwrap()), Some(CipherContext::default()), @@ -2767,6 +2772,7 @@ mod tests { [0; 32], 0, 0, + false, BlobCompressionContextHeader::default(), Arc::new(crypt::Algorithm::Aes128Xts.new_cipher().unwrap()), Some(CipherContext::default()), diff --git a/src/bin/nydus-image/deduplicate.rs b/src/bin/nydus-image/deduplicate.rs index 83de9188940..c28130e023f 100644 --- a/src/bin/nydus-image/deduplicate.rs +++ b/src/bin/nydus-image/deduplicate.rs @@ -4,13 +4,21 @@ //! Deduplicate for Chunk. use anyhow::{Context, Result}; +use core::cmp::Ordering; use nydus_api::ConfigV2; +use nydus_builder::BuildContext; +use nydus_builder::ConversionType; use nydus_builder::Tree; -use nydus_rafs::metadata::RafsSuper; +use nydus_builder::{ChunkdictBlobInfo, ChunkdictChunkInfo}; +use nydus_rafs::metadata::{RafsSuper, RafsVersion}; use nydus_storage::device::BlobInfo; use rusqlite::{params, Connection}; +use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryFrom; use std::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; +use std::result::Result::Ok; use std::sync::{Arc, Mutex}; #[derive(Debug)] @@ -18,7 +26,7 @@ pub enum DatabaseError { SqliteError(rusqlite::Error), PoisonError(String), // Add other database error variants here as needed, e.g.: - // MysqlError(mysql::Error), + // MysqlError(mysql::Error). } impl std::fmt::Display for DatabaseError { @@ -26,7 +34,7 @@ impl std::fmt::Display for DatabaseError { match *self { DatabaseError::SqliteError(ref err) => err.fmt(f), DatabaseError::PoisonError(ref err) => write!(f, "PoisonError: {}", err), - // Add other error type formatting here + // Add other error type formatting here. } } } @@ -47,16 +55,22 @@ pub trait Database { fn create_blob_table(&self) -> Result<()>; /// Inserts chunk information into the database. - fn insert_chunk(&self, chunk_info: &Chunk) -> Result<()>; + fn insert_chunk(&self, chunk_info: &ChunkdictChunkInfo) -> Result<()>; /// Inserts blob information into the database. - fn insert_blob(&self, blob_info: &Blob) -> Result<()>; + fn insert_blob(&self, blob_info: &ChunkdictBlobInfo) -> Result<()>; /// Retrieves all chunk information from the database. - fn get_chunks(&self) -> Result>; + fn get_chunks(&self) -> Result>; + + /// Retrieves all chunk information from the database filtered by blob ID. + fn get_chunks_by_blob_id(&self, blob_id: &str) -> Result>; /// Retrieves all blob information from the database. - fn get_blobs(&self) -> Result>; + fn get_blobs(&self) -> Result>; + + /// Retrieves blob information from the database filtered by blob ID. + fn get_blob_by_id(&self, blob_id: &str) -> Result; } pub struct SqliteDatabase { @@ -66,15 +80,11 @@ pub struct SqliteDatabase { impl SqliteDatabase { pub fn new(database_url: &str) -> Result { - // Delete the database file if it exists. + // Connect to a database that already exists. if let Ok(metadata) = fs::metadata(database_url) { if metadata.is_file() { - if let Err(err) = fs::remove_file(database_url) { - warn!( - "Warning: Unable to delete existing database file: {:?}.", - err - ); - } + } else { + panic!("Warning: Unable to find existing database file."); } } @@ -106,25 +116,93 @@ impl Database for SqliteDatabase { BlobTable::create(&self.blob_table).context("Failed to create blob table") } - fn insert_chunk(&self, chunk: &Chunk) -> Result<()> { + fn insert_chunk(&self, chunk: &ChunkdictChunkInfo) -> Result<()> { self.chunk_table .insert(chunk) .context("Failed to insert chunk") } - fn insert_blob(&self, blob: &Blob) -> Result<()> { + fn insert_blob(&self, blob: &ChunkdictBlobInfo) -> Result<()> { self.blob_table .insert(blob) .context("Failed to insert blob") } - fn get_chunks(&self) -> Result> { + fn get_chunks(&self) -> Result> { ChunkTable::list_all(&self.chunk_table).context("Failed to get chunks") } - fn get_blobs(&self) -> Result> { + fn get_chunks_by_blob_id(&self, blob_id: &str) -> Result> { + ChunkTable::list_all_by_blob_id(&self.chunk_table, blob_id).context("Failed to get chunks") + } + + fn get_blobs(&self) -> Result> { BlobTable::list_all(&self.blob_table).context("Failed to get blobs") } + + fn get_blob_by_id(&self, blob_id: &str) -> Result { + BlobTable::list_by_id(&self.blob_table, blob_id).context("Failed to get blob") + } +} + +/// Get fs version from bootstrap file. +fn get_fs_version(bootstrap_path: &Path) -> Result { + let (sb, _) = RafsSuper::load_from_file(bootstrap_path, Arc::new(ConfigV2::default()), false)?; + RafsVersion::try_from(sb.meta.version).context("Failed to get RAFS version number") +} + +/// Checks if all Bootstrap versions are consistent. +/// If they are inconsistent, returns an error and prints the version of each Bootstrap. +pub fn check_bootstrap_versions_consistency( + ctx: &mut BuildContext, + bootstrap_paths: &[PathBuf], +) -> Result<()> { + let mut versions = Vec::new(); + + for bootstrap_path in bootstrap_paths { + let version = get_fs_version(bootstrap_path)?; + versions.push((bootstrap_path.clone(), version)); + } + + if !versions.is_empty() { + let first_version = versions[0].1; + ctx.fs_version = first_version; + if versions.iter().any(|(_, v)| *v != first_version) { + for (path, version) in &versions { + println!("Bootstrap path {:?} has version {:?}", path, version); + } + return Err(anyhow!( + "Bootstrap versions are inconsistent, cannot use chunkdict." + )); + } + } + + Ok(()) +} + +// Get parent bootstrap context for chunkdict bootstrap. +pub fn update_ctx_from_parent_bootstrap( + ctx: &mut BuildContext, + bootstrap_path: &PathBuf, +) -> Result<()> { + let (sb, _) = RafsSuper::load_from_file(bootstrap_path, Arc::new(ConfigV2::default()), false)?; + + // Obtain the features of the first blob to use as the features for the blobs in chunkdict. + if let Some(first_blob) = sb.superblock.get_blob_infos().first() { + ctx.blob_features = first_blob.features(); + } + + let config = sb.meta.get_config(); + config.check_compatibility(&sb.meta)?; + + if config.is_tarfs_mode { + ctx.conversion_type = ConversionType::TarToTarfs; + } + ctx.fs_version = + RafsVersion::try_from(sb.meta.version).context("Failed to get RAFS version")?; + ctx.compressor = config.compressor; + + Ok(()) } pub struct Deduplicate { @@ -147,12 +225,14 @@ impl Deduplicate { &mut self, bootstrap_path: &Path, config: Arc, + image_reference: String, + version: String, ) -> anyhow::Result>> { let (sb, _) = RafsSuper::load_from_file(bootstrap_path, config, false)?; self.create_tables()?; let blob_infos = sb.superblock.get_blob_infos(); self.insert_blobs(&blob_infos)?; - self.insert_chunks(&blob_infos, &sb)?; + self.insert_chunks(&blob_infos, &sb, image_reference, version)?; Ok(blob_infos) } @@ -169,10 +249,14 @@ impl Deduplicate { fn insert_blobs(&mut self, blob_infos: &[Arc]) -> anyhow::Result<()> { for blob in blob_infos { self.db - .insert_blob(&Blob { + .insert_blob(&ChunkdictBlobInfo { blob_id: blob.blob_id().to_string(), blob_compressed_size: blob.compressed_size(), blob_uncompressed_size: blob.uncompressed_size(), + blob_compressor: blob.compressor().to_string(), + blob_meta_ci_compressed_size: blob.meta_ci_compressed_size(), + blob_meta_ci_uncompressed_size: blob.meta_ci_uncompressed_size(), + blob_meta_ci_offset: blob.meta_ci_offset(), }) .context("Failed to insert blob")?; } @@ -183,6 +267,8 @@ impl Deduplicate { &mut self, blob_infos: &[Arc], sb: &RafsSuper, + image_reference: String, + version: String, ) -> anyhow::Result<()> { let process_chunk = &mut |t: &Tree| -> Result<()> { let node = t.lock_node(); @@ -190,7 +276,9 @@ impl Deduplicate { let index = chunk.inner.blob_index(); let chunk_blob_id = blob_infos[index as usize].blob_id(); self.db - .insert_chunk(&Chunk { + .insert_chunk(&ChunkdictChunkInfo { + image_reference: image_reference.to_string(), + version: version.to_string(), chunk_blob_id, chunk_digest: chunk.inner.id().to_string(), chunk_compressed_size: chunk.inner.compressed_size(), @@ -209,27 +297,630 @@ impl Deduplicate { } } +pub struct Algorithm { + algorithm_name: String, + db: D, +} + +// Generate deduplicated chunkdict by exponential_smoothing algorithm. +type VersionMap = HashMap>; +// Generate deduplicated chunkdict by cluster algorithm. +type ImageMap = Vec, Vec>>; + +impl Algorithm { + pub fn new(algorithm: String, db_url: &str) -> anyhow::Result { + let algorithm_name = algorithm; + let db = SqliteDatabase::new(db_url)?; + Ok(Self { algorithm_name, db }) + } + + // Call the algorithm to generate a dictionary. + pub fn chunkdict_generate( + &mut self, + ) -> anyhow::Result<(Vec, Vec, Vec)> { + let all_chunks: Vec = self.db.chunk_table.list_all()?; + let mut chunkdict_chunks: Vec = Vec::new(); + let mut chunkdict_blobs: Vec = Vec::new(); + let mut core_image = Vec::new(); + let mut noise_points = Vec::new(); + + let (chunkdict_version, chunkdict_image) = match &self.algorithm_name as &str { + "exponential_smoothing" => Self::deduplicate_version(&all_chunks)?, + _ => { + bail!("Unsupported algorithm name:, please use a valid algorithm name, such as exponential_smoothing") + } + }; + for single_clustering in chunkdict_image { + for (image_list, cluster_dictionary) in single_clustering { + core_image.extend(image_list); + chunkdict_chunks.extend(cluster_dictionary); + } + } + for (_, dictionary) in chunkdict_version { + chunkdict_chunks.extend(dictionary); + } + let mut chunkdict_size = 0; + for i in &chunkdict_chunks { + chunkdict_size += i.chunk_compressed_size; + } + info!( + "Chunkdict size is {}", + chunkdict_size as f64 / 1024 as f64 / 1024 as f64 + ); + for chunk in all_chunks { + if !core_image.contains(&chunk.image_reference) + && !noise_points.contains(&chunk.image_reference) + { + noise_points.push(chunk.image_reference.clone()); + } + } + Self::fill_chunkdict(self, &mut chunkdict_chunks, &mut chunkdict_blobs)?; + Ok((chunkdict_chunks, chunkdict_blobs, noise_points)) + } + + /// Baseed chunk list to fill chunkdict, including all chunks in the same blob and all blobs in the chunkdict. + fn fill_chunkdict( + &mut self, + chunkdict_chunks: &mut Vec, + chunkdict_blobs: &mut Vec, + ) -> Result<()> { + let mut blob_ids = std::collections::HashSet::new(); + for chunk in chunkdict_chunks.iter() { + blob_ids.insert(chunk.chunk_blob_id.clone()); + } + for blob_id in blob_ids { + let mut chunks = self.db.get_chunks_by_blob_id(&blob_id)?; + chunks = chunks + .into_iter() + .collect::>() + .into_iter() + .collect::>(); + for chunk in chunks { + if !chunkdict_chunks.contains(&chunk) { + chunkdict_chunks.push(chunk); + } + } + chunkdict_blobs.push(self.db.get_blob_by_id(&blob_id)?); + } + Ok(()) + } + + // Algorithm "exponential_smoothing" + // List all chunk and sort them by the order in chunk table. + // Score each chunk by "exponential_smoothing" formula. + // Select chunks whose score is greater than threshold and generate chunk dictionary. + fn exponential_smoothing( + all_chunks: Vec, + threshold: f64, + ) -> anyhow::Result> { + let alpha = 0.5; + let mut smoothed_data = Vec::new(); + + let mut last_start_version_index = 0; + let mut start_version_index = 0; + let mut last_end_version_index = 0; + + for (chunk_index, chunk) in all_chunks.iter().enumerate() { + let mut is_duplicate: f64 = 0.0; + if chunk.version == all_chunks[0].version { + let smoothed_score: f64 = 0.0; + smoothed_data.push(smoothed_score); + } else { + if all_chunks[chunk_index - 1].version != all_chunks[chunk_index].version { + last_start_version_index = start_version_index; + start_version_index = chunk_index; + last_end_version_index = chunk_index - 1; + } + for last_chunk in all_chunks + .iter() + .take(last_end_version_index + 1) + .skip(last_start_version_index) + { + if chunk.chunk_digest == last_chunk.chunk_digest { + is_duplicate = 1.0; + break; + } + } + let smoothed_score: f64 = + alpha * is_duplicate + (1.0 - alpha) * smoothed_data[chunk_index - 1]; + smoothed_data.push(smoothed_score); + } + } + + let mut chunkdict: Vec = Vec::new(); + for i in 0..smoothed_data.len() { + let chunk = ChunkdictChunkInfo { + image_reference: all_chunks[i].image_reference.clone(), + version: all_chunks[i].version.clone(), + chunk_blob_id: all_chunks[i].chunk_blob_id.clone(), + chunk_digest: all_chunks[i].chunk_digest.clone(), + chunk_compressed_offset: all_chunks[i].chunk_compressed_offset, + chunk_uncompressed_offset: all_chunks[i].chunk_uncompressed_offset, + chunk_compressed_size: all_chunks[i].chunk_compressed_size, + chunk_uncompressed_size: all_chunks[i].chunk_uncompressed_size, + }; + if smoothed_data[i] > threshold { + chunkdict.push(chunk); + } + } + + // Deduplicate chunk dictionary. + let mut unique_chunks: BTreeMap = BTreeMap::new(); + for chunk in &chunkdict { + if !unique_chunks.contains_key(&chunk.chunk_digest) { + unique_chunks.insert(chunk.chunk_digest.clone(), chunk.clone()); + } + } + let unique_chunk_list: Vec = unique_chunks.values().cloned().collect(); + Ok(unique_chunk_list) + } + + /// Calculate the distance between two images. + fn distance( + image1: &[ChunkdictChunkInfo], + image2: &[ChunkdictChunkInfo], + ) -> anyhow::Result { + // The total size of all chunks in both images. + let mut image1_size: u64 = 0; + let mut image2_size: u64 = 0; + + for chunk1 in image1 { + image1_size += chunk1.chunk_compressed_size as u64; + } + for chunk2 in image2 { + image2_size += chunk2.chunk_compressed_size as u64; + } + + // The total size of the chunk repeated between two images. + let all_chunks: Vec<&ChunkdictChunkInfo> = image1.iter().chain(image2.iter()).collect(); + let mut compressed_size_map: std::collections::HashMap = + std::collections::HashMap::new(); + let mut processed_digests: HashSet<&String> = HashSet::new(); + + for chunk in all_chunks { + if processed_digests.contains(&chunk.chunk_digest) { + let size = compressed_size_map + .entry(chunk.chunk_digest.clone()) + .or_insert(0); + *size += chunk.chunk_compressed_size as u64; + } + processed_digests.insert(&chunk.chunk_digest); + } + + let repeat_size: u64 = compressed_size_map.values().cloned().sum(); + let distance: f64 = 1.0 - (repeat_size as f64 / ((image1_size + image2_size) as f64)); + Ok(distance) + } + + /// Divide the chunk list into sublists by image name. + fn divide_by_image(all_chunks: &[ChunkdictChunkInfo]) -> anyhow::Result> { + let mut image_chunks: std::collections::HashMap> = + std::collections::HashMap::new(); + let mut datadict: Vec = Vec::new(); + for chunk in all_chunks { + image_chunks + .entry(chunk.image_reference.clone()) + .or_insert(Vec::new()) + .push(chunk.clone()); + } + for (index, chunks) in image_chunks { + let data_point = DataPoint { + image_reference: index, + chunk_list: chunks, + visited: false, + clustered: false, + cluster_id: 0, + }; + datadict.push(data_point); + } + Ok(datadict) + } + + fn divide_set( + chunks: &[ChunkdictChunkInfo], + train_percentage: f64, + ) -> anyhow::Result<(Vec, Vec)> { + // Create a HashMap to store the list of chunks for each image_reference. + let mut image_chunks: BTreeMap> = BTreeMap::new(); + + // Group chunks into image_reference. + for chunk in chunks { + let entry = image_chunks + .entry(chunk.image_reference.clone()) + .or_insert(Vec::new()); + entry.push(chunk.clone()); + } + + // Create the final training and testing sets. + let mut train_set: Vec = Vec::new(); + let mut test_set: Vec = Vec::new(); + + // Iterate through the list of Chunks for each image_reference. + for (_, chunk_list) in image_chunks.iter_mut() { + let mut version_chunks: BTreeMap> = + BTreeMap::new(); + // Group the chunks in the image into version. + for chunk in chunk_list { + let entry = version_chunks + .entry(CustomString(chunk.version.clone())) + .or_insert(Vec::new()); + entry.push(chunk.clone()); + } + + let num_version_groups = version_chunks.len(); + let num_train_groups = (num_version_groups as f64 * train_percentage) as usize; + let version_groups = version_chunks.into_iter().collect::>(); + let (train_version_groups, test_version_groups) = + version_groups.split_at(num_train_groups); + + for (_, train_chunks) in train_version_groups { + for chunk in train_chunks { + train_set.push(chunk.clone()); + } + } + + for (_, test_chunks) in test_version_groups { + for chunk in test_chunks { + test_set.push(chunk.clone()); + } + } + } + Ok((train_set, test_set)) + } + + /// Dbscan clustering algorithm. + fn dbsacn(data_point: &mut Vec, radius: f64) -> anyhow::Result<&Vec> { + let min_points = 10; + let mut cluster_id = 1; + + for i in 0..data_point.len() { + if data_point[i].visited { + continue; + } + if data_point[i].clustered { + continue; + } + + let mut neighbors = Vec::new(); + for j in 0..data_point.len() { + let distance = + Self::distance(&data_point[i].chunk_list, &data_point[j].chunk_list)?; + if !data_point[j].visited && distance <= radius { + neighbors.push(j); + } + } + if neighbors.len() < min_points { + data_point[i].clustered = false; + } else { + Self::expand_cluster(data_point, i, cluster_id, radius, min_points)?; + cluster_id += 1; + } + } + Ok(data_point) + } + + /// Core point expansion cluster in dbscan algorithm. + fn expand_cluster( + data_point: &mut Vec, + i: usize, + cluster_id: i32, + radius: f64, + min_points: usize, + ) -> anyhow::Result<()> { + data_point[i].clustered = true; + data_point[i].cluster_id = cluster_id; + + let mut stack = vec![i]; + while let Some(q) = stack.pop() { + if data_point[q].visited { + continue; + } + data_point[q].visited = true; + let mut q_neighbors = Vec::new(); + for j in 0..data_point.len() { + let distance = + Self::distance(&data_point[q].chunk_list, &data_point[j].chunk_list)?; + if !data_point[j].visited && distance <= radius { + q_neighbors.push(j); + } + } + if q_neighbors.len() >= min_points { + for &r_index in &q_neighbors { + if !data_point[r_index].visited { + data_point[r_index].visited = true; + stack.push(r_index) + } + if !data_point[r_index].clustered { + data_point[r_index].clustered = true; + data_point[r_index].cluster_id = cluster_id; + } + } + } else { + data_point[i].clustered = false; + } + } + Ok(()) + } + + /// Aggregate the chunks in each cluster into a dictionary. + fn aggregate_chunk( + data_point: &[DataPoint], + ) -> anyhow::Result, Vec>> { + // Divide chunk list according to clusters. + let mut cluster_map: HashMap> = HashMap::new(); + for (index, point) in data_point.iter().enumerate() { + if point.clustered { + let cluster_id = point.cluster_id; + cluster_map + .entry(cluster_id) + .or_insert(Vec::new()) + .push(index); + } + } + + // Iterate through each cluster. + let mut dictionary: HashMap, Vec> = HashMap::new(); + for (_, cluster_points) in cluster_map.iter() { + let mut image_total_counts: HashMap<&str, usize> = HashMap::new(); + let mut image_list: Vec = Vec::new(); + // Count the total number of images in the cluster. + for &point_index in cluster_points { + let point = &data_point[point_index]; + let image_total_count = image_total_counts + .entry(&point.image_reference) + .or_insert(0); + *image_total_count += 1; + + image_list.push(point.image_reference.clone()); + } + + // Count the number of images in which chunks appear in the cluster. + let mut chunk_digest_counts: HashMap = HashMap::new(); + for &point_index in cluster_points { + let point = &data_point[point_index]; + let chunk_digest_set: HashSet = point + .chunk_list + .iter() + .map(|chunk| chunk.chunk_digest.clone()) + .collect(); + for chunk_digest in chunk_digest_set { + let count = chunk_digest_counts + .entry(chunk_digest.to_string()) + .or_insert(0); + *count += 1; + } + } + + let mut chunk_list: Vec = Vec::new(); + let mut added_chunk_digests: HashSet = HashSet::new(); + for &point_index in cluster_points { + let point = &data_point[point_index]; + for chunk in &point.chunk_list { + let chunk_digest = &chunk.chunk_digest; + if !added_chunk_digests.contains(chunk_digest) { + let count = chunk_digest_counts.get(chunk_digest).unwrap_or(&0); + if *count as f64 / image_total_counts.len() as f64 >= 0.9 { + chunk_list.push(chunk.clone()); + added_chunk_digests.insert(chunk_digest.to_string()); + } + } + } + } + dictionary.insert(image_list, chunk_list); + } + Ok(dictionary) + } + + fn deduplicate_image( + all_chunks: Vec, + ) -> anyhow::Result, Vec>>> { + let train_percentage = 0.7; + let max_cluster_count = 7; + let mut counter = 0; + let all_chunks_clone = all_chunks; + let mut data_dict: Vec, Vec>> = Vec::new(); + + let (mut train, mut test) = Self::divide_set(&all_chunks_clone, train_percentage)?; + while counter < max_cluster_count { + // Parameter settings. + let mut data_point = Self::divide_by_image(&train)?; + let all_train_length = data_point.len(); + let mut radius = 0.5; + let max_radius = 0.9; + let mut test_chunk_sizes = Vec::new(); + let mut min_test_size: u64 = std::u64::MAX; + let mut min_data_dict = HashMap::new(); + let mut data_cluster_length = 0; + + // Adjust the radius size to select the dictionary that tests best. + while radius <= max_radius { + let data_cluster = Self::dbsacn(&mut data_point, radius)?; + data_cluster_length = data_cluster.len(); + + let data_dict = Self::aggregate_chunk(data_cluster)?; + + let all_chunks: HashSet<&ChunkdictChunkInfo> = + data_dict.values().flat_map(|v| v.iter()).collect(); + let mut total_test_set_size: u64 = 0; + + for chunk in test.iter() { + if !all_chunks.contains(chunk) { + total_test_set_size += chunk.chunk_compressed_size as u64; + } + } + test_chunk_sizes.push((radius, total_test_set_size)); + min_test_size = total_test_set_size; + if total_test_set_size <= min_test_size { + min_test_size = total_test_set_size; + min_data_dict = data_dict; + } + radius += 0.05; + } + debug!("test set size is {}", min_test_size); + + let min_chunk_list: Vec = min_data_dict + .values() + .flat_map(|chunk_list| chunk_list.iter()) + .cloned() + .collect(); + let mut to_remove = Vec::new(); + for chunk in train.iter() { + if min_chunk_list.contains(chunk) { + to_remove.push(chunk.clone()); + } + } + for chunk in &to_remove { + train.retain(|c| c.chunk_digest != chunk.chunk_digest); + } + for chunk in &to_remove { + test.retain(|c| c.chunk_digest != chunk.chunk_digest); + } + if (data_cluster_length as f64 / all_train_length as f64) < 0.2 { + break; + } + data_dict.push(min_data_dict); + counter += 1; + } + Ok(data_dict) + } + + pub fn deduplicate_version( + all_chunks: &[ChunkdictChunkInfo], + ) -> anyhow::Result<(VersionMap, ImageMap)> { + let mut all_chunks_size = 0; + for i in all_chunks { + all_chunks_size += i.chunk_compressed_size; + } + info!( + "All chunk size is {}", + all_chunks_size as f64 / 1024 as f64 / 1024 as f64 + ); + + let train_percentage = 0.7; + let datadict = Self::deduplicate_image(all_chunks.to_owned())?; + let (train, test) = Self::divide_set(all_chunks, train_percentage)?; + let mut train_set_size = 0; + for i in &train { + train_set_size += i.chunk_compressed_size; + } + info!( + "Train set size is {}", + train_set_size as f64 / 1024 as f64 / 1024 as f64 + ); + + let mut test_set_size = 0; + for i in &test { + test_set_size += i.chunk_compressed_size; + } + info!( + "Test set size is {}", + test_set_size as f64 / 1024 as f64 / 1024 as f64 + ); + + let mut version_datadict: HashMap> = HashMap::new(); + let mut data_point = Self::divide_by_image(&train)?; + + let mut threshold = 0.5; + let max_threshold = 0.8; + + let mut test_total_size: u32 = 0; + let mut min_test_size: u32 = std::u32::MAX; + let mut min_data_dict = HashMap::new(); + + while threshold <= max_threshold { + version_datadict.clear(); + for point in data_point.iter_mut() { + for single_dictionary in &datadict { + for (key, value) in single_dictionary.iter() { + if key.contains(&point.image_reference) { + let mut to_remove = Vec::new(); + for chunk in point.chunk_list.iter() { + if value.contains(chunk) { + to_remove.push(chunk.clone()); + } + } + for chunk in to_remove { + point.chunk_list.retain(|c| c != &chunk); + } + } + } + } + let chunk_dict = Self::exponential_smoothing(point.chunk_list.clone(), threshold)?; + version_datadict.insert(point.image_reference.clone(), chunk_dict); + } + + let mut test_by_image = Self::divide_by_image(&test)?; + for point in test_by_image.iter_mut() { + if version_datadict.contains_key(&point.image_reference.clone()) { + let mut to_remove = Vec::new(); + let mut vec_string = Vec::new(); + let chunkdict_option = version_datadict.get(&point.image_reference); + if let Some(chunkdict) = chunkdict_option { + for i in chunkdict { + vec_string.push(i.chunk_digest.clone()); + } + } + for chunk in point.chunk_list.iter() { + if vec_string.contains(&chunk.chunk_digest) { + to_remove.push(chunk.clone()); + } + } + for chunk in to_remove { + point.chunk_list.retain(|c| c != &chunk); + } + } + for chunk in point.chunk_list.iter() { + test_total_size = test_total_size + .checked_add(chunk.chunk_compressed_size) + .unwrap_or(test_total_size); + } + } + if test_total_size <= min_test_size { + min_test_size = test_total_size; + min_data_dict = version_datadict.clone(); + } + threshold += 0.05; + } + info!( + "After deduplicating test set size is {} and deduplicating rate is {} ", + min_test_size as f64 / 1024 as f64 / 1024 as f64, + 1.0 - (min_test_size as f64) / (test_set_size as f64) + ); + Ok((min_data_dict, datadict)) + } +} + +#[allow(dead_code)] +#[derive(Debug)] +struct DataPoint { + image_reference: String, + chunk_list: Vec, + visited: bool, + clustered: bool, + cluster_id: i32, +} + pub trait Table: Sync + Send + Sized + 'static where Err: std::error::Error + 'static, { - /// clear table. + /// Clear table. fn clear(&self) -> Result<(), Err>; - /// create table. + /// Create table. fn create(&self) -> Result<(), Err>; - /// insert data. + /// Insert data. fn insert(&self, table: &T) -> Result<(), Err>; - /// select all data. + /// Select all data. fn list_all(&self) -> Result, Err>; - /// select data with offset and limit. + /// Select data with offset and limit. fn list_paged(&self, offset: i64, limit: i64) -> Result, Err>; } -#[derive(Debug)] +#[derive()] pub struct ChunkTable { conn: Arc>, } @@ -248,19 +939,128 @@ impl ChunkTable { conn: Arc::new(Mutex::new(conn)), }) } + + /// Select all data filtered by blob ID. + fn list_all_by_blob_id(&self, blob_id: &str) -> Result, DatabaseError> { + let mut offset = 0; + let limit: i64 = 100; + let mut all_chunks_by_blob_id = Vec::new(); + + loop { + let chunks = self.list_paged_by_blob_id(blob_id, offset, limit)?; + if chunks.is_empty() { + break; + } + + all_chunks_by_blob_id.extend(chunks); + offset += limit; + } + + Ok(all_chunks_by_blob_id) + } + + /// Select data with offset and limit filtered by blob ID. + fn list_paged_by_blob_id( + &self, + blob_id: &str, + offset: i64, + limit: i64, + ) -> Result, DatabaseError> { + let conn_guard = self + .conn + .lock() + .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; + let mut stmt: rusqlite::Statement<'_> = conn_guard + .prepare( + "SELECT id, image_reference, version, chunk_blob_id, chunk_digest, chunk_compressed_size, + chunk_uncompressed_size, chunk_compressed_offset, chunk_uncompressed_offset from chunk + WHERE chunk_blob_id = ?1 + ORDER BY id LIMIT ?2 OFFSET ?3", + )?; + let chunk_iterator = stmt.query_map(params![blob_id, limit, offset], |row| { + Ok(ChunkdictChunkInfo { + image_reference: row.get(1)?, + version: row.get(2)?, + chunk_blob_id: row.get(3)?, + chunk_digest: row.get(4)?, + chunk_compressed_size: row.get(5)?, + chunk_uncompressed_size: row.get(6)?, + chunk_compressed_offset: row.get(7)?, + chunk_uncompressed_offset: row.get(8)?, + }) + })?; + let mut chunks = Vec::new(); + for chunk in chunk_iterator { + chunks.push(chunk.map_err(DatabaseError::SqliteError)?); + } + Ok(chunks) + } } -#[derive(Debug)] -pub struct Chunk { - chunk_blob_id: String, - chunk_digest: String, - chunk_compressed_size: u32, - chunk_uncompressed_size: u32, - chunk_compressed_offset: u64, - chunk_uncompressed_offset: u64, +#[derive(Debug, Clone)] +struct CustomString(String); + +impl Ord for CustomString { + /// Extract the numbers in the string. + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + let mut current_number = String::new(); + + // Parse numbers in strings. + let mut numbers1 = Vec::new(); + let mut numbers2 = Vec::new(); + + for ch in self.0.chars() { + if ch.is_ascii_digit() { + current_number.push(ch); + } else if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers1.push(number); + } + current_number.clear(); + } + } + if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers1.push(number); + } + } + current_number.clear(); + + for ch in other.0.chars() { + if ch.is_ascii_digit() { + current_number.push(ch); + } else if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers2.push(number); + } + current_number.clear(); + } + } + if !current_number.is_empty() { + if let Ok(number) = current_number.parse::() { + numbers2.push(number); + } + } + current_number.clear(); + numbers1.cmp(&numbers2) + } +} + +impl PartialOrd for CustomString { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for CustomString { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } } -impl Table for ChunkTable { +impl Eq for CustomString {} + +impl Table for ChunkTable { fn clear(&self) -> Result<(), DatabaseError> { self.conn .lock() @@ -277,6 +1077,8 @@ impl Table for ChunkTable { .execute( "CREATE TABLE IF NOT EXISTS chunk ( id INTEGER PRIMARY KEY, + image_reference TEXT, + version TEXT, chunk_blob_id TEXT NOT NULL, chunk_digest TEXT, chunk_compressed_size INT, @@ -290,12 +1092,14 @@ impl Table for ChunkTable { Ok(()) } - fn insert(&self, chunk: &Chunk) -> Result<(), DatabaseError> { + fn insert(&self, chunk: &ChunkdictChunkInfo) -> Result<(), DatabaseError> { self.conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))? .execute( "INSERT INTO chunk( + image_reference, + version, chunk_blob_id, chunk_digest, chunk_compressed_size, @@ -303,9 +1107,11 @@ impl Table for ChunkTable { chunk_compressed_offset, chunk_uncompressed_offset ) - VALUES (?1, ?2, ?3, ?4, ?5, ?6); + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8); ", rusqlite::params![ + chunk.image_reference, + chunk.version, chunk.chunk_blob_id, chunk.chunk_digest, chunk.chunk_compressed_size, @@ -318,7 +1124,7 @@ impl Table for ChunkTable { Ok(()) } - fn list_all(&self) -> Result, DatabaseError> { + fn list_all(&self) -> Result, DatabaseError> { let mut offset = 0; let limit: i64 = 100; let mut all_chunks = Vec::new(); @@ -336,25 +1142,31 @@ impl Table for ChunkTable { Ok(all_chunks) } - fn list_paged(&self, offset: i64, limit: i64) -> Result, DatabaseError> { + fn list_paged( + &self, + offset: i64, + limit: i64, + ) -> Result, DatabaseError> { let conn_guard = self .conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; let mut stmt: rusqlite::Statement<'_> = conn_guard .prepare( - "SELECT id, chunk_blob_id, chunk_digest, chunk_compressed_size, + "SELECT id, image_reference, version, chunk_blob_id, chunk_digest, chunk_compressed_size, chunk_uncompressed_size, chunk_compressed_offset, chunk_uncompressed_offset from chunk ORDER BY id LIMIT ?1 OFFSET ?2", )?; let chunk_iterator = stmt.query_map(params![limit, offset], |row| { - Ok(Chunk { - chunk_blob_id: row.get(1)?, - chunk_digest: row.get(2)?, - chunk_compressed_size: row.get(3)?, - chunk_uncompressed_size: row.get(4)?, - chunk_compressed_offset: row.get(5)?, - chunk_uncompressed_offset: row.get(6)?, + Ok(ChunkdictChunkInfo { + image_reference: row.get(1)?, + version: row.get(2)?, + chunk_blob_id: row.get(3)?, + chunk_digest: row.get(4)?, + chunk_compressed_size: row.get(5)?, + chunk_uncompressed_size: row.get(6)?, + chunk_compressed_offset: row.get(7)?, + chunk_uncompressed_offset: row.get(8)?, }) })?; let mut chunks = Vec::new(); @@ -384,15 +1196,38 @@ impl BlobTable { conn: Arc::new(Mutex::new(conn)), }) } -} -pub struct Blob { - blob_id: String, - blob_compressed_size: u64, - blob_uncompressed_size: u64, + pub fn list_by_id(&self, blob_id: &str) -> Result { + let conn_guard = self + .conn + .lock() + .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; + let mut stmt = conn_guard.prepare( + "SELECT blob_id, blob_compressed_size, blob_uncompressed_size, blob_compressor, blob_meta_ci_compressed_size, blob_meta_ci_uncompressed_size, blob_meta_ci_offset FROM blob WHERE blob_id = ?1", + )?; + let mut blob_iterator = stmt.query_map([blob_id], |row| { + Ok(ChunkdictBlobInfo { + blob_id: row.get(0)?, + blob_compressed_size: row.get(1)?, + blob_uncompressed_size: row.get(2)?, + blob_compressor: row.get(3)?, + blob_meta_ci_compressed_size: row.get(4)?, + blob_meta_ci_uncompressed_size: row.get(5)?, + blob_meta_ci_offset: row.get(6)?, + }) + })?; + + if let Some(blob) = blob_iterator.next() { + blob.map_err(DatabaseError::SqliteError) + } else { + Err(DatabaseError::SqliteError( + rusqlite::Error::QueryReturnedNoRows, + )) + } + } } -impl Table for BlobTable { +impl Table for BlobTable { fn clear(&self) -> Result<(), DatabaseError> { self.conn .lock() @@ -408,10 +1243,14 @@ impl Table for BlobTable { .map_err(|e| DatabaseError::PoisonError(e.to_string()))? .execute( "CREATE TABLE IF NOT EXISTS blob ( - id INTEGER PRIMARY KEY, - blob_id TEXT NOT NULL, - blob_compressed_size INT, - blob_uncompressed_size INT + id INTEGER PRIMARY KEY, + blob_id TEXT NOT NULL, + blob_compressed_size INT, + blob_uncompressed_size INT, + blob_compressor TEXT, + blob_meta_ci_compressed_size INT, + blob_meta_ci_uncompressed_size INT, + blob_meta_ci_offset INT )", [], ) @@ -419,7 +1258,7 @@ impl Table for BlobTable { Ok(()) } - fn insert(&self, blob: &Blob) -> Result<(), DatabaseError> { + fn insert(&self, blob: &ChunkdictBlobInfo) -> Result<(), DatabaseError> { self.conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))? @@ -427,21 +1266,29 @@ impl Table for BlobTable { "INSERT INTO blob ( blob_id, blob_compressed_size, - blob_uncompressed_size + blob_uncompressed_size, + blob_compressor, + blob_meta_ci_compressed_size, + blob_meta_ci_uncompressed_size, + blob_meta_ci_offset ) - VALUES (?1, ?2, ?3); + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7); ", rusqlite::params![ blob.blob_id, blob.blob_compressed_size, - blob.blob_uncompressed_size + blob.blob_uncompressed_size, + blob.blob_compressor, + blob.blob_meta_ci_compressed_size, + blob.blob_meta_ci_uncompressed_size, + blob.blob_meta_ci_offset, ], ) .map_err(DatabaseError::SqliteError)?; Ok(()) } - fn list_all(&self) -> Result, DatabaseError> { + fn list_all(&self) -> Result, DatabaseError> { let mut offset = 0; let limit: i64 = 100; let mut all_blobs = Vec::new(); @@ -459,20 +1306,24 @@ impl Table for BlobTable { Ok(all_blobs) } - fn list_paged(&self, offset: i64, limit: i64) -> Result, DatabaseError> { + fn list_paged(&self, offset: i64, limit: i64) -> Result, DatabaseError> { let conn_guard = self .conn .lock() .map_err(|e| DatabaseError::PoisonError(e.to_string()))?; let mut stmt: rusqlite::Statement<'_> = conn_guard.prepare( - "SELECT blob_id, blob_compressed_size, blob_uncompressed_size from blob + "SELECT blob_id, blob_compressed_size, blob_uncompressed_size, blob_compressor, blob_meta_ci_compressed_size, blob_meta_ci_uncompressed_size, blob_meta_ci_offset from blob ORDER BY id LIMIT ?1 OFFSET ?2", )?; let blob_iterator = stmt.query_map(params![limit, offset], |row| { - Ok(Blob { + Ok(ChunkdictBlobInfo { blob_id: row.get(0)?, blob_compressed_size: row.get(1)?, blob_uncompressed_size: row.get(2)?, + blob_compressor: row.get(3)?, + blob_meta_ci_compressed_size: row.get(4)?, + blob_meta_ci_uncompressed_size: row.get(5)?, + blob_meta_ci_offset: row.get(6)?, }) })?; let mut blobs = Vec::new(); @@ -488,14 +1339,45 @@ mod tests { use super::*; use rusqlite::Result; + #[test] + fn test_partial_cmp() -> Result<(), Box> { + let custom_string1 = CustomString("nydus_1.2.3".to_string()); + let custom_string2 = CustomString("nydus_1.2.10".to_string()); + let custom_string3 = CustomString("nydus_2.0".to_string()); + + assert!(custom_string1 < custom_string2); + assert!(custom_string2 < custom_string3); + assert!(custom_string1 < custom_string3); + + assert!(custom_string1 <= custom_string2); + assert!(custom_string2 <= custom_string3); + assert!(custom_string1 <= custom_string3); + + assert!(custom_string2 > custom_string1); + assert!(custom_string3 > custom_string2); + assert!(custom_string3 > custom_string1); + + assert!(custom_string2 >= custom_string1); + assert!(custom_string3 >= custom_string2); + assert!(custom_string3 >= custom_string1); + + assert_eq!(custom_string1, CustomString("nydus_1.2.3".to_string())); + assert_ne!(custom_string1, custom_string2); + Ok(()) + } + #[test] fn test_blob_table() -> Result<(), Box> { let blob_table = BlobTable::new_in_memory()?; blob_table.create()?; - let blob = Blob { + let blob = ChunkdictBlobInfo { blob_id: "BLOB123".to_string(), blob_compressed_size: 1024, blob_uncompressed_size: 2048, + blob_compressor: "zstd".to_string(), + blob_meta_ci_compressed_size: 1024, + blob_meta_ci_uncompressed_size: 2048, + blob_meta_ci_offset: 0, }; blob_table.insert(&blob)?; let blobs = blob_table.list_all()?; @@ -503,6 +1385,16 @@ mod tests { assert_eq!(blobs[0].blob_id, blob.blob_id); assert_eq!(blobs[0].blob_compressed_size, blob.blob_compressed_size); assert_eq!(blobs[0].blob_uncompressed_size, blob.blob_uncompressed_size); + assert_eq!(blobs[0].blob_compressor, blob.blob_compressor); + assert_eq!( + blobs[0].blob_meta_ci_compressed_size, + blob.blob_meta_ci_compressed_size + ); + assert_eq!( + blobs[0].blob_meta_ci_uncompressed_size, + blob.blob_meta_ci_uncompressed_size + ); + assert_eq!(blobs[0].blob_meta_ci_offset, blob.blob_meta_ci_offset); Ok(()) } @@ -510,7 +1402,9 @@ mod tests { fn test_chunk_table() -> Result<(), Box> { let chunk_table = ChunkTable::new_in_memory()?; chunk_table.create()?; - let chunk = Chunk { + let chunk = ChunkdictChunkInfo { + image_reference: "REDIS".to_string(), + version: "1.0.0".to_string(), chunk_blob_id: "BLOB123".to_string(), chunk_digest: "DIGEST123".to_string(), chunk_compressed_size: 512, @@ -519,8 +1413,21 @@ mod tests { chunk_uncompressed_offset: 0, }; chunk_table.insert(&chunk)?; + let chunk2 = ChunkdictChunkInfo { + image_reference: "REDIS02".to_string(), + version: "1.0.0".to_string(), + chunk_blob_id: "BLOB456".to_string(), + chunk_digest: "DIGEST123".to_string(), + chunk_compressed_size: 512, + chunk_uncompressed_size: 1024, + chunk_compressed_offset: 0, + chunk_uncompressed_offset: 0, + }; + chunk_table.insert(&chunk2)?; let chunks = chunk_table.list_all()?; - assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].image_reference, chunk.image_reference); + assert_eq!(chunks[0].version, chunk.version); + assert_eq!(chunks.len(), 2); assert_eq!(chunks[0].chunk_blob_id, chunk.chunk_blob_id); assert_eq!(chunks[0].chunk_digest, chunk.chunk_digest); assert_eq!(chunks[0].chunk_compressed_size, chunk.chunk_compressed_size); @@ -536,6 +1443,11 @@ mod tests { chunks[0].chunk_uncompressed_offset, chunk.chunk_uncompressed_offset ); + + let chunks = chunk_table.list_all_by_blob_id(&chunk.chunk_blob_id)?; + assert_eq!(chunks[0].chunk_blob_id, chunk.chunk_blob_id); + assert_eq!(chunks.len(), 1); + Ok(()) } @@ -544,10 +1456,14 @@ mod tests { let blob_table = BlobTable::new_in_memory()?; blob_table.create()?; for i in 0..200 { - let blob = Blob { + let blob = ChunkdictBlobInfo { blob_id: format!("BLOB{}", i), blob_compressed_size: i, blob_uncompressed_size: i * 2, + blob_compressor: "zstd".to_string(), + blob_meta_ci_compressed_size: i, + blob_meta_ci_uncompressed_size: i * 2, + blob_meta_ci_offset: i * 3, }; blob_table.insert(&blob)?; } @@ -556,6 +1472,10 @@ mod tests { assert_eq!(blobs[0].blob_id, "BLOB100"); assert_eq!(blobs[0].blob_compressed_size, 100); assert_eq!(blobs[0].blob_uncompressed_size, 200); + assert_eq!(blobs[0].blob_compressor, "zstd"); + assert_eq!(blobs[0].blob_meta_ci_compressed_size, 100); + assert_eq!(blobs[0].blob_meta_ci_uncompressed_size, 200); + assert_eq!(blobs[0].blob_meta_ci_offset, 300); Ok(()) } @@ -565,7 +1485,9 @@ mod tests { chunk_table.create()?; for i in 0..200 { let i64 = i as u64; - let chunk = Chunk { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", i), chunk_blob_id: format!("BLOB{}", i), chunk_digest: format!("DIGEST{}", i), chunk_compressed_size: i, @@ -577,6 +1499,8 @@ mod tests { } let chunks = chunk_table.list_paged(100, 100)?; assert_eq!(chunks.len(), 100); + assert_eq!(chunks[0].image_reference, "REDIS100"); + assert_eq!(chunks[0].version, "1.0.0100"); assert_eq!(chunks[0].chunk_blob_id, "BLOB100"); assert_eq!(chunks[0].chunk_digest, "DIGEST100"); assert_eq!(chunks[0].chunk_compressed_size, 100); @@ -585,4 +1509,272 @@ mod tests { assert_eq!(chunks[0].chunk_uncompressed_offset, 400); Ok(()) } + + #[test] + fn test_algorithm_exponential_smoothing() -> Result<(), Box> { + let threshold = 0.1; + let mut all_chunk: Vec = Vec::new(); + for i in 0..199 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 0), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 2), + chunk_compressed_size: i, + chunk_uncompressed_size: i * 2, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunk.push(chunk); + } + let chunkdict = Algorithm::::exponential_smoothing(all_chunk, threshold)?; + assert_eq!(chunkdict.len(), 2); + assert_eq!(chunkdict[0].image_reference, "REDIS0"); + assert_eq!(chunkdict[0].version, "1.0.01"); + assert_eq!(chunkdict[0].chunk_blob_id, "BLOB99"); + assert_eq!(chunkdict[0].chunk_digest, "DIGEST0"); + assert_eq!(chunkdict[0].chunk_compressed_size, 99); + assert_eq!(chunkdict[0].chunk_uncompressed_size, 198); + assert_eq!(chunkdict[0].chunk_compressed_offset, 297); + assert_eq!(chunkdict[0].chunk_uncompressed_offset, 396); + Ok(()) + } + + #[test] + fn test_divide_by_image() -> Result<(), Box> { + let db_url = "./metadata.db"; + let chunk_table = ChunkTable::new(db_url)?; + chunk_table.create()?; + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i / 50), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 2), + chunk_compressed_size: i, + chunk_uncompressed_size: i * 2, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + chunk_table.insert(&chunk)?; + } + let algorithm = String::from("exponential_smoothing"); + let algorithm = Algorithm::::new(algorithm, db_url)?; + let all_chunks = algorithm.db.chunk_table.list_all()?; + assert_eq!(all_chunks.len(), 200); + let datadict = Algorithm::::divide_by_image(&all_chunks)?; + assert_eq!(datadict.len(), 4); + assert_eq!(datadict[3].cluster_id, 0); + assert_eq!(datadict[3].chunk_list.len(), 50); + chunk_table.clear()?; + Ok(()) + } + + #[test] + fn test_distance() -> Result<(), Box> { + let mut all_chunks1: Vec = Vec::new(); + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 0), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 4), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunks1.push(chunk); + } + let mut all_chunks2: Vec = Vec::new(); + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 1), + version: format!("1.0.0{}", (i + 1) / 100), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 4), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunks2.push(chunk); + } + let datadict = Algorithm::::distance(&all_chunks1, &all_chunks2)?; + assert!( + (datadict - 0.01).abs() <= 0.0001, + "Expected {} to be approximately equal to {} with tolerance {}", + datadict, + 0.01, + 0.0001 + ); + Ok(()) + } + + #[test] + fn test_divide_set() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", j / 10), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let (train, test) = Algorithm::::divide_set(&all_chunks, 0.7)?; + assert_eq!(train.len(), 14000); + assert_eq!(train[0].image_reference, "REDIS0"); + assert_eq!(train[0].version, "1.0.00"); + assert_eq!(test.len(), 6000); + assert_eq!(test[0].image_reference, "REDIS0"); + assert_eq!(test[0].version, "1.0.07"); + Ok(()) + } + + #[test] + fn test_dbscan() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + let radius = 0.6; + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", j / 10), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let mut data_point = Algorithm::::divide_by_image(&all_chunks)?; + let datadict = Algorithm::::dbsacn(&mut data_point, radius)?; + assert_eq!(datadict.len(), 200); + if datadict[150].chunk_list[0].chunk_digest == datadict[0].chunk_list[0].chunk_digest { + assert_eq!(datadict[150].cluster_id, 1); + } else { + assert_eq!(datadict[150].cluster_id, 2); + } + assert_eq!(datadict[0].cluster_id, 1); + assert!(datadict[150].clustered); + assert!(datadict[150].visited); + assert_eq!(datadict[0].chunk_list.len(), 100); + Ok(()) + } + + #[test] + fn test_aggregate_chunk() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + let radius = 0.6; + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", (j + 1) / 100), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let mut data_point = Algorithm::::divide_by_image(&all_chunks)?; + let data_cluster = Algorithm::::dbsacn(&mut data_point, radius)?; + let datadict = Algorithm::::aggregate_chunk(&data_cluster)?; + assert_eq!(datadict.len(), 2); + Ok(()) + } + + #[test] + fn test_deduplicate_image() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + for i in 0..200 { + for j in 0..100 { + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", i), + version: format!("1.0.0{}", j / 10), + chunk_blob_id: format!("BLOB{}", j), + chunk_digest: format!("DIGEST{}", j + (i / 100) * 100), + chunk_compressed_size: 1, + chunk_uncompressed_size: 1, + chunk_compressed_offset: 1, + chunk_uncompressed_offset: 1, + }; + all_chunks.push(chunk); + } + } + assert_eq!(all_chunks.len(), 20000); + let datadict = Algorithm::::deduplicate_image(all_chunks)?; + for i in datadict.clone() { + for (_, b) in i { + if !b.is_empty() { + assert_eq!(b.len(), 70); + } + } + } + assert_eq!(datadict[0].len(), 2); + assert_eq!(datadict[0].values().len(), 2); + assert_eq!(datadict[1].len(), 0); + assert_eq!(datadict[1].values().len(), 0); + assert_eq!(datadict.len(), 7); + Ok(()) + } + + #[test] + fn test_deduplicate_version() -> Result<(), Box> { + let mut all_chunks: Vec = Vec::new(); + let mut chunkdict: Vec = Vec::new(); + for i in 0..200 { + let i64 = i as u64; + let chunk = ChunkdictChunkInfo { + image_reference: format!("REDIS{}", 0), + version: format!("1.0.0{}", (i + 1) / 20), + chunk_blob_id: format!("BLOB{}", i), + chunk_digest: format!("DIGEST{}", (i + 1) % 2), + chunk_compressed_size: i, + chunk_uncompressed_size: i * 2, + chunk_compressed_offset: i64 * 3, + chunk_uncompressed_offset: i64 * 4, + }; + all_chunks.push(chunk); + } + let (chunkdict_version, chunkdict_image) = + Algorithm::::deduplicate_version(&all_chunks)?; + for (_, dictionary) in chunkdict_version { + chunkdict.extend(dictionary); + } + + assert_eq!(chunkdict[0].image_reference, "REDIS0"); + assert_eq!(chunkdict[0].chunk_compressed_size, 21); + assert_eq!(chunkdict.len(), 2); + + for single_clustering in chunkdict_image { + for (_, cluster_dictionary) in single_clustering { + chunkdict.extend(cluster_dictionary); + } + } + assert_eq!(chunkdict.len(), 2); + Ok(()) + } } diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index de4e1a7e276..5fa3a3a3c10 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -13,7 +13,10 @@ extern crate log; extern crate serde_json; #[macro_use] extern crate lazy_static; -use crate::deduplicate::SqliteDatabase; +use crate::deduplicate::{ + check_bootstrap_versions_consistency, update_ctx_from_parent_bootstrap, Deduplicate, + SqliteDatabase, +}; use std::convert::TryFrom; use std::fs::{self, metadata, DirEntry, File, OpenOptions}; use std::os::unix::fs::FileTypeExt; @@ -28,9 +31,9 @@ use nydus::{get_build_time_info, setup_logging}; use nydus_api::{BuildTimeInfo, ConfigV2, LocalFsConfig}; use nydus_builder::{ parse_chunk_dict_arg, ArtifactStorage, BlobCacheGenerator, BlobCompactor, BlobManager, - BootstrapManager, BuildContext, BuildOutput, Builder, ConversionType, DirectoryBuilder, - Feature, Features, HashChunkDict, Merger, Prefetch, PrefetchPolicy, StargzBuilder, - TarballBuilder, WhiteoutSpec, + BootstrapManager, BuildContext, BuildOutput, Builder, ChunkdictBlobInfo, ChunkdictChunkInfo, + ConversionType, DirectoryBuilder, Feature, Features, Generator, HashChunkDict, Merger, + Prefetch, PrefetchPolicy, StargzBuilder, TarballBuilder, WhiteoutSpec, }; use nydus_rafs::metadata::{MergeError, RafsSuper, RafsSuperConfig, RafsVersion}; use nydus_storage::backend::localfs::LocalFs; @@ -45,7 +48,6 @@ use nydus_utils::{ }; use serde::{Deserialize, Serialize}; -use crate::deduplicate::Deduplicate; use crate::unpack::{OCIUnpacker, Unpacker}; use crate::validator::Validator; @@ -386,42 +388,45 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { App::new("chunkdict") .about("deduplicate RAFS filesystem metadata") .subcommand( - App::new("save") - .about("Save chunk info to a database") + App::new("generate") + .about("generate chunk dictionary based on database") + .arg( + Arg::new("database") + .long("database") + .help("Database connection address for assisting chunk dictionary generation, e.g. /path/database.db") + .default_value("sqlite:///home/runner/output/database.db") + .required(false), + ) .arg( Arg::new("bootstrap") - .short('B') - .long("bootstrap") - .help("File path of RAFS meta blob/bootstrap") - .required(false), + .long("bootstrap") + .short('B') + .help("Output path of nydus overlaid bootstrap"), + ) + .arg( + Arg::new("blob-dir") + .long("blob-dir") + .short('D') + .help("Directory path to save generated RAFS metadata and data blobs"), + ) + .arg(arg_prefetch_policy.clone()) + .arg(arg_output_json.clone()) + .arg(arg_config.clone()) + .arg( + Arg::new("SOURCE") + .help("bootstrap paths (allow one or more)") + .required(true) + .num_args(1..), + ) + .arg( + Arg::new("verbose") + .long("verbose") + .short('v') + .help("Output message in verbose mode") + .action(ArgAction::SetTrue) + .required(false), ) - .arg( - Arg::new("database") - .long("database") - .help("Database connection URI for assisting chunk dict generation, e.g. sqlite:///path/to/database.db") - .default_value("sqlite://:memory:") - .required(false), - ) - .arg( - Arg::new("blob-dir") - .long("blob-dir") - .short('D') - .conflicts_with("config") - .help( - "Directory for localfs storage backend, hosting data blobs and cache files", - ), - ) - .arg(arg_config.clone()) - .arg( - Arg::new("verbose") - .long("verbose") - .short('v') - .help("Output message in verbose mode") - .action(ArgAction::SetTrue) - .required(false), ) - .arg(arg_output_json.clone()) - ) ); let app = app.subcommand( @@ -775,7 +780,10 @@ fn main() -> Result<()> { Command::create(matches, &build_info) } else if let Some(matches) = cmd.subcommand_matches("chunkdict") { match matches.subcommand_name() { - Some("save") => Command::chunkdict_save(matches.subcommand_matches("save").unwrap()), + Some("generate") => Command::chunkdict_generate( + matches.subcommand_matches("generate").unwrap(), + &build_info, + ), _ => { println!("{}", usage); Ok(()) @@ -1194,32 +1202,150 @@ impl Command { OutputSerializer::dump(matches, build_output, build_info, compressor, version) } - fn chunkdict_save(matches: &ArgMatches) -> Result<()> { - let bootstrap_path = Self::get_bootstrap(matches)?; - let config = Self::get_configuration(matches)?; + fn chunkdict_generate(matches: &ArgMatches, build_info: &BuildTimeInfo) -> Result<()> { + let mut build_ctx = BuildContext { + prefetch: Self::get_prefetch(matches)?, + ..Default::default() + }; let db_url: &String = matches.get_one::("database").unwrap(); - debug!("db_url: {}", db_url); - // For backward compatibility with v2.1. - config - .internal - .set_blob_accessible(matches.get_one::("bootstrap").is_none()); + // Save chunk and blob info to database. + let source_bootstrap_paths: Vec = matches + .get_many::("SOURCE") + .map(|paths| paths.map(PathBuf::from).collect()) + .unwrap(); + + check_bootstrap_versions_consistency(&mut build_ctx, &source_bootstrap_paths)?; + update_ctx_from_parent_bootstrap(&mut build_ctx, &source_bootstrap_paths[0])?; + + for (_, bootstrap_path) in source_bootstrap_paths.iter().enumerate() { + let path_name = bootstrap_path.as_path(); + + // Extract the image name and version name from the bootstrap directory. + let bootstrap_dir = match path_name + .parent() + .and_then(|p| p.file_name().and_then(|f| f.to_str())) + { + Some(dir_str) => dir_str.to_string(), + None => bail!("Invalid Bootstrap directory name"), + }; + let full_image_name: Vec<&str> = bootstrap_dir.split(':').collect(); + let image_name = match full_image_name.get(full_image_name.len() - 2) { + Some(&second_last) => second_last.to_string(), + None => bail!( + "Invalid image name {:?}", + full_image_name.get(full_image_name.len() - 2) + ), + }; + let image_tag = match full_image_name.last() { + Some(&last) => last.to_string(), + None => bail!("Invalid version name {:?}", full_image_name.last()), + }; + // For backward compatibility with v2.1. + let config = Self::get_configuration(matches)?; + config + .internal + .set_blob_accessible(matches.get_one::("bootstrap").is_none()); + let db_strs: Vec<&str> = db_url.split("://").collect(); + if db_strs.len() != 2 || (!db_strs[1].starts_with('/') && !db_strs[1].starts_with(':')) + { + bail!("Invalid database URL: {}", db_url); + } + match db_strs[0] { + "sqlite" => { + let mut deduplicate: Deduplicate = + Deduplicate::::new(db_strs[1])?; + deduplicate.save_metadata(bootstrap_path, config, image_name, image_tag)? + } + _ => { + bail!("Unsupported database type: {}, please use a valid database URI, such as 'sqlite:///path/to/chunkdict.db'.", db_strs[0]) + } + }; + } + info!("Chunkdict metadata is saved at: {:?}", db_url); + // Connecting database and generating chunk dictionary by algorithm "exponential_smoothing". let db_strs: Vec<&str> = db_url.split("://").collect(); if db_strs.len() != 2 || (!db_strs[1].starts_with('/') && !db_strs[1].starts_with(':')) { bail!("Invalid database URL: {}", db_url); } + let algorithm = String::from("exponential_smoothing"); + let _source_bootstrap_paths: Vec = matches + .get_many::("SOURCE") + .map(|paths| paths.map(PathBuf::from).collect()) + .unwrap(); + + let (chunkdict_chunks, chunkdict_blobs, noise_points): ( + Vec, + Vec, + Vec, + ); match db_strs[0] { "sqlite" => { - let mut deduplicate: Deduplicate = - Deduplicate::::new(db_strs[1])?; - deduplicate.save_metadata(bootstrap_path, config)? + let mut algorithm: deduplicate::Algorithm = + deduplicate::Algorithm::::new(algorithm, db_strs[1])?; + let result = algorithm.chunkdict_generate()?; + chunkdict_chunks = result.0; + chunkdict_blobs = result.1; + noise_points = result.2; } _ => { - bail!("Unsupported database type: {}, please use a valid database URI, such as 'sqlite:///path/to/database.db'.", db_strs[0]) + bail!("Unsupported database type: {}, please use a valid database URI, such as 'sqlite:///path/to/chunkdict.db'.", db_strs[0]) } }; - info!("Chunkdict metadata is saved at: {:?}", db_url); + + // Output noise point in DBSCAN clustering algorithm. + info!( + "The length of chunkdict is {}", + Vec::::len(&chunkdict_chunks) + ); + info!("It is not recommended to use image deduplication"); + for image_name in noise_points { + info!("{}", image_name); + } + + // Dump chunkdict to bootstrap. + let chunkdict_bootstrap_path = Self::get_bootstrap_storage(matches)?; + let config = + Self::get_configuration(matches).context("failed to get configuration information")?; + config + .internal + .set_blob_accessible(matches.get_one::("config").is_some()); + build_ctx.configuration = config; + build_ctx.blob_storage = Some(chunkdict_bootstrap_path); + build_ctx + .blob_features + .insert(BlobFeatures::IS_CHUNKDICT_GENERATED); + build_ctx.is_chunkdict_generated = true; + + let mut blob_mgr = BlobManager::new(build_ctx.digester); + + let bootstrap_path = Self::get_bootstrap_storage(matches)?; + let mut bootstrap_mgr = BootstrapManager::new(Some(bootstrap_path), None); + + let output = Generator::generate( + &mut build_ctx, + &mut bootstrap_mgr, + &mut blob_mgr, + chunkdict_chunks, + chunkdict_blobs, + )?; + OutputSerializer::dump( + matches, + output, + build_info, + build_ctx.compressor, + build_ctx.fs_version, + ) + .unwrap(); + info!( + "Chunkdict metadata is saved at: {:?}", + matches + .get_one::("bootstrap") + .map(|s| s.as_str()) + .unwrap_or_default(), + ); + Ok(()) } diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index fd561c8a60c..e6b8c5b80da 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -266,7 +266,9 @@ impl FileCacheEntry { ); return Err(einval!(msg)); } - let meta = if blob_info.meta_ci_is_valid() { + let meta = if blob_info.meta_ci_is_valid() + || blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) + { let meta = FileCacheMeta::new( blob_file_path, blob_info.clone(), diff --git a/storage/src/device.rs b/storage/src/device.rs index cdefa5f77b8..6e6cbc15ed6 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -77,6 +77,8 @@ bitflags! { const CAP_TAR_TOC = 0x4000_0000; /// Rafs V5 image without extended blob table, this is an internal flag. const _V5_NO_EXT_BLOB_TABLE = 0x8000_0000; + /// Blob is generated with chunkdict. + const IS_CHUNKDICT_GENERATED = 0x0000_0200; } } @@ -172,6 +174,9 @@ pub struct BlobInfo { cipher_object: Arc, /// Cipher context for encryption. cipher_ctx: Option, + + /// is chunkdict generated + is_chunkdict_generated: bool, } impl BlobInfo { @@ -215,6 +220,8 @@ impl BlobInfo { meta_path: Arc::new(Mutex::new(String::new())), cipher_object: Default::default(), cipher_ctx: None, + + is_chunkdict_generated: false, }; blob_info.compute_features(); @@ -222,6 +229,16 @@ impl BlobInfo { blob_info } + /// Set the is_chunkdict_generated flag. + pub fn set_chunkdict_generated(&mut self, is_chunkdict_generated: bool) { + self.is_chunkdict_generated = is_chunkdict_generated; + } + + /// Get the is_chunkdict_generated flag. + pub fn is_chunkdict_generated(&self) -> bool { + self.is_chunkdict_generated + } + /// Get the blob index in the blob array. pub fn blob_index(&self) -> u32 { self.blob_index diff --git a/storage/src/meta/mod.rs b/storage/src/meta/mod.rs index ef892b56b97..9e9d40334c3 100644 --- a/storage/src/meta/mod.rs +++ b/storage/src/meta/mod.rs @@ -354,6 +354,15 @@ impl BlobCompressionContextHeader { ) } } + + /// Set flag indicating whether it's a blob for batch chunk or not. + pub fn set_is_chunkdict_generated(&mut self, enable: bool) { + if enable { + self.s_features |= BlobFeatures::IS_CHUNKDICT_GENERATED.bits(); + } else { + self.s_features &= !BlobFeatures::IS_CHUNKDICT_GENERATED.bits(); + } + } } /// Struct to manage blob chunk compression information, a wrapper over [BlobCompressionContext]. @@ -850,7 +859,8 @@ impl BlobCompressionContextInfo { if u32::from_le(header.s_magic) != BLOB_CCT_MAGIC || u32::from_le(header.s_magic2) != BLOB_CCT_MAGIC - || u32::from_le(header.s_ci_entries) != blob_info.chunk_count() + || (!blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) + && u32::from_le(header.s_ci_entries) != blob_info.chunk_count()) || u32::from_le(header.s_ci_compressor) != blob_info.meta_ci_compressor() as u32 || u64::from_le(header.s_ci_offset) != blob_info.meta_ci_offset() || u64::from_le(header.s_ci_compressed_size) != blob_info.meta_ci_compressed_size() @@ -886,8 +896,9 @@ impl BlobCompressionContextInfo { || blob_info.has_feature(BlobFeatures::BATCH) { return Err(einval!("invalid feature flags in blob meta header!")); - } else if info_size != (chunk_count as usize) * (size_of::()) - || (aligned_info_size as u64) > BLOB_CCT_V1_MAX_SIZE + } else if !blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED) + && (info_size != (chunk_count as usize) * (size_of::()) + || (aligned_info_size as u64) > BLOB_CCT_V1_MAX_SIZE) { return Err(einval!("uncompressed size in blob meta header is invalid!")); } @@ -1776,7 +1787,10 @@ impl BlobMetaChunkArray { ) -> Result<&'a T> { assert!(index < chunk_info_array.len()); let entry = &chunk_info_array[index]; - entry.validate(state)?; + // If the chunk belongs to a chunkdict, skip the validation check. + if state.blob_features & BlobFeatures::IS_CHUNKDICT_GENERATED.bits() == 0 { + entry.validate(state)?; + } Ok(entry) } } @@ -1993,6 +2007,9 @@ pub fn format_blob_features(features: BlobFeatures) -> String { if features.contains(BlobFeatures::ENCRYPTED) { output += "encrypted "; } + if features.contains(BlobFeatures::IS_CHUNKDICT_GENERATED) { + output += "is-chunkdict-generated "; + } output.trim_end().to_string() } diff --git a/utils/src/digest.rs b/utils/src/digest.rs index 26a6997a530..12e74486f3b 100644 --- a/utils/src/digest.rs +++ b/utils/src/digest.rs @@ -176,6 +176,18 @@ impl RafsDigest { } } + /// According to the format of sha256. + pub fn from_string(input: &str) -> Self { + let mut digest = RafsDigest::default(); + + for (i, byte) in input.as_bytes().chunks(2).enumerate() { + let hex_str = std::str::from_utf8(byte).unwrap(); + digest.data[i] = u8::from_str_radix(hex_str, 16).unwrap(); + } + + digest + } + pub fn hasher(algorithm: Algorithm) -> RafsDigestHasher { match algorithm { Algorithm::Blake3 => RafsDigestHasher::Blake3(Box::new(blake3::Hasher::new())),