From d1e7303531263fbf27f4915c593e619ee114b73f Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 31 Oct 2023 10:26:10 +0100 Subject: [PATCH] chore: adding more docs to the storage Signed-off-by: Simon Paitrault --- crates/topos-tce-storage/src/fullnode/mod.rs | 2 +- crates/topos-tce-storage/src/lib.rs | 163 +++------ crates/topos-tce-storage/src/rocks.rs | 319 +----------------- crates/topos-tce-storage/src/validator/mod.rs | 24 +- .../topos-tce-storage/src/validator/tables.rs | 35 +- 5 files changed, 92 insertions(+), 451 deletions(-) diff --git a/crates/topos-tce-storage/src/fullnode/mod.rs b/crates/topos-tce-storage/src/fullnode/mod.rs index 052c394bf..6f1c6511a 100644 --- a/crates/topos-tce-storage/src/fullnode/mod.rs +++ b/crates/topos-tce-storage/src/fullnode/mod.rs @@ -24,7 +24,7 @@ use crate::{ use self::locking::LockGuards; -mod locking; +pub mod locking; /// Store to manage FullNode data /// diff --git a/crates/topos-tce-storage/src/lib.rs b/crates/topos-tce-storage/src/lib.rs index 88005abab..80785a880 100644 --- a/crates/topos-tce-storage/src/lib.rs +++ b/crates/topos-tce-storage/src/lib.rs @@ -19,40 +19,63 @@ //! Text changing depending on mode. Light: 'So light!' Dark: 'So dark!' //! //! -//! ## Usage +//! ### Definitions and Responsibilities //! -//! Each store represents a different kind of capabilities, but they all act and need the same kind -//! of configuration in order to work. +//! As illustrated above, multiple `stores` are exposed in the library using various `tables`. //! -//! For instance, the [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) only needs a [`PathBuf`](struct@std::path::PathBuf) -//! argument to be instantiated where [`FullNodeStore`](struct@fullnode::FullNodeStore) needs a little bit more arguments. +//! The difference between a `store` and a `table` is that the `table` is responsible for storing +//! the data while the `store` is responsible for managing the data and its access and behaviour. //! -//! The underlying mechanisms of how data is stored is fairly simple, it relies a lot on [`rocksdb`] and will -//! be describe below. +//! Here's the list of the different stores and their responsibilities: +//! +//! - The [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) is responsible for managing the list of validators for each `epoch`. +//! - The [`FullNodeStore`](struct@fullnode::FullNodeStore) is responsible for managing all the persistent data such as [`Certificate`] delivered and associated `streams`. +//! - The [`IndexStore`](struct@index::IndexStore) is responsible for managing indexes in order to collect information about the broadcast and the network. +//! - The [`ValidatorStore`](struct@validator::ValidatorStore) is responsible for managing the pending certificates pool and all the transient and volatile data. +//! +//! For more information about a `store`, see the related doc. +//! +//! Next, we've the list of the different tables and their responsibilities: +//! +//! - The [`EpochValidatorsTables`](struct@epoch::EpochValidatorsTables) is responsible for storing the list of validators for each `epoch`. +//! - The [`ValidatorPerpetualTables`](struct@validator::ValidatorPerpetualTables) is responsible for storing the [`Certificate`] delivered and all the persitent data related to the broadcast. +//! - The [`ValidatorPendingTables`](struct@validator::ValidatorPendingTables) is responsible for storing the pending certificates pool and all the transient and volatile data. +//! - The [`IndexTables`](struct@index::IndexTables) is responsible for storing indexes about the delivery of [`Certificate`] such as `target subnet stream`. //! //! ## Special Considerations //! //! When using the storage layer, you need to be aware of the following: -//! - The storage layer is using [`rocksdb`] as a backend, which means that the data is stored on disk. -//! - The storage layer is using [`Arc`](struct@std::sync::Arc) to share the stores between threads. -//! - The storage layer is using [`async_trait`](https://docs.rs/async-trait/0.1.51/async_trait/) to expose methods that need to manage locks. (see [`WriteStore`](trait@store::WriteStore)) -//! - Some functions are using [`DBBatch`](struct@rocks::db_column::DBBatch) to batch multiple writes in one transaction. But not all functions are using it. +//! - The storage layer is using [rocksdb](https://rocksdb.org/) as a backend, which means that this storage doesn't need external service, as `rocksdb` is embeddable kv store. +//! - The storage layer is using [`Arc`](struct@std::sync::Arc) to share the stores between threads. It also means that a `store` is only instantiated once. +//! - Some functions are batching multiple writes in one transaction. But not all functions are using it. //! //! ## Design Philosophy //! -//! The choice of using [`rocksdb`] as a backend was made because it is a well known and battle tested database. -//! It is also very fast and efficient when it comes to write and read data. However, it is not the best when it comes -//! to compose or filter data. This is why we have multiple store that are used for different purposes. +//! The choice of using [rocksdb](https://rocksdb.org/) as a backend was made because it is a well known and battle tested database. +//! It is also very fast and efficient when it comes to write and read data. +//! +//! Multiple `stores` and `tables` exists in order to allow admin to deal with backups or +//! snapshots as they see fit. You can pick and choose which `tables` you want to backup without having to backup the whole database. +//! +//! By splitting the data in dedicated tables we define strong separation of concern +//! directly in our storage. +//! +//! `RocksDB` is however not the best fit when it comes to compose or filter data based on the data +//! itself. //! //! For complex queries, another database like [`PostgreSQL`](https://www.postgresql.org/) or [`CockroachDB`](https://www.cockroachlabs.com/) could be used as a Storage for projections. -//! The source of truth would still be [`rocksdb`] but the projections would be stored in a relational database. Allowing for more complex queries. +//! The source of truth would still be [rocksdb](https://rocksdb.org/) but the projections would be stored in a relational database, allowing for more complex queries. //! //! As mention above, the different stores are using [`Arc`](struct@std::sync::Arc), allowing a single store to be instantiated once -//! and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used in various places. +//! and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used in various places but need to provides single entrypoint to the data. //! //! It also means that the store is immutable, which is a good thing when it comes to concurrency. +//! //! The burden of managing the locks is handled by the [`async_trait`](https://docs.rs/async-trait/0.1.51/async_trait/) crate when using the [`WriteStore`](trait@store::WriteStore). -//! The rest of the mutation on the data are handled by [`rocksdb`] itself. +//! +//! The locks are responsible for preventing any other query to mutate the data currently in processing. For more information about the locks see [`locking`](module@fullnode::locking) +//! +//! The rest of the mutation on the data are handled by [rocksdb](https://rocksdb.org/) itself. //! use errors::InternalStorageError; use rocks::iterator::ColumnIterator; @@ -72,7 +95,6 @@ pub mod epoch; pub mod fullnode; pub mod index; pub mod types; -/// Everything that is needed to participate to the protocol pub mod validator; // v1 @@ -128,108 +150,3 @@ pub struct SourceHead { /// Position of the Certificate pub position: Position, } - -/// Define possible status of a certificate -#[derive(Debug, Deserialize, Serialize)] -pub enum CertificateStatus { - Pending, - Delivered, -} - -/// The `Storage` trait defines methods to interact and manage with the persistency layer -#[async_trait::async_trait] -pub trait Storage: Sync + Send + 'static { - async fn get_pending_certificate( - &self, - certificate_id: CertificateId, - ) -> Result<(PendingCertificateId, Certificate), InternalStorageError>; - - /// Add a pending certificate to the pool - async fn add_pending_certificate( - &self, - certificate: &Certificate, - ) -> Result; - - /// Persist the certificate with given status - async fn persist( - &self, - certificate: &Certificate, - pending_certificate_id: Option, - ) -> Result; - - /// Update the certificate entry with new status - async fn update( - &self, - certificate_id: &CertificateId, - status: CertificateStatus, - ) -> Result<(), InternalStorageError>; - - /// Returns the source heads of given subnets - async fn get_source_heads( - &self, - subnets: Vec, - ) -> Result, InternalStorageError>; - - /// Returns the certificate data given their id - async fn get_certificates( - &self, - certificate_ids: Vec, - ) -> Result, InternalStorageError>; - - /// Returns the certificate data given its id - async fn get_certificate( - &self, - certificate_id: CertificateId, - ) -> Result; - - /// Returns the certificate emitted by given subnet - /// Ranged by position since emitted Certificate are totally ordered - async fn get_certificates_by_source( - &self, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError>; - - /// Returns the certificate received by given subnet - /// Ranged by timestamps since received Certificate are not referrable by position - async fn get_certificates_by_target( - &self, - target_subnet_id: SubnetId, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError>; - - /// Returns all the known Certificate that are not delivered yet - async fn get_pending_certificates( - &self, - ) -> Result, InternalStorageError>; - - /// Returns the next Certificate that are not delivered yet - async fn get_next_pending_certificate( - &self, - starting_at: Option, - ) -> Result, InternalStorageError>; - - /// Remove a certificate from pending pool - async fn remove_pending_certificate( - &self, - index: PendingCertificateId, - ) -> Result<(), InternalStorageError>; - - async fn get_target_stream_iterator( - &self, - target: SubnetId, - source: SubnetId, - position: Position, - ) -> Result< - ColumnIterator<'_, CertificateTargetStreamPosition, CertificateId>, - InternalStorageError, - >; - - async fn get_source_list_by_target( - &self, - target: SubnetId, - ) -> Result, InternalStorageError>; -} diff --git a/crates/topos-tce-storage/src/rocks.rs b/crates/topos-tce-storage/src/rocks.rs index c451f5f68..08ddcf899 100644 --- a/crates/topos-tce-storage/src/rocks.rs +++ b/crates/topos-tce-storage/src/rocks.rs @@ -10,7 +10,7 @@ use tracing::warn; use crate::{ errors::InternalStorageError, CertificatePositions, CertificateTargetStreamPosition, - PendingCertificateId, Position, SourceHead, Storage, SubnetId, + PendingCertificateId, Position, SourceHead, SubnetId, }; use self::iterator::ColumnIterator; @@ -26,320 +26,3 @@ pub(crate) mod types; pub(crate) use types::*; pub const EMPTY_PREVIOUS_CERT_ID: [u8; CERTIFICATE_ID_LENGTH] = [0u8; CERTIFICATE_ID_LENGTH]; - -#[derive(Debug)] -pub struct RocksDBStorage { - pending_certificates: PendingCertificatesColumn, - certificates: CertificatesColumn, - source_streams: SourceStreamsColumn, - target_streams: TargetStreamsColumn, - target_source_list: TargetSourceListColumn, - next_pending_id: AtomicU64, -} - -impl RocksDBStorage { - #[cfg(test)] - #[allow(dead_code)] - pub(crate) fn new( - pending_certificates: PendingCertificatesColumn, - certificates: CertificatesColumn, - source_streams: SourceStreamsColumn, - target_streams: TargetStreamsColumn, - target_source_list: TargetSourceListColumn, - next_pending_id: AtomicU64, - ) -> Self { - Self { - pending_certificates, - certificates, - source_streams, - target_streams, - target_source_list, - next_pending_id, - } - } -} - -#[async_trait::async_trait] -impl Storage for RocksDBStorage { - async fn get_pending_certificate( - &self, - certificate_id: CertificateId, - ) -> Result<(PendingCertificateId, Certificate), InternalStorageError> { - self.pending_certificates - .iter()? - .filter(|(_pending_id, cert)| cert.id == certificate_id) - .collect::>() - .first() - .cloned() - .ok_or(InternalStorageError::CertificateNotFound(certificate_id)) - } - - async fn add_pending_certificate( - &self, - certificate: &Certificate, - ) -> Result { - let key = self.next_pending_id.fetch_add(1, Ordering::Relaxed); - - self.pending_certificates.insert(&key, certificate)?; - - Ok(key) - } - - async fn persist( - &self, - certificate: &Certificate, - pending_certificate_id: Option, - ) -> Result { - let mut batch = self.certificates.batch(); - - // Inserting the certificate data into the CERTIFICATES cf - batch = batch.insert_batch(&self.certificates, [(&certificate.id, certificate)])?; - - if let Some(pending_id) = pending_certificate_id { - match self.pending_certificates.get(&pending_id) { - Ok(Some(ref pending_certificate)) if pending_certificate == certificate => { - batch = batch.delete(&self.pending_certificates, pending_id)?; - } - Ok(_) => { - warn!( - "PendingCertificateId {} ignored during persist execution: Difference in \ - certificates", - pending_id - ); - } - - _ => { - warn!( - "PendingCertificateId {} ignored during persist execution: Not Found", - pending_id - ); - } - } - } - - let source_subnet_position = if certificate.prev_id.as_array() == &EMPTY_PREVIOUS_CERT_ID { - Position::ZERO - } else if let Some((CertificateSourceStreamPosition { position, .. }, _)) = self - .source_streams - .prefix_iter(&certificate.source_subnet_id)? - .last() - { - position.increment().map_err(|error| { - InternalStorageError::PositionError(error, certificate.source_subnet_id.into()) - })? - } else { - // TODO: Need to be fixed when dealing with order of delivery - Position::ZERO - // TODO: Better error to define that we were expecting a previous defined position - // return Err(InternalStorageError::CertificateNotFound( - // certificate.prev_id, - // )); - }; - - // Return from function as info - let source_subnet_stream_position = CertificateSourceStreamPosition { - subnet_id: certificate.source_subnet_id, - position: source_subnet_position, - }; - - // Adding the certificate to the stream - batch = batch.insert_batch( - &self.source_streams, - [( - CertificateSourceStreamPosition { - subnet_id: certificate.source_subnet_id, - position: source_subnet_position, - }, - certificate.id, - )], - )?; - - // Return list of new target stream positions of certificate that will be persisted - // Information is needed by sequencer/subnet contract to know from - // where to continue with streaming on restart - let mut target_subnet_stream_positions: HashMap = - HashMap::new(); - - // Adding certificate to target_streams - // TODO: Add expected position instead of calculating on the go - let mut targets = Vec::new(); - - for target_subnet_id in &certificate.target_subnets { - let target = match self - .target_streams - .prefix_iter(&TargetSourceListKey( - *target_subnet_id, - certificate.source_subnet_id, - ))? - .last() - { - Some((mut target_stream_position, _)) => { - target_stream_position.position = target_stream_position - .position - .increment() - .map_err(|error| { - InternalStorageError::PositionError( - error, - certificate.source_subnet_id.into(), - ) - })?; - target_stream_position - } - None => CertificateTargetStreamPosition::new( - *target_subnet_id, - certificate.source_subnet_id, - Position::ZERO, - ), - }; - - target_subnet_stream_positions.insert(*target_subnet_id, target); - - batch = batch.insert_batch( - &self.target_source_list, - [( - TargetSourceListKey(*target_subnet_id, certificate.source_subnet_id), - *target.position, - )], - )?; - - targets.push((target, certificate.id)); - } - - batch = batch.insert_batch(&self.target_streams, targets)?; - - batch.write()?; - - Ok(CertificatePositions { - targets: target_subnet_stream_positions, - source: source_subnet_stream_position, - }) - } - - async fn update( - &self, - _certificate_id: &CertificateId, - _status: crate::CertificateStatus, - ) -> Result<(), InternalStorageError> { - unimplemented!(); - } - - async fn get_source_heads( - &self, - subnets: Vec, - ) -> Result, InternalStorageError> { - let mut result: Vec = Vec::new(); - for source_subnet_id in subnets { - let (position, certificate_id) = self - .source_streams - .prefix_iter(&source_subnet_id)? - .last() - .map(|(source_stream_position, cert_id)| (source_stream_position.position, cert_id)) - .ok_or(InternalStorageError::MissingHeadForSubnet(source_subnet_id))?; - result.push(SourceHead { - position, - certificate_id, - subnet_id: source_subnet_id, - }); - } - Ok(result) - } - - async fn get_certificates( - &self, - certificate_ids: Vec, - ) -> Result, InternalStorageError> { - let mut result = Vec::new(); - - for certificate_id in certificate_ids { - result.push(self.get_certificate(certificate_id).await?); - } - - Ok(result) - } - - async fn get_certificate( - &self, - certificate_id: CertificateId, - ) -> Result { - let res = self.certificates.get(&certificate_id)?; - res.ok_or(InternalStorageError::CertificateNotFound(certificate_id)) - } - - async fn get_certificates_by_source( - &self, - source_subnet_id: SubnetId, - from: crate::Position, - limit: usize, - ) -> Result, InternalStorageError> { - Ok(self - .source_streams - .prefix_iter(&source_subnet_id)? - // TODO: Find a better way to convert u64 to usize - .skip(from.try_into().unwrap()) - .take(limit) - .map(|(_, certificate_id)| certificate_id) - .collect()) - } - - async fn get_certificates_by_target( - &self, - target_subnet_id: SubnetId, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError> { - Ok(self - .target_streams - .prefix_iter(&(&target_subnet_id, &source_subnet_id))? - // TODO: Find a better way to convert u64 to usize - .skip(from.try_into().unwrap()) - .take(limit) - .map(|(_, certificate_id)| certificate_id) - .collect()) - } - - async fn get_pending_certificates( - &self, - ) -> Result, InternalStorageError> { - Ok(self.pending_certificates.iter()?.collect()) - } - async fn get_next_pending_certificate( - &self, - starting_at: Option, - ) -> Result, InternalStorageError> { - Ok(self - .pending_certificates - .iter()? - .nth(starting_at.map(|v| v + 1).unwrap_or(0))) - } - - async fn remove_pending_certificate(&self, index: u64) -> Result<(), InternalStorageError> { - self.pending_certificates.delete(&index) - } - - async fn get_target_stream_iterator( - &self, - target: SubnetId, - source: SubnetId, - position: Position, - ) -> Result< - ColumnIterator<'_, CertificateTargetStreamPosition, CertificateId>, - InternalStorageError, - > { - Ok(self.target_streams.prefix_iter_at( - &(&target, &source), - &CertificateTargetStreamPosition::new(target, source, position), - )?) - } - - async fn get_source_list_by_target( - &self, - target: SubnetId, - ) -> Result, InternalStorageError> { - Ok(self - .target_source_list - .prefix_iter(&target)? - .map(|(TargetSourceListKey(_, k), _)| k) - .collect()) - } -} diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index 5d5e63b3e..b3c7b9644 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -1,3 +1,19 @@ +//! Validator's context store and storage +//! +//! The [`ValidatorStore`] is responsible for managing the different data that are required by the +//! TCE network in order to broadcast certificates. It is composed of two main parts: +//! +//! - a [`FullNodeStore`] +//! - a [`ValidatorPendingTables`] +//! +//! ## Responsibilities +//! +//! This store is used in place where the [`FullNodeStore`] is not enough, it allows to access the +//! different pending pools and to manage them but also to access the [`FullNodeStore`] in order to +//! persist or update [`Certificate`] or `streams`. +//! +//! Pending pools and how they behave is decribed in the [`ValidatorPendingTables`] documentation. +//! use std::{ collections::HashMap, path::PathBuf, @@ -33,8 +49,11 @@ mod tables; /// The [`ValidatorStore`] is composed of a [`FullNodeStore`] and a [`ValidatorPendingTables`]. /// /// As the [`FullNodeStore`] is responsible of keeping and managing every data that are persistent, -/// the [`ValidatorStore`] is forwarding everything to it. The crucial point is that the -/// [`ValidatorStore`] is managing the different pending pool using the [`ValidatorPendingTables`]. +/// the [`ValidatorStore`] is delegating many of the [`WriteStore`] and [`ReadStore`] to it. +/// +/// The crucial point is that the [`ValidatorStore`] is managing the different pending pool using a [`ValidatorPendingTables`]. +/// +/// Pending pools and how they behave is decribed in the [`ValidatorPendingTables`] documentation. /// pub struct ValidatorStore { pub(crate) pending_tables: ValidatorPendingTables, @@ -42,6 +61,7 @@ pub struct ValidatorStore { } impl ValidatorStore { + /// Open a [`ValidatorStore`] at the given `path` and using the given [`FullNodeStore`] pub fn open( path: PathBuf, fullnode_store: Arc, diff --git a/crates/topos-tce-storage/src/validator/tables.rs b/crates/topos-tce-storage/src/validator/tables.rs index 9b89b98ef..efe5b04e1 100644 --- a/crates/topos-tce-storage/src/validator/tables.rs +++ b/crates/topos-tce-storage/src/validator/tables.rs @@ -18,19 +18,41 @@ use crate::{ PendingCertificateId, }; -/// Volatile and pending data +/// Volatile and pending data used by Validator +/// +/// It contains data that is not yet delivered. +/// +/// When a [`Certificate`] is received, it can either be added to the pending +/// pool or to the precedence pool. +/// +/// ## Pending pool +/// +/// The pending pool is used to store certificates that are ready to be validated and broadcast. +/// Meaning that the previous [`Certificate`] has been delivered and the [`Certificate`] is +/// ready to be broadcast. +/// +/// The ordering inside the pending pool is a FIFO queue, each [`Certificate`] in the pool gets +/// assigned to a unique [`PendingCertificateId`](type@crate::PendingCertificateId). +/// +/// ## Precedence pool +/// +/// The precedence pool is used to store certificates that are not yet ready to be broadcast, +/// mostly waiting for the previous certificate to be delivered. However, the [`Certificate`] is +/// already validated. +/// +/// When a [`Certificate`] is delivered, the [`ValidatorStore`](struct@super::ValidatorStore) will +/// check for any [`Certificate`] in the precedence pool and if one is found, it is moved to the +/// pending pool, ready to be broadcast. +/// pub struct ValidatorPendingTables { pub(crate) next_pending_id: AtomicU64, - #[allow(unused)] - fetching_pool: BTreeSet, // Not sure to keep it pub(crate) pending_pool: DBColumn, pub(crate) pending_pool_index: DBColumn, pub(crate) precedence_pool: DBColumn, - #[allow(unused)] - expiration_tracker: (), // Unknown } impl ValidatorPendingTables { + /// Open the [`ValidatorPendingTables`] at the given path. pub fn open(mut path: PathBuf) -> Self { path.push("pending"); if !path.exists() { @@ -49,16 +71,15 @@ impl ValidatorPendingTables { Self { // TODO: Fetch it from the storage next_pending_id: AtomicU64::new(0), - fetching_pool: BTreeSet::new(), pending_pool: DBColumn::reopen(&db, cfs::PENDING_POOL), pending_pool_index: DBColumn::reopen(&db, cfs::PENDING_POOL_INDEX), precedence_pool: DBColumn::reopen(&db, cfs::PRECEDENCE_POOL), - expiration_tracker: (), } } } /// Data that shouldn't be purged at all. +// TODO: TP-774: Rename and move to FullNode domain pub struct ValidatorPerpetualTables { pub(crate) certificates: DBColumn, pub(crate) streams: DBColumn,