Skip to content

Commit

Permalink
moss: parallelize 'state verify'
Browse files Browse the repository at this point in the history
Use rayon to iterate over entries in parallel and use a Arc<Rwlock<>> to
update the vec of issues atomically.

Resolves #390.
  • Loading branch information
joebonrichie committed Feb 10, 2025
1 parent cb1207e commit 4aea0c0
Showing 1 changed file with 69 additions and 50 deletions.
119 changes: 69 additions & 50 deletions moss/src/client/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
//
// SPDX-License-Identifier: MPL-2.0

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::{collections::BTreeSet, fmt, io, path::PathBuf};

use itertools::Itertools;

use fs_err as fs;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use stone::{payload::layout, write::digest};
use tui::{
dialoguer::{theme::ColorfulTheme, Confirm},
Expand Down Expand Up @@ -37,9 +40,6 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
})
.into_group_map();

let mut issues = vec![];
let mut hasher = digest::Hasher::new();

let pb = ProgressBar::new(unique_assets.len() as u64)
.with_message("Verifying")
.with_style(
Expand All @@ -49,11 +49,15 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
);
pb.tick();

// For each asset, ensure it exists in the content store and isn't corrupt (hash is correct)
for (hash, meta) in unique_assets
let issues_arcrw = Arc::new(RwLock::new(Vec::new()));

let sorted_unique_assets = unique_assets
.into_iter()
.sorted_by_key(|(key, _)| format!("{key:0>32}"))
{
.collect::<Vec<_>>();

// For each asset, ensure it exists in the content store and isn't corrupt (hash is correct)
sorted_unique_assets.par_iter().for_each(|(hash, meta)| {
let display_hash = format!("{hash:0>32}");

let path = cache::asset_path(&client.installation, &hash);

Check warning on line 63 in moss/src/client/verify.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] moss/src/client/verify.rs#L63

warning: this expression creates a reference which is immediately dereferenced by the compiler --> moss/src/client/verify.rs:63:60 | 63 | let path = cache::asset_path(&client.installation, &hash); | ^^^^^ help: change this to: `hash` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default
Raw output
moss/src/client/verify.rs:63:60:w:warning: this expression creates a reference which is immediately dereferenced by the compiler
  --> moss/src/client/verify.rs:63:60
   |
63 |         let path = cache::asset_path(&client.installation, &hash);
   |                                                            ^^^^^ help: change this to: `hash`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
   = note: `#[warn(clippy::needless_borrow)]` on by default


__END__
Expand All @@ -67,43 +71,55 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
if verbose {
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "×".yellow()));
}
issues.push(Issue::MissingAsset {
hash: display_hash,
files,
packages: meta.into_iter().map(|(package, _)| package).collect(),
});
continue;
{
let mut lock = issues_arcrw.write().unwrap();
lock.push(Issue::MissingAsset {
hash: display_hash,
files,
packages: meta.into_iter().map(|(package, _)| package).collect(),

Check warning on line 79 in moss/src/client/verify.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] moss/src/client/verify.rs#L79

warning: this `.into_iter()` call is equivalent to `.iter()` and will not consume the `Vec` --> moss/src/client/verify.rs:79:36 | 79 | packages: meta.into_iter().map(|(package, _)| package).collect(), | ^^^^^^^^^ help: call directly: `iter` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#into_iter_on_ref = note: `#[warn(clippy::into_iter_on_ref)]` on by default
Raw output
moss/src/client/verify.rs:79:36:w:warning: this `.into_iter()` call is equivalent to `.iter()` and will not consume the `Vec`
  --> moss/src/client/verify.rs:79:36
   |
79 |                     packages: meta.into_iter().map(|(package, _)| package).collect(),
   |                                    ^^^^^^^^^ help: call directly: `iter`
   |
   = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#into_iter_on_ref
   = note: `#[warn(clippy::into_iter_on_ref)]` on by default


__END__
});
}
return;
}

let mut hasher = digest::Hasher::new();
hasher.reset();

let mut digest_writer = digest::Writer::new(io::sink(), &mut hasher);
let mut file = fs::File::open(&path)?;
let res = fs::File::open(&path);

if res.is_err() {
return;
}

let mut file = res.unwrap();

// Copy bytes to null sink so we don't
// explode memory
io::copy(&mut file, &mut digest_writer)?;
// Copy bytes to null sink so we don't explode memory
io::copy(&mut file, &mut digest_writer).ok().unwrap();

let verified_hash = format!("{:02x}", hasher.digest128());

if verified_hash != hash {
if &verified_hash != hash {
pb.inc(1);
if verbose {
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "×".yellow()));
}
issues.push(Issue::CorruptAsset {
hash: display_hash,
files,
packages: meta.into_iter().map(|(package, _)| package).collect(),
});
continue;
{
let mut lock = issues_arcrw.write().unwrap();
lock.push(Issue::CorruptAsset {
hash: display_hash,
files,
packages: meta.into_iter().map(|(package, _)| package).collect(),

Check warning on line 112 in moss/src/client/verify.rs

View workflow job for this annotation

GitHub Actions / clippy

[clippy] moss/src/client/verify.rs#L112

warning: this `.into_iter()` call is equivalent to `.iter()` and will not consume the `Vec` --> moss/src/client/verify.rs:112:36 | 112 | packages: meta.into_iter().map(|(package, _)| package).collect(), | ^^^^^^^^^ help: call directly: `iter` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#into_iter_on_ref
Raw output
moss/src/client/verify.rs:112:36:w:warning: this `.into_iter()` call is equivalent to `.iter()` and will not consume the `Vec`
   --> moss/src/client/verify.rs:112:36
    |
112 |                     packages: meta.into_iter().map(|(package, _)| package).collect(),
    |                                    ^^^^^^^^^ help: call directly: `iter`
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#into_iter_on_ref


__END__
});
}
return;
}

pb.inc(1);
if verbose {
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "»".green()));
}
}
});

// Get all states
let states = client.state_db.all()?;
Expand All @@ -114,13 +130,11 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
println!("Verifying states");
});

// Check the VFS of each state exists properly on the FS
for state in &states {
states.par_iter().for_each(|state| {
pb.set_message(format!("Verifying state #{}", state.id));

let is_active = client.installation.active_state == Some(state.id);

let vfs = client.vfs(state.selections.iter().map(|s| &s.package))?;
let vfs = client.vfs(state.selections.iter().map(|s| &s.package)).unwrap();

let base = if is_active {
client.installation.root.join("usr")
Expand All @@ -130,7 +144,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E

let files = vfs.iter().collect::<Vec<_>>();

let mut num_issues = 0;
let counter = Arc::new(AtomicUsize::new(0));

for file in files {
let path = base.join(file.path().strip_prefix("/usr/").unwrap_or_default());
Expand All @@ -144,33 +158,38 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
Ok(true) => {}
Ok(false) if path.is_symlink() => {}
_ => {
num_issues += 1;
issues.push(Issue::MissingVFSPath { path, state: state.id });
counter.fetch_add(1, Ordering::Relaxed);
{
let mut lock = issues_arcrw.write().unwrap();
lock.push(Issue::MissingVFSPath { path, state: state.id });
}
}
}
}

pb.inc(1);
if verbose {
let mark = if num_issues > 0 { "×".yellow() } else { "»".green() };
let mark = if counter.load(Ordering::Relaxed) > 0 {
"×".yellow()
} else {
"»".green()
};
pb.suspend(|| println!(" {mark} state #{}", state.id));
}
}
});

pb.finish_and_clear();

if issues.is_empty() {
let lock = issues_arcrw.write().unwrap();

if lock.is_empty() {
println!("No issues found");
return Ok(());
}

println!(
"Found {} issue{}",
issues.len(),
if issues.len() == 1 { "" } else { "s" }
);
println!("Found {} issue{}", lock.len(), if lock.len() == 1 { "" } else { "s" });

for issue in &issues {
for issue in lock.iter() {
println!(" {} {issue}", "×".yellow());
}

Expand All @@ -187,15 +206,15 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
}

// Calculate and resolve the unique set of packages with asset issues
let issue_packages = issues
let issue_packages = lock
.iter()
.filter_map(Issue::packages)
.flatten()
.collect::<BTreeSet<_>>()
.into_iter()
.map(|id| {
client.install_db.get(id).map(|meta| Package {
id: id.clone(),
id: id.to_owned().to_owned(),
meta,
flags: package::Flags::default(),
})
Expand All @@ -205,7 +224,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
// We had some corrupt or missing assets, let's resolve that!
if !issue_packages.is_empty() {
// Remove all corrupt assets
for corrupt_hash in issues.iter().filter_map(Issue::corrupt_hash) {
for corrupt_hash in lock.iter().filter_map(Issue::corrupt_hash) {
let path = cache::asset_path(&client.installation, corrupt_hash);
fs::remove_file(&path)?;
}
Expand All @@ -227,7 +246,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
.any(|s| issue_packages.iter().any(|p| p.id == s.package))
.then_some(&state.id)
})
.chain(issues.iter().filter_map(Issue::state))
.chain(lock.iter().filter_map(Issue::state))
.collect::<BTreeSet<_>>();

println!("Reblitting affected states");
Expand Down Expand Up @@ -277,24 +296,24 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
}

#[derive(Debug)]
enum Issue {
enum Issue<'a> {
CorruptAsset {
hash: String,
files: BTreeSet<String>,
packages: BTreeSet<package::Id>,
packages: BTreeSet<&'a package::Id>,
},
MissingAsset {
hash: String,
files: BTreeSet<String>,
packages: BTreeSet<package::Id>,
packages: BTreeSet<&'a package::Id>,
},
MissingVFSPath {
path: PathBuf,
state: state::Id,
},
}

impl Issue {
impl Issue<'_> {
fn corrupt_hash(&self) -> Option<&str> {
match self {
Issue::CorruptAsset { hash, .. } => Some(hash),
Expand All @@ -303,7 +322,7 @@ impl Issue {
}
}

fn packages(&self) -> Option<&BTreeSet<package::Id>> {
fn packages(&self) -> Option<&BTreeSet<&package::Id>> {
match self {
Issue::CorruptAsset { packages, .. } | Issue::MissingAsset { packages, .. } => Some(packages),
Issue::MissingVFSPath { .. } => None,
Expand All @@ -318,7 +337,7 @@ impl Issue {
}
}

impl fmt::Display for Issue {
impl fmt::Display for Issue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Issue::CorruptAsset { hash, files, .. } => write!(f, "Corrupt asset {hash} - {files:?}"),
Expand Down

0 comments on commit 4aea0c0

Please sign in to comment.