diff --git a/Cargo.lock b/Cargo.lock index 7c4a0e414b..7bac9a65e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -430,15 +430,18 @@ dependencies = [ "futures 0.3.21", "hashbrown", "hex", + "humanize-rs", "iota-crypto", "log", "ref-cast", "reqwest", "serde", + "serde_json", "thiserror", "time-helper", "tokio", "tokio-stream", + "toml", "trace-tools", "tracing", "url", @@ -1943,6 +1946,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humanize-rs" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "016b02deb8b0c415d8d56a6f0ab265e50c22df61194e37f9be75ed3a722de8a6" + [[package]] name = "humantime" version = "2.1.0" diff --git a/bee-ledger/Cargo.toml b/bee-ledger/Cargo.toml index 286744bd13..cba80ce85b 100644 --- a/bee-ledger/Cargo.toml +++ b/bee-ledger/Cargo.toml @@ -23,6 +23,7 @@ digest = { version = "0.9.0", default-features = false, optional = true } futures = { version = "0.3.17", default-features = false, optional = true } hashbrown = { version = "0.11.2", default-features = false, optional = true } hex = { version = "0.4.3", default-features = false, optional = true } +humanize-rs = { version = "0.1.5", default-features = false, optional = true } iota-crypto = { version = "0.10.0", default-features = false, features = [ "blake2b" ], optional = true } log = { version = "0.4.14", default-features = false, optional = true } ref-cast = { version = "1.0.6", default-features = false, optional = true } @@ -36,6 +37,10 @@ trace-tools = { version = "0.3.0", default-features = false, optional = true } tracing = { version = "0.1.29", default-features = false, optional = true } url = { version = "2.2.2", default-features = false, optional = true } +[dev-dependencies] +serde_json = { version = "1.0.68", default-features = false, features = [ "std" ] } +toml = { version = "0.5.8", default-features = false } + [features] workers = [ "bee-runtime", @@ -47,6 +52,7 @@ workers = [ "futures", "hashbrown", "hex", + "humanize-rs", "iota-crypto", "log", "ref-cast", diff --git a/bee-ledger/src/workers/consensus/worker.rs b/bee-ledger/src/workers/consensus/worker.rs index 10d57b6ce3..a337c9f0a7 100644 --- a/bee-ledger/src/workers/consensus/worker.rs +++ b/bee-ledger/src/workers/consensus/worker.rs @@ -24,7 +24,11 @@ use crate::{ consensus::{metadata::WhiteFlagMetadata, state::validate_ledger_state, white_flag}, error::Error, event::{MessageReferenced, MilestoneConfirmed, OutputConsumed, OutputCreated}, - pruning::{condition::should_prune, config::PruningConfig, prune}, + pruning::{ + condition::{should_prune, PruningTask}, + config::PruningConfig, + prune, + }, snapshot::{condition::should_snapshot, config::SnapshotConfig, worker::SnapshotWorker}, storage::{self, StorageBackend}, }, @@ -279,28 +283,28 @@ where let bmd = tangle.config().below_max_depth(); let snapshot_depth_min = bmd + EXTRA_SNAPSHOT_DEPTH; - let snapshot_depth = if snapshot_config.depth() < snapshot_depth_min { + let snapshot_depth = if snapshot_config.snapshotting().depth() < snapshot_depth_min { warn!( "Configuration value for \"snapshot.depth\" is too low ({}), value changed to {}.", - snapshot_config.depth(), + snapshot_config.snapshotting().depth(), snapshot_depth_min ); snapshot_depth_min } else { - snapshot_config.depth() + snapshot_config.snapshotting().depth() }; let snapshot_pruning_delta = bmd + EXTRA_PRUNING_DEPTH; - let pruning_delay_min = snapshot_depth + snapshot_pruning_delta; - let pruning_delay = if pruning_config.delay() < pruning_delay_min { + let milestones_to_keep_min = snapshot_depth + snapshot_pruning_delta; + let milestones_to_keep = if pruning_config.milestones().max_milestones_to_keep() < milestones_to_keep_min { warn!( - "Configuration value for \"pruning.delay\" is too low ({}), value changed to {}.", - pruning_config.delay(), - pruning_delay_min + "Configuration value for \"max_milestones_to_keep\" is too low ({}), value changed to {}.", + pruning_config.milestones().max_milestones_to_keep(), + milestones_to_keep_min ); - pruning_delay_min + milestones_to_keep_min } else { - pruning_config.delay() + pruning_config.milestones().max_milestones_to_keep() }; // Unwrap is fine because ledger index was already in storage or just added by the snapshot worker. @@ -341,24 +345,46 @@ where // } } Err(reason) => { - debug!("Snapshotting skipped: {:?}", reason); + debug!("Snapshotting skipped: {reason}"); } } - match should_prune(&tangle, ledger_index, pruning_delay, &pruning_config) { - Ok((start_index, target_index)) => { - if let Err(e) = - prune::prune(&tangle, &storage, &bus, start_index, target_index, &pruning_config) - .await - { - error!("Pruning failed: {:?}.", e); - // TODO: consider initiating an emergency (but graceful) shutdown instead of just - // panicking. - panic!("Aborting due to an unexpected pruning error."); + match should_prune(&tangle, &storage, ledger_index, milestones_to_keep, &pruning_config) { + Ok(pruning_task) => match pruning_task { + PruningTask::ByIndexRange { + start_index, + target_index, + } => { + if let Err(e) = prune::prune_by_range( + &tangle, + &storage, + &bus, + start_index, + target_index, + &pruning_config, + ) + .await + { + error!("Pruning milestone range failed: {e}."); + } } - } + PruningTask::ByDbSize { num_bytes_to_prune } => { + if let Err(e) = prune::prune_by_size( + &tangle, + &storage, + &bus, + ledger_index, + num_bytes_to_prune, + &pruning_config, + ) + .await + { + error!("Pruning by storage size failed: {e}."); + } + } + }, Err(reason) => { - debug!("Pruning skipped: {:?}", reason); + debug!("Pruning skipped: {reason}"); } } } diff --git a/bee-ledger/src/workers/error.rs b/bee-ledger/src/workers/error.rs index 3df86cbca4..e28e721083 100644 --- a/bee-ledger/src/workers/error.rs +++ b/bee-ledger/src/workers/error.rs @@ -7,76 +7,57 @@ use bee_message::{address::Address, milestone::MilestoneIndex, Error as MessageE use crate::{ types::{Balance, Error as TypesError, Unspent}, - workers::snapshot::error::Error as SnapshotError, + workers::{pruning::error::PruningError, snapshot::error::Error as SnapshotError}, }; /// Errors occurring during ledger workers operations. #[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] pub enum Error { - /// Snapshot error. #[error("Snapshot error: {0}")] Snapshot(#[from] SnapshotError), - /// Types error. + #[error("Pruning error: {0}")] + Pruning(#[from] PruningError), #[error("Types error: {0}")] Types(#[from] TypesError), - /// Message error. #[error("Message error: {0}")] Message(#[from] MessageError), - /// Missing message in the past cone of the milestone #[error("Message {0} is missing in the past cone of the milestone")] MissingMessage(MessageId), - /// Unsupported input kind. #[error("Unsupported input kind: {0}")] UnsupportedInputKind(u8), - /// Unsupported output kind. #[error("Unsupported output kind: {0}")] UnsupportedOutputKind(u8), - /// Unsupported payload kind. #[error("Unsupported payload kind: {0}")] UnsupportedPayloadKind(u32), - /// Milestone message not found. #[error("Milestone message not found: {0}")] MilestoneMessageNotFound(MessageId), - /// Message payload is not a milestone #[error("Message payload is not a milestone")] NoMilestonePayload, - /// Non contiguous milestones. #[error("Non contiguous milestones: tried to confirm {0} on top of {1}")] NonContiguousMilestones(u32, u32), - /// Merkle proof mismatch. #[error("Merkle proof mismatch on milestone {0}: computed {1} != provided {2}")] MerkleProofMismatch(MilestoneIndex, String, String), - /// Invalid messages count. #[error("Invalid messages count: referenced ({0}) != no transaction ({1}) + conflicting ({2}) + included ({3})")] InvalidMessagesCount(usize, usize, usize, usize), - /// Invalid ledger unspent state. #[error("Invalid ledger unspent state: {0}")] InvalidLedgerUnspentState(u64), - /// Invalid ledger balance state. #[error("Invalid ledger balance state: {0}")] InvalidLedgerBalanceState(u64), - /// Invalid ledger dust state. #[error("Invalid ledger dust state: {0:?} {1:?}")] InvalidLedgerDustState(Address, Balance), - /// Consumed amount overflow. #[error("Consumed amount overflow: {0}.")] ConsumedAmountOverflow(u128), - /// Created amount overflow. #[error("Created amount overflow: {0}.")] CreatedAmountOverflow(u128), - /// Ledger state overflow. #[error("Ledger state overflow: {0}")] LedgerStateOverflow(u128), - /// Non zero balance diff sum. #[error("Non zero balance diff sum: {0}.")] NonZeroBalanceDiffSum(i64), - /// Decreasing receipt migrated at index. #[error("Decreasing receipt migrated at index: {0} < {1}")] DecreasingReceiptMigratedAtIndex(MilestoneIndex, MilestoneIndex), - /// Missing unspent output. #[error("Missing unspent output {0}")] MissingUnspentOutput(Unspent), - /// Storage backend error. #[error("Storage backend error: {0}")] Storage(Box), } diff --git a/bee-ledger/src/workers/pruning/batch.rs b/bee-ledger/src/workers/pruning/batch.rs index b95262713c..ab09d95d1b 100644 --- a/bee-ledger/src/workers/pruning/batch.rs +++ b/bee-ledger/src/workers/pruning/batch.rs @@ -3,6 +3,7 @@ use std::collections::VecDeque; +use bee_common::packable::Packable; use bee_message::{ milestone::{Milestone, MilestoneIndex}, output::OutputId, @@ -24,29 +25,31 @@ use crate::{ types::{ConsumedOutput, CreatedOutput, OutputDiff, Receipt}, workers::{ pruning::{ - error::Error, + error::PruningError, metrics::{ConfirmedDataPruningMetrics, MilestoneDataPruningMetrics, UnconfirmedDataPruningMetrics}, }, storage::StorageBackend, }, }; -pub type Messages = HashSet; -pub type ApproverCache = HashMap; -pub type Seps = HashMap; +pub(crate) type Messages = HashSet; +pub(crate) type ApproverCache = HashMap; +pub(crate) type Seps = HashMap; +pub(crate) type ByteLength = usize; #[derive(Eq, PartialEq, Hash)] -pub struct Edge { - pub from_parent: MessageId, - pub to_child: MessageId, +pub(crate) struct Edge { + pub(crate) from_parent: MessageId, + pub(crate) to_child: MessageId, } -pub fn batch_prunable_confirmed_data( +pub(crate) fn batch_prunable_confirmed_data( storage: &S, batch: &mut S::Batch, prune_index: MilestoneIndex, current_seps: &Seps, -) -> Result<(Seps, ConfirmedDataPruningMetrics), Error> { + by_size: bool, +) -> Result<(Seps, ConfirmedDataPruningMetrics, ByteLength), PruningError> { // We keep a list of already visited messages. let mut visited = Messages::with_capacity(512); // We keep a cache of approvers to prevent fetch the same data from the storage more than once. @@ -55,12 +58,18 @@ pub fn batch_prunable_confirmed_data( let mut new_seps = Seps::with_capacity(512); // We collect stats during the traversal, and return them as a result of this function. let mut metrics = ConfirmedDataPruningMetrics::default(); + // We count the number of bytes pruned from the storage. + let mut byte_length = 0usize; // Get the `MessageId` of the milestone we are about to prune from the storage. - let prune_id = *Fetch::::fetch(storage, &prune_index) - .map_err(|e| Error::Storage(Box::new(e)))? - .ok_or(Error::MissingMilestone(prune_index))? - .message_id(); + let milestone_to_prune = Fetch::::fetch(storage, &prune_index) + .map_err(PruningError::storage)? + .ok_or(PruningError::MissingMilestone(prune_index))?; + + byte_length += prune_index.packed_len(); + byte_length += milestone_to_prune.packed_len(); + + let prune_id = *milestone_to_prune.message_id(); // Breadth-first traversal will increase our chances of sorting out redundant messages without querying the storage. let mut to_visit: VecDeque<_> = vec![prune_id].into_iter().collect(); @@ -80,8 +89,8 @@ pub fn batch_prunable_confirmed_data( // Get the `Message` for `message_id`. let msg = match Fetch::::fetch(storage, &message_id) - .map_err(|e| Error::Storage(Box::new(e)))? - .ok_or(Error::MissingMessage(message_id)) + .map_err(PruningError::storage)? + .ok_or(PruningError::MissingMessage(message_id)) { Ok(msg) => msg, Err(e) => { @@ -105,14 +114,16 @@ pub fn batch_prunable_confirmed_data( if let Some(indexation) = unwrap_indexation(payload) { let padded_index = indexation.padded_index(); - prune_indexation_data(storage, batch, &(padded_index, message_id))?; + byte_length += prune_indexation_data(storage, batch, &(padded_index, message_id), by_size)?; + metrics.prunable_indexations += 1; } // Delete its edges. let parents = msg.parents(); for parent_id in parents.iter() { - prune_edge(storage, batch, &(*parent_id, message_id))?; + byte_length += prune_edge(storage, batch, &(*parent_id, message_id), by_size)?; + metrics.prunable_edges += 1; } @@ -123,7 +134,7 @@ pub fn batch_prunable_confirmed_data( visited.insert(message_id); // Delete its associated data. - prune_message_and_metadata(storage, batch, &message_id)?; + byte_length += prune_message_and_metadata(storage, batch, &message_id, by_size)?; // --- // Everything that follows is required to decide whether this message's id should be kept as a solid entry @@ -134,8 +145,8 @@ pub fn batch_prunable_confirmed_data( // Fetch its approvers from the storage. let approvers = Fetch::>::fetch(storage, &message_id) - .map_err(|e| Error::Storage(Box::new(e)))? - .ok_or(Error::MissingApprovers(message_id))?; + .map_err(PruningError::storage)? + .ok_or(PruningError::MissingApprovers(message_id))?; // We can safely skip messages whose approvers are all part of the currently pruned cone. If we are lucky // (chances are better with the chosen breadth-first traversal) we've already seen all of its approvers. @@ -163,8 +174,8 @@ pub fn batch_prunable_confirmed_data( metrics.approver_cache_miss += 1; let unvisited_md = Fetch::::fetch(storage, &unvisited_id) - .map_err(|e| Error::Storage(Box::new(e)))? - .ok_or(Error::MissingMetadata(unvisited_id))?; + .map_err(PruningError::storage)? + .ok_or(PruningError::MissingMetadata(unvisited_id))?; // A non-existing milestone index means that a message remained unconfirmed and therefore is neglibable // for its parent in terms of SEP consideration. This can be expressed by assigning the @@ -197,18 +208,20 @@ pub fn batch_prunable_confirmed_data( metrics.prunable_messages = visited.len(); metrics.new_seps = new_seps.len(); - Ok((new_seps, metrics)) + Ok((new_seps, metrics, byte_length)) } -pub fn batch_prunable_unconfirmed_data( +pub(crate) fn batch_prunable_unconfirmed_data( storage: &S, batch: &mut S::Batch, prune_index: MilestoneIndex, -) -> Result { + by_size: bool, +) -> Result<(ByteLength, UnconfirmedDataPruningMetrics), PruningError> { + let mut byte_length = 0usize; let mut metrics = UnconfirmedDataPruningMetrics::default(); let unconf_msgs = match Fetch::>::fetch(storage, &prune_index) - .map_err(|e| Error::Storage(Box::new(e)))? + .map_err(PruningError::storage)? { Some(unconf_msgs) => { if unconf_msgs.is_empty() { @@ -226,9 +239,7 @@ pub fn batch_prunable_unconfirmed_data( // TODO: consider using `MultiFetch` 'next_unconf_msg: for unconf_msg_id in unconf_msgs.iter().map(|unconf_msg| unconf_msg.message_id()) { - match Fetch::::fetch(storage, unconf_msg_id) - .map_err(|e| Error::Storage(Box::new(e)))? - { + match Fetch::::fetch(storage, unconf_msg_id).map_err(PruningError::storage)? { Some(msg_meta) => { if msg_meta.flags().is_referenced() { metrics.were_confirmed += 1; @@ -244,13 +255,13 @@ pub fn batch_prunable_unconfirmed_data( } // Delete those messages that remained unconfirmed. - match Fetch::::fetch(storage, unconf_msg_id).map_err(|e| Error::Storage(Box::new(e)))? { + match Fetch::::fetch(storage, unconf_msg_id).map_err(PruningError::storage)? { Some(msg) => { let payload = msg.payload().as_ref(); let parents = msg.parents(); // Add message data to the delete batch. - prune_message_and_metadata(storage, batch, unconf_msg_id)?; + byte_length += prune_message_and_metadata(storage, batch, unconf_msg_id, by_size)?; log::trace!("Pruned unconfirmed msg {} at {}.", unconf_msg_id, prune_index); @@ -259,14 +270,14 @@ pub fn batch_prunable_unconfirmed_data( let message_id = *unconf_msg_id; // Add prunable indexations to the delete batch. - prune_indexation_data(storage, batch, &(padded_index, message_id))?; + byte_length += prune_indexation_data(storage, batch, &(padded_index, message_id), by_size)?; metrics.prunable_indexations += 1; } // Add prunable edges to the delete batch. for parent in parents.iter() { - prune_edge(storage, batch, &(*parent, *unconf_msg_id))?; + byte_length += prune_edge(storage, batch, &(*parent, *unconf_msg_id), by_size)?; metrics.prunable_edges += 1; } @@ -277,115 +288,218 @@ pub fn batch_prunable_unconfirmed_data( } } - Batch::<(MilestoneIndex, UnreferencedMessage), ()>::batch_delete( - storage, - batch, - &(prune_index, (*unconf_msg_id).into()), - ) - .map_err(|e| Error::Storage(Box::new(e)))?; + byte_length += prune_unreferenced_message(storage, batch, prune_index, (*unconf_msg_id).into(), by_size)?; metrics.prunable_messages += 1; } - Ok(metrics) + Ok((byte_length, metrics)) } -pub fn prune_milestone_data( +pub(crate) fn prune_milestone_data( storage: &S, batch: &mut S::Batch, prune_index: MilestoneIndex, should_prune_receipts: bool, -) -> Result { + by_size: bool, +) -> Result<(ByteLength, MilestoneDataPruningMetrics), PruningError> { + let mut byte_length = 0usize; let mut metrics = MilestoneDataPruningMetrics::default(); - prune_milestone(storage, batch, prune_index)?; - - prune_output_diff(storage, batch, prune_index)?; + byte_length += prune_milestone(storage, batch, prune_index, by_size)?; + byte_length += prune_output_diff(storage, batch, prune_index, by_size)?; if should_prune_receipts { - metrics.receipts = prune_receipts(storage, batch, prune_index)?; + let (num_receipts, num_bytes) = prune_receipts(storage, batch, prune_index, by_size)?; + + metrics.receipts = num_receipts; + byte_length += num_bytes; } - Ok(metrics) + Ok((byte_length, metrics)) } fn prune_message_and_metadata( storage: &S, batch: &mut S::Batch, message_id: &MessageId, -) -> Result<(), Error> { - Batch::::batch_delete(storage, batch, message_id).map_err(|e| Error::Storage(Box::new(e)))?; - Batch::::batch_delete(storage, batch, message_id) - .map_err(|e| Error::Storage(Box::new(e)))?; + by_size: bool, +) -> Result { + let mut byte_length = 0usize; + + if by_size { + // Panic: we know the message is in the database, because the caller already made sure of that. + let msg = Fetch::::fetch(storage, message_id) + .map_err(PruningError::storage)? + .unwrap(); + byte_length += msg.packed_len(); + + // Panic: we know the message metadata is in the database for the same reason as above. + let md = Fetch::::fetch(storage, message_id) + .map_err(PruningError::storage)? + .unwrap(); + byte_length += md.packed_len(); + } + + Batch::::batch_delete(storage, batch, message_id).map_err(PruningError::storage)?; - Ok(()) + Batch::::batch_delete(storage, batch, message_id).map_err(PruningError::storage)?; + + Ok(byte_length) } fn prune_edge( storage: &S, batch: &mut S::Batch, edge: &(MessageId, MessageId), -) -> Result<(), Error> { - Batch::<(MessageId, MessageId), ()>::batch_delete(storage, batch, edge).map_err(|e| Error::Storage(Box::new(e)))?; + by_size: bool, +) -> Result { + let mut byte_length = 0usize; + + if by_size { + byte_length += edge.packed_len(); + } + + Batch::<(MessageId, MessageId), ()>::batch_delete(storage, batch, edge).map_err(PruningError::storage)?; - Ok(()) + Ok(byte_length) } fn prune_indexation_data( storage: &S, batch: &mut S::Batch, index_message_id: &(PaddedIndex, MessageId), -) -> Result<(), Error> { + by_size: bool, +) -> Result { + let mut byte_length = 0usize; + + if by_size { + byte_length += index_message_id.0.packed_len() + index_message_id.1.packed_len(); + } + Batch::<(PaddedIndex, MessageId), ()>::batch_delete(storage, batch, index_message_id) - .map_err(|e| Error::Storage(Box::new(e)))?; + .map_err(PruningError::storage)?; - Ok(()) + Ok(byte_length) } -fn prune_milestone(storage: &S, batch: &mut S::Batch, index: MilestoneIndex) -> Result<(), Error> { - Batch::::batch_delete(storage, batch, &index) - .map_err(|e| Error::Storage(Box::new(e)))?; +fn prune_unreferenced_message( + storage: &S, + batch: &mut S::Batch, + prune_index: MilestoneIndex, + unreferenced_message: UnreferencedMessage, + by_size: bool, +) -> Result { + let mut byte_length = 0usize; + + if by_size { + byte_length += prune_index.packed_len() + unreferenced_message.packed_len(); + } + + Batch::<(MilestoneIndex, UnreferencedMessage), ()>::batch_delete( + storage, + batch, + &(prune_index, unreferenced_message), + ) + .map_err(PruningError::storage)?; - Ok(()) + Ok(byte_length) } -fn prune_output_diff(storage: &S, batch: &mut S::Batch, index: MilestoneIndex) -> Result<(), Error> { +fn prune_milestone( + storage: &S, + batch: &mut S::Batch, + index: MilestoneIndex, + by_size: bool, +) -> Result { + let mut byte_length = 0usize; + + if by_size { + // Panic: we know the milestone is in the database. + let ms = Fetch::::fetch(storage, &index) + .map_err(PruningError::storage)? + .unwrap(); + byte_length += ms.packed_len(); + } + + Batch::::batch_delete(storage, batch, &index).map_err(PruningError::storage)?; + + Ok(byte_length) +} + +fn prune_output_diff( + storage: &S, + batch: &mut S::Batch, + index: MilestoneIndex, + by_size: bool, +) -> Result { + let mut byte_length = 0usize; + if let Some(output_diff) = - Fetch::::fetch(storage, &index).map_err(|e| Error::Storage(Box::new(e)))? + Fetch::::fetch(storage, &index).map_err(PruningError::storage)? { - for consumed_output in output_diff.consumed_outputs() { - Batch::::batch_delete(storage, batch, consumed_output) - .map_err(|e| Error::Storage(Box::new(e)))?; - Batch::::batch_delete(storage, batch, consumed_output) - .map_err(|e| Error::Storage(Box::new(e)))?; - } + byte_length += index.packed_len(); + byte_length += output_diff.packed_len(); + + for consumed_output_id in output_diff.consumed_outputs() { + if by_size { + // Panic: we know the output is in this database table. + let consumed_output = Fetch::::fetch(storage, consumed_output_id) + .map_err(PruningError::storage)? + .unwrap(); + + byte_length += consumed_output_id.packed_len(); + byte_length += consumed_output.packed_len(); + + // Panic: we know the output is in this database table. + let created_output = Fetch::::fetch(storage, consumed_output_id) + .map_err(PruningError::storage)? + .unwrap(); + + byte_length += consumed_output_id.packed_len(); + byte_length += created_output.packed_len(); + } + + Batch::::batch_delete(storage, batch, consumed_output_id) + .map_err(PruningError::storage)?; - if let Some(_treasury_diff) = output_diff.treasury_diff() { - // TODO + Batch::::batch_delete(storage, batch, consumed_output_id) + .map_err(PruningError::storage)?; } } - Batch::::batch_delete(storage, batch, &index) - .map_err(|e| Error::Storage(Box::new(e)))?; + Batch::::batch_delete(storage, batch, &index).map_err(PruningError::storage)?; - Ok(()) + Ok(byte_length) } -fn prune_receipts(storage: &S, batch: &mut S::Batch, index: MilestoneIndex) -> Result { +fn prune_receipts( + storage: &S, + batch: &mut S::Batch, + index: MilestoneIndex, + by_size: bool, +) -> Result<(usize, ByteLength), PruningError> { + let mut byte_length = 0usize; + + // Panic: Fine since Fetch of a Vec<_> always returns Some(Vec<_>). let receipts = Fetch::>::fetch(storage, &index) - .map_err(|e| Error::Storage(Box::new(e)))? - // Fine since Fetch of a Vec<_> always returns Some(Vec<_>). + .map_err(PruningError::storage)? .unwrap(); let mut num = 0; for receipt in receipts.into_iter() { + if by_size { + byte_length += index.packed_len(); + byte_length += receipt.packed_len(); + } + Batch::<(MilestoneIndex, Receipt), ()>::batch_delete(storage, batch, &(index, receipt)) - .map_err(|e| Error::Storage(Box::new(e)))?; + .map_err(PruningError::storage)?; num += 1; } - Ok(num) + Ok((num, byte_length)) } fn unwrap_indexation(payload: Option<&Payload>) -> Option<&IndexationPayload> { @@ -408,13 +522,16 @@ fn unwrap_indexation(payload: Option<&Payload>) -> Option<&IndexationPayload> { } } -// TODO: consider using this instead of 'truncate' +// TODO: consider using this instead of 'truncate'. #[allow(dead_code)] -fn prune_seps(storage: &S, batch: &mut S::Batch, seps: &[SolidEntryPoint]) -> Result { +fn prune_seps( + storage: &S, + batch: &mut S::Batch, + seps: &[SolidEntryPoint], +) -> Result { let mut num = 0; for sep in seps { - Batch::::batch_delete(storage, batch, sep) - .map_err(|e| Error::Storage(Box::new(e)))?; + Batch::::batch_delete(storage, batch, sep).map_err(PruningError::storage)?; num += 1; } diff --git a/bee-ledger/src/workers/pruning/condition.rs b/bee-ledger/src/workers/pruning/condition.rs index bae415e9b8..9ede5f9ba3 100644 --- a/bee-ledger/src/workers/pruning/condition.rs +++ b/bee-ledger/src/workers/pruning/condition.rs @@ -1,6 +1,13 @@ // Copyright 2020-2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +// use std::time::Duration; + +use std::{ + sync::atomic::{AtomicU64, Ordering}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + use bee_message::milestone::MilestoneIndex; use bee_tangle::{storage::StorageBackend, Tangle}; @@ -8,42 +15,104 @@ use crate::{types::LedgerIndex, workers::pruning::config::PruningConfig}; const PRUNING_BATCH_SIZE_MAX: u32 = 200; +static PREVIOUS_PRUNING_BY_SIZE: AtomicU64 = AtomicU64::new(0); + /// Reasons for skipping pruning. -#[derive(Debug)] -pub enum PruningSkipReason { - /// Pruning is disabled in the config. +#[derive(Debug, thiserror::Error)] +pub(crate) enum PruningSkipReason { + #[error("disabled")] Disabled, - /// Not enough data yet to be pruned. - BelowThreshold { reached_in: u32 }, + #[error("ledger index < target index threshold")] + BelowTargetIndexThreshold, + #[error("size metric not supported by the storage layer")] + SizeMetricUnsupported, + #[error("size metric currently unavailable")] + SizeMetricUnavailable, + #[error("current size < target size threshold")] + BelowTargetSizeThreshold, + #[error("cooldown")] + Cooldown, +} + +pub(crate) enum PruningTask { + ByIndexRange { + start_index: MilestoneIndex, + target_index: MilestoneIndex, + }, + ByDbSize { + num_bytes_to_prune: usize, + }, } -pub(crate) fn should_prune( - tangle: &Tangle, +pub(crate) fn should_prune( + tangle: &Tangle, + storage: &S, ledger_index: LedgerIndex, - pruning_delay: u32, - config: &PruningConfig, -) -> Result<(MilestoneIndex, MilestoneIndex), PruningSkipReason> { - if config.disabled() { + milestones_to_keep: u32, + pruning_config: &PruningConfig, +) -> Result { + if !pruning_config.milestones().enabled() && !pruning_config.db_size().enabled() { return Err(PruningSkipReason::Disabled); } - let pruning_index = *tangle.get_pruning_index() + 1; - let pruning_threshold = pruning_index + pruning_delay; + let pruning_by_size = if pruning_config.db_size().enabled() { + let prev = Duration::from_secs(PREVIOUS_PRUNING_BY_SIZE.load(Ordering::Relaxed)); + // Panic: should not cause problems on properly set up hosts. + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + + if now < prev + pruning_config.db_size().cooldown_time() { + Err(PruningSkipReason::Cooldown) + } else { + storage + .size() + .map_err(|_| PruningSkipReason::SizeMetricUnsupported) + .and_then(|actual_size| { + let actual_size = actual_size.ok_or(PruningSkipReason::SizeMetricUnavailable)?; + let threshold_size = pruning_config.db_size().target_size(); + + log::debug!("Storage size: actual {actual_size} threshold {threshold_size}"); + + let excess_size = actual_size + .checked_sub(threshold_size) + .ok_or(PruningSkipReason::BelowTargetSizeThreshold)?; + + let num_bytes_to_prune = excess_size + + (pruning_config.db_size().threshold_percentage() as f64 / 100.0 * threshold_size as f64) + as usize; + + log::debug!("Num bytes to prune: {num_bytes_to_prune}"); + + // Store the time we issued a pruning-by-size. + PREVIOUS_PRUNING_BY_SIZE.store(now.as_secs(), Ordering::Relaxed); + + Ok(PruningTask::ByDbSize { num_bytes_to_prune }) + }) + } + } else { + Err(PruningSkipReason::Disabled) + }; + + if pruning_by_size.is_err() && pruning_config.milestones().enabled() { + let pruning_index = *tangle.get_pruning_index() + 1; + let target_index_threshold = pruning_index + milestones_to_keep; + + if *ledger_index < target_index_threshold { + Err(PruningSkipReason::BelowTargetIndexThreshold) + } else { + // Panic: cannot underflow due to ledger_size >= target_index_threshold = pruning_index + + // milestones_to_keep. + let target_pruning_index = *ledger_index - milestones_to_keep; - if *ledger_index < pruning_threshold { - Err(PruningSkipReason::BelowThreshold { - reached_in: pruning_threshold - *ledger_index, - }) + Ok(PruningTask::ByIndexRange { + start_index: pruning_index.into(), + target_index: if target_pruning_index > pruning_index + PRUNING_BATCH_SIZE_MAX { + (pruning_index + PRUNING_BATCH_SIZE_MAX).into() + } else { + target_pruning_index.into() + }, + }) + } } else { - let target_pruning_index = *ledger_index - pruning_delay; - - Ok(( - pruning_index.into(), - if target_pruning_index > pruning_index + PRUNING_BATCH_SIZE_MAX { - (pruning_index + PRUNING_BATCH_SIZE_MAX).into() - } else { - target_pruning_index.into() - }, - )) + pruning_by_size } } diff --git a/bee-ledger/src/workers/pruning/config.rs b/bee-ledger/src/workers/pruning/config.rs index 5aff3a7419..1e184ad2cc 100644 --- a/bee-ledger/src/workers/pruning/config.rs +++ b/bee-ledger/src/workers/pruning/config.rs @@ -3,20 +3,28 @@ //! Module containing pruning configuration. +use std::time::Duration; + +use humanize_rs::{bytes, duration}; use serde::Deserialize; -const DEFAULT_ENABLED: bool = true; -const DEFAULT_DELAY: u32 = 60480; -const DEFAULT_PRUNE_RECEIPTS: bool = false; +const PRUNING_MILESTONES_ENABLED_DEFAULT: bool = true; +const PRUNING_SIZE_ENABLED_DEFAULT: bool = true; +const PRUNING_RECEIPTS_ENABLED_DEFAULT: bool = false; +const MAX_MILESTONES_TO_KEEP_DEFAULT: u32 = 60480; +pub(crate) const MILESTONES_TO_KEEP_MIN: u32 = 50; +const THRESHOLD_PERCENTAGE_DEFAULT: f32 = 10.0; +const COOLDOWN_TIME_DEFAULT: &str = "5m"; +const TARGET_SIZE_DEFAULT: &str = "30Gb"; /// Builder for a [`PruningConfig`]. -#[derive(Default, Deserialize, PartialEq)] +#[derive(Default, Debug, Deserialize, PartialEq)] #[must_use] pub struct PruningConfigBuilder { - enabled: Option, - delay: Option, - #[serde(alias = "pruneReceipts")] - prune_receipts: Option, + milestones: Option, + #[serde(rename = "size")] + db_size: Option, + receipts: Option, } impl PruningConfigBuilder { @@ -25,21 +33,21 @@ impl PruningConfigBuilder { Self::default() } - /// Enables pruning. - pub fn enabled(mut self, enabled: bool) -> Self { - self.enabled.replace(enabled); + /// Sets the [`PruningMilestonesConfigBuilder`]. + pub fn milestones(mut self, builder: PruningMilestonesConfigBuilder) -> Self { + self.milestones.replace(builder); self } - /// Sets the pruning delay. - pub fn delay(mut self, delay: u32) -> Self { - self.delay.replace(delay); + /// Sets the [`PruningDbSizeConfigBuilder`]. + pub fn db_size(mut self, builder: PruningDbSizeConfigBuilder) -> Self { + self.db_size.replace(builder); self } - /// Sets whether receipts should be pruned as well. - pub fn prune_receipts(mut self, prune_receipts: bool) -> Self { - self.prune_receipts.replace(prune_receipts); + /// Sets the [`PruningReceiptsConfigBuilder`]. + pub fn receipts(mut self, builder: PruningReceiptsConfigBuilder) -> Self { + self.receipts.replace(builder); self } @@ -47,19 +55,135 @@ impl PruningConfigBuilder { #[must_use] pub fn finish(self) -> PruningConfig { PruningConfig { - enabled: self.enabled.unwrap_or(DEFAULT_ENABLED), - delay: self.delay.unwrap_or(DEFAULT_DELAY), - prune_receipts: self.prune_receipts.unwrap_or(DEFAULT_PRUNE_RECEIPTS), + milestones: self.milestones.unwrap_or_default().finish(), + db_size: self.db_size.unwrap_or_default().finish(), + receipts: self.receipts.unwrap_or_default().finish(), + } + } +} + +/// Builder for a [`PruningMilestonesConfig`]. +#[derive(Default, Debug, Deserialize, PartialEq)] +#[must_use] +pub struct PruningMilestonesConfigBuilder { + enabled: Option, + #[serde(alias = "maxMilestonesToKeep")] + max_milestones_to_keep: Option, +} + +impl PruningMilestonesConfigBuilder { + /// Sets whether pruning based on milestone indexes is enabled. + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled.replace(enabled); + self + } + + /// Sets how many milestones to hold available in the storage. + pub fn max_milestones_to_keep(mut self, max_milestones_to_keep: u32) -> Self { + let max_milestones_to_keep = max_milestones_to_keep.max(MILESTONES_TO_KEEP_MIN); + self.max_milestones_to_keep.replace(max_milestones_to_keep); + self + } + + /// Finishes this builder into a [`PruningMilestonesConfig`]. + #[must_use] + pub fn finish(self) -> PruningMilestonesConfig { + PruningMilestonesConfig { + enabled: self.enabled.unwrap_or(PRUNING_MILESTONES_ENABLED_DEFAULT), + max_milestones_to_keep: self.max_milestones_to_keep.unwrap_or(MAX_MILESTONES_TO_KEEP_DEFAULT), + } + } +} + +/// Builder for a [`PruningDbSizeConfig`]. +#[derive(Default, Debug, Deserialize, PartialEq)] +#[must_use] +pub struct PruningDbSizeConfigBuilder { + enabled: Option, + #[serde(alias = "targetSize")] + target_size: Option, + #[serde(alias = "thresholdPercentage")] + threshold_percentage: Option, + #[serde(alias = "cooldownTime")] + cooldown_time: Option, +} + +impl PruningDbSizeConfigBuilder { + /// Sets whether pruning based on storage size is enabled. + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled.replace(enabled); + self + } + + /// Sets the target size (i.e. the maximum size) of the database. + pub fn target_size(mut self, target_size: String) -> Self { + self.target_size.replace(target_size); + self + } + + /// Sets the percentage of the target size that is pruned from the database. + pub fn threshold_percentage(mut self, threshold_percentage: f32) -> Self { + self.threshold_percentage.replace(threshold_percentage); + self + } + + /// Sets the cooldown time (i.e. the sleep interval) between two subsequent pruning-by-size events. + pub fn cooldown_time(mut self, cooldown_time: String) -> Self { + self.cooldown_time.replace(cooldown_time); + self + } + + /// Finishes this builder into a [`PruningDbSizeConfig`]. + #[must_use] + pub fn finish(self) -> PruningDbSizeConfig { + let target_size = self.target_size.unwrap_or_else(|| TARGET_SIZE_DEFAULT.to_string()); + let target_size = target_size + .parse::() + .expect("parse human-readable pruning target size") + .size(); + + let cooldown_time = self.cooldown_time.unwrap_or_else(|| COOLDOWN_TIME_DEFAULT.to_string()); + let cooldown_time = + duration::parse(cooldown_time.as_ref()).expect("parse human-readable pruning cooldown time"); + + PruningDbSizeConfig { + enabled: self.enabled.unwrap_or(PRUNING_SIZE_ENABLED_DEFAULT), + target_size, + threshold_percentage: self.threshold_percentage.unwrap_or(THRESHOLD_PERCENTAGE_DEFAULT), + cooldown_time, + } + } +} + +/// Builder for a [`PruningReceiptsConfig`]. +#[derive(Default, Debug, Deserialize, PartialEq)] +#[must_use] +pub struct PruningReceiptsConfigBuilder { + enabled: Option, +} + +impl PruningReceiptsConfigBuilder { + /// Sets whether receipts will be pruned. + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled.replace(enabled); + self + } + + /// Finishes this builder into a [`PruningReceiptsConfig`]. + #[must_use] + pub fn finish(self) -> PruningReceiptsConfig { + PruningReceiptsConfig { + enabled: self.enabled.unwrap_or(PRUNING_RECEIPTS_ENABLED_DEFAULT), } } } /// The pruning configuration. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct PruningConfig { - enabled: bool, - delay: u32, - prune_receipts: bool, + milestones: PruningMilestonesConfig, + db_size: PruningDbSizeConfig, + receipts: PruningReceiptsConfig, } impl PruningConfig { @@ -68,23 +192,182 @@ impl PruningConfig { PruningConfigBuilder::new() } - /// Returns whether pruning is enabled. + /// Returns the `[PruningMilestonesConfig`]. + #[inline(always)] + pub fn milestones(&self) -> &PruningMilestonesConfig { + &self.milestones + } + + /// Returns the `[PruningSizeConfig`]. + #[inline(always)] + pub fn db_size(&self) -> &PruningDbSizeConfig { + &self.db_size + } + + /// Returns the `[PruningReceiptsConfig`]. + #[inline(always)] + pub fn receipts(&self) -> &PruningReceiptsConfig { + &self.receipts + } +} + +/// The config associated with milestone index based pruning. +#[derive(Clone, Debug)] +pub struct PruningMilestonesConfig { + enabled: bool, + max_milestones_to_keep: u32, +} + +impl PruningMilestonesConfig { + /// Returns whether pruning based on milestone indexes is enabled. + pub fn enabled(&self) -> bool { + self.enabled + } + + /// Returns the maximum number of milestones to hold available in the storage. + pub fn max_milestones_to_keep(&self) -> u32 { + self.max_milestones_to_keep + } +} + +/// The config associated with storage size based pruning. +#[derive(Clone, Debug)] +pub struct PruningDbSizeConfig { + enabled: bool, + target_size: usize, + threshold_percentage: f32, + cooldown_time: Duration, +} + +impl PruningDbSizeConfig { + /// Returns whether pruning based on a target storage size is enabled. + pub fn enabled(&self) -> bool { + self.enabled + } + + /// Returns the target size of the database. + pub fn target_size(&self) -> usize { + self.target_size + } + + /// Returns the percentage the database gets reduced if the target size is reached. + pub fn threshold_percentage(&self) -> f32 { + self.threshold_percentage + } + + /// Returns the cooldown time between two pruning-by-database size events. + pub fn cooldown_time(&self) -> Duration { + self.cooldown_time + } +} + +/// The config associated with pruning receipts. +#[cfg_attr(test, derive(Eq, PartialEq))] +#[derive(Clone, Debug)] +pub struct PruningReceiptsConfig { + enabled: bool, +} + +impl PruningReceiptsConfig { + /// Returns whether pruning receipts is enabled. pub fn enabled(&self) -> bool { self.enabled } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug)] + struct NodeConfig { + pruning: PruningConfig, + } + + #[derive(Default, Debug, Deserialize)] + #[must_use] + struct NodeConfigBuilder { + pruning: Option, + } + + impl NodeConfigBuilder { + fn finish(self) -> NodeConfig { + NodeConfig { + pruning: self.pruning.unwrap().finish(), + } + } + } - /// Returns whether pruning is disabled. - pub fn disabled(&self) -> bool { - !self.enabled + fn create_config_from_json_str() -> PruningConfig { + let config_json_str = r#" + { + "pruning": { + "milestones": { + "enabled": false, + "maxMilestonesToKeep": 200 + }, + "size": { + "enabled": false, + "targetSize": "500MB", + "thresholdPercentage": 20.0, + "cooldownTime": "1m" + }, + "receipts": { + "enabled": true + } + } + }"#; + + let node_config = serde_json::from_str::(config_json_str) + .expect("error deserializing json config str") + .finish(); + + node_config.pruning } - /// Returns the pruning delay. - pub fn delay(&self) -> u32 { - self.delay + fn create_config_from_toml_str() -> PruningConfig { + let config_toml_str = r#" + [pruning] + [pruning.milestones] + enabled = false + max_milestones_to_keep = 200 + [pruning.size] + enabled = false + target_size = "500MB" + threshold_percentage = 20.0 + cooldown_time = "1m" + [pruning.receipts] + enabled = true + "#; + + let node_config_builder = + toml::from_str::(config_toml_str).expect("error deserializing toml config str"); + + println!("{:?}", node_config_builder); + let node_config = node_config_builder.finish(); + + node_config.pruning } - /// Returns whether [`Receipt`](crate::types::Receipt)s are pruned. - pub fn prune_receipts(&self) -> bool { - self.prune_receipts + #[test] + fn deserialize_json_and_toml_repr_into_same_config() { + let json_config = create_config_from_json_str(); + let toml_config = create_config_from_toml_str(); + + assert!(!json_config.milestones().enabled()); + assert_eq!(json_config.milestones().max_milestones_to_keep(), 200); + assert_eq!(json_config.db_size().target_size(), 500000000); + assert_eq!(json_config.db_size().threshold_percentage(), 20.0); + assert_eq!(json_config.db_size().cooldown_time(), Duration::from_secs(60)); + assert!(!json_config.db_size().enabled()); + assert!(json_config.receipts().enabled()); + + assert!(!toml_config.milestones().enabled()); + assert_eq!(toml_config.milestones().max_milestones_to_keep(), 200); + assert_eq!(toml_config.db_size().target_size(), 500000000); + assert_eq!(toml_config.db_size().threshold_percentage(), 20.0); + assert_eq!(toml_config.db_size().cooldown_time(), Duration::from_secs(60)); + assert!(!toml_config.db_size().enabled()); + assert!(toml_config.receipts().enabled()); } } diff --git a/bee-ledger/src/workers/pruning/error.rs b/bee-ledger/src/workers/pruning/error.rs index ad6cf01047..f422669366 100644 --- a/bee-ledger/src/workers/pruning/error.rs +++ b/bee-ledger/src/workers/pruning/error.rs @@ -4,7 +4,7 @@ use bee_message::{milestone::MilestoneIndex, MessageId}; #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum PruningError { #[error("pruning target index {selected} below minimum {minimum}")] InvalidTargetIndex { selected: MilestoneIndex, @@ -23,3 +23,9 @@ pub enum Error { #[error("storage operation failed due to: {0:?}")] Storage(Box), } + +impl PruningError { + pub(crate) fn storage(e: impl std::error::Error + Send + 'static) -> Self { + Self::Storage(Box::new(e)) + } +} diff --git a/bee-ledger/src/workers/pruning/metrics.rs b/bee-ledger/src/workers/pruning/metrics.rs index c74b3f7e9d..8c3ea9e957 100644 --- a/bee-ledger/src/workers/pruning/metrics.rs +++ b/bee-ledger/src/workers/pruning/metrics.rs @@ -4,58 +4,59 @@ use std::time::Duration; #[derive(Debug, Default)] -pub struct PruningMetrics { - pub curr_seps: usize, - pub new_seps: usize, - pub kept_seps: usize, - pub next_seps: usize, - pub messages: usize, - pub edges: usize, - pub indexations: usize, - pub output_diffs: bool, - pub receipts: usize, +pub(crate) struct PruningMetrics { + pub(crate) curr_seps: usize, + pub(crate) new_seps: usize, + pub(crate) kept_seps: usize, + pub(crate) next_seps: usize, + pub(crate) messages: usize, + pub(crate) edges: usize, + pub(crate) indexations: usize, + // TODO + // pub(crate) output_diffs: bool, + pub(crate) receipts: usize, } #[derive(Debug, Default)] -pub struct ConfirmedDataPruningMetrics { - pub msg_already_visited: usize, - pub references_sep: usize, - pub approver_cache_miss: usize, - pub approver_cache_hit: usize, - pub all_approvers_visited: usize, - pub not_all_approvers_visited: usize, - pub found_seps: usize, - pub prunable_messages: usize, - pub prunable_edges: usize, - pub prunable_indexations: usize, - pub new_seps: usize, +pub(crate) struct ConfirmedDataPruningMetrics { + pub(crate) msg_already_visited: usize, + pub(crate) references_sep: usize, + pub(crate) approver_cache_miss: usize, + pub(crate) approver_cache_hit: usize, + pub(crate) all_approvers_visited: usize, + pub(crate) not_all_approvers_visited: usize, + pub(crate) found_seps: usize, + pub(crate) prunable_messages: usize, + pub(crate) prunable_edges: usize, + pub(crate) prunable_indexations: usize, + pub(crate) new_seps: usize, } #[derive(Debug, Default)] -pub struct UnconfirmedDataPruningMetrics { - pub none_received: bool, - pub prunable_messages: usize, - pub prunable_edges: usize, - pub prunable_indexations: usize, - pub already_pruned: usize, - pub were_confirmed: usize, +pub(crate) struct UnconfirmedDataPruningMetrics { + pub(crate) none_received: bool, + pub(crate) prunable_messages: usize, + pub(crate) prunable_edges: usize, + pub(crate) prunable_indexations: usize, + pub(crate) already_pruned: usize, + pub(crate) were_confirmed: usize, } #[derive(Debug, Default)] -pub struct MilestoneDataPruningMetrics { - pub receipts: usize, +pub(crate) struct MilestoneDataPruningMetrics { + pub(crate) receipts: usize, } #[derive(Debug, Default)] -pub struct Timings { - pub full_prune: Duration, - pub get_curr_seps: Duration, - pub filter_curr_seps: Duration, - pub replace_seps: Duration, - pub batch_confirmed_data: Duration, - pub batch_unconfirmed_data: Duration, - pub batch_milestone_data: Duration, - pub batch_new_seps: Duration, - pub truncate_curr_seps: Duration, - pub batch_commit: Duration, +pub(crate) struct Timings { + pub(crate) full_prune: Duration, + pub(crate) get_curr_seps: Duration, + pub(crate) filter_curr_seps: Duration, + pub(crate) replace_seps: Duration, + pub(crate) batch_confirmed_data: Duration, + pub(crate) batch_unconfirmed_data: Duration, + pub(crate) batch_milestone_data: Duration, + pub(crate) batch_new_seps: Duration, + pub(crate) truncate_curr_seps: Duration, + pub(crate) batch_commit: Duration, } diff --git a/bee-ledger/src/workers/pruning/mod.rs b/bee-ledger/src/workers/pruning/mod.rs index ce004ff587..cecb48b704 100644 --- a/bee-ledger/src/workers/pruning/mod.rs +++ b/bee-ledger/src/workers/pruning/mod.rs @@ -4,10 +4,10 @@ //! Module that contains the pruning logic. mod batch; -mod error; mod metrics; pub(crate) mod condition; +pub(crate) mod error; pub(crate) mod prune; pub mod config; diff --git a/bee-ledger/src/workers/pruning/prune.rs b/bee-ledger/src/workers/pruning/prune.rs index 9df7da2d46..4b33314f72 100644 --- a/bee-ledger/src/workers/pruning/prune.rs +++ b/bee-ledger/src/workers/pruning/prune.rs @@ -10,17 +10,19 @@ use bee_message::milestone::MilestoneIndex; use bee_runtime::event::Bus; use bee_storage::access::{Batch, Truncate}; use bee_tangle::{solid_entry_point::SolidEntryPoint, Tangle}; -use log::{debug, info}; - -use crate::workers::{ - event::PrunedIndex, - pruning::{ - batch, - config::PruningConfig, - error::Error, - metrics::{PruningMetrics, Timings}, + +use crate::{ + types::LedgerIndex, + workers::{ + event::PrunedIndex, + pruning::{ + batch, + config::{PruningConfig, MILESTONES_TO_KEEP_MIN}, + error::PruningError, + metrics::{PruningMetrics, Timings}, + }, + storage::{self, StorageBackend}, }, - storage::{self, StorageBackend}, }; const KEEP_INITIAL_SNAPSHOT_SEPS: usize = 50; @@ -29,172 +31,230 @@ static NUM_PRUNINGS: AtomicUsize = AtomicUsize::new(0); /// Performs pruning of data from `start_index` to `target_index`. #[cfg_attr(feature = "trace", trace_tools::observe)] -pub async fn prune( +pub async fn prune_by_range( tangle: &Tangle, storage: &S, bus: &Bus<'_>, start_index: MilestoneIndex, target_index: MilestoneIndex, config: &PruningConfig, -) -> Result<(), Error> { +) -> Result<(), PruningError> { let mut timings = Timings::default(); let mut metrics = PruningMetrics::default(); if target_index < start_index { - return Err(Error::InvalidTargetIndex { + return Err(PruningError::InvalidTargetIndex { selected: target_index, minimum: start_index, }); } if start_index != target_index { - info!( + log::info!( "Pruning from milestone {} to milestone {}...", - start_index, target_index + start_index, + target_index ); } for index in *start_index..=*target_index { - let index = MilestoneIndex(index); - - debug!("Pruning milestone {}...", index); - - // - - // Measurement of the full pruning step. - let full_prune = Instant::now(); - - // Get the current set of SEPs. - let get_curr_seps = Instant::now(); - let mut curr_seps = tangle.get_solid_entry_points().await; - timings.get_curr_seps = get_curr_seps.elapsed(); - - metrics.curr_seps = curr_seps.len(); - - // Start a batch to make changes to the storage in a single atomic step. - let mut batch = S::batch_begin(); - - // Add confirmed data to the delete batch. - // NOTE: This is the most costly thing during pruning, because it has to perform a past-cone traversal. - let batch_confirmed_data = Instant::now(); - let (mut new_seps, confirmed_data_metrics) = - batch::batch_prunable_confirmed_data(storage, &mut batch, index, &curr_seps)?; - timings.batch_confirmed_data = batch_confirmed_data.elapsed(); - - metrics.new_seps = new_seps.len(); - metrics.messages = confirmed_data_metrics.prunable_messages; - metrics.edges = confirmed_data_metrics.prunable_edges; - metrics.indexations = confirmed_data_metrics.prunable_indexations; - - // Keep still relevant SEPs. - // - // Note: - // Currently Bee is reliant on the snapshot file generated by Hornet, which stores the confirmation index - // of an SEP along with it. It then keeps it long enough to be (pretty) sure the coordinator would reject a - // message directly referencing it. In Bee, however, we wanted to try a different approach, which doesn't - // trust the Coordinator's tip selection, and stores the highest confirmation index of any of its direct - // approvers instead. - // - // For the first X milestones we keep the initial SEP set (from the snapshot file) around, after that, we keep - // only the necessary SEPs (the ones that will be referenced in future prunings). - let filter_curr_seps = Instant::now(); - if NUM_PRUNINGS.fetch_add(1, Ordering::Relaxed) >= KEEP_INITIAL_SNAPSHOT_SEPS { - curr_seps.retain(|_, v| **v > *index); - } - timings.filter_curr_seps = filter_curr_seps.elapsed(); + prune_milestone(index, tangle, storage, bus, &mut timings, &mut metrics, config, false).await?; + } - metrics.kept_seps = curr_seps.len(); + if start_index == target_index { + log::info!("Pruned milestone {}.", start_index); + } else { + log::info!("Pruned from milestone {} to milestone {}.", start_index, target_index); + } - // Create the union of both sets: - new_seps.extend(curr_seps); + Ok(()) +} - let num_next_seps = new_seps.len(); +/// Performs pruning of data until a certain size is reached. +#[cfg_attr(feature = "trace", trace_tools::observe)] +pub async fn prune_by_size( + tangle: &Tangle, + storage: &S, + bus: &Bus<'_>, + ledger_index: LedgerIndex, + num_bytes_to_prune: usize, + config: &PruningConfig, +) -> Result<(), PruningError> { + let mut timings = Timings::default(); + let mut metrics = PruningMetrics::default(); + let mut num_pruned_bytes = 0; - metrics.next_seps = num_next_seps; + while num_pruned_bytes < num_bytes_to_prune { + let index = *tangle.get_pruning_index() + 1; - // Write the new set of SEPs to the storage. - let batch_new_seps = Instant::now(); - for (new_sep, index) in &new_seps { - Batch::::batch_insert(storage, &mut batch, new_sep, index) - .map_err(|e| Error::Storage(Box::new(e)))?; + if *ledger_index < index + MILESTONES_TO_KEEP_MIN { + log::debug!("Minimum number of milestones to keep reached."); + break; } - timings.batch_new_seps = batch_new_seps.elapsed(); - - // Replace the old set of SEPs with the new one. - let replace_seps = Instant::now(); - tangle.replace_solid_entry_points(new_seps).await; - timings.replace_seps = replace_seps.elapsed(); - - // Update entry point index - tangle.update_entry_point_index(index); - - let batch_milestones = Instant::now(); - let milestone_data_metrics = batch::prune_milestone_data(storage, &mut batch, index, config.prune_receipts())?; - timings.batch_milestone_data = batch_milestones.elapsed(); - - metrics.receipts = milestone_data_metrics.receipts; - - // Add unconfirmed data to the delete batch. - let batch_unconfirmed_data = Instant::now(); - let unconfirmed_data_metrics = batch::batch_prunable_unconfirmed_data(storage, &mut batch, index)?; - timings.batch_unconfirmed_data = batch_unconfirmed_data.elapsed(); - - metrics.messages += unconfirmed_data_metrics.prunable_messages; - metrics.edges += unconfirmed_data_metrics.prunable_edges; - metrics.indexations += unconfirmed_data_metrics.prunable_indexations; - - // Remove old SEPs from the storage. - // - // **WARNING**: This operation must come before the batch is committed! - // - // TODO: consider batching deletes rather than using Truncate. Is one faster than the other? Do we care if its - // atomic or not? - let truncate_old_seps = Instant::now(); - Truncate::::truncate(storage).expect("truncating solid entry points failed"); - timings.truncate_curr_seps = truncate_old_seps.elapsed(); - - // Execute the batch operation. - let batch_commit = Instant::now(); - storage - .batch_commit(batch, true) - .map_err(|e| Error::Storage(Box::new(e)))?; - timings.batch_commit = batch_commit.elapsed(); - - // Update the pruning index. - tangle.update_pruning_index(index); - - // Write the updated snapshot info to the storage. - let timestamp = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("error creating timestamp") - .as_secs(); - let mut snapshot_info = storage::fetch_snapshot_info(storage) - .map_err(|e| Error::Storage(Box::new(e)))? - .ok_or(Error::MissingSnapshotInfo)?; - snapshot_info.update_pruning_index(index); - snapshot_info.update_timestamp(timestamp); - storage::insert_snapshot_info(storage, &snapshot_info).map_err(|e| Error::Storage(Box::new(e)))?; - - timings.full_prune = full_prune.elapsed(); - - debug!("{:?}.", metrics); - debug!("{:?}", confirmed_data_metrics); - debug!("{:?}", unconfirmed_data_metrics); - debug!("{:?}.", timings); - debug!( - "Entry point index now at {} with {} solid entry points..", - index, num_next_seps - ); - debug!("Pruned milestone {}.", index); - bus.dispatch(PrunedIndex { index }); + num_pruned_bytes += + prune_milestone(index, tangle, storage, bus, &mut timings, &mut metrics, config, true).await?; + + log::debug!("Pruned {num_pruned_bytes}/{num_bytes_to_prune} bytes."); } - if start_index == target_index { - info!("Pruned milestone {}.", start_index); + Ok(()) +} + +/// Prunes a single milestone. +#[allow(clippy::too_many_arguments)] +async fn prune_milestone( + index: u32, + tangle: &Tangle, + storage: &S, + bus: &Bus<'_>, + timings: &mut Timings, + metrics: &mut PruningMetrics, + config: &PruningConfig, + by_size: bool, +) -> Result { + let mut byte_length = 0usize; + let index = MilestoneIndex(index); + + log::debug!("Pruning milestone {}...", index); + + // Measurement of the full pruning step. + let full_prune = Instant::now(); + + // Get the current set of SEPs. + let get_curr_seps = Instant::now(); + let mut curr_seps = tangle.get_solid_entry_points().await; + timings.get_curr_seps = get_curr_seps.elapsed(); + + metrics.curr_seps = curr_seps.len(); + + // Start a batch to make changes to the storage in a single atomic step. + let mut batch = S::batch_begin(); + + // Add confirmed data to the delete batch. + // NOTE: This is the most costly thing during pruning, because it has to perform a past-cone traversal. + let batch_confirmed_data = Instant::now(); + let (mut new_seps, confirmed_data_metrics, num_bytes) = + batch::batch_prunable_confirmed_data(storage, &mut batch, index, &curr_seps, by_size)?; + timings.batch_confirmed_data = batch_confirmed_data.elapsed(); + + byte_length += num_bytes; + + metrics.new_seps = new_seps.len(); + metrics.messages = confirmed_data_metrics.prunable_messages; + metrics.edges = confirmed_data_metrics.prunable_edges; + metrics.indexations = confirmed_data_metrics.prunable_indexations; + + // Keep still relevant SEPs. + // + // Note: + // Currently Bee is reliant on the snapshot file generated by Hornet, which stores the confirmation index + // of an SEP along with it. It then keeps it long enough to be (pretty) sure the coordinator would reject a + // message directly referencing it. In Bee, however, we wanted to try a different approach, which doesn't + // trust the Coordinator's tip selection, and stores the highest confirmation index of any of its direct + // approvers instead. + // + // For the first X milestones we keep the initial SEP set (from the snapshot file) around, after that, we keep + // only the necessary SEPs (the ones that will be referenced in future prunings). + let filter_curr_seps = Instant::now(); + if NUM_PRUNINGS.fetch_add(1, Ordering::Relaxed) >= KEEP_INITIAL_SNAPSHOT_SEPS { + curr_seps.retain(|_, v| **v > *index); + } + timings.filter_curr_seps = filter_curr_seps.elapsed(); + + metrics.kept_seps = curr_seps.len(); + + // Create the union of both sets: + new_seps.extend(curr_seps); + + let num_next_seps = new_seps.len(); + + metrics.next_seps = num_next_seps; + + // Write the new set of SEPs to the storage. + let batch_new_seps = Instant::now(); + for (new_sep, index) in &new_seps { + Batch::::batch_insert(storage, &mut batch, new_sep, index) + .map_err(PruningError::storage)?; + } + timings.batch_new_seps = batch_new_seps.elapsed(); + + // Replace the old set of SEPs with the new one. + let replace_seps = Instant::now(); + tangle.replace_solid_entry_points(new_seps).await; + timings.replace_seps = replace_seps.elapsed(); + + // Update entry point index + tangle.update_entry_point_index(index); + + let batch_milestones = Instant::now(); + let (num_bytes, milestone_data_metrics) = + batch::prune_milestone_data(storage, &mut batch, index, config.receipts().enabled(), by_size)?; + timings.batch_milestone_data = batch_milestones.elapsed(); + + byte_length += num_bytes; + metrics.receipts = milestone_data_metrics.receipts; + + // Add unconfirmed data to the delete batch. + let batch_unconfirmed_data = Instant::now(); + let (num_bytes, unconfirmed_data_metrics) = + batch::batch_prunable_unconfirmed_data(storage, &mut batch, index, by_size)?; + timings.batch_unconfirmed_data = batch_unconfirmed_data.elapsed(); + + byte_length += num_bytes; + + metrics.messages += unconfirmed_data_metrics.prunable_messages; + metrics.edges += unconfirmed_data_metrics.prunable_edges; + metrics.indexations += unconfirmed_data_metrics.prunable_indexations; + + // Remove old SEPs from the storage. + // + // **WARNING**: This operation must come before the batch is committed! + // + // TODO: consider batching deletes rather than using Truncate. Is one faster than the other? Do we care if its + // atomic or not? + let truncate_old_seps = Instant::now(); + Truncate::::truncate(storage).expect("truncating solid entry points failed"); + timings.truncate_curr_seps = truncate_old_seps.elapsed(); + + // Execute the batch operation. + let batch_commit = Instant::now(); + storage.batch_commit(batch, false).map_err(PruningError::storage)?; + timings.batch_commit = batch_commit.elapsed(); + + // Update the pruning index. + tangle.update_pruning_index(index); + + // Write the updated snapshot info to the storage. + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("error creating timestamp") + .as_secs(); + let mut snapshot_info = storage::fetch_snapshot_info(storage) + .map_err(PruningError::storage)? + .ok_or(PruningError::MissingSnapshotInfo)?; + snapshot_info.update_pruning_index(index); + snapshot_info.update_timestamp(timestamp); + storage::insert_snapshot_info(storage, &snapshot_info).map_err(PruningError::storage)?; + + timings.full_prune = full_prune.elapsed(); + + log::debug!("{:?}.", metrics); + log::debug!("{:?}", confirmed_data_metrics); + log::debug!("{:?}", unconfirmed_data_metrics); + log::debug!("{:?}.", timings); + log::debug!( + "Entry point index now at {} with {} solid entry points..", + index, + num_next_seps + ); + if by_size { + log::debug!("Pruned milestone {}: {} bytes", index, byte_length); } else { - info!("Pruned from milestone {} to milestone {}.", start_index, target_index); + log::debug!("Pruned milestone {}", index); } - Ok(()) + bus.dispatch(PrunedIndex { index }); + + Ok(byte_length) } diff --git a/bee-ledger/src/workers/snapshot/condition.rs b/bee-ledger/src/workers/snapshot/condition.rs index ac55e643f6..1cc2186fb9 100644 --- a/bee-ledger/src/workers/snapshot/condition.rs +++ b/bee-ledger/src/workers/snapshot/condition.rs @@ -6,12 +6,14 @@ use bee_tangle::{storage::StorageBackend, Tangle}; use crate::{types::LedgerIndex, workers::snapshot::config::SnapshotConfig}; /// Reasons for skipping snapshotting. -#[derive(Debug)] -pub enum SnapshottingSkipReason { - /// Not enough data yet to create a snapshot. - BelowThreshold { reached_in: u32 }, - /// Snapshotting is deferred to a later milestone. - Deferred { next_in: u32 }, +#[derive(Debug, thiserror::Error)] +pub(crate) enum SnapshottingSkipReason { + #[error("disabled")] + Disabled, + #[error("ledger index < snapshotting depth")] + BelowDepth, + #[error("ledger index < next snapshot index {next_snapshot_index}")] + BelowNextSnapshotIndex { next_snapshot_index: u32 }, } pub(crate) fn should_snapshot( @@ -20,21 +22,24 @@ pub(crate) fn should_snapshot( snapshot_depth: u32, snapshot_config: &SnapshotConfig, ) -> Result<(), SnapshottingSkipReason> { + if !snapshot_config.snapshotting().enabled() { + return Err(SnapshottingSkipReason::Disabled); + } + + // Get the index of the last snapshot. let snapshot_index = *tangle.get_snapshot_index(); let snapshot_interval = if tangle.is_synced() { - snapshot_config.interval_synced() + snapshot_config.snapshotting().interval_synced() } else { - snapshot_config.interval_unsynced() + snapshot_config.snapshotting().interval_unsynced() }; - if *ledger_index < snapshot_depth { - Err(SnapshottingSkipReason::BelowThreshold { - reached_in: snapshot_depth - *ledger_index, - }) + if *ledger_index < snapshot_index + snapshot_depth { + Err(SnapshottingSkipReason::BelowDepth) } else if *ledger_index < snapshot_index + snapshot_interval { - Err(SnapshottingSkipReason::Deferred { - next_in: (snapshot_index + snapshot_interval) - *ledger_index, + Err(SnapshottingSkipReason::BelowNextSnapshotIndex { + next_snapshot_index: snapshot_index + snapshot_interval, }) } else { Ok(()) diff --git a/bee-ledger/src/workers/snapshot/config.rs b/bee-ledger/src/workers/snapshot/config.rs index 0927ed4051..815416a755 100644 --- a/bee-ledger/src/workers/snapshot/config.rs +++ b/bee-ledger/src/workers/snapshot/config.rs @@ -8,14 +8,15 @@ use std::path::{Path, PathBuf}; use serde::Deserialize; use url::Url; -const DEFAULT_FULL_PATH: &str = "./snapshots/mainnet/latest-full_snapshot.bin"; -const DEFAULT_DOWNLOAD_URLS: Vec = Vec::new(); +const DEFAULT_ENABLED: bool = false; const DEFAULT_DEPTH: u32 = 50; const DEFAULT_INTERVAL_SYNCED: u32 = 50; const DEFAULT_INTERVAL_UNSYNCED: u32 = 1000; +const DEFAULT_DOWNLOAD_URLS: Vec = Vec::new(); +const DEFAULT_FULL_PATH: &str = "./snapshots/mainnet/latest-full_snapshot.bin"; /// Contains URLs to download the full and delta snapshot files. -#[derive(Clone, Deserialize, PartialEq)] +#[derive(Clone, Debug, Deserialize, PartialEq)] pub struct DownloadUrls { full: Url, delta: Url, @@ -33,8 +34,57 @@ impl DownloadUrls { } } -/// Builder for a `SnapshotConfig`. -#[derive(Default, Deserialize, PartialEq)] +/// Builder for a [`SnapshottingConfig`] which is part of the [`SnapshotConfig`]. +#[derive(Default, Debug, Deserialize, PartialEq)] +#[must_use] +pub struct SnapshottingConfigBuilder { + enabled: Option, + depth: Option, + #[serde(alias = "intervalSynced")] + interval_synced: Option, + #[serde(alias = "intervalUnsynced")] + interval_unsynced: Option, +} + +impl SnapshottingConfigBuilder { + /// Sets whether snapshotting is enabled. + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled.replace(enabled); + self + } + + /// Sets the snapshotting depth. + pub fn depth(mut self, depth: u32) -> Self { + self.depth.replace(depth); + self + } + + /// Sets the snapshotting interval for a synced node. + pub fn interval_synced(mut self, interval_synced: u32) -> Self { + self.interval_synced.replace(interval_synced); + self + } + + /// Sets the snapshotting interval for an unsynced node. + pub fn interval_unsynced(mut self, interval_unsynced: u32) -> Self { + self.interval_unsynced.replace(interval_unsynced); + self + } + + /// Produces a [`SnapshottingConfig`] from this builder. + #[must_use] + pub fn finish(self) -> SnapshottingConfig { + SnapshottingConfig { + enabled: self.enabled.unwrap_or(DEFAULT_ENABLED), + depth: self.depth.unwrap_or(DEFAULT_DEPTH), + interval_synced: self.interval_synced.unwrap_or(DEFAULT_INTERVAL_SYNCED), + interval_unsynced: self.interval_unsynced.unwrap_or(DEFAULT_INTERVAL_UNSYNCED), + } + } +} + +/// Builder for a [`SnapshotConfig`] that can also be deserialized from some source. +#[derive(Default, Debug, Deserialize, PartialEq)] #[must_use] pub struct SnapshotConfigBuilder { #[serde(alias = "fullPath")] @@ -43,56 +93,41 @@ pub struct SnapshotConfigBuilder { delta_path: Option, #[serde(alias = "downloadUrls")] download_urls: Option>, - depth: Option, - #[serde(alias = "intervalSynced")] - interval_synced: Option, - #[serde(alias = "intervalUnsynced")] - interval_unsynced: Option, + #[serde(alias = "create")] + snapshotting: Option, } impl SnapshotConfigBuilder { - /// Creates a new `SnapshotConfigBuilder`. + /// Creates a new builder. pub fn new() -> Self { Self::default() } - /// Sets the full path of the `SnapshotConfigBuilder`. + /// Sets the path of full snapshots. pub fn full_path(mut self, full_path: PathBuf) -> Self { self.full_path.replace(full_path); self } - /// Sets the delta path of the `SnapshotConfigBuilder`. + /// Sets the path of delta snapshots. pub fn delta_path(mut self, delta_path: PathBuf) -> Self { self.delta_path.replace(delta_path); self } - /// Sets the download URLs of the `SnapshotConfigBuilder`. + /// Sets the URLs for downloading remotely produced snapshots. pub fn download_urls(mut self, download_urls: Vec) -> Self { self.download_urls.replace(download_urls); self } - /// Sets the depth of the `SnapshotConfigBuilder`. - pub fn depth(mut self, depth: u32) -> Self { - self.depth.replace(depth); - self - } - - /// Sets the synced interval of the `SnapshotConfigBuilder`. - pub fn interval_synced(mut self, interval_synced: u32) -> Self { - self.interval_synced.replace(interval_synced); - self - } - - /// Sets the unsynced interval of the `SnapshotConfigBuilder`. - pub fn interval_unsynced(mut self, interval_unsynced: u32) -> Self { - self.interval_unsynced.replace(interval_unsynced); + /// Sets the `[SnapshottingConfigBuilder`]. + pub fn snapshotting(mut self, builder: SnapshottingConfigBuilder) -> Self { + self.snapshotting.replace(builder); self } - /// Finishes the `SnapshotConfigBuilder` into a `SnapshotConfig`. + /// Produces a [`SnapshotConfig`] from this builder. #[must_use] pub fn finish(self) -> SnapshotConfig { SnapshotConfig { @@ -101,22 +136,18 @@ impl SnapshotConfigBuilder { .unwrap_or_else(|| PathBuf::from(DEFAULT_FULL_PATH.to_string())), delta_path: self.delta_path, download_urls: self.download_urls.unwrap_or(DEFAULT_DOWNLOAD_URLS), - depth: self.depth.unwrap_or(DEFAULT_DEPTH), - interval_synced: self.interval_synced.unwrap_or(DEFAULT_INTERVAL_SYNCED), - interval_unsynced: self.interval_unsynced.unwrap_or(DEFAULT_INTERVAL_UNSYNCED), + snapshotting: self.snapshotting.unwrap_or_default().finish(), } } } -/// A snapshot configuration. -#[derive(Clone)] +/// The configuration of downloading and creating snapshots. +#[derive(Clone, Debug)] pub struct SnapshotConfig { full_path: PathBuf, delta_path: Option, download_urls: Vec, - depth: u32, - interval_synced: u32, - interval_unsynced: u32, + snapshotting: SnapshottingConfig, } impl SnapshotConfig { @@ -140,17 +171,38 @@ impl SnapshotConfig { &self.download_urls } - /// Returns the depth of the `SnapshotConfig`. + /// Returns the [`SnapshottingConfig`]. + pub fn snapshotting(&self) -> &SnapshottingConfig { + &self.snapshotting + } +} + +/// The configuration for creating snapshots. +#[derive(Clone, Debug)] +pub struct SnapshottingConfig { + enabled: bool, + depth: u32, + interval_synced: u32, + interval_unsynced: u32, +} + +impl SnapshottingConfig { + /// Returns whether snapshotting is enabled. + pub fn enabled(&self) -> bool { + self.enabled + } + + /// Returns the snapshot depth. pub fn depth(&self) -> u32 { self.depth } - /// Returns the synced interval of the `SnapshotConfig`. + /// Returns the snapshot interval for a synced node. pub fn interval_synced(&self) -> u32 { self.interval_synced } - /// Returns the unsynced interval of the `SnapshotConfig`. + /// Returns the snapshot interval for an unsynced node. pub fn interval_unsynced(&self) -> u32 { self.interval_unsynced } diff --git a/bee-ledger/src/workers/storage.rs b/bee-ledger/src/workers/storage.rs index 32987b1905..fc53c94212 100644 --- a/bee-ledger/src/workers/storage.rs +++ b/bee-ledger/src/workers/storage.rs @@ -51,6 +51,7 @@ pub trait StorageBackend: + Exist + Fetch<(), SnapshotInfo> + Fetch + + Fetch + Fetch<(), LedgerIndex> + Fetch + Fetch> @@ -92,6 +93,7 @@ impl StorageBackend for T where + Exist + Fetch<(), SnapshotInfo> + Fetch + + Fetch + Fetch<(), LedgerIndex> + Fetch + Fetch> diff --git a/bee-node/bee-node/config.chrysalis-devnet.json b/bee-node/bee-node/config.chrysalis-devnet.json index 568730c5ee..62708824f4 100644 --- a/bee-node/bee-node/config.chrysalis-devnet.json +++ b/bee-node/bee-node/config.chrysalis-devnet.json @@ -24,8 +24,7 @@ "reconnectIntervalSecs": 30, "maxUnknownPeers": 4, "maxDiscoveredPeers": 8, - "peering": { - } + "peering": {} }, "autopeering": { "enabled": false, @@ -81,9 +80,6 @@ "whiteFlagSolidificationTimeout": 2 }, "snapshot": { - "depth": 50, - "intervalSynced": 50, - "intervalUnsynced": 1000, "fullPath": "./snapshots/devnet/full_snapshot.bin", "deltaPath": "./snapshots/devnet/delta_snapshot.bin", "downloadUrls": [ @@ -91,12 +87,28 @@ "full": "http://dbfiles.chrysalis-devnet.iota.cafe/snapshots/hornet/latest-full_snapshot.bin", "delta": "http://dbfiles.chrysalis-devnet.iota.cafe/snapshots/hornet/latest-delta_snapshot.bin" } - ] + ], + "create": { + "enabled": false, + "depth": 50, + "intervalSynced": 50, + "intervalUnsynced": 1000 + } }, "pruning": { - "enabled": true, - "delay": 60480, - "pruneReceipts": false + "milestones": { + "enabled": true, + "maxMilestonesToKeep": 60480 + }, + "size": { + "enabled": true, + "targetSize": "30GB", + "thresholdPercentage": 10.0, + "cooldownTime": "5m" + }, + "receipts": { + "enabled": false + } }, "storage": { "path": "./storage/devnet/tangle" diff --git a/bee-node/bee-node/config.chrysalis-devnet.toml b/bee-node/bee-node/config.chrysalis-devnet.toml index 96b4977c83..61db8d582c 100644 --- a/bee-node/bee-node/config.chrysalis-devnet.toml +++ b/bee-node/bee-node/config.chrysalis-devnet.toml @@ -76,19 +76,28 @@ feature_proof_of_work = true white_flag_solidification_timeout = 2 [snapshot] -depth = 50 -interval_synced = 50 -interval_unsynced = 1000 full_path = "./snapshots/devnet/full_snapshot.bin" delta_path = "./snapshots/devnet/delta_snapshot.bin" [[snapshot.download_urls]] full = "http://dbfiles.chrysalis-devnet.iota.cafe/snapshots/hornet/latest-full_snapshot.bin" delta = "http://dbfiles.chrysalis-devnet.iota.cafe/snapshots/hornet/latest-delta_snapshot.bin" +[snapshot.create] +enabled = false +depth = 50 +interval_synced = 50 +interval_unsynced = 1000 [pruning] -enabled = true -delay = 60480 -prune_receipts = false +[pruning.milestones] +enabled = true +max_milestones_to_keep = 60480 +[pruning.size] +enabled = true +target_size = "30GB" +threshold_percentage = 10.0 +cooldown_time = "5m" +[pruning.receipts] +enabled = false [storage] path = "./storage/devnet/tangle" diff --git a/bee-node/bee-node/config.chrysalis-mainnet.json b/bee-node/bee-node/config.chrysalis-mainnet.json index edf72c2733..5d2993cb56 100644 --- a/bee-node/bee-node/config.chrysalis-mainnet.json +++ b/bee-node/bee-node/config.chrysalis-mainnet.json @@ -24,8 +24,7 @@ "reconnectIntervalSecs": 30, "maxUnknownPeers": 4, "maxDiscoveredPeers": 8, - "peering": { - } + "peering": {} }, "autopeering": { "enabled": false, @@ -116,9 +115,6 @@ "whiteFlagSolidificationTimeout": 2 }, "snapshot": { - "depth": 50, - "intervalSynced": 50, - "intervalUnsynced": 1000, "fullPath": "./snapshots/mainnet/full_snapshot.bin", "deltaPath": "./snapshots/mainnet/delta_snapshot.bin", "downloadUrls": [ @@ -130,12 +126,28 @@ "full": "https://cdn.tanglebay.com/snapshots/mainnet/full_snapshot.bin", "delta": "https://cdn.tanglebay.com/snapshots/mainnet/delta_snapshot.bin" } - ] + ], + "create": { + "enabled": false, + "depth": 50, + "intervalSynced": 50, + "intervalUnsynced": 1000 + } }, "pruning": { - "enabled": true, - "delay": 60480, - "pruneReceipts": false + "milestones": { + "enabled": true, + "maxMilestonesToKeep": 60480 + }, + "size": { + "enabled": true, + "targetSize": "30GB", + "thresholdPercentage": 10.0, + "cooldownTime": "5m" + }, + "receipts": { + "enabled": false + } }, "storage": { "path": "./storage/mainnet/tangle" diff --git a/bee-node/bee-node/config.chrysalis-mainnet.toml b/bee-node/bee-node/config.chrysalis-mainnet.toml index 7441bae064..69213631cd 100644 --- a/bee-node/bee-node/config.chrysalis-mainnet.toml +++ b/bee-node/bee-node/config.chrysalis-mainnet.toml @@ -106,22 +106,31 @@ feature_proof_of_work = true white_flag_solidification_timeout = 2 [snapshot] +full_path = "./snapshots/mainnet/full_snapshot.bin" +delta_path = "./snapshots/mainnet/delta_snapshot.bin" +[[snapshot.download_urls]] +full = "https://chrysalis-dbfiles.iota.org/snapshots/hornet/latest-full_snapshot.bin" +delta = "https://chrysalis-dbfiles.iota.org/snapshots/hornet/latest-delta_snapshot.bin" +[[snapshot.download_urls]] +full = "https://cdn.tanglebay.com/snapshots/mainnet/full_snapshot.bin" +delta = "https://cdn.tanglebay.com/snapshots/mainnet/delta_snapshot.bin" +[snapshot.create] +enabled = false depth = 50 interval_synced = 50 interval_unsynced = 1000 -full_path = "./snapshots/mainnet/full_snapshot.bin" -delta_path = "./snapshots/mainnet/delta_snapshot.bin" -[[snapshot.download_urls]] -full = "https://chrysalis-dbfiles.iota.org/snapshots/hornet/latest-full_snapshot.bin" -delta = "https://chrysalis-dbfiles.iota.org/snapshots/hornet/latest-delta_snapshot.bin" -[[snapshot.download_urls]] -full = "https://cdn.tanglebay.com/snapshots/mainnet/full_snapshot.bin" -delta = "https://cdn.tanglebay.com/snapshots/mainnet/delta_snapshot.bin" [pruning] -enabled = true -delay = 60480 -prune_receipts = false +[pruning.milestones] +enabled = true +max_milestones_to_keep = 60480 +[pruning.size] +enabled = true +target_size = "30GB" +threshold_percentage = 10.0 +cooldown_time = "5m" +[pruning.receipts] +enabled = false [storage] path = "./storage/mainnet/tangle" diff --git a/bee-storage/bee-storage-rocksdb/src/storage.rs b/bee-storage/bee-storage-rocksdb/src/storage.rs index 15805606a1..73b6d2946b 100644 --- a/bee-storage/bee-storage-rocksdb/src/storage.rs +++ b/bee-storage/bee-storage-rocksdb/src/storage.rs @@ -212,9 +212,7 @@ impl StorageBackend for Storage { } fn size(&self) -> Result, Self::Error> { - Ok(Some( - self.inner.live_files()?.iter().fold(0, |acc, file| acc + file.size), - )) + Ok(Some(self.inner.live_files()?.iter().map(|file| file.size).sum())) } fn get_health(&self) -> Result, Self::Error> {