Skip to content

Commit

Permalink
feat(storage): improve storage memory (web-infra-dev#8847)
Browse files Browse the repository at this point in the history
* feat: add storage generation fields

* feat: improve storage memory usage

* feat: improve storage memory usage

* feat: improve storage memory usage

* feat: improve storage memory usage

* feat: improve storage memory usage

* feat: improve storage memory usage
  • Loading branch information
LingyuCoder authored Dec 27, 2024
1 parent fdb2868 commit 7939321
Show file tree
Hide file tree
Showing 22 changed files with 861 additions and 249 deletions.
2 changes: 2 additions & 0 deletions crates/rspack_core/src/cache/persistent/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub fn create_storage(
pack_size: 500 * 1024,
expire: 7 * 24 * 60 * 60 * 1000,
fs: Arc::new(BridgeFileSystem(fs)),
fresh_generation: Some(1),
release_generation: Some(2),
version,
};
Arc::new(PackStorage::new(option))
Expand Down
3 changes: 3 additions & 0 deletions crates/rspack_storage/src/pack/data/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct PackFileMeta {
pub name: String,
pub size: usize,
pub wrote: bool,
pub generation: usize,
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct ScopeMeta {
pub path: Utf8PathBuf,
pub bucket_size: usize,
pub pack_size: usize,
pub generation: usize,
pub packs: Vec<Vec<PackFileMeta>>,
}

Expand All @@ -65,6 +67,7 @@ impl ScopeMeta {
path: Self::get_path(dir),
bucket_size: options.bucket_size,
pack_size: options.pack_size,
generation: 0,
packs,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/rspack_storage/src/pack/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod scope;

pub use meta::{current_time, PackFileMeta, RootMeta, RootMetaFrom, ScopeMeta};
pub use options::{PackOptions, RootOptions};
pub use pack::{Pack, PackContents, PackKeys};
pub use pack::{Pack, PackContents, PackGenerations, PackKeys};
pub use scope::{PackScope, RootMetaState};
30 changes: 26 additions & 4 deletions crates/rspack_storage/src/pack/data/pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{ItemKey, ItemValue};

pub type PackKeys = Vec<Arc<ItemKey>>;
pub type PackContents = Vec<Arc<ItemValue>>;
pub type PackGenerations = Vec<usize>;

#[derive(Debug, Default)]
pub enum PackKeysState {
Expand Down Expand Up @@ -46,6 +47,7 @@ pub enum PackContentsState {
#[default]
Pending,
Value(PackContents),
Released,
}

impl PackContentsState {
Expand All @@ -59,12 +61,14 @@ impl PackContentsState {
match self {
PackContentsState::Value(v) => Some(v),
PackContentsState::Pending => None,
PackContentsState::Released => None,
}
}
pub fn expect_value(&self) -> &PackContents {
match self {
PackContentsState::Value(v) => v,
PackContentsState::Pending => panic!("pack content is not ready"),
PackContentsState::Released => panic!("pack content has been released"),
}
}
pub fn take_value(&mut self) -> Option<PackContents> {
Expand All @@ -73,13 +77,20 @@ impl PackContentsState {
_ => None,
}
}
pub fn release(&mut self) {
*self = PackContentsState::Released;
}
pub fn is_released(&self) -> bool {
matches!(self, Self::Released)
}
}

#[derive(Debug)]
pub struct Pack {
pub path: Utf8PathBuf,
pub keys: PackKeysState,
pub contents: PackContentsState,
pub generations: PackGenerations,
}

impl Pack {
Expand All @@ -88,20 +99,31 @@ impl Pack {
path,
keys: Default::default(),
contents: Default::default(),
generations: Default::default(),
}
}

pub fn loaded(&self) -> bool {
matches!(self.keys, PackKeysState::Value(_))
&& matches!(self.contents, PackContentsState::Value(_))
&& (matches!(self.contents, PackContentsState::Value(_))
|| matches!(self.contents, PackContentsState::Released))
}

pub fn size(&self) -> usize {
self
let key_size = self
.keys
.expect_value()
.iter()
.chain(self.contents.expect_value().iter())
.fold(0_usize, |acc, item| acc + item.len())
.fold(0_usize, |acc, item| acc + item.len());
let content_size = self
.contents
.expect_value()
.iter()
.fold(0_usize, |acc, item| acc + item.len());
let generation_size = self
.generations
.iter()
.fold(0_usize, |acc, item| acc + item.to_string().len());
key_size + content_size + generation_size
}
}
77 changes: 44 additions & 33 deletions crates/rspack_storage/src/pack/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ mod queue;
use std::sync::Arc;

use futures::future::join_all;
use itertools::Itertools;
use pollster::block_on;
use queue::TaskQueue;
use rayon::iter::{ParallelBridge, ParallelIterator};
use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, Mutex};
Expand Down Expand Up @@ -57,7 +55,9 @@ impl ScopeManager {
updates,
pack_options.clone(),
strategy.as_ref(),
) {
)
.await
{
Ok(_) => {
*root_meta.lock().await = RootMetaState::Value(Some(RootMeta::new(
scopes_guard
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ScopeManager {
}
}

fn update_scopes(
async fn update_scopes(
scopes: &mut ScopeMap,
mut updates: ScopeUpdates,
pack_options: Arc<PackOptions>,
Expand All @@ -220,22 +220,25 @@ fn update_scopes(
});
}

scopes
.iter_mut()
.filter_map(|(name, scope)| {
updates
.remove(name.to_string().as_str())
.and_then(|scope_update| {
if scope_update.is_empty() {
None
} else {
Some((scope, scope_update))
}
})
})
.par_bridge()
.map(|(scope, scope_update)| strategy.update_scope(scope, scope_update))
.collect::<Result<Vec<_>>>()?;
join_all(
scopes
.iter_mut()
.filter_map(|(name, scope)| {
updates
.remove(name.to_string().as_str())
.and_then(|scope_update| {
if scope_update.is_empty() {
None
} else {
Some((scope, scope_update))
}
})
})
.map(|(scope, scope_update)| strategy.update_scope(scope, scope_update)),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

Ok(())
}
Expand All @@ -250,21 +253,25 @@ async fn save_scopes(

strategy.before_all(&mut scopes).await?;

let changed = join_all(
join_all(
scopes
.values_mut()
.map(|scope| async move {
let mut res = WriteScopeResult::default();
if scope.loaded() {
res.extend(strategy.write_packs(scope).await?);
res.extend(strategy.write_meta(scope).await?);
}
Ok(res)
})
.collect_vec(),
.map(|scope| strategy.optimize_scope(scope)),
)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

let changed = join_all(scopes.values_mut().map(|scope| async move {
let mut res = WriteScopeResult::default();
if scope.loaded() {
res.extend(strategy.write_packs(scope).await?);
res.extend(strategy.write_meta(scope).await?);
}
Ok(res)
}))
.await
.into_iter()
.collect::<Result<Vec<WriteScopeResult>>>()?
.into_iter()
.fold(WriteScopeResult::default(), |mut acc, res| {
Expand All @@ -275,9 +282,7 @@ async fn save_scopes(
strategy.write_root_meta(root_meta).await?;
strategy.merge_changed(changed).await?;
strategy.after_all(&mut scopes).await?;
strategy
.clean_unused(root_meta, &scopes, root_options)
.await?;
strategy.clean(root_meta, &scopes, root_options).await?;

Ok(scopes.into_iter().collect())
}
Expand Down Expand Up @@ -341,6 +346,8 @@ mod tests {
root.to_path_buf(),
temp.to_path_buf(),
fs.clone(),
Some(1),
Some(2),
));
let manager = ScopeManager::new(root_options, pack_options, strategy);

Expand Down Expand Up @@ -395,6 +402,8 @@ mod tests {
root.to_path_buf(),
temp.to_path_buf(),
fs.clone(),
Some(1),
Some(2),
));
let manager = ScopeManager::new(root_options, pack_options, strategy);

Expand Down Expand Up @@ -478,6 +487,8 @@ mod tests {
root.to_path_buf(),
temp.to_path_buf(),
fs.clone(),
Some(1),
Some(2),
));
let manager = ScopeManager::new(root_options.clone(), pack_options.clone(), strategy.clone());
// should report error when invalid failed
Expand Down
6 changes: 6 additions & 0 deletions crates/rspack_storage/src/pack/manager/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ impl Debug for TaskQueue {
}
}

impl Default for TaskQueue {
fn default() -> Self {
Self::new()
}
}

impl TaskQueue {
pub fn new() -> Self {
TaskQueue(LazyLock::new(|| {
Expand Down
8 changes: 6 additions & 2 deletions crates/rspack_storage/src/pack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::{error::Result, FileSystem, ItemKey, ItemPairs, ItemValue, Storage};
pub type ScopeUpdates = HashMap<&'static str, ScopeUpdate>;
#[derive(Debug)]
pub struct PackStorage {
manager: ScopeManager,
updates: Mutex<ScopeUpdates>,
pub manager: ScopeManager,
pub updates: Mutex<ScopeUpdates>,
}

pub struct PackStorageOptions {
Expand All @@ -32,6 +32,8 @@ pub struct PackStorageOptions {
pub expire: u64,
pub version: String,
pub clean: bool,
pub fresh_generation: Option<usize>,
pub release_generation: Option<usize>,
}

impl PackStorage {
Expand All @@ -51,6 +53,8 @@ impl PackStorage {
options.root.join(&options.version).assert_utf8(),
options.temp_root.join(&options.version).assert_utf8(),
options.fs,
options.fresh_generation,
options.release_generation,
)),
),
updates: Default::default(),
Expand Down
27 changes: 21 additions & 6 deletions crates/rspack_storage/src/pack/strategy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
pub use split::SplitPackStrategy;

use super::data::{
Pack, PackContents, PackFileMeta, PackKeys, PackOptions, PackScope, RootMeta, RootOptions,
Pack, PackContents, PackFileMeta, PackGenerations, PackKeys, PackOptions, PackScope, RootMeta,
RootOptions,
};
use crate::{
error::{Result, ValidateResult},
Expand Down Expand Up @@ -37,29 +38,42 @@ pub trait RootStrategy {
async fn read_root_meta(&self) -> Result<Option<RootMeta>>;
async fn write_root_meta(&self, root_meta: &RootMeta) -> Result<()>;
async fn validate_root(&self, root_meta: &RootMeta) -> Result<ValidateResult>;
async fn clean_unused(
async fn clean(
&self,
root_meta: &RootMeta,
scopes: &HashMap<String, PackScope>,
root_options: &RootOptions,
) -> Result<()>;
}

#[derive(Debug, Default)]
pub struct PackMainContents {
pub contents: PackContents,
pub generations: PackGenerations,
}

#[async_trait]
pub trait PackReadStrategy {
async fn read_pack_keys(&self, path: &Utf8Path) -> Result<Option<PackKeys>>;
async fn read_pack_contents(&self, path: &Utf8Path) -> Result<Option<PackContents>>;
async fn read_pack_contents(&self, path: &Utf8Path) -> Result<Option<PackMainContents>>;
}

#[async_trait]
pub trait PackWriteStrategy {
fn update_packs(
async fn update_packs(
&self,
dir: Utf8PathBuf,
generation: usize,
options: &PackOptions,
packs: HashMap<PackFileMeta, Pack>,
updates: HashMap<ItemKey, Option<ItemValue>>,
) -> UpdatePacksResult;
) -> Result<UpdatePacksResult>;
async fn optimize_packs(
&self,
dir: Utf8PathBuf,
options: &PackOptions,
packs: Vec<(PackFileMeta, Pack)>,
) -> Result<UpdatePacksResult>;
async fn write_pack(&self, pack: &Pack) -> Result<()>;
}

Expand Down Expand Up @@ -94,8 +108,9 @@ impl WriteScopeResult {
pub type ScopeUpdate = HashMap<ItemKey, Option<ItemValue>>;
#[async_trait]
pub trait ScopeWriteStrategy {
fn update_scope(&self, scope: &mut PackScope, updates: ScopeUpdate) -> Result<()>;
async fn update_scope(&self, scope: &mut PackScope, updates: ScopeUpdate) -> Result<()>;
async fn before_all(&self, scopes: &mut HashMap<String, PackScope>) -> Result<()>;
async fn optimize_scope(&self, scope: &mut PackScope) -> Result<()>;
async fn write_packs(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn write_meta(&self, scope: &mut PackScope) -> Result<WriteScopeResult>;
async fn merge_changed(&self, changed: WriteScopeResult) -> Result<()>;
Expand Down
Loading

0 comments on commit 7939321

Please sign in to comment.