Skip to content

Commit

Permalink
Merge pull request #1138 from subspace/use-cache-for-providers
Browse files Browse the repository at this point in the history
Use cache for providers
  • Loading branch information
nazar-pc authored Feb 2, 2023
2 parents e6b6387 + a6a97cb commit bba74a0
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(super) async fn configure_dsn(
) -> Result<
(
Node,
NodeRunner<FarmerProviderStorage<ParityDbProviderStorage>>,
NodeRunner<FarmerProviderStorage<ParityDbProviderStorage, FarmerPieceCache>>,
FarmerPieceCache,
),
anyhow::Error,
Expand Down Expand Up @@ -81,13 +81,17 @@ pub(super) async fn configure_dsn(
ParityDbProviderStorage::new(&provider_cache_db_path, provider_cache_size, peer_id)
.map_err(|err| anyhow::anyhow!(err.to_string()))?;

let farmer_provider_storage =
FarmerProviderStorage::new(peer_id, readers_and_pieces.clone(), db_provider_storage);

let piece_store =
ParityDbStore::new(&piece_cache_db_path).map_err(|err| anyhow::anyhow!(err.to_string()))?;
let piece_cache = FarmerPieceCache::new(piece_store.clone(), piece_cache_size, peer_id);

let farmer_provider_storage = FarmerProviderStorage::new(
peer_id,
readers_and_pieces.clone(),
db_provider_storage,
piece_cache.clone(),
);

let config = Config {
reserved_peers,
keypair,
Expand Down
22 changes: 18 additions & 4 deletions crates/subspace-farmer/src/utils/farmer_piece_cache.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use crate::utils::parity_db_store::ParityDbStore;
use crate::utils::piece_cache::PieceCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use std::sync::Arc;
use subspace_core_primitives::Piece;
use subspace_networking::libp2p::kad::record::Key;
use subspace_networking::libp2p::PeerId;
use subspace_networking::RecordBinaryHeap;
use tracing::{info, trace, warn};

/// Piece cache with limited size where pieces closer to provided peer ID are retained.
#[derive(Clone)]
pub struct FarmerPieceCache {
// Underlying unbounded store.
store: ParityDbStore<Key, Piece>,
// Maintains a heap to limit total number of entries.
heap: RecordBinaryHeap,
heap: Arc<Mutex<RecordBinaryHeap>>,
}

impl FarmerPieceCache {
Expand Down Expand Up @@ -40,19 +43,24 @@ impl FarmerPieceCache {
}
}

Self { store, heap }
Self {
store,
heap: Arc::new(Mutex::new(heap)),
}
}
}

impl PieceCache for FarmerPieceCache {
type KeysIterator = impl IntoIterator<Item = Key>;

fn should_cache(&self, key: &Key) -> bool {
self.heap.should_include_key(key)
self.heap.lock().should_include_key(key)
}

fn add_piece(&mut self, key: Key, piece: Piece) {
self.store.update([(&key, Some(piece.into()))]);

let evicted_key = self.heap.insert(key);
let evicted_key = self.heap.lock().insert(key);

if let Some(key) = evicted_key {
trace!(?key, "Record evicted from cache.");
Expand All @@ -64,4 +72,10 @@ impl PieceCache for FarmerPieceCache {
fn get_piece(&self, key: &Key) -> Option<Piece> {
self.store.get(key)
}

fn keys(&self) -> Self::KeysIterator {
// It is not great that we're cloning it, but at the same time dealing with self-referential
// lifetimes originating from the fact that mutex is used here proven to be challenging
self.heap.lock().keys().cloned().collect::<Vec<_>>()
}
}
72 changes: 57 additions & 15 deletions crates/subspace-farmer/src/utils/farmer_provider_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::utils::piece_cache::PieceCache;
use crate::utils::readers_and_pieces::ReadersAndPieces;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::Arc;
use subspace_core_primitives::{Blake2b256Hash, BLAKE2B_256_HASH_SIZE};
use subspace_networking::libp2p::kad::record::Key;
Expand All @@ -11,37 +13,62 @@ use subspace_networking::utils::multihash::{MultihashCode, ToMultihash};
use subspace_networking::ProviderStorage;
use tracing::trace;

pub struct FarmerProviderStorage<PersistentProviderStorage> {
pub struct FarmerProviderStorage<PersistentProviderStorage, LocalPieceCache> {
local_peer_id: PeerId,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
persistent_provider_storage: PersistentProviderStorage,
piece_cache: LocalPieceCache,
}

impl<PersistentProviderStorage> FarmerProviderStorage<PersistentProviderStorage> {
impl<PersistentProviderStorage, LocalPieceCache>
FarmerProviderStorage<PersistentProviderStorage, LocalPieceCache>
where
PersistentProviderStorage: ProviderStorage,
{
pub fn new(
local_peer_id: PeerId,
readers_and_pieces: Arc<Mutex<Option<ReadersAndPieces>>>,
persistent_provider_storage: PersistentProviderStorage,
mut persistent_provider_storage: PersistentProviderStorage,
piece_cache: LocalPieceCache,
) -> Self {
// TODO: Transitional upgrade code, should be removed in the future; this is because we no
// longer persist locally provided records
for key in persistent_provider_storage
.provided()
.map(|provided_record| provided_record.key.clone())
.collect::<Vec<_>>()
{
persistent_provider_storage.remove_provider(&key, &local_peer_id);
}
Self {
local_peer_id,
readers_and_pieces,
persistent_provider_storage,
piece_cache,
}
}
}

impl<PersistentProviderStorage> ProviderStorage for FarmerProviderStorage<PersistentProviderStorage>
impl<PersistentProviderStorage, LocalPieceCache> ProviderStorage
for FarmerProviderStorage<PersistentProviderStorage, LocalPieceCache>
where
PersistentProviderStorage: ProviderStorage,
LocalPieceCache: PieceCache,
{
type ProvidedIter<'a> = impl Iterator<Item = Cow<'a, ProviderRecord>> where Self:'a;
type ProvidedIter<'a> = impl Iterator<Item = Cow<'a, ProviderRecord>>
where
Self:'a;

fn add_provider(
&mut self,
record: ProviderRecord,
) -> subspace_networking::libp2p::kad::store::Result<()> {
self.persistent_provider_storage.add_provider(record)
// Local providers are implicit and should not be put into persistent storage
if record.provider != self.local_peer_id {
self.persistent_provider_storage.add_provider(record)
} else {
Ok(())
}
}

fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
Expand Down Expand Up @@ -76,12 +103,14 @@ where

let mut provider_records = self.persistent_provider_storage.providers(key);

// `ReadersAndPieces` is much cheaper than getting from piece cache, so try it first
if self
.readers_and_pieces
.lock()
.as_ref()
.expect("Should be populated at this point.")
.contains_piece(&piece_index_hash)
|| self.piece_cache.get_piece(key).is_some()
{
provider_records.push(ProviderRecord {
key: piece_index_hash.to_multihash().into(),
Expand All @@ -95,33 +124,46 @@ where
}

fn provided(&self) -> Self::ProvidedIter<'_> {
// We are not using interior mutability in this context, so this is fine
#[allow(clippy::mutable_key_type)]
let provided_by_cache = self.piece_cache.keys().into_iter().collect::<HashSet<_>>();
let provided_by_plots = self
.readers_and_pieces
.lock()
.as_ref()
.map(|readers_and_pieces| {
readers_and_pieces
.piece_index_hashes()
.map(|hash| {
ProviderRecord {
key: hash.to_multihash().into(),
provider: self.local_peer_id,
expires: None,
addresses: Vec::new(), // TODO: add address hints
.filter_map(|hash| {
let key = hash.to_multihash().into();

if provided_by_cache.contains(&key) {
None
} else {
Some(key)
}
})
.map(Cow::Owned)
.collect::<Vec<_>>()
})
.unwrap_or_default();

provided_by_plots
provided_by_cache
.into_iter()
.chain(provided_by_plots)
.map(|key| {
ProviderRecord {
key,
provider: self.local_peer_id,
expires: None,
addresses: Vec::new(), // TODO: add address hints
}
})
.map(Cow::Owned)
.chain(self.persistent_provider_storage.provided())
}

fn remove_provider(&mut self, key: &Key, peer_id: &PeerId) {
self.persistent_provider_storage
.remove_provider(key, peer_id)
.remove_provider(key, peer_id);
}
}
5 changes: 2 additions & 3 deletions crates/subspace-farmer/src/utils/parity_db_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use parity_db::{ColumnOptions, Db, Options};
use std::error::Error;
use std::fmt;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::path::Path;
Expand Down Expand Up @@ -71,9 +70,9 @@ where
}
}

pub fn update<'a, I>(&'a mut self, values: I) -> bool
pub fn update<'a, I>(&'a self, values: I) -> bool
where
I: IntoIterator<Item = (&'a StoreKey, Option<Vec<u8>>)> + fmt::Debug,
I: IntoIterator<Item = (&'a StoreKey, Option<Vec<u8>>)> + Debug,
{
trace!(?values, "Updating records in DB");

Expand Down
5 changes: 5 additions & 0 deletions crates/subspace-farmer/src/utils/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use subspace_networking::libp2p::kad::record::Key;
/// Defines persistent piece cache interface.
// TODO: This should be elsewhere, like in `subspace-dsn`
pub trait PieceCache: Sync + Send + 'static {
type KeysIterator: IntoIterator<Item = Key>;

/// Check whether key should be cached based on current cache size and key-to-peer-id distance.
fn should_cache(&self, key: &Key) -> bool;

Expand All @@ -12,4 +14,7 @@ pub trait PieceCache: Sync + Send + 'static {

/// Get piece from the cache.
fn get_piece(&self, key: &Key) -> Option<Piece>;

/// Iterator over pieces in cache
fn keys(&self) -> Self::KeysIterator;
}
5 changes: 5 additions & 0 deletions crates/subspace-networking/src/utils/record_binary_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ impl RecordBinaryHeap {
}
}

/// Iterator over all keys in arbitrary order
pub fn keys(&self) -> impl Iterator<Item = &'_ Key> {
self.max_heap.iter().map(|key| key.key.preimage())
}

fn is_limit_reached(&self) -> bool {
self.size() >= self.limit
}
Expand Down

0 comments on commit bba74a0

Please sign in to comment.