Skip to content

Commit

Permalink
Fix: poisoned lock
Browse files Browse the repository at this point in the history
Replace std::sync::RwLock with parking_lot::RwLock.

For some reason the lock is poisoned after a panic.
This fix the crash of openvasd and running scans can continue running and new scans can be started.
However, it doesn't fix the original issue, still unknown.
  • Loading branch information
jjnicola committed Jan 30, 2025
1 parent 27ab641 commit 3c22e0d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 18 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ nasl-c-lib = { path = "crates/nasl-c-lib", optional = true }
openssl = { version = "0.10.66", features = ["vendored"] }
blowfish = "0.9.1"
rc4 = "0.1.0"
parking_lot = "0.12.3"

[workspace]
resolver = "2"
Expand Down
36 changes: 18 additions & 18 deletions rust/src/openvasd/storage/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
//
// SPDX-License-Identifier: GPL-2.0-or-later WITH x11vnc-openssl-exception

use std::{collections::HashSet, sync::RwLock};

use super::*;
use parking_lot::RwLock;
use scannerlib::{
models, notus,
storage::{item::Nvt, ContextKey, DefaultDispatcher, StorageError},
};
use std::collections::HashSet;
use tokio::task::JoinSet;

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -95,7 +95,7 @@ where
from: Option<usize>,
to: Option<usize>,
) -> Result<Box<dyn Iterator<Item = Vec<u8>> + Send>, Error> {
let scans = self.scans.read().unwrap();
let scans = self.scans.read();
let progress = scans.get(id).ok_or(Error::NotFound)?;
Ok(Self::decrypt_results_sync(
self.crypter.clone(),
Expand Down Expand Up @@ -128,7 +128,7 @@ where
scan_id: String,
client_id: ClientHash,
) -> Result<(), Error> {
let mut ids = self.client_id.write().unwrap();
let mut ids = self.client_id.write();
ids.push((client_id, scan_id));

Ok(())
Expand All @@ -138,7 +138,7 @@ where
where
I: AsRef<str> + Send + 'static,
{
let mut ids = self.client_id.write().unwrap();
let mut ids = self.client_id.write();
let ssid = scan_id.as_ref();
let mut to_remove = vec![];
for (i, (_, sid)) in ids.iter().enumerate() {
Expand All @@ -154,7 +154,7 @@ where
}

async fn get_scans_of_client_id(&self, client_id: &ClientHash) -> Result<Vec<String>, Error> {
let ids = self.client_id.read().unwrap();
let ids = self.client_id.read();
Ok(ids
.iter()
.filter(|(cid, _)| cid == client_id)
Expand All @@ -171,7 +171,7 @@ where
let scans = self.scans.clone();
let crypter = self.crypter.clone();
tokio::task::spawn_blocking(move || {
let mut scans = scans.write().unwrap();
let mut scans = scans.write();
let id = sp.scan_id.clone();
if let Some(prgs) = scans.get_mut(&id) {
prgs.scan = sp;
Expand All @@ -186,14 +186,14 @@ where
}

async fn remove_scan(&self, id: &str) -> Result<(), Error> {
let mut scans = self.scans.write().unwrap();
let mut scans = self.scans.write();

scans.remove(id);
Ok(())
}

async fn update_status(&self, id: &str, status: models::Status) -> Result<(), Error> {
let mut scans = self.scans.write().unwrap();
let mut scans = self.scans.write();
let progress = scans.get_mut(id).ok_or(Error::NotFound)?;
progress.status = status;
Ok(())
Expand All @@ -209,7 +209,7 @@ where
let scans = self.scans.clone();
let crypter = self.crypter.clone();
tokio::task::spawn_blocking(move || {
let mut scans = scans.write().unwrap();
let mut scans = scans.write();
for r in results {
let id = &r.id;
let progress = scans.get_mut(id).ok_or(Error::NotFound)?;
Expand All @@ -236,7 +236,7 @@ where
E: crate::crypt::Crypt + Send + Sync + 'static,
{
async fn get_scan_ids(&self) -> Result<Vec<String>, Error> {
let scans = self.scans.read().unwrap();
let scans = self.scans.read();
let mut result = Vec::with_capacity(scans.len());
for (_, progress) in scans.iter() {
result.push(progress.scan.scan_id.clone());
Expand All @@ -245,7 +245,7 @@ where
}

async fn get_scan(&self, id: &str) -> Result<(models::Scan, models::Status), Error> {
let scans = self.scans.read().unwrap();
let scans = self.scans.read();
let progress = scans.get(id).ok_or(Error::NotFound)?;
Ok((progress.scan.clone(), progress.status.clone()))
}
Expand All @@ -267,7 +267,7 @@ where
}

async fn get_status(&self, id: &str) -> Result<models::Status, Error> {
let scans = self.scans.read().unwrap();
let scans = self.scans.read();
let progress = scans.get(id).ok_or(Error::NotFound)?;
Ok(progress.status.clone())
}
Expand Down Expand Up @@ -322,7 +322,7 @@ where
tracing::debug!("starting feed update");

{
let mut h = self.hash.write().unwrap();
let mut h = self.hash.write();
for ha in h.iter_mut() {
if let Some(nh) = hash.iter().find(|x| x.typus == ha.typus) {
ha.hash.clone_from(&nh.hash)
Expand Down Expand Up @@ -370,11 +370,11 @@ where
}

async fn feed_hash(&self) -> Vec<FeedHash> {
self.hash.read().unwrap().to_vec()
self.hash.read().to_vec()
}

async fn current_feed_version(&self) -> Result<String, Error> {
let v = self.feed_version.read().unwrap();
let v = self.feed_version.read();
Ok(v.clone())
}
}
Expand All @@ -393,7 +393,7 @@ where
{
// we may already run in an specialized thread therefore we use current thread.
use models::Phase;
let mut scans = self.scans.write().unwrap();
let mut scans = self.scans.write();
let progress = scans
.get_mut(key.as_ref())
.ok_or_else(|| StorageError::UnexpectedData(format!("Expected scan for {key}")))?;
Expand Down Expand Up @@ -424,7 +424,7 @@ where
.map_err(|_| StorageError::NotFound(key.value()))?
.filter_map(|b| serde_json::de::from_slice(&b).ok());

let mut scans = self.scans.write().unwrap();
let mut scans = self.scans.write();
let progress = scans
.get_mut(key.as_ref())
.ok_or_else(|| StorageError::UnexpectedData(format!("Expected scan for {key}")))?;
Expand Down

0 comments on commit 3c22e0d

Please sign in to comment.