From 3c22e0d86035d7b60db0d772909a6c91da80aaca Mon Sep 17 00:00:00 2001 From: Juan Jose Nicola Date: Thu, 30 Jan 2025 08:57:28 -0300 Subject: [PATCH] Fix: poisoned lock Replace std::sync::RwLock with parking_lot::RwLock. For some reason the lock is poisoned after a panic. This fix the crash of openvasd and running scans can continue running and new scans can be started. However, it doesn't fix the original issue, still unknown. --- rust/Cargo.lock | 1 + rust/Cargo.toml | 1 + rust/src/openvasd/storage/inmemory.rs | 36 +++++++++++++-------------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b0428706f..c866a4920 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -3472,6 +3472,7 @@ dependencies = [ "num_cpus", "once_cell", "openssl", + "parking_lot", "pbkdf2 0.12.2", "pcap", "pkcs8", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 39ed3f7cb..9f30f382c 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -96,6 +96,7 @@ nasl-c-lib = { path = "crates/nasl-c-lib", optional = true } openssl = { version = "0.10.66", features = ["vendored"] } blowfish = "0.9.1" rc4 = "0.1.0" +parking_lot = "0.12.3" [workspace] resolver = "2" diff --git a/rust/src/openvasd/storage/inmemory.rs b/rust/src/openvasd/storage/inmemory.rs index 18513fb73..072c324a5 100644 --- a/rust/src/openvasd/storage/inmemory.rs +++ b/rust/src/openvasd/storage/inmemory.rs @@ -2,13 +2,13 @@ // // SPDX-License-Identifier: GPL-2.0-or-later WITH x11vnc-openssl-exception -use std::{collections::HashSet, sync::RwLock}; - use super::*; +use parking_lot::RwLock; use scannerlib::{ models, notus, storage::{item::Nvt, ContextKey, DefaultDispatcher, StorageError}, }; +use std::collections::HashSet; use tokio::task::JoinSet; #[derive(Clone, Debug, Default)] @@ -95,7 +95,7 @@ where from: Option, to: Option, ) -> Result> + Send>, Error> { - let scans = self.scans.read().unwrap(); + let scans = self.scans.read(); let progress = scans.get(id).ok_or(Error::NotFound)?; Ok(Self::decrypt_results_sync( self.crypter.clone(), @@ -128,7 +128,7 @@ where scan_id: String, client_id: ClientHash, ) -> Result<(), Error> { - let mut ids = self.client_id.write().unwrap(); + let mut ids = self.client_id.write(); ids.push((client_id, scan_id)); Ok(()) @@ -138,7 +138,7 @@ where where I: AsRef + Send + 'static, { - let mut ids = self.client_id.write().unwrap(); + let mut ids = self.client_id.write(); let ssid = scan_id.as_ref(); let mut to_remove = vec![]; for (i, (_, sid)) in ids.iter().enumerate() { @@ -154,7 +154,7 @@ where } async fn get_scans_of_client_id(&self, client_id: &ClientHash) -> Result, Error> { - let ids = self.client_id.read().unwrap(); + let ids = self.client_id.read(); Ok(ids .iter() .filter(|(cid, _)| cid == client_id) @@ -171,7 +171,7 @@ where let scans = self.scans.clone(); let crypter = self.crypter.clone(); tokio::task::spawn_blocking(move || { - let mut scans = scans.write().unwrap(); + let mut scans = scans.write(); let id = sp.scan_id.clone(); if let Some(prgs) = scans.get_mut(&id) { prgs.scan = sp; @@ -186,14 +186,14 @@ where } async fn remove_scan(&self, id: &str) -> Result<(), Error> { - let mut scans = self.scans.write().unwrap(); + let mut scans = self.scans.write(); scans.remove(id); Ok(()) } async fn update_status(&self, id: &str, status: models::Status) -> Result<(), Error> { - let mut scans = self.scans.write().unwrap(); + let mut scans = self.scans.write(); let progress = scans.get_mut(id).ok_or(Error::NotFound)?; progress.status = status; Ok(()) @@ -209,7 +209,7 @@ where let scans = self.scans.clone(); let crypter = self.crypter.clone(); tokio::task::spawn_blocking(move || { - let mut scans = scans.write().unwrap(); + let mut scans = scans.write(); for r in results { let id = &r.id; let progress = scans.get_mut(id).ok_or(Error::NotFound)?; @@ -236,7 +236,7 @@ where E: crate::crypt::Crypt + Send + Sync + 'static, { async fn get_scan_ids(&self) -> Result, Error> { - let scans = self.scans.read().unwrap(); + let scans = self.scans.read(); let mut result = Vec::with_capacity(scans.len()); for (_, progress) in scans.iter() { result.push(progress.scan.scan_id.clone()); @@ -245,7 +245,7 @@ where } async fn get_scan(&self, id: &str) -> Result<(models::Scan, models::Status), Error> { - let scans = self.scans.read().unwrap(); + let scans = self.scans.read(); let progress = scans.get(id).ok_or(Error::NotFound)?; Ok((progress.scan.clone(), progress.status.clone())) } @@ -267,7 +267,7 @@ where } async fn get_status(&self, id: &str) -> Result { - let scans = self.scans.read().unwrap(); + let scans = self.scans.read(); let progress = scans.get(id).ok_or(Error::NotFound)?; Ok(progress.status.clone()) } @@ -322,7 +322,7 @@ where tracing::debug!("starting feed update"); { - let mut h = self.hash.write().unwrap(); + let mut h = self.hash.write(); for ha in h.iter_mut() { if let Some(nh) = hash.iter().find(|x| x.typus == ha.typus) { ha.hash.clone_from(&nh.hash) @@ -370,11 +370,11 @@ where } async fn feed_hash(&self) -> Vec { - self.hash.read().unwrap().to_vec() + self.hash.read().to_vec() } async fn current_feed_version(&self) -> Result { - let v = self.feed_version.read().unwrap(); + let v = self.feed_version.read(); Ok(v.clone()) } } @@ -393,7 +393,7 @@ where { // we may already run in an specialized thread therefore we use current thread. use models::Phase; - let mut scans = self.scans.write().unwrap(); + let mut scans = self.scans.write(); let progress = scans .get_mut(key.as_ref()) .ok_or_else(|| StorageError::UnexpectedData(format!("Expected scan for {key}")))?; @@ -424,7 +424,7 @@ where .map_err(|_| StorageError::NotFound(key.value()))? .filter_map(|b| serde_json::de::from_slice(&b).ok()); - let mut scans = self.scans.write().unwrap(); + let mut scans = self.scans.write(); let progress = scans .get_mut(key.as_ref()) .ok_or_else(|| StorageError::UnexpectedData(format!("Expected scan for {key}")))?;