Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moss: parallelize 'state verify' #421

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 70 additions & 51 deletions moss/src/client/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
//
// SPDX-License-Identifier: MPL-2.0

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::{collections::BTreeSet, fmt, io, path::PathBuf};

use itertools::Itertools;

use fs_err as fs;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use stone::{payload::layout, write::digest};
use tui::{
dialoguer::{theme::ColorfulTheme, Confirm},
Expand Down Expand Up @@ -37,9 +40,6 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
})
.into_group_map();

let mut issues = vec![];
let mut hasher = digest::Hasher::new();

let pb = ProgressBar::new(unique_assets.len() as u64)
.with_message("Verifying")
.with_style(
Expand All @@ -49,14 +49,18 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
);
pb.tick();

// For each asset, ensure it exists in the content store and isn't corrupt (hash is correct)
for (hash, meta) in unique_assets
let issues_arcrw = Arc::new(RwLock::new(Vec::new()));

let sorted_unique_assets = unique_assets
.into_iter()
.sorted_by_key(|(key, _)| format!("{key:0>32}"))
{
.collect::<Vec<_>>();

// For each asset, ensure it exists in the content store and isn't corrupt (hash is correct)
sorted_unique_assets.par_iter().for_each(|(hash, meta)| {
let display_hash = format!("{hash:0>32}");

let path = cache::asset_path(&client.installation, &hash);
let path = cache::asset_path(&client.installation, hash);

let files = meta.iter().map(|(_, file)| file).cloned().collect::<BTreeSet<_>>();

Expand All @@ -67,43 +71,55 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
if verbose {
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "×".yellow()));
}
issues.push(Issue::MissingAsset {
hash: display_hash,
files,
packages: meta.into_iter().map(|(package, _)| package).collect(),
});
continue;
{
let mut lock = issues_arcrw.write().unwrap();
lock.push(Issue::MissingAsset {
hash: display_hash,
files,
packages: meta.iter().map(|(package, _)| package).collect(),
});
}
return;
}

let mut hasher = digest::Hasher::new();
hasher.reset();

let mut digest_writer = digest::Writer::new(io::sink(), &mut hasher);
let mut file = fs::File::open(&path)?;
let res = fs::File::open(&path);

if res.is_err() {
return;
}

let mut file = res.unwrap();

// Copy bytes to null sink so we don't
// explode memory
io::copy(&mut file, &mut digest_writer)?;
// Copy bytes to null sink so we don't explode memory
io::copy(&mut file, &mut digest_writer).unwrap_or_default();

let verified_hash = format!("{:02x}", hasher.digest128());

if verified_hash != hash {
if &verified_hash != hash {
pb.inc(1);
if verbose {
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "×".yellow()));
}
issues.push(Issue::CorruptAsset {
hash: display_hash,
files,
packages: meta.into_iter().map(|(package, _)| package).collect(),
});
continue;
{
let mut lock = issues_arcrw.write().unwrap();
lock.push(Issue::CorruptAsset {
hash: display_hash,
files,
packages: meta.iter().map(|(package, _)| package).collect(),
});
}
return;
}

pb.inc(1);
if verbose {
pb.suspend(|| println!(" {} {display_hash} - {files:?}", "»".green()));
}
}
});

// Get all states
let states = client.state_db.all()?;
Expand All @@ -114,13 +130,11 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
println!("Verifying states");
});

// Check the VFS of each state exists properly on the FS
for state in &states {
states.par_iter().for_each(|state| {
pb.set_message(format!("Verifying state #{}", state.id));

let is_active = client.installation.active_state == Some(state.id);

let vfs = client.vfs(state.selections.iter().map(|s| &s.package))?;
let vfs = client.vfs(state.selections.iter().map(|s| &s.package)).unwrap();

let base = if is_active {
client.installation.root.join("usr")
Expand All @@ -130,7 +144,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E

let files = vfs.iter().collect::<Vec<_>>();

let mut num_issues = 0;
let counter = Arc::new(AtomicUsize::new(0));

for file in files {
let path = base.join(file.path().strip_prefix("/usr/").unwrap_or_default());
Expand All @@ -144,33 +158,38 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
Ok(true) => {}
Ok(false) if path.is_symlink() => {}
_ => {
num_issues += 1;
issues.push(Issue::MissingVFSPath { path, state: state.id });
counter.fetch_add(1, Ordering::Relaxed);
{
let mut lock = issues_arcrw.write().unwrap();
lock.push(Issue::MissingVFSPath { path, state: state.id });
}
}
}
}

pb.inc(1);
if verbose {
let mark = if num_issues > 0 { "×".yellow() } else { "»".green() };
let mark = if counter.load(Ordering::Relaxed) > 0 {
"×".yellow()
} else {
"»".green()
};
pb.suspend(|| println!(" {mark} state #{}", state.id));
}
}
});

pb.finish_and_clear();

if issues.is_empty() {
let lock = issues_arcrw.write().unwrap();

if lock.is_empty() {
println!("No issues found");
return Ok(());
}

println!(
"Found {} issue{}",
issues.len(),
if issues.len() == 1 { "" } else { "s" }
);
println!("Found {} issue{}", lock.len(), if lock.len() == 1 { "" } else { "s" });

for issue in &issues {
for issue in lock.iter() {
println!(" {} {issue}", "×".yellow());
}

Expand All @@ -187,15 +206,15 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
}

// Calculate and resolve the unique set of packages with asset issues
let issue_packages = issues
let issue_packages = lock
.iter()
.filter_map(Issue::packages)
.flatten()
.collect::<BTreeSet<_>>()
.into_iter()
.map(|id| {
client.install_db.get(id).map(|meta| Package {
id: id.clone(),
id: id.to_owned().to_owned(),
meta,
flags: package::Flags::default(),
})
Expand All @@ -205,7 +224,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
// We had some corrupt or missing assets, let's resolve that!
if !issue_packages.is_empty() {
// Remove all corrupt assets
for corrupt_hash in issues.iter().filter_map(Issue::corrupt_hash) {
for corrupt_hash in lock.iter().filter_map(Issue::corrupt_hash) {
let path = cache::asset_path(&client.installation, corrupt_hash);
fs::remove_file(&path)?;
}
Expand All @@ -227,7 +246,7 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
.any(|s| issue_packages.iter().any(|p| p.id == s.package))
.then_some(&state.id)
})
.chain(issues.iter().filter_map(Issue::state))
.chain(lock.iter().filter_map(Issue::state))
.collect::<BTreeSet<_>>();

println!("Reblitting affected states");
Expand Down Expand Up @@ -277,24 +296,24 @@ pub fn verify(client: &Client, yes: bool, verbose: bool) -> Result<(), client::E
}

#[derive(Debug)]
enum Issue {
enum Issue<'a> {
CorruptAsset {
hash: String,
files: BTreeSet<String>,
packages: BTreeSet<package::Id>,
packages: BTreeSet<&'a package::Id>,
},
MissingAsset {
hash: String,
files: BTreeSet<String>,
packages: BTreeSet<package::Id>,
packages: BTreeSet<&'a package::Id>,
},
MissingVFSPath {
path: PathBuf,
state: state::Id,
},
}

impl Issue {
impl Issue<'_> {
fn corrupt_hash(&self) -> Option<&str> {
match self {
Issue::CorruptAsset { hash, .. } => Some(hash),
Expand All @@ -303,7 +322,7 @@ impl Issue {
}
}

fn packages(&self) -> Option<&BTreeSet<package::Id>> {
fn packages(&self) -> Option<&BTreeSet<&package::Id>> {
match self {
Issue::CorruptAsset { packages, .. } | Issue::MissingAsset { packages, .. } => Some(packages),
Issue::MissingVFSPath { .. } => None,
Expand All @@ -318,7 +337,7 @@ impl Issue {
}
}

impl fmt::Display for Issue {
impl fmt::Display for Issue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Issue::CorruptAsset { hash, files, .. } => write!(f, "Corrupt asset {hash} - {files:?}"),
Expand Down