diff --git a/Cargo.lock b/Cargo.lock index baa49ad263..f0da77b308 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,7 +510,7 @@ dependencies = [ [[package]] name = "bee-storage" -version = "0.9.0" +version = "0.12.0" dependencies = [ "packable", "serde", @@ -519,7 +519,7 @@ dependencies = [ [[package]] name = "bee-storage-memory" -version = "0.1.0" +version = "0.4.0" dependencies = [ "bee-ledger", "bee-message", @@ -533,14 +533,14 @@ dependencies = [ [[package]] name = "bee-storage-null" -version = "0.1.0" +version = "0.3.0" dependencies = [ "bee-storage", ] [[package]] name = "bee-storage-rocksdb" -version = "0.5.0" +version = "0.8.0" dependencies = [ "bee-ledger", "bee-message", @@ -550,6 +550,7 @@ dependencies = [ "bee-test", "num_cpus", "packable", + "parking_lot 0.12.0", "rocksdb", "serde", "thiserror", @@ -557,7 +558,7 @@ dependencies = [ [[package]] name = "bee-storage-sled" -version = "0.4.0" +version = "0.7.0" dependencies = [ "bee-ledger", "bee-message", @@ -574,7 +575,7 @@ dependencies = [ [[package]] name = "bee-storage-test" -version = "0.3.0" +version = "0.6.0" dependencies = [ "bee-ledger", "bee-message", diff --git a/bee-api/bee-rest-api/Cargo.toml b/bee-api/bee-rest-api/Cargo.toml index cdb5af214d..0ce72be5e9 100644 --- a/bee-api/bee-rest-api/Cargo.toml +++ b/bee-api/bee-rest-api/Cargo.toml @@ -23,7 +23,7 @@ bee-message = { version = "0.2.0", path = "../../bee-message", default-features bee-pow = { version = "0.2.0", path = "../../bee-pow", default-features = false } bee-protocol = { version = "0.2.2", path = "../../bee-protocol", default-features = false, optional = true } bee-runtime = { version = "0.1.1-alpha", path = "../../bee-runtime", default-features = false, optional = true } -bee-storage = { version = "0.9.0", path = "../../bee-storage/bee-storage", default-features = false, optional = true } +bee-storage = { version = "0.12.0", path = "../../bee-storage/bee-storage", default-features = false, optional = true } bee-tangle = { version = "0.3.0", path = "../../bee-tangle", default-features = false, optional = true } async-trait = { version = "0.1.51", default-features = false, optional = true } diff --git a/bee-api/bee-rest-api/src/endpoints/mod.rs b/bee-api/bee-rest-api/src/endpoints/mod.rs index a8be81fc2a..d5d43e41eb 100644 --- a/bee-api/bee-rest-api/src/endpoints/mod.rs +++ b/bee-api/bee-rest-api/src/endpoints/mod.rs @@ -35,7 +35,7 @@ pub(crate) type Bech32Hrp = String; pub(crate) const CONFIRMED_THRESHOLD: u32 = 5; -pub async fn init_full_node( +pub fn init_full_node( rest_api_config: RestApiConfig, protocol_config: ProtocolConfig, network_id: NetworkId, @@ -105,7 +105,7 @@ where requested_messages, consensus_worker, ) - .recover(handle_rejection); + .recover(|err| async { handle_rejection(err) }); let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(rest_api_config.bind_socket_addr(), async { @@ -121,7 +121,7 @@ where } } -async fn handle_rejection(err: Rejection) -> Result { +fn handle_rejection(err: Rejection) -> Result { let (http_code, err_code, reason) = match err.find() { // handle custom rejections Some(CustomRejection::Forbidden) => (StatusCode::FORBIDDEN, "403", "access forbidden"), @@ -149,7 +149,7 @@ async fn handle_rejection(err: Rejection) -> Result { )) } -pub async fn init_entry_node(rest_api_config: RestApiConfig, node_builder: N::Builder) -> N::Builder +pub fn init_entry_node(rest_api_config: RestApiConfig, node_builder: N::Builder) -> N::Builder where N::Backend: StorageBackend, { @@ -170,7 +170,9 @@ where node.spawn::(|shutdown| async move { info!("Running."); - let health = warp::path("health").map(|| StatusCode::OK).recover(handle_rejection); + let health = warp::path("health") + .map(|| StatusCode::OK) + .recover(|err| async { handle_rejection(err) }); let (_, server) = warp::serve(health).bind_with_graceful_shutdown(config.bind_socket_addr(), async { shutdown.await.ok(); diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/info.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/info.rs index 97feb314b2..458675b34f 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/info.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/info.rs @@ -49,11 +49,23 @@ pub(crate) fn filter( .and(with_protocol_config(protocol_config)) .and(with_node_info(node_info)) .and(with_peer_manager(peer_manager)) - .and_then(info) + .and_then( + |tangle, network_id, bech32_hrp, rest_api_config, protocol_config, node_info, peer_manager| async { + info( + tangle, + network_id, + bech32_hrp, + rest_api_config, + protocol_config, + node_info, + peer_manager, + ) + }, + ) .boxed() } -pub(crate) async fn info( +pub(crate) fn info( tangle: ResourceHandle>, network_id: NetworkId, bech32_hrp: Bech32Hrp, @@ -65,7 +77,6 @@ pub(crate) async fn info( let latest_milestone_index = tangle.get_latest_milestone_index(); let latest_milestone_timestamp = tangle .get_milestone(latest_milestone_index) - .await .map(|m| m.timestamp()) .unwrap_or_default(); @@ -73,7 +84,7 @@ pub(crate) async fn info( name: node_info.name.clone(), version: node_info.version.clone(), status: StatusResponse { - is_healthy: health::is_healthy(&tangle, &peer_manager).await, + is_healthy: health::is_healthy(&tangle, &peer_manager), latest_milestone_timestamp, latest_milestone_index: *latest_milestone_index, confirmed_milestone_index: *tangle.get_confirmed_milestone_index(), diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message.rs index ebf26bf8a2..da880e660d 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message.rs @@ -32,15 +32,15 @@ pub(crate) fn filter( .and(warp::get()) .and(has_permission(ROUTE_MESSAGE, public_routes, allowed_ips)) .and(with_tangle(tangle)) - .and_then(message) + .and_then(|message_id, tangle| async move { message(message_id, tangle) }) .boxed() } -pub(crate) async fn message( +pub(crate) fn message( message_id: MessageId, tangle: ResourceHandle>, ) -> Result { - match tangle.get(&message_id).await.map(|m| (*m).clone()) { + match tangle.get(&message_id) { Some(message) => Ok(warp::reply::json(&MessageResponse(MessageDto::from(&message)))), None => Err(reject::custom(CustomRejection::NotFound( "can not find message".to_string(), diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_children.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_children.rs index e38e6cb585..f9be95bf6f 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_children.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_children.rs @@ -33,15 +33,15 @@ pub(crate) fn filter( .and(warp::get()) .and(has_permission(ROUTE_MESSAGE_CHILDREN, public_routes, allowed_ips)) .and(with_tangle(tangle)) - .and_then(message_children) + .and_then(|message_id, tangle| async move { message_children(message_id, tangle) }) .boxed() } -pub async fn message_children( +pub fn message_children( message_id: MessageId, tangle: ResourceHandle>, ) -> Result { - let mut children = Vec::from_iter(tangle.get_children(&message_id).await.unwrap_or_default()); + let mut children = Vec::from_iter(tangle.get_children(&message_id).unwrap_or_default()); let count = children.len(); let max_results = 1000; children.truncate(max_results); diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_metadata.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_metadata.rs index 6d66526d7d..536d2c4b5b 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_metadata.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_metadata.rs @@ -33,11 +33,11 @@ pub(crate) fn filter( .and(warp::get()) .and(has_permission(ROUTE_MESSAGE_METADATA, public_routes, allowed_ips)) .and(with_tangle(tangle)) - .and_then(message_metadata) + .and_then(|message_id, tangle| async move { message_metadata(message_id, tangle) }) .boxed() } -pub(crate) async fn message_metadata( +pub(crate) fn message_metadata( message_id: MessageId, tangle: ResourceHandle>, ) -> Result { @@ -47,11 +47,8 @@ pub(crate) async fn message_metadata( ))); } - match tangle.get(&message_id).await.map(|m| (*m).clone()) { - Some(message) => { - // existing message <=> existing metadata, therefore unwrap() is safe - let metadata = tangle.get_metadata(&message_id).await.unwrap(); - + match tangle.get_message_and_metadata(&message_id) { + Some((message, metadata)) => { // TODO: access constants from URTS let ymrsi_delta = 8; let omrsi_delta = 13; @@ -112,13 +109,17 @@ pub(crate) async fn message_metadata( conflict_reason = None; let cmi = *tangle.get_confirmed_milestone_index(); + // unwrap() of OMRSI/YMRSI is safe since message is solid - if (cmi - *metadata.omrsi().unwrap().index()) > below_max_depth { + let (omrsi, ymrsi) = metadata + .omrsi_and_ymrsi() + .map(|(o, y)| (*o.index(), *y.index())) + .unwrap(); + + if (cmi - omrsi) > below_max_depth { should_promote = Some(false); should_reattach = Some(true); - } else if (cmi - *metadata.ymrsi().unwrap().index()) > ymrsi_delta - || (cmi - *metadata.omrsi().unwrap().index()) > omrsi_delta - { + } else if (cmi - ymrsi) > ymrsi_delta || (cmi - omrsi) > omrsi_delta { should_promote = Some(true); should_reattach = Some(false); } else { diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_raw.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_raw.rs index 3a639fa9fb..24c02623e6 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_raw.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/message_raw.rs @@ -31,15 +31,15 @@ pub(crate) fn filter( .and(warp::get()) .and(has_permission(ROUTE_MESSAGE_RAW, public_routes, allowed_ips)) .and(with_tangle(tangle)) - .and_then(message_raw) + .and_then(|message_id, tangle| async move { message_raw(message_id, tangle) }) .boxed() } -pub async fn message_raw( +pub fn message_raw( message_id: MessageId, tangle: ResourceHandle>, ) -> Result { - match tangle.get(&message_id).await.map(|m| (*m).clone()) { + match tangle.get(&message_id) { Some(message) => Ok(Response::builder() .header("Content-Type", "application/octet-stream") .body(message.pack_to_vec())), diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/milestone.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/milestone.rs index 6db75ad7da..8f6a860703 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/milestone.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/milestone.rs @@ -32,27 +32,22 @@ pub(crate) fn filter( .and(warp::get()) .and(has_permission(ROUTE_MILESTONE, public_routes, allowed_ips)) .and(with_tangle(tangle)) - .and_then(milestone) + .and_then(|milestone_index, tangle| async move { milestone(milestone_index, tangle) }) .boxed() } -pub(crate) async fn milestone( +pub(crate) fn milestone( milestone_index: MilestoneIndex, tangle: ResourceHandle>, ) -> Result { - match tangle.get_milestone_message_id(milestone_index).await { - Some(message_id) => match tangle.get_metadata(&message_id).await { - Some(metadata) => Ok(warp::reply::json(&MilestoneResponse { - milestone_index: *milestone_index, - message_id: message_id.to_string(), - timestamp: metadata.arrival_timestamp(), - })), - None => Err(reject::custom(CustomRejection::NotFound( - "can not find metadata for milestone".to_string(), - ))), - }, + match tangle.get_milestone(milestone_index) { + Some(milestone) => Ok(warp::reply::json(&MilestoneResponse { + milestone_index: *milestone_index, + message_id: milestone.message_id().to_string(), + timestamp: milestone.timestamp(), + })), None => Err(reject::custom(CustomRejection::NotFound( - "can not find milestone".to_string(), + "cannot find milestone".to_string(), ))), } } diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/remove_peer.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/remove_peer.rs index b66c09b282..71debc4e07 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/remove_peer.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/remove_peer.rs @@ -28,11 +28,11 @@ pub(crate) fn filter( .and(warp::delete()) .and(has_permission(ROUTE_REMOVE_PEER, public_routes, allowed_ips)) .and(with_network_command_sender(network_command_sender)) - .and_then(remove_peer) + .and_then(|peer_id, network_controller| async move { remove_peer(peer_id, network_controller) }) .boxed() } -pub(crate) async fn remove_peer( +pub(crate) fn remove_peer( peer_id: PeerId, network_controller: ResourceHandle, ) -> Result { diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/submit_message.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/submit_message.rs index 81f09be116..83815684c5 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/submit_message.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/submit_message.rs @@ -156,7 +156,7 @@ pub(crate) async fn submit_message( if parsed_nonce == 0 { None } else { Some(parsed_nonce) } }; - let message = build_message(parents, payload, nonce, rest_api_config, protocol_config).await?; + let message = build_message(parents, payload, nonce, rest_api_config, protocol_config)?; let message_id = forward_to_message_submitter(message.pack_to_vec(), message_submitter).await?; Ok(warp::reply::with_status( @@ -167,7 +167,7 @@ pub(crate) async fn submit_message( )) } -pub(crate) async fn build_message( +pub(crate) fn build_message( parents: Vec, payload: Option, nonce: Option, diff --git a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/transaction_included_message.rs b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/transaction_included_message.rs index 5c661e32f3..bd7ce12d7d 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/api/v2/transaction_included_message.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/api/v2/transaction_included_message.rs @@ -43,11 +43,13 @@ pub(crate) fn filter( )) .and(with_storage(storage)) .and(with_tangle(tangle)) - .and_then(transaction_included_message) + .and_then(|transaction_id, storage, tangle| async move { + transaction_included_message(transaction_id, storage, tangle) + }) .boxed() } -pub(crate) async fn transaction_included_message( +pub(crate) fn transaction_included_message( transaction_id: TransactionId, storage: ResourceHandle, tangle: ResourceHandle>, @@ -60,7 +62,7 @@ pub(crate) async fn transaction_included_message( "Can not fetch from storage".to_string(), )) })? { - Some(output) => message::message(*output.message_id(), tangle).await, + Some(output) => message::message(*output.message_id(), tangle), None => Err(reject::custom(CustomRejection::NotFound( "Can not find output".to_string(), ))), diff --git a/bee-api/bee-rest-api/src/endpoints/routes/health.rs b/bee-api/bee-rest-api/src/endpoints/routes/health.rs index 0c6b92e0c5..6905f44c69 100644 --- a/bee-api/bee-rest-api/src/endpoints/routes/health.rs +++ b/bee-api/bee-rest-api/src/endpoints/routes/health.rs @@ -37,22 +37,22 @@ pub(crate) fn filter( .and(has_permission(ROUTE_HEALTH, public_routes, allowed_ips)) .and(with_tangle(tangle)) .and(with_peer_manager(peer_manager)) - .and_then(health) + .and_then(|tangle, peer_manager| async move { health(tangle, peer_manager) }) .boxed() } -pub(crate) async fn health( +pub(crate) fn health( tangle: ResourceHandle>, peer_manager: ResourceHandle, ) -> Result { - if is_healthy(&tangle, &peer_manager).await { + if is_healthy(&tangle, &peer_manager) { Ok(StatusCode::OK) } else { Ok(StatusCode::SERVICE_UNAVAILABLE) } } -pub async fn is_healthy(tangle: &Tangle, peer_manager: &PeerManager) -> bool { +pub fn is_healthy(tangle: &Tangle, peer_manager: &PeerManager) -> bool { if !tangle.is_confirmed_threshold(HEALTH_CONFIRMED_THRESHOLD) { return false; } @@ -61,7 +61,7 @@ pub async fn is_healthy(tangle: &Tangle, peer_manager: &Pe return false; } - match tangle.get_milestone(tangle.get_latest_milestone_index()).await { + match tangle.get_milestone(tangle.get_latest_milestone_index()) { Some(milestone) => { (SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/bee-api/bee-rest-api/src/types/responses.rs b/bee-api/bee-rest-api/src/types/responses.rs index 1a9b70dbf1..2fc66e704e 100644 --- a/bee-api/bee-rest-api/src/types/responses.rs +++ b/bee-api/bee-rest-api/src/types/responses.rs @@ -204,7 +204,7 @@ pub struct MilestoneResponse { pub milestone_index: u32, #[serde(rename = "messageId")] pub message_id: String, - pub timestamp: u64, + pub timestamp: u32, } impl BodyInner for MilestoneResponse {} diff --git a/bee-ledger/Cargo.toml b/bee-ledger/Cargo.toml index 0e4504d20a..aaab8c9d29 100644 --- a/bee-ledger/Cargo.toml +++ b/bee-ledger/Cargo.toml @@ -19,7 +19,7 @@ rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] bee-message = { version = "0.2.0", path = "../bee-message", default-features = false, features = [ "std" ] } bee-runtime = { version = "0.1.1-alpha", path = "../bee-runtime", default-features = false, optional = true } -bee-storage = { version = "0.9.0", path = "../bee-storage/bee-storage", default-features = false, optional = true } +bee-storage = { version = "0.12.0", path = "../bee-storage/bee-storage", default-features = false, optional = true } bee-tangle = { version = "0.3.0", path = "../bee-tangle", default-features = false, optional = true } async-trait = { version = "0.1.51", default-features = false, optional = true } diff --git a/bee-ledger/src/workers/consensus/white_flag.rs b/bee-ledger/src/workers/consensus/white_flag.rs index 07d6a74a95..de79cda0a8 100644 --- a/bee-ledger/src/workers/consensus/white_flag.rs +++ b/bee-ledger/src/workers/consensus/white_flag.rs @@ -170,12 +170,7 @@ async fn traverse_past_cone( let mut visited = HashSet::new(); while let Some(message_id) = message_ids.last() { - if let Some((message, meta)) = tangle - .get_vertex(message_id) - .await - .as_ref() - .and_then(|v| v.message_and_metadata().cloned()) - { + if let Some((message, meta)) = tangle.get_message_and_metadata(message_id) { if meta.flags().is_referenced() { visited.insert(*message_id); message_ids.pop(); diff --git a/bee-ledger/src/workers/consensus/worker.rs b/bee-ledger/src/workers/consensus/worker.rs index 4605638b48..50ddb03161 100644 --- a/bee-ledger/src/workers/consensus/worker.rs +++ b/bee-ledger/src/workers/consensus/worker.rs @@ -55,7 +55,7 @@ pub struct ConsensusWorker { pub tx: mpsc::UnboundedSender, } -pub(crate) async fn migration_from_milestone( +pub(crate) fn migration_from_milestone( milestone_index: MilestoneIndex, milestone_id: MilestoneId, receipt: &ReceiptMilestoneOption, @@ -83,7 +83,6 @@ where { let message = tangle .get(&message_id) - .await .ok_or(Error::MilestoneMessageNotFound(message_id))?; let milestone = match message.payload() { @@ -159,15 +158,12 @@ where *receipt_migrated_at = *receipt_migrated_at + 1; } - Some( - migration_from_milestone( - milestone.essence().index(), - milestone_id, - receipt, - storage::fetch_unspent_treasury_output(storage)?, - ) - .await?, - ) + Some(migration_from_milestone( + milestone.essence().index(), + milestone_id, + receipt, + storage::fetch_unspent_treasury_output(storage)?, + )?) } else { None }; @@ -184,36 +180,30 @@ where tangle.update_confirmed_milestone_index(milestone.essence().index()); for message_id in metadata.excluded_no_transaction_messages.iter() { - tangle - .update_metadata(message_id, |message_metadata| { - message_metadata.set_conflict(ConflictReason::None); - message_metadata.reference(milestone.essence().timestamp()); - }) - .await; + tangle.update_metadata(message_id, |message_metadata| { + message_metadata.set_conflict(ConflictReason::None); + message_metadata.reference(milestone.essence().timestamp()); + }); bus.dispatch(MessageReferenced { message_id: *message_id, }); } for (message_id, conflict) in metadata.excluded_conflicting_messages.iter() { - tangle - .update_metadata(message_id, |message_metadata| { - message_metadata.set_conflict(*conflict); - message_metadata.reference(milestone.essence().timestamp()); - }) - .await; + tangle.update_metadata(message_id, |message_metadata| { + message_metadata.set_conflict(*conflict); + message_metadata.reference(milestone.essence().timestamp()); + }); bus.dispatch(MessageReferenced { message_id: *message_id, }); } for message_id in metadata.included_messages.iter() { - tangle - .update_metadata(message_id, |message_metadata| { - message_metadata.set_conflict(ConflictReason::None); - message_metadata.reference(milestone.essence().timestamp()); - }) - .await; + tangle.update_metadata(message_id, |message_metadata| { + message_metadata.set_conflict(ConflictReason::None); + message_metadata.reference(milestone.essence().timestamp()); + }); bus.dispatch(MessageReferenced { message_id: *message_id, }); diff --git a/bee-ledger/src/workers/pruning/batch.rs b/bee-ledger/src/workers/pruning/batch.rs index b4b7b6b72b..695ec6cb1a 100644 --- a/bee-ledger/src/workers/pruning/batch.rs +++ b/bee-ledger/src/workers/pruning/batch.rs @@ -37,7 +37,7 @@ pub struct Edge { pub to_child: MessageId, } -pub async fn prune_confirmed_data( +pub fn prune_confirmed_data( tangle: &Tangle, storage: &S, batch: &mut S::Batch, @@ -207,7 +207,7 @@ pub async fn prune_confirmed_data( Ok((new_seps, metrics)) } -pub async fn prune_unconfirmed_data( +pub fn prune_unconfirmed_data( storage: &S, batch: &mut S::Batch, prune_index: MilestoneIndex, @@ -318,7 +318,7 @@ pub async fn prune_unconfirmed_data( Ok(metrics) } -pub async fn prune_milestone_data( +pub fn prune_milestone_data( storage: &S, batch: &mut S::Batch, prune_index: MilestoneIndex, @@ -326,12 +326,12 @@ pub async fn prune_milestone_data( ) -> Result { let mut metrics = MilestoneDataPruningMetrics::default(); - prune_milestone(storage, batch, prune_index).await?; + prune_milestone(storage, batch, prune_index)?; - prune_output_diff(storage, batch, prune_index).await?; + prune_output_diff(storage, batch, prune_index)?; if should_prune_receipts { - metrics.receipts = prune_receipts(storage, batch, prune_index).await?; + metrics.receipts = prune_receipts(storage, batch, prune_index)?; } Ok(metrics) @@ -359,22 +359,14 @@ fn prune_edge( Ok(()) } -async fn prune_milestone( - storage: &S, - batch: &mut S::Batch, - index: MilestoneIndex, -) -> Result<(), Error> { +fn prune_milestone(storage: &S, batch: &mut S::Batch, index: MilestoneIndex) -> Result<(), Error> { Batch::::batch_delete(storage, batch, &index) .map_err(|e| Error::Storage(Box::new(e)))?; Ok(()) } -async fn prune_output_diff( - storage: &S, - batch: &mut S::Batch, - index: MilestoneIndex, -) -> Result<(), Error> { +fn prune_output_diff(storage: &S, batch: &mut S::Batch, index: MilestoneIndex) -> Result<(), Error> { if let Some(output_diff) = Fetch::::fetch(storage, &index).map_err(|e| Error::Storage(Box::new(e)))? { @@ -396,11 +388,7 @@ async fn prune_output_diff( Ok(()) } -async fn prune_receipts( - storage: &S, - batch: &mut S::Batch, - index: MilestoneIndex, -) -> Result { +fn prune_receipts(storage: &S, batch: &mut S::Batch, index: MilestoneIndex) -> Result { let receipts = Fetch::>::fetch(storage, &index) .map_err(|e| Error::Storage(Box::new(e)))? // Fine since Fetch of a Vec<_> always returns Some(Vec<_>). @@ -419,11 +407,7 @@ async fn prune_receipts( // TODO: consider using this instead of 'truncate' #[allow(dead_code)] -async fn prune_seps( - storage: &S, - batch: &mut S::Batch, - seps: &[SolidEntryPoint], -) -> Result { +fn prune_seps(storage: &S, batch: &mut S::Batch, seps: &[SolidEntryPoint]) -> Result { let mut num = 0; for sep in seps { Batch::::batch_delete(storage, batch, sep) diff --git a/bee-ledger/src/workers/pruning/prune.rs b/bee-ledger/src/workers/pruning/prune.rs index 0d1b172ab1..4825ecb337 100644 --- a/bee-ledger/src/workers/pruning/prune.rs +++ b/bee-ledger/src/workers/pruning/prune.rs @@ -77,7 +77,7 @@ pub async fn prune( // NOTE: This is the most costly thing during pruning, because it has to perform a past-cone traversal. let batch_confirmed_data = Instant::now(); let (mut new_seps, confirmed_data_metrics) = - batch::prune_confirmed_data(tangle, storage, &mut batch, index, &curr_seps).await?; + batch::prune_confirmed_data(tangle, storage, &mut batch, index, &curr_seps)?; timings.batch_confirmed_data = batch_confirmed_data.elapsed(); metrics.new_seps = new_seps.len(); @@ -127,15 +127,14 @@ pub async fn prune( tangle.update_entry_point_index(index); let batch_milestones = Instant::now(); - let milestone_data_metrics = - batch::prune_milestone_data(storage, &mut batch, index, config.prune_receipts()).await?; + let milestone_data_metrics = batch::prune_milestone_data(storage, &mut batch, index, config.prune_receipts())?; timings.batch_milestone_data = batch_milestones.elapsed(); metrics.receipts = milestone_data_metrics.receipts; // Add unconfirmed data to the delete batch. let batch_unconfirmed_data = Instant::now(); - let unconfirmed_data_metrics = batch::prune_unconfirmed_data(storage, &mut batch, index).await?; + let unconfirmed_data_metrics = batch::prune_unconfirmed_data(storage, &mut batch, index)?; timings.batch_unconfirmed_data = batch_unconfirmed_data.elapsed(); metrics.messages += unconfirmed_data_metrics.prunable_messages; diff --git a/bee-ledger/src/workers/snapshot/import.rs b/bee-ledger/src/workers/snapshot/import.rs index 4e90cdd29a..4d996e3f03 100644 --- a/bee-ledger/src/workers/snapshot/import.rs +++ b/bee-ledger/src/workers/snapshot/import.rs @@ -79,7 +79,7 @@ fn import_outputs, B: StorageBackend>( Ok(()) } -async fn import_milestone_diffs, B: StorageBackend>( +fn import_milestone_diffs, B: StorageBackend>( unpacker: &mut U, storage: &B, milestone_diff_count: u64, @@ -104,15 +104,12 @@ async fn import_milestone_diffs, B: StorageB .ok_or(Error::Snapshot(SnapshotError::MissingConsumedTreasury))? .clone(); - Some( - migration_from_milestone( - index, - diff.milestone().id(), - receipt, - TreasuryOutput::new(consumed_treasury.0, consumed_treasury.1), - ) - .await?, - ) + Some(migration_from_milestone( + index, + diff.milestone().id(), + receipt, + TreasuryOutput::new(consumed_treasury.0, consumed_treasury.1), + )?) } else { None }; @@ -145,7 +142,7 @@ fn check_header(header: &SnapshotHeader, kind: SnapshotKind, network_id: u64) -> } } -async fn import_full_snapshot(storage: &B, path: &Path, network_id: u64) -> Result<(), Error> { +fn import_full_snapshot(storage: &B, path: &Path, network_id: u64) -> Result<(), Error> { info!("Importing full snapshot file {}...", &path.to_string_lossy()); let mut unpacker = IoUnpacker::new(snapshot_reader(path)?); @@ -190,7 +187,7 @@ async fn import_full_snapshot(storage: &B, path: &Path, netwo import_solid_entry_points(&mut unpacker, storage, full_header.sep_count(), header.sep_index())?; import_outputs(&mut unpacker, storage, full_header.output_count())?; - import_milestone_diffs(&mut unpacker, storage, full_header.milestone_diff_count()).await?; + import_milestone_diffs(&mut unpacker, storage, full_header.milestone_diff_count())?; if unpacker.into_inner().bytes().next().is_some() { return Err(Error::Snapshot(SnapshotError::RemainingBytes)); @@ -209,7 +206,7 @@ async fn import_full_snapshot(storage: &B, path: &Path, netwo Ok(()) } -async fn import_delta_snapshot(storage: &B, path: &Path, network_id: u64) -> Result<(), Error> { +fn import_delta_snapshot(storage: &B, path: &Path, network_id: u64) -> Result<(), Error> { info!("Importing delta snapshot file {}...", &path.to_string_lossy()); let mut unpacker = IoUnpacker::new(snapshot_reader(path)?); @@ -245,7 +242,7 @@ async fn import_delta_snapshot(storage: &B, path: &Path, netw )?; import_solid_entry_points(&mut unpacker, storage, delta_header.sep_count(), header.sep_index())?; - import_milestone_diffs(&mut unpacker, storage, delta_header.milestone_diff_count()).await?; + import_milestone_diffs(&mut unpacker, storage, delta_header.milestone_diff_count())?; if unpacker.into_inner().bytes().next().is_some() { return Err(Error::Snapshot(SnapshotError::RemainingBytes)); @@ -283,11 +280,11 @@ pub(crate) async fn import_snapshots( .await?; } - import_full_snapshot(storage, config.full_path(), network_id).await?; + import_full_snapshot(storage, config.full_path(), network_id)?; if let Some(delta_path) = config.delta_path() { if delta_path.exists() { - import_delta_snapshot(storage, delta_path, network_id).await?; + import_delta_snapshot(storage, delta_path, network_id)?; } } diff --git a/bee-node/bee-node/Cargo.toml b/bee-node/bee-node/Cargo.toml index 67ca5628ce..ea6205fb97 100644 --- a/bee-node/bee-node/Cargo.toml +++ b/bee-node/bee-node/Cargo.toml @@ -22,10 +22,10 @@ bee-plugin-version-checker = { version = "0.1.0", path = "../bee-plugin/bee-plug bee-protocol = { version = "0.2.2", path = "../../bee-protocol", default-features = false, features = [ "workers" ] } bee-rest-api = { version = "0.2.0", path = "../../bee-api/bee-rest-api", default-features = false, features = [ "endpoints", "peer" ] } bee-runtime = { version = "0.1.1-alpha", path = "../../bee-runtime", default-features = false } -bee-storage = { version = "0.9.0", path = "../../bee-storage/bee-storage", default-features = false } -bee-storage-null = { version = "0.1.0", path = "../../bee-storage/bee-storage-null", default-features = false } -bee-storage-rocksdb = { version = "0.5.0", path = "../../bee-storage/bee-storage-rocksdb", default-features = false, optional = true } -bee-storage-sled = { version = "0.4.0", path = "../../bee-storage/bee-storage-sled", default-features = false, optional = true } +bee-storage = { version = "0.12.0", path = "../../bee-storage/bee-storage", default-features = false } +bee-storage-null = { version = "0.3.0", path = "../../bee-storage/bee-storage-null", default-features = false } +bee-storage-rocksdb = { version = "0.8.0", path = "../../bee-storage/bee-storage-rocksdb", default-features = false, optional = true } +bee-storage-sled = { version = "0.7.0", path = "../../bee-storage/bee-storage-sled", default-features = false, optional = true } bee-tangle = { version = "0.3.0", path = "../../bee-tangle", default-features = false } anymap = { version = "0.12.1", default-features = false } diff --git a/bee-node/bee-node/config.alphanet.json b/bee-node/bee-node/config.alphanet.json index 6a27ac42d8..51621e0249 100644 --- a/bee-node/bee-node/config.alphanet.json +++ b/bee-node/bee-node/config.alphanet.json @@ -133,8 +133,7 @@ "path": "./storage/alphanet/tangle" }, "tangle": { - "belowMaxDepth": 15, - "numPartitions": 16 + "belowMaxDepth": 15 }, "mqtt": { "address": "tcp://localhost:1883" diff --git a/bee-node/bee-node/config.alphanet.toml b/bee-node/bee-node/config.alphanet.toml index baad155903..d06ea03f93 100644 --- a/bee-node/bee-node/config.alphanet.toml +++ b/bee-node/bee-node/config.alphanet.toml @@ -124,7 +124,6 @@ path = "./storage/alphanet/tangle" [tangle] below_max_depth = 15 -num_partitions = 16 [mqtt] address = "tcp://localhost:1883" diff --git a/bee-node/bee-node/src/entrynode/builder.rs b/bee-node/bee-node/src/entrynode/builder.rs index 455ffddfa8..203fa906b0 100644 --- a/bee-node/bee-node/src/entrynode/builder.rs +++ b/bee-node/bee-node/src/entrynode/builder.rs @@ -137,7 +137,7 @@ impl NodeBuilder for EntryNodeBuilder { let (autopeering_rx, builder) = initialize_autopeering(builder).await?; // Initialize the API. - let builder = initialize_api(builder).await; + let builder = initialize_api(builder); // Start the version checker. let builder = builder.with_worker::(); @@ -266,16 +266,14 @@ fn create_local_autopeering_entity(keypair: Keypair, config: &EntryNodeConfig) - } /// Initializes the API. -async fn initialize_api(builder: EntryNodeBuilder) -> EntryNodeBuilder { +fn initialize_api(builder: EntryNodeBuilder) -> EntryNodeBuilder { log::info!("Initializing REST API..."); let config = builder.config(); let rest_api_cfg = config.rest_api.clone(); - let builder = bee_rest_api::endpoints::init_entry_node::(rest_api_cfg, builder).await; - - builder + bee_rest_api::endpoints::init_entry_node::(rest_api_cfg, builder) } #[derive(Clone)] diff --git a/bee-node/bee-node/src/fullnode/builder.rs b/bee-node/bee-node/src/fullnode/builder.rs index ad683459c5..245e1110f6 100644 --- a/bee-node/bee-node/src/fullnode/builder.rs +++ b/bee-node/bee-node/src/fullnode/builder.rs @@ -148,7 +148,7 @@ impl NodeBuilder> for FullNodeBuilder { let (autopeering_rx, builder) = initialize_autopeering(builder).await?; let builder = initialize_ledger(builder); let builder = initialize_protocol(builder, gossip_rx, autopeering_rx); - let builder = initialize_api(builder).await; + let builder = initialize_api(builder); let builder = initialize_tangle(builder); // Start the version checker. @@ -372,7 +372,7 @@ fn create_local_autopeering_entity( } /// Initializes the API. -async fn initialize_api(builder: FullNodeBuilder) -> FullNodeBuilder { +fn initialize_api(builder: FullNodeBuilder) -> FullNodeBuilder { log::info!("Initializing REST API..."); let config = builder.config(); @@ -387,11 +387,7 @@ async fn initialize_api(builder: FullNodeBuilder) -> F let rest_api_cfg = config.rest_api.clone(); let protocol_cfg = config.protocol.clone(); - let builder = - bee_rest_api::endpoints::init_full_node::>(rest_api_cfg, protocol_cfg, network_id, hrp, builder) - .await; - - builder + bee_rest_api::endpoints::init_full_node::>(rest_api_cfg, protocol_cfg, network_id, hrp, builder) } /// Initializes the Tangle. diff --git a/bee-node/bee-plugin/bee-plugin-dashboard/Cargo.toml b/bee-node/bee-plugin/bee-plugin-dashboard/Cargo.toml index 686db07fc8..d03a64cf2c 100644 --- a/bee-node/bee-plugin/bee-plugin-dashboard/Cargo.toml +++ b/bee-node/bee-plugin/bee-plugin-dashboard/Cargo.toml @@ -16,7 +16,7 @@ bee-message = { version = "0.2.0", path = "../../../bee-message", default-featur bee-protocol = { version = "0.2.0", path = "../../../bee-protocol", default-features = false, features = [ "workers" ] } bee-rest-api = { version = "0.2.0", path = "../../../bee-api/bee-rest-api", default-features = false, features = [ "endpoints", "peer" ] } bee-runtime = { version = "0.1.1-alpha", path = "../../../bee-runtime", default-features = false } -bee-storage = { version = "0.9.0", path = "../../../bee-storage/bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../../../bee-storage/bee-storage", default-features = false } bee-tangle = { version = "0.3.0", path = "../../../bee-tangle", default-features = false } async-trait = { version = "0.1.51", default-features = false } diff --git a/bee-node/bee-plugin/bee-plugin-dashboard/src/auth.rs b/bee-node/bee-plugin/bee-plugin-dashboard/src/auth.rs index 59d35bab1a..a77ef1bf35 100644 --- a/bee-node/bee-plugin/bee-plugin-dashboard/src/auth.rs +++ b/bee-node/bee-plugin/bee-plugin-dashboard/src/auth.rs @@ -15,11 +15,7 @@ pub struct AuthResponse { pub jwt: String, } -pub(crate) async fn auth( - node_id: String, - config: DashboardAuthConfig, - body: JsonValue, -) -> Result { +pub(crate) fn auth(node_id: String, config: DashboardAuthConfig, body: JsonValue) -> Result { let jwt_json = &body["jwt"]; if !jwt_json.is_null() { diff --git a/bee-node/bee-plugin/bee-plugin-dashboard/src/routes.rs b/bee-node/bee-plugin/bee-plugin-dashboard/src/routes.rs index 8e17030093..b995ed8c81 100644 --- a/bee-node/bee-plugin/bee-plugin-dashboard/src/routes.rs +++ b/bee-node/bee-plugin/bee-plugin-dashboard/src/routes.rs @@ -27,11 +27,11 @@ use crate::{ const BEARER: &str = "Bearer "; -async fn serve_index() -> Result { +fn serve_index() -> Result { serve_asset("index.html") } -async fn serve_full_path(path: FullPath) -> Result { +fn serve_full_path(path: FullPath) -> Result { serve_asset(&path.as_str()[1..]) } @@ -50,25 +50,27 @@ fn serve_asset(path: &str) -> Result { } pub(crate) fn index_filter() -> impl Filter + Clone { - warp::path::end().and_then(serve_index) + warp::path::end().and_then(|| async move { serve_index() }) } pub(crate) fn asset_routes() -> impl Filter + Clone { warp::path("branding") .and(warp::path::full()) - .and_then(serve_full_path) - .or(warp::path("static").and(warp::path::full()).and_then(serve_full_path)) + .and_then(|path| async move { serve_full_path(path) }) + .or(warp::path("static") + .and(warp::path::full()) + .and_then(|path| async move { serve_full_path(path) })) } pub(crate) fn page_routes() -> impl Filter + Clone { warp::path!("analytics" / ..) - .and_then(serve_index) - .or(warp::path!("dashboard" / ..).and_then(serve_index)) - .or(warp::path!("explorer" / ..).and_then(serve_index)) - .or(warp::path!("login" / ..).and_then(serve_index)) - .or(warp::path!("peers" / ..).and_then(serve_index)) - .or(warp::path!("settings" / ..).and_then(serve_index)) - .or(warp::path!("visualizer" / ..).and_then(serve_index)) + .and_then(|| async { serve_index() }) + .or(warp::path!("dashboard" / ..).and_then(|| async { serve_index() })) + .or(warp::path!("explorer" / ..).and_then(|| async { serve_index() })) + .or(warp::path!("login" / ..).and_then(|| async { serve_index() })) + .or(warp::path!("peers" / ..).and_then(|| async { serve_index() })) + .or(warp::path!("settings" / ..).and_then(|| async { serve_index() })) + .or(warp::path!("visualizer" / ..).and_then(|| async { serve_index() })) } pub(crate) fn ws_routes( @@ -183,7 +185,7 @@ pub(crate) fn auth_route( .and(node_id_filter) .and(auth_config_filter) .and(warp::body::json()) - .and_then(auth) + .and_then(|node_id, config, body| async move { auth(node_id, config, body) }) } pub(crate) fn routes( diff --git a/bee-node/bee-plugin/bee-plugin-dashboard/src/workers/node_status.rs b/bee-node/bee-plugin/bee-plugin-dashboard/src/workers/node_status.rs index c72136fddf..6497852220 100644 --- a/bee-node/bee-plugin/bee-plugin-dashboard/src/workers/node_status.rs +++ b/bee-node/bee-plugin/bee-plugin-dashboard/src/workers/node_status.rs @@ -53,7 +53,7 @@ pub(crate) fn node_status_worker( let public_node_status = PublicNodeStatus { snapshot_index: *tangle.get_snapshot_index(), pruning_index: *tangle.get_pruning_index(), - is_healthy: is_healthy(&tangle, &peer_manager).await, + is_healthy: is_healthy(&tangle, &peer_manager), is_synced: tangle.is_synced(), }; diff --git a/bee-protocol/Cargo.toml b/bee-protocol/Cargo.toml index 030f5779c9..928c96452f 100644 --- a/bee-protocol/Cargo.toml +++ b/bee-protocol/Cargo.toml @@ -23,7 +23,7 @@ bee-ledger = { version = "0.7.0", path = "../bee-ledger", default-features = fal bee-message = { version = "0.2.0", path = "../bee-message", default-features = false, features = [ "serde" ] } bee-pow = { version = "0.2.0", path = "../bee-pow", default-features = false } bee-runtime = { version = "0.1.1-alpha", path = "../bee-runtime", default-features = false, optional = true } -bee-storage = { version = "0.9.0", path = "../bee-storage/bee-storage", default-features = false, optional = true } +bee-storage = { version = "0.12.0", path = "../bee-storage/bee-storage", default-features = false, optional = true } bee-tangle = { version = "0.3.0", path = "../bee-tangle", default-features = false, optional = true } async-channel = { version = "1.6.1", default-features = false, optional = true } diff --git a/bee-protocol/src/workers/index_updater.rs b/bee-protocol/src/workers/index_updater.rs index 28398b0062..f2a427f7cf 100644 --- a/bee-protocol/src/workers/index_updater.rs +++ b/bee-protocol/src/workers/index_updater.rs @@ -73,7 +73,6 @@ where async fn process(tangle: &Tangle, milestone: Milestone, index: MilestoneIndex) { if let Some(parents) = tangle .get(milestone.message_id()) - .await .map(|message| message.parents().to_vec()) { // Update the past cone of this milestone by setting its milestone index, and return them. @@ -82,7 +81,7 @@ async fn process(tangle: &Tangle, milestone: Milestone, in // Note: For tip-selection only the most recent tangle is relevent. That means that during synchronization we do // not need to update xMRSI values or tip scores before (LATEST_MILESTONE_INDEX - BELOW_MAX_DEPTH). if index > tangle.get_latest_milestone_index() - tangle.config().below_max_depth() { - update_future_cone(tangle, roots).await; + update_future_cone(tangle, roots); // Update tip pool after all values got updated. tangle.update_tip_scores().await; @@ -107,7 +106,6 @@ async fn update_past_cone( || tangle.is_solid_entry_point(&parent_id).await || tangle .get_metadata(&parent_id) - .await // TODO: I don't think unwrapping here is safe. Investigate! .unwrap() .milestone_index() @@ -116,17 +114,14 @@ async fn update_past_cone( continue; } - tangle - .update_metadata(&parent_id, |metadata| { - metadata.set_milestone_index(index); - // TODO: That was fine in a synchronous scenario, where this algo had the newest information, but - // probably isn't the case in the now asynchronous scenario. Investigate! - metadata.set_omrsi(IndexId::new(index, parent_id)); - metadata.set_ymrsi(IndexId::new(index, parent_id)); - }) - .await; - - if let Some(parent) = tangle.get(&parent_id).await { + tangle.update_metadata(&parent_id, |metadata| { + metadata.set_milestone_index(index); + + let index = IndexId::new(index, parent_id); + metadata.set_omrsi_and_ymrsi(index, index); + }); + + if let Some(parent) = tangle.get(&parent_id) { parents.extend_from_slice(parent.parents()) } @@ -145,69 +140,56 @@ async fn update_past_cone( // NOTE: Once a milestone comes in we have to walk the future cones of the root transactions and update their OMRSI and // YMRSI; during that time we need to block the propagator, otherwise it will propagate outdated data. -async fn update_future_cone(tangle: &Tangle, roots: HashSet) { +fn update_future_cone(tangle: &Tangle, roots: HashSet) { let mut to_process = roots.into_iter().collect::>(); let mut processed = HashSet::new(); while let Some(parent_id) = to_process.pop() { - if let Some(children) = tangle.get_children(&parent_id).await { + if let Some(children) = tangle.get_children(&parent_id) { // Unwrap is safe with very high probability. - let (parent_omrsi, parent_ymrsi) = tangle - .get_metadata(&parent_id) - .await - .map(|md| (md.omrsi(), md.ymrsi())) - .unwrap(); + let parent_omrsi_and_ymrsi = tangle.get_metadata(&parent_id).map(|md| md.omrsi_and_ymrsi()).unwrap(); // TODO: investigate data race // Skip vertices with unset omrsi/ymrsi - let (parent_omrsi, parent_ymrsi) = { - if parent_omrsi.is_none() || parent_ymrsi.is_none() { - continue; - } else { - (parent_omrsi.unwrap(), parent_ymrsi.unwrap()) - } - }; - - // We can update the OMRSI/YMRSI of those children that inherited the value from the current parent. - for child in &children { - if let Some(child_metadata) = tangle.get_metadata(child).await { - // We can ignore children that are already confirmed - // TODO: resolve ambiguity between `is_confirmed()` and `milestone_index().is_some()` - // if child_metadata.flags().is_confirmed() { - if child_metadata.milestone_index().is_some() { - continue; - } - - // If the childs OMRSI was previously inherited from the current parent, update it. - if let Some(child_omrsi) = child_metadata.omrsi() { - if child_omrsi.id().eq(&parent_id) { - tangle - .update_metadata(child, |md| { - md.set_omrsi(IndexId::new(parent_omrsi.index(), parent_id)); - }) - .await; - } - } - - // If the childs YMRSI was previously inherited from the current parent, update it. - if let Some(child_ymrsi) = child_metadata.ymrsi() { - if child_ymrsi.id().eq(&parent_id) { - tangle - .update_metadata(child, |md| { - md.set_ymrsi(IndexId::new(parent_ymrsi.index(), parent_id)); - }) - .await; + match parent_omrsi_and_ymrsi { + None => continue, + Some((parent_omrsi, parent_ymrsi)) => { + // We can update the OMRSI/YMRSI of those children that inherited the value from the current parent. + for child in &children { + let continue_walk = tangle + .update_metadata(child, |child_metadata| { + // We can ignore children that are already confirmed + // TODO: resolve ambiguity between `is_confirmed()` and `milestone_index().is_some()` + // if child_metadata.flags().is_confirmed() { + if child_metadata.milestone_index().is_some() { + return false; + } + + // If the childs OMRSI and YMRSI was previously inherited from the current parent, + // update it. + child_metadata.update_omrsi_and_ymrsi(|child_omrsi, child_ymrsi| { + if child_omrsi.id() == parent_id { + *child_omrsi = IndexId::new(parent_omrsi.index(), parent_id); + } + + if child_ymrsi.id() == parent_id { + *child_ymrsi = IndexId::new(parent_ymrsi.index(), parent_id); + } + }); + + true + }) + .unwrap_or_default(); + + // Continue the future walk for that child, if we haven't landed on it earlier already. + if continue_walk && !processed.contains(child) { + to_process.push(*child); } } - // Continue the future walk for that child, if we haven't landed on it earlier already. - if !processed.contains(child) { - to_process.push(*child); - } + processed.insert(parent_id); } } - - processed.insert(parent_id); } } diff --git a/bee-protocol/src/workers/message/payload/milestone.rs b/bee-protocol/src/workers/message/payload/milestone.rs index 7a9d094a8a..3239bc6853 100644 --- a/bee-protocol/src/workers/message/payload/milestone.rs +++ b/bee-protocol/src/workers/message/payload/milestone.rs @@ -13,7 +13,7 @@ use bee_message::{ Message, MessageId, }; use bee_runtime::{event::Bus, node::Node, shutdown_stream::ShutdownStream, worker::Worker}; -use bee_tangle::{event::LatestMilestoneChanged, MessageRef, Tangle, TangleWorker}; +use bee_tangle::{event::LatestMilestoneChanged, Tangle, TangleWorker}; use futures::{future::FutureExt, stream::StreamExt}; use log::{debug, error, info}; use tokio::sync::mpsc; @@ -36,7 +36,7 @@ pub(crate) enum Error { pub(crate) struct MilestonePayloadWorkerEvent { pub(crate) message_id: MessageId, - pub(crate) message: MessageRef, + pub(crate) message: Message, } pub(crate) struct MilestonePayloadWorker { @@ -67,10 +67,10 @@ fn validate( } #[allow(clippy::too_many_arguments)] -async fn process( +fn process( tangle: &Tangle, message_id: MessageId, - message: MessageRef, + message: Message, peer_manager: &PeerManager, metrics: &NodeMetrics, requested_milestones: &RequestedMilestones, @@ -89,7 +89,7 @@ async fn process( match validate(message_id, &message, milestone, key_manager) { Ok(milestone) => { - tangle.add_milestone(index, milestone.clone()).await; + tangle.add_milestone(index, milestone.clone()); if index > tangle.get_latest_milestone_index() { info!("New milestone {} {}.", index, milestone.message_id()); tangle.update_latest_milestone_index(index); @@ -165,8 +165,7 @@ where &milestone_solidifier, &key_manager, &bus, - ) - .await; + ); } // Before the worker completely stops, the receiver needs to be drained for milestone payloads to be @@ -186,8 +185,7 @@ where &milestone_solidifier, &key_manager, &bus, - ) - .await; + ); count += 1; } diff --git a/bee-protocol/src/workers/message/payload/mod.rs b/bee-protocol/src/workers/message/payload/mod.rs index a1da876e56..89a5599d1f 100644 --- a/bee-protocol/src/workers/message/payload/mod.rs +++ b/bee-protocol/src/workers/message/payload/mod.rs @@ -8,9 +8,8 @@ mod transaction; use std::{any::TypeId, convert::Infallible}; use async_trait::async_trait; -use bee_message::{payload::Payload, MessageId}; +use bee_message::{payload::Payload, Message, MessageId}; use bee_runtime::{node::Node, shutdown_stream::ShutdownStream, worker::Worker}; -use bee_tangle::MessageRef; use futures::{future::FutureExt, stream::StreamExt}; use log::{debug, error, info}; use tokio::sync::mpsc; @@ -25,16 +24,16 @@ use crate::workers::storage::StorageBackend; pub(crate) struct PayloadWorkerEvent { pub(crate) message_id: MessageId, - pub(crate) message: MessageRef, + pub(crate) message: Message, } pub(crate) struct PayloadWorker { pub(crate) tx: mpsc::UnboundedSender, } -async fn process( +fn process( message_id: MessageId, - message: MessageRef, + message: Message, transaction_payload_worker: &mpsc::UnboundedSender, milestone_payload_worker: &mpsc::UnboundedSender, tagged_data_payload_worker: &mpsc::UnboundedSender, @@ -104,8 +103,7 @@ where &transaction_payload_worker, &milestone_payload_worker, &tagged_data_payload_worker, - ) - .await; + ); } // Before the worker completely stops, the receiver needs to be drained for payloads to be analysed. @@ -121,8 +119,7 @@ where &transaction_payload_worker, &milestone_payload_worker, &tagged_data_payload_worker, - ) - .await; + ); count += 1; } diff --git a/bee-protocol/src/workers/message/payload/transaction.rs b/bee-protocol/src/workers/message/payload/transaction.rs index ccb90e9bd7..5f19a05301 100644 --- a/bee-protocol/src/workers/message/payload/transaction.rs +++ b/bee-protocol/src/workers/message/payload/transaction.rs @@ -6,10 +6,9 @@ use std::{any::TypeId, convert::Infallible}; use async_trait::async_trait; use bee_message::{ payload::{transaction::TransactionEssence, Payload}, - MessageId, + Message, MessageId, }; use bee_runtime::{node::Node, shutdown_stream::ShutdownStream, worker::Worker}; -use bee_tangle::MessageRef; use futures::{future::FutureExt, stream::StreamExt}; use log::{debug, error, info}; use tokio::sync::mpsc; @@ -22,16 +21,16 @@ use crate::{ pub(crate) struct TransactionPayloadWorkerEvent { pub(crate) message_id: MessageId, - pub(crate) message: MessageRef, + pub(crate) message: Message, } pub(crate) struct TransactionPayloadWorker { pub(crate) tx: mpsc::UnboundedSender, } -async fn process( +fn process( message_id: MessageId, - message: MessageRef, + message: Message, tagged_data_payload_worker: &mpsc::UnboundedSender, metrics: &NodeMetrics, ) { @@ -84,7 +83,7 @@ where let mut receiver = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(rx)); while let Some(TransactionPayloadWorkerEvent { message_id, message }) = receiver.next().await { - process(message_id, message, &tagged_data_payload_worker, &metrics).await; + process(message_id, message, &tagged_data_payload_worker, &metrics); } // Before the worker completely stops, the receiver needs to be drained for transaction payloads to be @@ -95,7 +94,7 @@ where while let Some(Some(TransactionPayloadWorkerEvent { message_id, message })) = receiver.next().now_or_never() { - process(message_id, message, &tagged_data_payload_worker, &metrics).await; + process(message_id, message, &tagged_data_payload_worker, &metrics); count += 1; } diff --git a/bee-protocol/src/workers/message/processor.rs b/bee-protocol/src/workers/message/processor.rs index b7c6db0ce6..9115a7df59 100644 --- a/bee-protocol/src/workers/message/processor.rs +++ b/bee-protocol/src/workers/message/processor.rs @@ -180,11 +180,8 @@ where } let message_id = message.id(); - let metadata = MessageMetadata::arrived(); - let message = if let Some(message) = tangle.insert(message, message_id, metadata).await { - message - } else { + if tangle.contains(&message_id) { metrics.known_messages_inc(); if let Some(ref peer_id) = from { peer_manager @@ -194,7 +191,17 @@ where .unwrap_or_default(); } continue 'next_event; - }; + } else { + let metadata = MessageMetadata::arrived(); + // There is no data race here even if the `Message` and + // `MessageMetadata` are inserted between the call to `tangle.contains` + // and here because: + // - Both `Message`s are the same because they have the same hash. + // - `MessageMetadata` is not overwritten. + // - Some extra code is executing due to not calling `continue` but + // this does not create inconsistencies. + tangle.insert(&message, &message_id, &metadata); + } // Send the propagation event ASAP to allow the propagator to do its thing if let Err(e) = propagator.send(PropagatorWorkerEvent(message_id)) { @@ -232,13 +239,9 @@ where } }; - if payload_worker - .send(PayloadWorkerEvent { - message_id, - message: message.clone(), - }) - .is_err() - { + let parent_message_ids = message.parents().to_vec(); + + if payload_worker.send(PayloadWorkerEvent { message_id, message }).is_err() { error!("Sending message {} to payload worker failed.", message_id); } @@ -249,7 +252,7 @@ where // TODO: boolean values are false at this point in time? trigger event from another location? bus.dispatch(VertexCreated { message_id, - parent_message_ids: message.parents().to_vec(), + parent_message_ids, is_solid: false, is_referenced: false, is_conflicting: false, diff --git a/bee-protocol/src/workers/peer/mod.rs b/bee-protocol/src/workers/peer/mod.rs index a876f39494..d935716da9 100644 --- a/bee-protocol/src/workers/peer/mod.rs +++ b/bee-protocol/src/workers/peer/mod.rs @@ -90,8 +90,7 @@ impl PeerWorker { &self.milestone_requester, &*requested_milestones, Some(*self.peer.id()), - ) - .await; + ); // TODO is this needed ? let tangle = tangle.into_weak(); diff --git a/bee-protocol/src/workers/propagator.rs b/bee-protocol/src/workers/propagator.rs index e0bd007148..31a8b89062 100644 --- a/bee-protocol/src/workers/propagator.rs +++ b/bee-protocol/src/workers/propagator.rs @@ -37,7 +37,7 @@ async fn propagate( continue 'outer; } - if let Some(message) = tangle.get(message_id).await { + if let Some(message) = tangle.get(message_id) { // If one of the parents is not yet solid, we skip the current message. for parent in message.parents().iter() { if !tangle.is_solid_message(parent).await { @@ -64,12 +64,10 @@ async fn propagate( // SAFETY: 'unwrap' is safe, see explanation above. None => tangle .get_metadata(parent) - .await .map(|parent_md| { - ( - parent_md.omrsi().expect("solid msg with unset omrsi"), - parent_md.ymrsi().expect("solid msg with unset ymrsi"), - ) + parent_md + .omrsi_and_ymrsi() + .expect("solid msg with unset omrsi and ymrsi") }) .unwrap(), }; @@ -90,16 +88,14 @@ async fn propagate( if metadata.flags().is_milestone() { metadata.milestone_index() } else { - metadata.set_omrsi(*child_omrsi); - metadata.set_ymrsi(*child_ymrsi); + metadata.set_omrsi_and_ymrsi(*child_omrsi, *child_ymrsi); None } }) - .await .expect("Failed to fetch metadata."); // Try to propagate as far as possible into the future. - if let Some(msg_children) = tangle.get_children(message_id).await { + if let Some(msg_children) = tangle.get_children(message_id) { for child in msg_children { children.push(child); } diff --git a/bee-protocol/src/workers/requester/message.rs b/bee-protocol/src/workers/requester/message.rs index 13b92d4a28..941a1676a4 100644 --- a/bee-protocol/src/workers/requester/message.rs +++ b/bee-protocol/src/workers/requester/message.rs @@ -39,7 +39,7 @@ pub async fn request_message( message_id: MessageId, index: MilestoneIndex, ) { - if !tangle.contains(&message_id).await + if !tangle.contains(&message_id) && !tangle.is_solid_entry_point(&message_id).await && !requested_messages.contains(&message_id) { @@ -136,7 +136,7 @@ fn process_request_unchecked( } } -async fn retry_requests( +fn retry_requests( requested_messages: &RequestedMessages, peer_manager: &PeerManager, metrics: &NodeMetrics, @@ -162,7 +162,7 @@ async fn retry_requests( } for (message_id, index) in to_retry { - if tangle.contains(&message_id).await { + if tangle.contains(&message_id) { requested_messages.remove(&message_id); } else { process_request_unchecked(message_id, index, peer_manager, metrics); @@ -229,7 +229,7 @@ where let mut ticker = ShutdownStream::new(shutdown, IntervalStream::new(interval(RETRY_INTERVAL))); while ticker.next().await.is_some() { - retry_requests(&requested_messages, &peer_manager, &metrics, &tangle).await; + retry_requests(&requested_messages, &peer_manager, &metrics, &tangle); } info!("Retryer stopped."); diff --git a/bee-protocol/src/workers/requester/milestone.rs b/bee-protocol/src/workers/requester/milestone.rs index 5580466797..2fc2c578bd 100644 --- a/bee-protocol/src/workers/requester/milestone.rs +++ b/bee-protocol/src/workers/requester/milestone.rs @@ -30,27 +30,27 @@ use crate::{ const RETRY_INTERVAL: Duration = Duration::from_millis(2500); -pub(crate) async fn request_milestone( +pub(crate) fn request_milestone( tangle: &Tangle, milestone_requester: &mpsc::UnboundedSender, requested_milestones: &RequestedMilestones, index: MilestoneIndex, to: Option, ) { - if !requested_milestones.contains(&index) && !tangle.contains_milestone(index).await { + if !requested_milestones.contains(&index) && !tangle.contains_milestone(index) { if let Err(e) = milestone_requester.send(MilestoneRequesterWorkerEvent(index, to)) { warn!("Requesting milestone failed: {}.", e); } } } -pub(crate) async fn request_latest_milestone( +pub(crate) fn request_latest_milestone( tangle: &Tangle, milestone_requester: &mpsc::UnboundedSender, requested_milestones: &RequestedMilestones, to: Option, ) { - request_milestone(tangle, milestone_requester, requested_milestones, MilestoneIndex(0), to).await + request_milestone(tangle, milestone_requester, requested_milestones, MilestoneIndex(0), to) } #[derive(Default)] @@ -86,7 +86,7 @@ pub(crate) struct MilestoneRequesterWorker { pub(crate) tx: mpsc::UnboundedSender, } -async fn process_request( +fn process_request( index: MilestoneIndex, peer_id: Option, peer_manager: &PeerManager, @@ -128,7 +128,7 @@ fn process_request_unchecked( } } -async fn retry_requests( +fn retry_requests( requested_milestones: &RequestedMilestones, peer_manager: &PeerManager, metrics: &NodeMetrics, @@ -154,7 +154,7 @@ async fn retry_requests( } for index in to_retry { - if tangle.contains_milestone(index).await { + if tangle.contains_milestone(index) { requested_milestones.remove(&index); } else { process_request_unchecked(index, None, peer_manager, metrics); @@ -200,9 +200,9 @@ where let mut receiver = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(rx)); while let Some(MilestoneRequesterWorkerEvent(index, peer_id)) = receiver.next().await { - if !tangle.contains_milestone(index).await { + if !tangle.contains_milestone(index) { debug!("Requesting milestone {}.", *index); - process_request(index, peer_id, &peer_manager, &metrics, &requested_milestones).await; + process_request(index, peer_id, &peer_manager, &metrics, &requested_milestones); } } @@ -220,7 +220,7 @@ where let mut ticker = ShutdownStream::new(shutdown, IntervalStream::new(interval(RETRY_INTERVAL))); while ticker.next().await.is_some() { - retry_requests(&requested_milestones, &peer_manager, &metrics, &tangle).await; + retry_requests(&requested_milestones, &peer_manager, &metrics, &tangle); } info!("Retryer stopped."); diff --git a/bee-protocol/src/workers/responder/message.rs b/bee-protocol/src/workers/responder/message.rs index 25d72468fd..b1a7565cff 100644 --- a/bee-protocol/src/workers/responder/message.rs +++ b/bee-protocol/src/workers/responder/message.rs @@ -63,7 +63,7 @@ where let mut receiver = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(rx)); while let Some(MessageResponderWorkerEvent { peer_id, request }) = receiver.next().await { - if let Some(message) = tangle.get(&request.message_id).await { + if let Some(message) = tangle.get(&request.message_id) { Sender::::send( &MessagePacket::new(message.pack_to_vec()), &peer_id, diff --git a/bee-protocol/src/workers/responder/milestone.rs b/bee-protocol/src/workers/responder/milestone.rs index edc1bfc8b1..99a7faba69 100644 --- a/bee-protocol/src/workers/responder/milestone.rs +++ b/bee-protocol/src/workers/responder/milestone.rs @@ -69,7 +69,7 @@ where request.index.into() }; - if let Some(message) = tangle.get_milestone_message(index).await { + if let Some(message) = tangle.get_milestone_message(index) { Sender::::send( &MessagePacket::new(message.pack_to_vec()), &peer_id, diff --git a/bee-protocol/src/workers/solidifier.rs b/bee-protocol/src/workers/solidifier.rs index 69211ab5c6..000f37e484 100644 --- a/bee-protocol/src/workers/solidifier.rs +++ b/bee-protocol/src/workers/solidifier.rs @@ -47,12 +47,11 @@ async fn heavy_solidification( traversal::visit_parents_depth_first( tangle, target_id, - |id, _, metadata| !metadata.flags().is_solid() && !requested_messages.contains(&id), + |id, _, metadata| !metadata.flags().is_solid() && !requested_messages.contains(id), |_, _, _| {}, |_, _, _| {}, |missing_id| missing.push(*missing_id), - ) - .await; + ); let missing_len = missing.len(); @@ -146,13 +145,13 @@ where // Request all milestones within a range. while next <= cmp::min(smi + MilestoneIndex(milestone_sync_count), lmi) { - request_milestone(&tangle, &milestone_requester, &*requested_milestones, next, None).await; + request_milestone(&tangle, &milestone_requester, &*requested_milestones, next, None); next = next + MilestoneIndex(1); } if index < next { - if let Some(message_id) = tangle.get_milestone_message_id(index).await { - if let Some(message) = tangle.get(&message_id).await { + if let Some(message_id) = tangle.get_milestone_message_id(index) { + if let Some(message) = tangle.get(&message_id) { debug!( "Light solidification of milestone {} {} in [{};{}].", index, @@ -174,7 +173,7 @@ where let mut target = smi + MilestoneIndex(1); while target <= lmi { - if let Some(id) = tangle.get_milestone_message_id(target).await { + if let Some(id) = tangle.get_milestone_message_id(target) { if tangle.is_solid_message(&id).await { solidify( &tangle, diff --git a/bee-runtime/Cargo.toml b/bee-runtime/Cargo.toml index d6f17fca6c..2e2abf38ba 100644 --- a/bee-runtime/Cargo.toml +++ b/bee-runtime/Cargo.toml @@ -17,7 +17,7 @@ all-features = true rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] -bee-storage = { version = "0.9.0", path = "../bee-storage/bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../bee-storage/bee-storage", default-features = false } async-trait = { version = "0.1.51", default-features = false } dashmap = { version = "4.0.2", default-features = false } diff --git a/bee-storage/bee-storage-memory/CHANGELOG.md b/bee-storage/bee-storage-memory/CHANGELOG.md index 44feed92e2..7a2fe477fc 100644 --- a/bee-storage/bee-storage-memory/CHANGELOG.md +++ b/bee-storage/bee-storage-memory/CHANGELOG.md @@ -19,13 +19,29 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.2.0 - 2022-XX-XX +## 0.4.0 - 2022-XX-XX ### Removed - All `Balance` operations; - All `PaddedIndex` operations; +## 0.3.0 - 2022-03-17 + +### Added + +- Implementation of `InsertStrict` for `Storage`; + +### Removed + +- Implementation of `Insert` for `Storage`; + +## 0.2.0 - 2022-03-11 + +### Added + +- Implementation of `Update` for `Storage`; + ## 0.1.0 - 2021-10-21 ### Added diff --git a/bee-storage/bee-storage-memory/Cargo.toml b/bee-storage/bee-storage-memory/Cargo.toml index 642ecafc0b..9c8537b064 100644 --- a/bee-storage/bee-storage-memory/Cargo.toml +++ b/bee-storage/bee-storage-memory/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-storage-memory" -version = "0.1.0" +version = "0.4.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "An in-memory bee-storage implementation" @@ -19,7 +19,7 @@ rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] bee-ledger = { version = "0.7.0", path = "../../bee-ledger", default-features = false } bee-message = { version = "0.2.0", path = "../../bee-message", default-features = false } -bee-storage = { version = "0.9.0", path = "../bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../bee-storage", default-features = false } bee-tangle = { version = "0.3.0", path = "../../bee-tangle", default-features = false } serde = { version = "1.0.130", default-features = false, features = [ "derive" ] } diff --git a/bee-storage/bee-storage-memory/src/access/insert.rs b/bee-storage/bee-storage-memory/src/access/insert.rs index ec44be33d7..106054d282 100644 --- a/bee-storage/bee-storage-memory/src/access/insert.rs +++ b/bee-storage/bee-storage-memory/src/access/insert.rs @@ -13,7 +13,11 @@ use bee_message::{ output::OutputId, Message, MessageId, }; -use bee_storage::{access::Insert, backend::StorageBackend, system::System}; +use bee_storage::{ + access::{Insert, InsertStrict}, + backend::StorageBackend, + system::System, +}; use bee_tangle::{ metadata::MessageMetadata, solid_entry_point::SolidEntryPoint, unreferenced_message::UnreferencedMessage, }; @@ -34,7 +38,6 @@ macro_rules! impl_insert { impl_insert!(u8, System, system); impl_insert!(MessageId, Message, message_id_to_message); -impl_insert!(MessageId, MessageMetadata, message_id_to_metadata); impl_insert!((MessageId, MessageId), (), message_id_to_message_id); impl_insert!(OutputId, CreatedOutput, output_id_to_created_output); impl_insert!(OutputId, ConsumedOutput, output_id_to_consumed_output); @@ -52,3 +55,17 @@ impl_insert!( ); impl_insert!((MilestoneIndex, Receipt), (), milestone_index_to_receipt); impl_insert!((bool, TreasuryOutput), (), spent_to_treasury_output); + +impl InsertStrict for Storage { + fn insert_strict(&self, k: &MessageId, v: &MessageMetadata) -> Result<(), ::Error> { + let mut guard = self.inner.write()?; + + if !guard.message_id_to_metadata.exist(k) { + guard.message_id_to_metadata.insert(k, v); + } + + drop(guard); + + Ok(()) + } +} diff --git a/bee-storage/bee-storage-memory/src/access/mod.rs b/bee-storage/bee-storage-memory/src/access/mod.rs index bc317bbd9f..cd4bd0652b 100644 --- a/bee-storage/bee-storage-memory/src/access/mod.rs +++ b/bee-storage/bee-storage-memory/src/access/mod.rs @@ -11,3 +11,4 @@ pub mod insert; pub mod iter; pub mod multi_fetch; pub mod truncate; +pub mod update; diff --git a/bee-storage/bee-storage-memory/src/access/update.rs b/bee-storage/bee-storage-memory/src/access/update.rs new file mode 100644 index 0000000000..61b74c83fd --- /dev/null +++ b/bee-storage/bee-storage-memory/src/access/update.rs @@ -0,0 +1,22 @@ +// Copyright 2021-2022 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +//! Update access operations. + +use bee_message::MessageId; +use bee_storage::{access::Update, backend::StorageBackend}; +use bee_tangle::metadata::MessageMetadata; + +use crate::storage::Storage; + +macro_rules! impl_update { + ($key:ty, $value:ty, $field:ident) => { + impl Update<$key, $value> for Storage { + fn update(&self, k: &$key, f: impl FnMut(&mut $value)) -> Result<(), ::Error> { + Ok(self.inner.write()?.$field.update(k, f)) + } + } + }; +} + +impl_update!(MessageId, MessageMetadata, message_id_to_metadata); diff --git a/bee-storage/bee-storage-memory/src/table.rs b/bee-storage/bee-storage-memory/src/table.rs index 3c5409d5e3..f8d4b1e0aa 100644 --- a/bee-storage/bee-storage-memory/src/table.rs +++ b/bee-storage/bee-storage-memory/src/table.rs @@ -47,6 +47,10 @@ impl Table { self.inner.clear(); } + pub(crate) fn update(&mut self, k: &K, f: impl FnOnce(&mut V)) { + self.inner.get_mut(k).map(f); + } + pub(crate) fn iter(&self) -> TableIter { self.inner.clone().into_iter().map(Ok) } diff --git a/bee-storage/bee-storage-null/CHANGELOG.md b/bee-storage/bee-storage-null/CHANGELOG.md index 1ef96d8cce..77ec0e8a99 100644 --- a/bee-storage/bee-storage-null/CHANGELOG.md +++ b/bee-storage/bee-storage-null/CHANGELOG.md @@ -19,7 +19,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.1.0 - 2021-10-11 +## 0.3.0 - 2022-03-17 + +### Added + +- Implementation of `InsertStrict` for `Storage`; + +## 0.2.0 - 2022-03-11 + +### Added + +- Implementation of `Update` for `Storage`; + +## 0.1.0 - 2021-10-11 ### Added diff --git a/bee-storage/bee-storage-null/Cargo.toml b/bee-storage/bee-storage-null/Cargo.toml index 3ae5d0273b..dabf9eca3c 100644 --- a/bee-storage/bee-storage-null/Cargo.toml +++ b/bee-storage/bee-storage-null/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-storage-null" -version = "0.1.0" +version = "0.3.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "A bee-storage implementation with no backend, for testing purposes" @@ -17,4 +17,4 @@ all-features = true rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] -bee-storage = { version = "0.9.0", path = "../bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../bee-storage", default-features = false } diff --git a/bee-storage/bee-storage-null/src/access/insert.rs b/bee-storage/bee-storage-null/src/access/insert.rs index a062cb11ed..a3209681fd 100644 --- a/bee-storage/bee-storage-null/src/access/insert.rs +++ b/bee-storage/bee-storage-null/src/access/insert.rs @@ -1,7 +1,7 @@ // Copyright 2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use bee_storage::access::Insert; +use bee_storage::access::{Insert, InsertStrict}; use crate::Storage; @@ -10,3 +10,9 @@ impl Insert for Storage { Ok(()) } } + +impl InsertStrict for Storage { + fn insert_strict(&self, _key: &K, _value: &V) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/bee-storage/bee-storage-null/src/access/mod.rs b/bee-storage/bee-storage-null/src/access/mod.rs index 6584036235..2e4da23a81 100644 --- a/bee-storage/bee-storage-null/src/access/mod.rs +++ b/bee-storage/bee-storage-null/src/access/mod.rs @@ -9,3 +9,4 @@ pub mod insert; pub mod iter; pub mod multi_fetch; pub mod truncate; +pub mod update; diff --git a/bee-storage/bee-storage-null/src/access/update.rs b/bee-storage/bee-storage-null/src/access/update.rs new file mode 100644 index 0000000000..23d4bda025 --- /dev/null +++ b/bee-storage/bee-storage-null/src/access/update.rs @@ -0,0 +1,12 @@ +// Copyright 2021-2022 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use bee_storage::access::Update; + +use crate::Storage; + +impl Update for Storage { + fn update(&self, _key: &K, _f: impl FnMut(&mut V)) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/bee-storage/bee-storage-rocksdb/CHANGELOG.md b/bee-storage/bee-storage-rocksdb/CHANGELOG.md index b458ce7cb3..bb8244c895 100644 --- a/bee-storage/bee-storage-rocksdb/CHANGELOG.md +++ b/bee-storage/bee-storage-rocksdb/CHANGELOG.md @@ -19,18 +19,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.7.0 - 2022-XX-XX +## 0.8.0 - 2022-XX-XX + +### Changed + +- Use `packable` instead of `bee-common::packable` to serialize to and deserialize from storage. ### Removed - All `Balance` operations; - All `PaddedIndex` operations; -## 0.6.0 - 2022-XX-XX +## 0.7.0 - 2022-03-17 -### Changed +### Added -- Use `packable` instead of `bee-common::packable` to serialize to and deserialize from storage. +- Implementation of `InsertStrict` for `Storage`; + +### Removed + +- Implementation of `Insert` for `Storage`; + +## 0.6.0 - 2022-03-11 + +### Added + +- Implementation of `Update` for `Storage`; ## 0.5.0 - 2021-06-15 diff --git a/bee-storage/bee-storage-rocksdb/Cargo.toml b/bee-storage/bee-storage-rocksdb/Cargo.toml index df0d238004..9abb744d7a 100644 --- a/bee-storage/bee-storage-rocksdb/Cargo.toml +++ b/bee-storage/bee-storage-rocksdb/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-storage-rocksdb" -version = "0.5.0" +version = "0.8.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "A bee-storage implementation for the RocksDB backend" @@ -19,11 +19,12 @@ rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] bee-ledger = { version = "0.7.0", path = "../../bee-ledger", default-features = false } bee-message = { version = "0.2.0", path = "../../bee-message", default-features = false } -bee-storage = { version = "0.9.0", path = "../bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../bee-storage", default-features = false } bee-tangle = { version = "0.3.0", path = "../../bee-tangle", default-features = false } num_cpus = { version = "1.13.0", default-features = false } packable = { version = "0.3.1", default-features = false, features = [ "serde" ] } +parking_lot = { version = "0.12.0", default-features = false } rocksdb = { version = "0.18.0", default-features = false } serde = { version = "1.0.130", default-features = false, features = [ "derive" ] } thiserror = { version = "1.0.30", default-features = false } diff --git a/bee-storage/bee-storage-rocksdb/src/access/batch.rs b/bee-storage/bee-storage-rocksdb/src/access/batch.rs index 10d05b8fe3..f2c1ffc227 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/batch.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/batch.rs @@ -25,6 +25,7 @@ use crate::{ #[derive(Default)] pub struct StorageBatch { + should_lock: bool, inner: WriteBatch, key_buf: Vec, value_buf: Vec, @@ -37,8 +38,13 @@ impl BatchBuilder for Storage { let mut write_options = WriteOptions::default(); write_options.set_sync(false); write_options.disable_wal(!durability); + + let guard = batch.should_lock.then(|| self.locks.message_id_to_metadata.read()); + self.inner.write_opt(batch.inner, &write_options)?; + drop(guard); + Ok(()) } } @@ -81,6 +87,8 @@ impl Batch for Storage { message_id: &MessageId, metadata: &MessageMetadata, ) -> Result<(), ::Error> { + batch.should_lock = true; + batch.value_buf.clear(); // Packing to bytes can't fail. metadata.pack(&mut batch.value_buf).unwrap(); @@ -97,6 +105,8 @@ impl Batch for Storage { batch: &mut Self::Batch, message_id: &MessageId, ) -> Result<(), ::Error> { + batch.should_lock = true; + batch .inner .delete_cf(self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, message_id); diff --git a/bee-storage/bee-storage-rocksdb/src/access/delete.rs b/bee-storage/bee-storage-rocksdb/src/access/delete.rs index c8fddedf86..104868e14c 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/delete.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/delete.rs @@ -33,9 +33,13 @@ impl Delete for Storage { impl Delete for Storage { fn delete(&self, message_id: &MessageId) -> Result<(), ::Error> { + let guard = self.locks.message_id_to_metadata.read(); + self.inner .delete_cf(self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, message_id)?; + drop(guard); + Ok(()) } } diff --git a/bee-storage/bee-storage-rocksdb/src/access/exist.rs b/bee-storage/bee-storage-rocksdb/src/access/exist.rs index efd15d36c9..1d9d22a06a 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/exist.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/exist.rs @@ -26,17 +26,23 @@ impl Exist for Storage { fn exist(&self, message_id: &MessageId) -> Result::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE)?, message_id)? + .get_pinned_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE)?, message_id)? .is_some()) } } impl Exist for Storage { fn exist(&self, message_id: &MessageId) -> Result::Error> { - Ok(self + let guard = self.locks.message_id_to_metadata.read(); + + let exists = self .inner - .get_cf(self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, message_id)? - .is_some()) + .get_pinned_cf(self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, message_id)? + .is_some(); + + drop(guard); + + Ok(exists) } } @@ -47,7 +53,7 @@ impl Exist<(MessageId, MessageId), ()> for Storage { Ok(self .inner - .get_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_ID)?, key)? + .get_pinned_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE_ID)?, key)? .is_some()) } } @@ -56,7 +62,7 @@ impl Exist for Storage { fn exist(&self, output_id: &OutputId) -> Result::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_OUTPUT_ID_TO_CREATED_OUTPUT)?, output_id.pack_to_vec())? + .get_pinned_cf(self.cf_handle(CF_OUTPUT_ID_TO_CREATED_OUTPUT)?, output_id.pack_to_vec())? .is_some()) } } @@ -65,7 +71,7 @@ impl Exist for Storage { fn exist(&self, output_id: &OutputId) -> Result::Error> { Ok(self .inner - .get_cf( + .get_pinned_cf( self.cf_handle(CF_OUTPUT_ID_TO_CONSUMED_OUTPUT)?, output_id.pack_to_vec(), )? @@ -77,7 +83,7 @@ impl Exist for Storage { fn exist(&self, unspent: &Unspent) -> Result::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_OUTPUT_ID_UNSPENT)?, unspent.pack_to_vec())? + .get_pinned_cf(self.cf_handle(CF_OUTPUT_ID_UNSPENT)?, unspent.pack_to_vec())? .is_some()) } } @@ -92,14 +98,17 @@ impl Exist<(Ed25519Address, OutputId), ()> for Storage { Ok(self .inner - .get_cf(self.cf_handle(CF_ED25519_ADDRESS_TO_OUTPUT_ID)?, key)? + .get_pinned_cf(self.cf_handle(CF_ED25519_ADDRESS_TO_OUTPUT_ID)?, key)? .is_some()) } } impl Exist<(), LedgerIndex> for Storage { fn exist(&self, (): &()) -> Result::Error> { - Ok(self.inner.get_cf(self.cf_handle(CF_LEDGER_INDEX)?, [0x00u8])?.is_some()) + Ok(self + .inner + .get_pinned_cf(self.cf_handle(CF_LEDGER_INDEX)?, [0x00u8])? + .is_some()) } } @@ -107,7 +116,7 @@ impl Exist for Storage { fn exist(&self, index: &MilestoneIndex) -> Result::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_MILESTONE)?, index.pack_to_vec())? + .get_pinned_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_MILESTONE)?, index.pack_to_vec())? .is_some()) } } @@ -116,7 +125,7 @@ impl Exist<(), SnapshotInfo> for Storage { fn exist(&self, (): &()) -> Result::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_SNAPSHOT_INFO)?, [0x00u8])? + .get_pinned_cf(self.cf_handle(CF_SNAPSHOT_INFO)?, [0x00u8])? .is_some()) } } @@ -125,7 +134,7 @@ impl Exist for Storage { fn exist(&self, sep: &SolidEntryPoint) -> Result::Error> { Ok(self .inner - .get_cf( + .get_pinned_cf( self.cf_handle(CF_SOLID_ENTRY_POINT_TO_MILESTONE_INDEX)?, sep.pack_to_vec(), )? @@ -137,7 +146,7 @@ impl Exist for Storage { fn exist(&self, index: &MilestoneIndex) -> Result::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_OUTPUT_DIFF)?, index.pack_to_vec())? + .get_pinned_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_OUTPUT_DIFF)?, index.pack_to_vec())? .is_some()) } } @@ -152,7 +161,7 @@ impl Exist<(MilestoneIndex, UnreferencedMessage), ()> for Storage { Ok(self .inner - .get_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_UNREFERENCED_MESSAGE)?, key)? + .get_pinned_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_UNREFERENCED_MESSAGE)?, key)? .is_some()) } } @@ -164,7 +173,7 @@ impl Exist<(MilestoneIndex, Receipt), ()> for Storage { Ok(self .inner - .get_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_RECEIPT)?, key)? + .get_pinned_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_RECEIPT)?, key)? .is_some()) } } @@ -176,7 +185,7 @@ impl Exist<(bool, TreasuryOutput), ()> for Storage { Ok(self .inner - .get_cf(self.cf_handle(CF_SPENT_TO_TREASURY_OUTPUT)?, key)? + .get_pinned_cf(self.cf_handle(CF_SPENT_TO_TREASURY_OUTPUT)?, key)? .is_some()) } } diff --git a/bee-storage/bee-storage-rocksdb/src/access/fetch.rs b/bee-storage/bee-storage-rocksdb/src/access/fetch.rs index 1b4a75cdc1..d50ac5786d 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/fetch.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/fetch.rs @@ -25,9 +25,9 @@ impl Fetch for Storage { fn fetch(&self, key: &u8) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_SYSTEM)?, [*key])? + .get_pinned_cf(self.cf_handle(CF_SYSTEM)?, [*key])? // Unpacking from storage is fine. - .map(|v| System::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| System::unpack_unverified(&mut &*v).unwrap())) } } @@ -35,19 +35,25 @@ impl Fetch for Storage { fn fetch(&self, message_id: &MessageId) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE)?, message_id)? + .get_pinned_cf(self.cf_handle(CF_MESSAGE_ID_TO_MESSAGE)?, message_id)? // Unpacking from storage is fine. - .map(|v| Message::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| Message::unpack_unverified(&mut &*v).unwrap())) } } impl Fetch for Storage { fn fetch(&self, message_id: &MessageId) -> Result, ::Error> { - Ok(self + let guard = self.locks.message_id_to_metadata.read(); + + let metadata = self .inner - .get_cf(self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, message_id)? + .get_pinned_cf(self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, message_id)? // Unpacking from storage is fine. - .map(|v| MessageMetadata::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| MessageMetadata::unpack_unverified(&mut &*v).unwrap()); + + drop(guard); + + Ok(metadata) } } @@ -72,9 +78,9 @@ impl Fetch for Storage { fn fetch(&self, output_id: &OutputId) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_OUTPUT_ID_TO_CREATED_OUTPUT)?, output_id.pack_to_vec())? + .get_pinned_cf(self.cf_handle(CF_OUTPUT_ID_TO_CREATED_OUTPUT)?, output_id.pack_to_vec())? // Unpacking from storage is fine. - .map(|v| CreatedOutput::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| CreatedOutput::unpack_unverified(&mut &*v).unwrap())) } } @@ -82,12 +88,12 @@ impl Fetch for Storage { fn fetch(&self, output_id: &OutputId) -> Result, ::Error> { Ok(self .inner - .get_cf( + .get_pinned_cf( self.cf_handle(CF_OUTPUT_ID_TO_CONSUMED_OUTPUT)?, output_id.pack_to_vec(), )? // Unpacking from storage is fine. - .map(|v| ConsumedOutput::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| ConsumedOutput::unpack_unverified(&mut &*v).unwrap())) } } @@ -111,9 +117,9 @@ impl Fetch<(), LedgerIndex> for Storage { fn fetch(&self, (): &()) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_LEDGER_INDEX)?, [0x00u8])? + .get_pinned_cf(self.cf_handle(CF_LEDGER_INDEX)?, [0x00u8])? // Unpacking from storage is fine. - .map(|v| LedgerIndex::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| LedgerIndex::unpack_unverified(&mut &*v).unwrap())) } } @@ -121,9 +127,9 @@ impl Fetch for Storage { fn fetch(&self, index: &MilestoneIndex) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_MILESTONE)?, index.pack_to_vec())? + .get_pinned_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_MILESTONE)?, index.pack_to_vec())? // Unpacking from storage is fine. - .map(|v| Milestone::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| Milestone::unpack_unverified(&mut &*v).unwrap())) } } @@ -131,9 +137,9 @@ impl Fetch<(), SnapshotInfo> for Storage { fn fetch(&self, (): &()) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_SNAPSHOT_INFO)?, [0x00u8])? + .get_pinned_cf(self.cf_handle(CF_SNAPSHOT_INFO)?, [0x00u8])? // Unpacking from storage is fine. - .map(|v| SnapshotInfo::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| SnapshotInfo::unpack_unverified(&mut &*v).unwrap())) } } @@ -141,9 +147,9 @@ impl Fetch for Storage { fn fetch(&self, sep: &SolidEntryPoint) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_SOLID_ENTRY_POINT_TO_MILESTONE_INDEX)?, sep.as_ref())? + .get_pinned_cf(self.cf_handle(CF_SOLID_ENTRY_POINT_TO_MILESTONE_INDEX)?, sep.as_ref())? // Unpacking from storage is fine. - .map(|v| MilestoneIndex::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| MilestoneIndex::unpack_unverified(&mut &*v).unwrap())) } } @@ -151,9 +157,9 @@ impl Fetch for Storage { fn fetch(&self, index: &MilestoneIndex) -> Result, ::Error> { Ok(self .inner - .get_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_OUTPUT_DIFF)?, index.pack_to_vec())? + .get_pinned_cf(self.cf_handle(CF_MILESTONE_INDEX_TO_OUTPUT_DIFF)?, index.pack_to_vec())? // Unpacking from storage is fine. - .map(|v| OutputDiff::unpack_unverified(&mut v.as_slice()).unwrap())) + .map(|v| OutputDiff::unpack_unverified(&mut &*v).unwrap())) } } diff --git a/bee-storage/bee-storage-rocksdb/src/access/insert.rs b/bee-storage/bee-storage-rocksdb/src/access/insert.rs index d90016bd7c..15206179fe 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/insert.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/insert.rs @@ -11,7 +11,10 @@ use bee_message::{ output::OutputId, Message, MessageId, }; -use bee_storage::{access::Insert, system::System}; +use bee_storage::{ + access::{Insert, InsertStrict}, + system::System, +}; use bee_tangle::{ metadata::MessageMetadata, solid_entry_point::SolidEntryPoint, unreferenced_message::UnreferencedMessage, }; @@ -43,18 +46,22 @@ impl Insert for Storage { } } -impl Insert for Storage { - fn insert( +impl InsertStrict for Storage { + fn insert_strict( &self, message_id: &MessageId, metadata: &MessageMetadata, ) -> Result<(), ::Error> { - self.inner.put_cf( + let guard = self.locks.message_id_to_metadata.read(); + + self.inner.merge_cf( self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, message_id, metadata.pack_to_vec(), )?; + drop(guard); + Ok(()) } } diff --git a/bee-storage/bee-storage-rocksdb/src/access/iter.rs b/bee-storage/bee-storage-rocksdb/src/access/iter.rs index cba992c792..e7f3ac845f 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/iter.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/iter.rs @@ -17,6 +17,7 @@ use bee_tangle::{ metadata::MessageMetadata, solid_entry_point::SolidEntryPoint, unreferenced_message::UnreferencedMessage, }; use packable::PackableExt; +use parking_lot::RwLockReadGuard; use rocksdb::{DBIterator, IteratorMode}; use crate::{ @@ -27,13 +28,15 @@ use crate::{ pub struct StorageIterator<'a, K, V> { inner: DBIterator<'a>, marker: PhantomData<(K, V)>, + _guard: Option>, } impl<'a, K, V> StorageIterator<'a, K, V> { - fn new(inner: DBIterator<'a>) -> Self { + fn new(inner: DBIterator<'a>, guard: Option>) -> Self { StorageIterator:: { inner, marker: PhantomData, + _guard: guard, } } } @@ -46,6 +49,7 @@ macro_rules! impl_iter { fn iter(&'a self) -> Result::Error> { Ok(StorageIterator::new( self.inner.iterator_cf(self.cf_handle($cf)?, IteratorMode::Start), + None, )) } } @@ -271,7 +275,6 @@ impl<'a> StorageIterator<'a, (bool, TreasuryOutput), ()> { impl_iter!(u8, System, CF_SYSTEM); impl_iter!(MessageId, Message, CF_MESSAGE_ID_TO_MESSAGE); -impl_iter!(MessageId, MessageMetadata, CF_MESSAGE_ID_TO_METADATA); impl_iter!((MessageId, MessageId), (), CF_MESSAGE_ID_TO_MESSAGE_ID); impl_iter!(OutputId, CreatedOutput, CF_OUTPUT_ID_TO_CREATED_OUTPUT); impl_iter!(OutputId, ConsumedOutput, CF_OUTPUT_ID_TO_CONSUMED_OUTPUT); @@ -289,3 +292,34 @@ impl_iter!( ); impl_iter!((MilestoneIndex, Receipt), (), CF_MILESTONE_INDEX_TO_RECEIPT); impl_iter!((bool, TreasuryOutput), (), CF_SPENT_TO_TREASURY_OUTPUT); + +impl<'a> AsIterator<'a, MessageId, MessageMetadata> for Storage { + type AsIter = StorageIterator<'a, MessageId, MessageMetadata>; + + fn iter(&'a self) -> Result::Error> { + Ok(StorageIterator::new( + self.inner + .iterator_cf(self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?, IteratorMode::Start), + Some(self.locks.message_id_to_metadata.read()), + )) + } +} + +/// An iterator over all key-value pairs of a column family. +impl<'a> Iterator for StorageIterator<'a, MessageId, MessageMetadata> { + type Item = Result<(MessageId, MessageMetadata), ::Error>; + + fn next(&mut self) -> Option { + self.inner + .next() + .map(|(key, value)| Ok(Self::unpack_key_value(&key, &value))) + + // inner.status()?; + // + // if inner.valid() { + // Poll::Ready(item) + // } else { + // Poll::Ready(None) + // } + } +} diff --git a/bee-storage/bee-storage-rocksdb/src/access/mod.rs b/bee-storage/bee-storage-rocksdb/src/access/mod.rs index 8112eed97e..2cb058e010 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/mod.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/mod.rs @@ -9,3 +9,4 @@ pub mod insert; pub mod iter; pub mod multi_fetch; pub mod truncate; +pub mod update; diff --git a/bee-storage/bee-storage-rocksdb/src/access/multi_fetch.rs b/bee-storage/bee-storage-rocksdb/src/access/multi_fetch.rs index ebf3144285..c1bd4e6ea0 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/multi_fetch.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/multi_fetch.rs @@ -12,18 +12,20 @@ use bee_message::{ use bee_storage::{access::MultiFetch, system::System}; use bee_tangle::{metadata::MessageMetadata, solid_entry_point::SolidEntryPoint}; use packable::{Packable, PackableExt}; +use parking_lot::RwLockReadGuard; use crate::{ column_families::*, storage::{Storage, StorageBackend}, }; -pub struct MultiIter { +pub struct MultiIter<'a, V, E> { iter: IntoIter>, rocksdb::Error>>, marker: PhantomData<(V, E)>, + _guard: Option>, } -impl> Iterator for MultiIter { +impl<'a, V: Packable, E: From> Iterator for MultiIter<'a, V, E> { type Item = Result, E>; fn next(&mut self) -> Option { @@ -39,7 +41,7 @@ impl> Iterator for MultiIter { macro_rules! impl_multi_fetch { ($key:ty, $value:ty, $cf:expr) => { impl<'a> MultiFetch<'a, $key, $value> for Storage { - type Iter = MultiIter<$value, ::Error>; + type Iter = MultiIter<'a, $value, ::Error>; fn multi_fetch(&'a self, keys: &[$key]) -> Result::Error> { let cf = self.cf_handle($cf)?; @@ -50,6 +52,7 @@ macro_rules! impl_multi_fetch { .multi_get_cf(keys.iter().map(|k| (cf, k.pack_to_vec()))) .into_iter(), marker: PhantomData, + _guard: None, }) } } @@ -58,9 +61,25 @@ macro_rules! impl_multi_fetch { impl_multi_fetch!(u8, System, CF_SYSTEM); impl_multi_fetch!(MessageId, Message, CF_MESSAGE_ID_TO_MESSAGE); -impl_multi_fetch!(MessageId, MessageMetadata, CF_MESSAGE_ID_TO_METADATA); impl_multi_fetch!(OutputId, CreatedOutput, CF_OUTPUT_ID_TO_CREATED_OUTPUT); impl_multi_fetch!(OutputId, ConsumedOutput, CF_OUTPUT_ID_TO_CONSUMED_OUTPUT); impl_multi_fetch!(MilestoneIndex, Milestone, CF_MILESTONE_INDEX_TO_MILESTONE); impl_multi_fetch!(SolidEntryPoint, MilestoneIndex, CF_SOLID_ENTRY_POINT_TO_MILESTONE_INDEX); impl_multi_fetch!(MilestoneIndex, OutputDiff, CF_MILESTONE_INDEX_TO_OUTPUT_DIFF); + +impl<'a> MultiFetch<'a, MessageId, MessageMetadata> for Storage { + type Iter = MultiIter<'a, MessageMetadata, ::Error>; + + fn multi_fetch(&'a self, keys: &[MessageId]) -> Result::Error> { + let cf = self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?; + + Ok(MultiIter { + iter: self + .inner + .multi_get_cf(keys.iter().map(|k| (cf, k.pack_to_vec()))) + .into_iter(), + marker: PhantomData, + _guard: Some(self.locks.message_id_to_metadata.read()), + }) + } +} diff --git a/bee-storage/bee-storage-rocksdb/src/access/truncate.rs b/bee-storage/bee-storage-rocksdb/src/access/truncate.rs index 0d3b93f40c..64816abfac 100644 --- a/bee-storage/bee-storage-rocksdb/src/access/truncate.rs +++ b/bee-storage/bee-storage-rocksdb/src/access/truncate.rs @@ -20,50 +20,46 @@ use crate::{ storage::{Storage, StorageBackend}, }; -fn truncate(storage: &Storage, cf_str: &'static str) -> Result<(), ::Error> { - let cf_handle = storage.cf_handle(cf_str)?; - - let mut iter = storage.inner.raw_iterator_cf(cf_handle); - - // Seek to the first key. - iter.seek_to_first(); - // Grab the first key if it exists. - let first = if let Some(first) = iter.key() { - first.to_vec() - } else { - // There are no keys to remove. - return Ok(()); - }; - - iter.seek_to_last(); - // Grab the last key if it exists. - let last = if let Some(last) = iter.key() { - let mut last = last.to_vec(); - // `delete_range_cf` excludes the last key in the range so a byte is added to be sure the last key is included. - last.push(u8::MAX); - last - } else { - // There are no keys to remove. - return Ok(()); - }; - - storage.inner.delete_range_cf(cf_handle, first, last)?; - - Ok(()) -} - macro_rules! impl_truncate { ($key:ty, $value:ty, $cf:expr) => { impl Truncate<$key, $value> for Storage { fn truncate(&self) -> Result<(), ::Error> { - truncate(self, $cf) + let cf_handle = self.cf_handle($cf)?; + + let mut iter = self.inner.raw_iterator_cf(cf_handle); + + // Seek to the first key. + iter.seek_to_first(); + // Grab the first key if it exists. + let first = if let Some(first) = iter.key() { + first.to_vec() + } else { + // There are no keys to remove. + return Ok(()); + }; + + iter.seek_to_last(); + // Grab the last key if it exists. + let last = if let Some(last) = iter.key() { + let mut last = last.to_vec(); + // `delete_range_cf` excludes the last key in the range so a byte is added to be sure the last key + // is included. + last.push(u8::MAX); + last + } else { + // There are no keys to remove. + return Ok(()); + }; + + self.inner.delete_range_cf(cf_handle, first, last)?; + + Ok(()) } } }; } impl_truncate!(MessageId, Message, CF_MESSAGE_ID_TO_MESSAGE); -impl_truncate!(MessageId, MessageMetadata, CF_MESSAGE_ID_TO_METADATA); impl_truncate!((MessageId, MessageId), (), CF_MESSAGE_ID_TO_MESSAGE_ID); impl_truncate!(OutputId, CreatedOutput, CF_OUTPUT_ID_TO_CREATED_OUTPUT); impl_truncate!(OutputId, ConsumedOutput, CF_OUTPUT_ID_TO_CONSUMED_OUTPUT); @@ -81,3 +77,42 @@ impl_truncate!( ); impl_truncate!((MilestoneIndex, Receipt), (), CF_MILESTONE_INDEX_TO_RECEIPT); impl_truncate!((bool, TreasuryOutput), (), CF_SPENT_TO_TREASURY_OUTPUT); + +impl Truncate for Storage { + fn truncate(&self) -> Result<(), ::Error> { + let guard = self.locks.message_id_to_metadata.read(); + + let cf_handle = self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?; + + let mut iter = self.inner.raw_iterator_cf(cf_handle); + + // Seek to the first key. + iter.seek_to_first(); + // Grab the first key if it exists. + let first = if let Some(first) = iter.key() { + first.to_vec() + } else { + // There are no keys to remove. + return Ok(()); + }; + + iter.seek_to_last(); + // Grab the last key if it exists. + let last = if let Some(last) = iter.key() { + let mut last = last.to_vec(); + // `delete_range_cf` excludes the last key in the range so a byte is added to be sure the last key is + // included. + last.push(u8::MAX); + last + } else { + // There are no keys to remove. + return Ok(()); + }; + + self.inner.delete_range_cf(cf_handle, first, last)?; + + drop(guard); + + Ok(()) + } +} diff --git a/bee-storage/bee-storage-rocksdb/src/access/update.rs b/bee-storage/bee-storage-rocksdb/src/access/update.rs new file mode 100644 index 0000000000..7a2db61450 --- /dev/null +++ b/bee-storage/bee-storage-rocksdb/src/access/update.rs @@ -0,0 +1,30 @@ +// Copyright 2020-2022 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use bee_message::MessageId; +use bee_storage::access::Update; +use bee_tangle::metadata::MessageMetadata; +use packable::PackableExt; + +use crate::{column_families::*, storage::Storage}; + +impl Update for Storage { + fn update(&self, message_id: &MessageId, mut f: impl FnMut(&mut MessageMetadata)) -> Result<(), Self::Error> { + let cf_handle = self.cf_handle(CF_MESSAGE_ID_TO_METADATA)?; + + let guard = self.locks.message_id_to_metadata.write(); + + if let Some(v) = self.inner.get_pinned_cf(cf_handle, message_id)? { + // Unpacking from storage is fine. + let mut metadata = MessageMetadata::unpack_unverified(&mut &*v).unwrap(); + + f(&mut metadata); + + self.inner.put_cf(cf_handle, message_id, metadata.pack_to_vec())?; + } + + drop(guard); + + Ok(()) + } +} diff --git a/bee-storage/bee-storage-rocksdb/src/storage.rs b/bee-storage/bee-storage-rocksdb/src/storage.rs index 0fef51d02b..555be3c595 100644 --- a/bee-storage/bee-storage-rocksdb/src/storage.rs +++ b/bee-storage/bee-storage-rocksdb/src/storage.rs @@ -7,9 +7,10 @@ pub use bee_storage::{ backend::StorageBackend, system::{StorageHealth, StorageVersion, System, SYSTEM_HEALTH_KEY, SYSTEM_VERSION_KEY}, }; +use parking_lot::RwLock; use rocksdb::{ - ColumnFamily, ColumnFamilyDescriptor, DBCompactionStyle, DBCompressionType, Env, FlushOptions, Options, - SliceTransform, DB, + ColumnFamily, ColumnFamilyDescriptor, DBCompactionStyle, DBCompressionType, Env, FlushOptions, MergeOperands, + Options, SliceTransform, DB, }; use super::{ @@ -20,9 +21,14 @@ use super::{ pub(crate) const STORAGE_VERSION: StorageVersion = StorageVersion(10); +pub struct Locks { + pub(crate) message_id_to_metadata: RwLock<()>, +} + pub struct Storage { pub(crate) config: StorageConfig, pub(crate) inner: DB, + pub(crate) locks: Locks, } impl Storage { @@ -31,7 +37,14 @@ impl Storage { let cf_message_id_to_message = ColumnFamilyDescriptor::new(CF_MESSAGE_ID_TO_MESSAGE, Options::default()); - let cf_message_id_to_metadata = ColumnFamilyDescriptor::new(CF_MESSAGE_ID_TO_METADATA, Options::default()); + fn keep_current(_key: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands) -> Option> { + // Keep the existing value, if the value does not exist, take the first operand + // instead. + existing_val.or_else(|| operands.into_iter().next()).map(|v| v.to_vec()) + } + let mut options = Options::default(); + options.set_merge_operator_associative("keep current", keep_current); + let cf_message_id_to_metadata = ColumnFamilyDescriptor::new(CF_MESSAGE_ID_TO_METADATA, options); let mut options = Options::default(); options.set_prefix_extractor(SliceTransform::create_fixed_prefix(MessageId::LENGTH)); @@ -141,6 +154,9 @@ impl Storage { Ok(Storage { config: config.storage, inner: db, + locks: Locks { + message_id_to_metadata: RwLock::new(()), + }, }) } diff --git a/bee-storage/bee-storage-sled/CHANGELOG.md b/bee-storage/bee-storage-sled/CHANGELOG.md index b2619658cc..a56903165d 100644 --- a/bee-storage/bee-storage-sled/CHANGELOG.md +++ b/bee-storage/bee-storage-sled/CHANGELOG.md @@ -19,18 +19,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.6.0 - 2022-XX-XX +## 0.7.0 - 2022-XX-XX + +### Changed + +- Use `packable` instead of `bee-common::packable` to serialize to and deserialize from storage. ### Removed - All `Balance` operations; - All `PaddedIndex` operations; -## 0.5.0 - 2022-XX-XX +## 0.6.0 - 2022-03-17 -### Changed +### Added -- Use `packable` instead of `bee-common::packable` to serialize to and deserialize from storage. +- Implementation of `InsertStrict` for `Storage`; + +### Removed + +- Implementation of `Insert` for `Storage`; + +## 0.5.0 - 2022-03-11 + +### Added + +- Implementation of `Update` for `Storage`; ## 0.4.0 - 2021-06-15 diff --git a/bee-storage/bee-storage-sled/Cargo.toml b/bee-storage/bee-storage-sled/Cargo.toml index 6872da2caf..3551097bc6 100644 --- a/bee-storage/bee-storage-sled/Cargo.toml +++ b/bee-storage/bee-storage-sled/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-storage-sled" -version = "0.4.0" +version = "0.7.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "A bee-storage implementation for the Sled backend" @@ -19,7 +19,7 @@ rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] bee-ledger = { version = "0.7.0", path = "../../bee-ledger", default-features = false } bee-message = { version = "0.2.0", path = "../../bee-message", default-features = false } -bee-storage = { version = "0.9.0", path = "../bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../bee-storage", default-features = false } bee-tangle = { version = "0.3.0", path = "../../bee-tangle", default-features = false } num_cpus = { version = "1.13.0", default-features = false } diff --git a/bee-storage/bee-storage-sled/src/access/insert.rs b/bee-storage/bee-storage-sled/src/access/insert.rs index fbd22fe135..07ad7c21c6 100644 --- a/bee-storage/bee-storage-sled/src/access/insert.rs +++ b/bee-storage/bee-storage-sled/src/access/insert.rs @@ -13,7 +13,11 @@ use bee_message::{ output::OutputId, Message, MessageId, }; -use bee_storage::{access::Insert, backend::StorageBackend, system::System}; +use bee_storage::{ + access::{Insert, InsertStrict}, + backend::StorageBackend, + system::System, +}; use bee_tangle::{ metadata::MessageMetadata, solid_entry_point::SolidEntryPoint, unreferenced_message::UnreferencedMessage, }; @@ -39,15 +43,19 @@ impl Insert for Storage { } } -impl Insert for Storage { - fn insert( +impl InsertStrict for Storage { + fn insert_strict( &self, message_id: &MessageId, metadata: &MessageMetadata, ) -> Result<(), ::Error> { self.inner .open_tree(TREE_MESSAGE_ID_TO_METADATA)? - .insert(message_id, metadata.pack_to_vec())?; + .update_and_fetch(message_id, |old_metadata| { + old_metadata + .map(|b| b.to_vec()) + .or_else(|| Some(metadata.pack_to_vec())) + })?; Ok(()) } diff --git a/bee-storage/bee-storage-sled/src/access/mod.rs b/bee-storage/bee-storage-sled/src/access/mod.rs index bc317bbd9f..cd4bd0652b 100644 --- a/bee-storage/bee-storage-sled/src/access/mod.rs +++ b/bee-storage/bee-storage-sled/src/access/mod.rs @@ -11,3 +11,4 @@ pub mod insert; pub mod iter; pub mod multi_fetch; pub mod truncate; +pub mod update; diff --git a/bee-storage/bee-storage-sled/src/access/update.rs b/bee-storage/bee-storage-sled/src/access/update.rs new file mode 100644 index 0000000000..3b107ff4d1 --- /dev/null +++ b/bee-storage/bee-storage-sled/src/access/update.rs @@ -0,0 +1,28 @@ +// Copyright 2021-2022 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +//! Insert access operations. + +use bee_message::MessageId; +use bee_storage::access::Update; +use bee_tangle::metadata::MessageMetadata; +use packable::PackableExt; + +use crate::{storage::Storage, trees::*}; + +impl Update for Storage { + fn update(&self, message_id: &MessageId, mut f: impl FnMut(&mut MessageMetadata)) -> Result<(), Self::Error> { + self.inner + .open_tree(TREE_MESSAGE_ID_TO_METADATA)? + .fetch_and_update(message_id, move |opt_bytes| { + opt_bytes.map(|mut bytes| { + // Unpacking from storage is fine. + let mut metadata = MessageMetadata::unpack_unverified(&mut bytes).unwrap(); + f(&mut metadata); + metadata.pack_to_vec() + }) + })?; + + Ok(()) + } +} diff --git a/bee-storage/bee-storage-test/CHANGELOG.md b/bee-storage/bee-storage-test/CHANGELOG.md index 0ed8ac37bf..c3e49bd79c 100644 --- a/bee-storage/bee-storage-test/CHANGELOG.md +++ b/bee-storage/bee-storage-test/CHANGELOG.md @@ -19,17 +19,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.5.0 - 2022-XX-XX +## 0.6.0 - 2022-XX-XX + +### Changed + +- Use `packable` instead of `bee-common::packable` to serialize to and deserialize from storage. ### Removed - All `Balance` operations; -## 0.4.0 - 2022-XX-XX +## 0.5.0 - 2022-03-17 -### Changed +### Added -- Use `packable` instead of `bee-common::packable` to serialize to and deserialize from storage. +- Tests for the `InsertStrict` access trait; + +## 0.4.0 - 2022-03-11 + +### Added + +- Tests for the `Update` access trait; ## 0.3.0 - 2021-06-10 diff --git a/bee-storage/bee-storage-test/Cargo.toml b/bee-storage/bee-storage-test/Cargo.toml index 731384b377..ca65cb12fa 100644 --- a/bee-storage/bee-storage-test/Cargo.toml +++ b/bee-storage/bee-storage-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-storage-test" -version = "0.3.0" +version = "0.6.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "A crate to test storage implementation in a generic way" @@ -19,7 +19,7 @@ rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] bee-ledger = { version = "0.7.0", path = "../../bee-ledger", default-features = false } bee-message = { version = "0.2.0", path = "../../bee-message", default-features = false } -bee-storage = { version = "0.9.0", path = "../bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../bee-storage", default-features = false } bee-tangle = { version = "0.3.0", path = "../../bee-tangle", default-features = false } bee-test = { version = "0.1.0", path = "../../bee-test", default-features = false } diff --git a/bee-storage/bee-storage-test/src/message_id_to_message.rs b/bee-storage/bee-storage-test/src/message_id_to_message.rs index 0324603487..0273a07998 100644 --- a/bee-storage/bee-storage-test/src/message_id_to_message.rs +++ b/bee-storage/bee-storage-test/src/message_id_to_message.rs @@ -53,6 +53,16 @@ pub fn message_id_to_message_access(storage: &B) { Insert::::insert(storage, &message_id, &message).unwrap(); + let message = rand_message(); + Insert::::insert(storage, &message_id, &message).unwrap(); + assert_eq!( + Fetch::::fetch(storage, &message_id) + .unwrap() + .as_ref(), + Some(&message), + "insert should overwrite" + ); + assert!(Exist::::exist(storage, &message_id).unwrap()); assert_eq!( Fetch::::fetch(storage, &message_id) diff --git a/bee-storage/bee-storage-test/src/message_id_to_metadata.rs b/bee-storage/bee-storage-test/src/message_id_to_metadata.rs index 118e65f45c..7450a1bc4b 100644 --- a/bee-storage/bee-storage-test/src/message_id_to_metadata.rs +++ b/bee-storage/bee-storage-test/src/message_id_to_metadata.rs @@ -1,9 +1,9 @@ // Copyright 2020-2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use bee_message::MessageId; +use bee_message::{milestone::MilestoneIndex, MessageId}; use bee_storage::{ - access::{AsIterator, Batch, BatchBuilder, Delete, Exist, Fetch, Insert, MultiFetch, Truncate}, + access::{AsIterator, Batch, BatchBuilder, Delete, Exist, Fetch, InsertStrict, MultiFetch, Truncate, Update}, backend, }; use bee_tangle::metadata::MessageMetadata; @@ -14,12 +14,13 @@ pub trait StorageBackend: + Exist + Fetch + for<'a> MultiFetch<'a, MessageId, MessageMetadata> - + Insert + + InsertStrict + Delete + BatchBuilder + Batch + for<'a> AsIterator<'a, MessageId, MessageMetadata> + Truncate + + Update { } @@ -28,12 +29,13 @@ impl StorageBackend for T where + Exist + Fetch + for<'a> MultiFetch<'a, MessageId, MessageMetadata> - + Insert + + InsertStrict + Delete + BatchBuilder + Batch + for<'a> AsIterator<'a, MessageId, MessageMetadata> + Truncate + + Update { } @@ -52,21 +54,54 @@ pub fn message_id_to_metadata_access(storage: &B) { assert_eq!(results.len(), 1); assert!(matches!(results.get(0), Some(Ok(None)))); - Insert::::insert(storage, &message_id, &metadata).unwrap(); - + InsertStrict::::insert_strict(storage, &message_id, &metadata).unwrap(); assert!(Exist::::exist(storage, &message_id).unwrap()); + + // calling `insert_strict` with the same `MessageId` but a different `MessageMetadata` should + // not overwrite the old value. + { + let index = metadata.milestone_index().map_or(0, |i| *i + 1); + let mut metadata = metadata; + metadata.set_milestone_index(MilestoneIndex(index)); + + InsertStrict::::insert_strict(storage, &message_id, &metadata).unwrap(); + } assert_eq!( Fetch::::fetch(storage, &message_id) .unwrap() .unwrap(), - metadata + metadata, + "`InsertStrict` should not overwrite" ); + let results = MultiFetch::::multi_fetch(storage, &[message_id]) .unwrap() .collect::>(); assert_eq!(results.len(), 1); assert!(matches!(results.get(0), Some(Ok(Some(v))) if v == &metadata)); + let milestone_index = { + let index = Fetch::::fetch(storage, &message_id) + .unwrap() + .unwrap() + .milestone_index(); + + MilestoneIndex(index.map_or(0, |i| i.wrapping_add(1))) + }; + + Update::::update(storage, &message_id, |metadata: &mut MessageMetadata| { + metadata.set_milestone_index(milestone_index); + }) + .unwrap(); + + assert_eq!( + Fetch::::fetch(storage, &message_id) + .unwrap() + .unwrap() + .milestone_index(), + Some(milestone_index), + ); + Delete::::delete(storage, &message_id).unwrap(); assert!(!Exist::::exist(storage, &message_id).unwrap()); @@ -75,6 +110,7 @@ pub fn message_id_to_metadata_access(storage: &B) { .unwrap() .is_none() ); + let results = MultiFetch::::multi_fetch(storage, &[message_id]) .unwrap() .collect::>(); @@ -87,7 +123,7 @@ pub fn message_id_to_metadata_access(storage: &B) { for _ in 0..10 { let (message_id, metadata) = (rand_message_id(), rand_message_metadata()); - Insert::::insert(storage, &message_id, &metadata).unwrap(); + InsertStrict::::insert_strict(storage, &message_id, &metadata).unwrap(); Batch::::batch_delete(storage, &mut batch, &message_id).unwrap(); message_ids.push(message_id); metadatas.push((message_id, None)); diff --git a/bee-storage/bee-storage/CHANGELOG.md b/bee-storage/bee-storage/CHANGELOG.md index 5e3da02133..f81c907cc2 100644 --- a/bee-storage/bee-storage/CHANGELOG.md +++ b/bee-storage/bee-storage/CHANGELOG.md @@ -19,12 +19,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.10.0 - 2022-XX-XX +## 0.12.0 - 2022-XX-XX ### Changed - Use `packable` instead of `bee-common::packable` to serialize and deserialize system types; +## 0.11.0 - 2022-03-17 + +### Added + +- `InsertStrict` access trait; + +## 0.10.0 - 2022-03-11 + +### Added + +- `Update` access trait; + ## 0.9.0 - 2021-06-15 ### Changed diff --git a/bee-storage/bee-storage/Cargo.toml b/bee-storage/bee-storage/Cargo.toml index bee8f38a47..a522019220 100644 --- a/bee-storage/bee-storage/Cargo.toml +++ b/bee-storage/bee-storage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-storage" -version = "0.9.0" +version = "0.12.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "A general purpose storage backend crate with key value abstraction API" diff --git a/bee-storage/bee-storage/src/access/insert.rs b/bee-storage/bee-storage/src/access/insert.rs index 9a66ba9255..ceb37578f0 100644 --- a/bee-storage/bee-storage/src/access/insert.rs +++ b/bee-storage/bee-storage/src/access/insert.rs @@ -6,6 +6,14 @@ use crate::backend::StorageBackend; /// `Insert` trait extends the `StorageBackend` with `insert` operation for the (key: K, value: V) pair; /// therefore, it should be explicitly implemented for the corresponding `StorageBackend`. pub trait Insert: StorageBackend { - /// Inserts the (K, V) pair in the storage. + /// Inserts the (K, V) pair in the storage overwriting the value if it already exists. fn insert(&self, key: &K, value: &V) -> Result<(), Self::Error>; } + +/// `InsertStrict` trait extends the `StorageBackend` with `insert_strict` operation for the +/// (key: K, value: V) pair; therefore, it should be explicitly implemented for the corresponding +/// `StorageBackend`. +pub trait InsertStrict: StorageBackend { + /// Inserts the (K, V) pair in the storage without overwriting the value if it already exists. + fn insert_strict(&self, key: &K, value: &V) -> Result<(), Self::Error>; +} diff --git a/bee-storage/bee-storage/src/access/mod.rs b/bee-storage/bee-storage/src/access/mod.rs index 0f31cf9918..4182d13cac 100644 --- a/bee-storage/bee-storage/src/access/mod.rs +++ b/bee-storage/bee-storage/src/access/mod.rs @@ -20,14 +20,17 @@ mod iter; mod multi_fetch; /// Holds the contract for truncate access operations. mod truncate; +/// Holds the contract for update access operations. +mod update; pub use self::{ batch::{Batch, BatchBuilder}, delete::Delete, exist::Exist, fetch::Fetch, - insert::Insert, + insert::{Insert, InsertStrict}, iter::AsIterator, multi_fetch::MultiFetch, truncate::Truncate, + update::Update, }; diff --git a/bee-storage/bee-storage/src/access/update.rs b/bee-storage/bee-storage/src/access/update.rs new file mode 100644 index 0000000000..e456550e5d --- /dev/null +++ b/bee-storage/bee-storage/src/access/update.rs @@ -0,0 +1,11 @@ +// Copyright 2020-2022 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use crate::backend::StorageBackend; + +/// `Update` trait extends the `StorageBackend` with `update` operation for the (key: K, value: V) pair; +/// therefore, it should be explicitly implemented for the corresponding `StorageBackend`. +pub trait Update: StorageBackend { + /// Fetches the value for the key `K` and updates it using `f`. + fn update(&self, key: &K, f: impl FnMut(&mut V)) -> Result<(), Self::Error>; +} diff --git a/bee-tangle/CHANGELOG.md b/bee-tangle/CHANGELOG.md index 0bff2cacbe..ffd3240e98 100644 --- a/bee-tangle/CHANGELOG.md +++ b/bee-tangle/CHANGELOG.md @@ -29,7 +29,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `ConflictReason` moved to `bee-message`; -## 0.3.0 - 2022-XX-XX +## 0.3.0 - 2022-03-17 + +### Changed + +- Grouped `omrsi` and `ymrsi`; +- Made a lot of methods/functions sync; + +### Removed + +- `TangleConfig::{num_partitions, max_eviction_retries}`; +- Cache mechanism; ### Fixed diff --git a/bee-tangle/Cargo.toml b/bee-tangle/Cargo.toml index 1c29805e55..6e5394dff3 100644 --- a/bee-tangle/Cargo.toml +++ b/bee-tangle/Cargo.toml @@ -19,7 +19,7 @@ rustdoc-args = [ "--cfg", "doc_cfg" ] [dependencies] bee-message = { version = "0.2.0", path = "../bee-message", default-features = false, features = [ "serde" ] } bee-runtime = { version = "0.1.1-alpha", path = "../bee-runtime", default-features = false } -bee-storage = { version = "0.9.0", path = "../bee-storage/bee-storage", default-features = false } +bee-storage = { version = "0.12.0", path = "../bee-storage/bee-storage", default-features = false } async-trait = { version = "0.1.51", default-features = false } bitflags = { version = "1.2.1", default-features = false } @@ -35,7 +35,7 @@ tokio = { version = "1.17.0", default-features = false, features = [ "sync", "ti tokio-stream = { version = "0.1.7", default-features = false, features = [ "time" ] } [dev-dependencies] -bee-storage-null = { version = "0.1.0", path = "../bee-storage/bee-storage-null", default-features = false } +bee-storage-null = { version = "0.3.0", path = "../bee-storage/bee-storage-null", default-features = false } bee-test = { path = "../bee-test", default-features = false } criterion = { version = "0.3.5", default-features = false, features = [ "async_tokio" ] } diff --git a/bee-tangle/benches/tangle_bench.rs b/bee-tangle/benches/tangle_bench.rs index d2fcff42a6..aab4376935 100644 --- a/bee-tangle/benches/tangle_bench.rs +++ b/bee-tangle/benches/tangle_bench.rs @@ -8,7 +8,6 @@ use bee_tangle::{config::TangleConfig, metadata::MessageMetadata, Tangle}; use bee_test::rand::{message::rand_message, metadata::rand_message_metadata, number::rand_number}; use criterion::*; use rand::seq::SliceRandom; -use tokio::runtime::Runtime; fn random_input() -> (Message, MessageId, MessageMetadata) { let message = rand_message(); @@ -17,29 +16,22 @@ fn random_input() -> (Message, MessageId, MessageMetadata) { (message, id, rand_message_metadata()) } -async fn insert(tangle: &Tangle, message: Message, id: MessageId, metadata: MessageMetadata) { - tangle.insert(message, id, metadata).await; -} - -async fn update_metadata(tangle: &Tangle, id: &MessageId, timestamp: u32) { - tangle - .update_metadata(id, |metadata| { - metadata.set_conflict(ConflictReason::InputUtxoAlreadySpent); - metadata.reference(timestamp); - }) - .await; +fn update_metadata(tangle: &Tangle, id: &MessageId, timestamp: u32) { + tangle.update_metadata(id, |metadata| { + metadata.set_conflict(ConflictReason::InputUtxoAlreadySpent); + metadata.reference(timestamp); + }); } fn insert_bench(c: &mut Criterion) { let storage = ResourceHandle::::new(NullStorage); let config = TangleConfig::build().finish(); let tangle = Tangle::new(config, storage); - let rt = Runtime::new().unwrap(); c.bench_function("insert", |b| { - b.to_async(&rt).iter_batched( + b.iter_batched( random_input, - |(message, id, metadata)| insert(&tangle, message, id, metadata), + |(message, id, metadata)| tangle.insert(&message, &id, &metadata), BatchSize::SmallInput, ); }); @@ -49,18 +41,17 @@ fn update_metadata_bench(c: &mut Criterion) { let storage = ResourceHandle::::new(NullStorage); let config = TangleConfig::build().finish(); let tangle = Tangle::new(config, storage); - let rt = Runtime::new().unwrap(); let data = (0..1000).map(|_| random_input()); let mut ids = vec![]; for (message, id, metadata) in data { - rt.block_on(async { tangle.insert(message, id, metadata).await }); + tangle.insert(&message, &id, &metadata); ids.push(id); } c.bench_function("update_metadata", |b| { - b.to_async(&rt).iter_batched( + b.iter_batched( || (ids.choose(&mut rand::thread_rng()).unwrap(), rand_number::()), |(id, timestamp)| update_metadata(&tangle, id, timestamp), BatchSize::SmallInput, diff --git a/bee-tangle/src/config.rs b/bee-tangle/src/config.rs index 9a8665b46e..f0fe79c948 100644 --- a/bee-tangle/src/config.rs +++ b/bee-tangle/src/config.rs @@ -1,14 +1,9 @@ // Copyright 2020-2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::num::NonZeroUsize; - use serde::Deserialize; const DEFAULT_BELOW_MAX_DEPTH: u32 = 15; -// SAFETY: initialised with a non-zero value. -const DEFAULT_NUM_PARTITIONS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(16) }; -const DEFAULT_MAX_EVICTION_RETRIES: usize = 10; /// A builder type for a tangle configuration. #[derive(Default, Deserialize, PartialEq)] @@ -16,10 +11,6 @@ const DEFAULT_MAX_EVICTION_RETRIES: usize = 10; pub struct TangleConfigBuilder { #[serde(alias = "belowMaxDepth")] below_max_depth: Option, - #[serde(alias = "numPartitions")] - num_partitions: Option, - #[serde(alias = "maxEvictionRetries")] - max_eviction_retries: Option, } impl TangleConfigBuilder { @@ -33,8 +24,6 @@ impl TangleConfigBuilder { pub fn finish(self) -> TangleConfig { TangleConfig { below_max_depth: self.below_max_depth.unwrap_or(DEFAULT_BELOW_MAX_DEPTH), - num_partitions: self.num_partitions.unwrap_or(DEFAULT_NUM_PARTITIONS), - max_eviction_retries: self.max_eviction_retries.unwrap_or(DEFAULT_MAX_EVICTION_RETRIES), } } } @@ -43,8 +32,6 @@ impl TangleConfigBuilder { #[derive(Clone)] pub struct TangleConfig { below_max_depth: u32, - num_partitions: NonZeroUsize, - max_eviction_retries: usize, } impl TangleConfig { @@ -57,14 +44,4 @@ impl TangleConfig { pub fn below_max_depth(&self) -> u32 { self.below_max_depth } - - /// Get the value of `num_partitions`. - pub fn num_partitions(&self) -> NonZeroUsize { - self.num_partitions - } - - /// Get the value of `max_eviction_retries`. - pub fn max_eviction_retries(&self) -> usize { - self.max_eviction_retries - } } diff --git a/bee-tangle/src/lib.rs b/bee-tangle/src/lib.rs index bcd43071a1..1ae375b66a 100644 --- a/bee-tangle/src/lib.rs +++ b/bee-tangle/src/lib.rs @@ -31,29 +31,10 @@ pub mod unreferenced_message; /// The URTS tips pool. pub mod urts; -mod vec_set; -mod vertex; -mod vertices; - -use std::{ops::Deref, sync::Arc}; - -use bee_message::Message; use bee_runtime::node::{Node, NodeBuilder}; +use self::tip_pool_cleaner_worker::TipPoolCleanerWorker; pub use self::{tangle::Tangle, tangle_worker::TangleWorker}; -use self::{tip_pool_cleaner_worker::TipPoolCleanerWorker, vec_set::VecSet}; - -/// A thread-safe reference to a `Message`. -#[derive(Clone)] -pub struct MessageRef(pub(crate) Arc); - -impl Deref for MessageRef { - type Target = Message; - - fn deref(&self) -> &Self::Target { - &*self.0 - } -} /// Initiate the tangle on top of the given node builder. pub fn init(tangle_config: &config::TangleConfig, node_builder: N::Builder) -> N::Builder diff --git a/bee-tangle/src/metadata.rs b/bee-tangle/src/metadata.rs index aa7414c538..ee162d26d8 100644 --- a/bee-tangle/src/metadata.rs +++ b/bee-tangle/src/metadata.rs @@ -28,9 +28,7 @@ pub struct MessageMetadata { solidification_timestamp: u64, reference_timestamp: u32, #[packable(unpack_error_with = MessageMetadataError::OptionIndexId)] - omrsi: Option, - #[packable(unpack_error_with = MessageMetadataError::OptionIndexId)] - ymrsi: Option, + omrsi_and_ymrsi: Option<(IndexId, IndexId)>, #[packable(unpack_error_with = MessageMetadataError::Conflict)] conflict: ConflictReason, } @@ -44,8 +42,7 @@ impl MessageMetadata { arrival_timestamp: u64, solidification_timestamp: u64, reference_timestamp: u32, - omrsi: Option, - ymrsi: Option, + omrsi_and_ymrsi: Option<(IndexId, IndexId)>, conflict: ConflictReason, ) -> Self { Self { @@ -54,8 +51,7 @@ impl MessageMetadata { arrival_timestamp, solidification_timestamp, reference_timestamp, - omrsi, - ymrsi, + omrsi_and_ymrsi, conflict, } } @@ -101,24 +97,22 @@ impl MessageMetadata { self.solidification_timestamp } - /// Get the oldest message root snapshot index of this message. - pub fn omrsi(&self) -> Option { - self.omrsi - } - - /// Set the oldest message root snapshot index of this message. - pub fn set_omrsi(&mut self, omrsi: IndexId) { - self.omrsi = Some(omrsi); + /// Get the oldest and youngest message root snapshot index of this message. + pub fn omrsi_and_ymrsi(&self) -> Option<(IndexId, IndexId)> { + self.omrsi_and_ymrsi } - /// Get the youngest message root snapshot index of this message. - pub fn ymrsi(&self) -> Option { - self.ymrsi + /// Set the oldest and youngest message root snapshot index of this message. + pub fn set_omrsi_and_ymrsi(&mut self, omrsi: IndexId, ymrsi: IndexId) { + self.omrsi_and_ymrsi = Some((omrsi, ymrsi)); } - /// Set the youngest message root snapshot index of this message. - pub fn set_ymrsi(&mut self, ymrsi: IndexId) { - self.ymrsi = Some(ymrsi); + /// Update the oldest and youngest message root snapshot index of this message if they have + /// been set already. + pub fn update_omrsi_and_ymrsi(&mut self, f: impl FnOnce(&mut IndexId, &mut IndexId)) { + if let Some((omrsi, ymrsi)) = self.omrsi_and_ymrsi.as_mut() { + f(omrsi, ymrsi); + } } /// Get the reference timestamp (seconds from the unix epoch) of this message. diff --git a/bee-tangle/src/storage.rs b/bee-tangle/src/storage.rs index 1a872feb5c..3fdbb901e6 100644 --- a/bee-tangle/src/storage.rs +++ b/bee-tangle/src/storage.rs @@ -6,7 +6,7 @@ use bee_message::{ Message, MessageId, }; use bee_storage::{ - access::{Fetch, Insert}, + access::{Exist, Fetch, Insert, InsertStrict, Update}, backend, }; @@ -16,27 +16,33 @@ use crate::{metadata::MessageMetadata, solid_entry_point::SolidEntryPoint}; pub trait StorageBackend: backend::StorageBackend + Insert - + Insert + Insert<(MessageId, MessageId), ()> + Insert + Insert + + InsertStrict + + Exist + + Exist + Fetch + Fetch + Fetch> + Fetch + + Update { } impl StorageBackend for T where T: backend::StorageBackend + Insert - + Insert + Insert<(MessageId, MessageId), ()> + Insert + Insert + + InsertStrict + + Exist + + Exist + Fetch + Fetch + Fetch> + Fetch + + Update { } diff --git a/bee-tangle/src/tangle.rs b/bee-tangle/src/tangle.rs index d2182d735c..4597df0320 100644 --- a/bee-tangle/src/tangle.rs +++ b/bee-tangle/src/tangle.rs @@ -1,11 +1,7 @@ // Copyright 2020-2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::{ - marker::PhantomData, - ops::{Deref, DerefMut}, - sync::atomic::{AtomicU32, AtomicUsize, Ordering}, -}; +use std::sync::atomic::{AtomicU32, Ordering}; use bee_message::{ milestone::{Milestone, MilestoneIndex}, @@ -13,7 +9,7 @@ use bee_message::{ }; use bee_runtime::resource::ResourceHandle; use hashbrown::HashMap; -use log::info; +use log::warn; use ref_cast::RefCast; use tokio::sync::Mutex; @@ -23,23 +19,15 @@ use crate::{ solid_entry_point::SolidEntryPoint, storage::StorageBackend, urts::UrtsTipPool, - vertex::Vertex, - vertices::Vertices, - MessageRef, }; -const DEFAULT_CACHE_LEN: usize = 100_000; -const CACHE_THRESHOLD_FACTOR: f64 = 0.1; const SYNCED_THRESHOLD: u32 = 2; const CONFIRMED_THRESHOLD: u32 = 2; /// A Tangle wrapper designed to encapsulate milestone state. pub struct Tangle { config: TangleConfig, - vertices: Vertices, - max_len: AtomicUsize, storage: ResourceHandle, - milestones: Mutex>, solid_entry_points: Mutex>, latest_milestone_index: AtomicU32, solid_milestone_index: AtomicU32, @@ -54,10 +42,7 @@ impl Tangle { /// Create a new `Tangle` instance with the given configuration and storage handle. pub fn new(config: TangleConfig, storage: ResourceHandle) -> Self { Self { - vertices: Vertices::new(config.num_partitions()), - max_len: AtomicUsize::new(DEFAULT_CACHE_LEN), storage, - milestones: Default::default(), solid_entry_points: Default::default(), latest_milestone_index: Default::default(), solid_milestone_index: Default::default(), @@ -80,111 +65,60 @@ impl Tangle { &self.config } - /// Insert a message into the tangle. - pub async fn insert( - &self, - message: Message, - message_id: MessageId, - metadata: MessageMetadata, - ) -> Option { - let exists = self.pull_message(&message_id, true).await; - - let msg = self.insert_inner(message_id, message.clone(), metadata, !exists).await; - - self.vertices - .get_mut(&message_id) - .await - .expect("Just-inserted message is missing") - .allow_eviction(); - - if msg.is_some() { - // Write parents to DB - for &parent in message.parents().iter() { - self.storage - .insert(&(parent, message_id), &()) - .unwrap_or_else(|e| info!("Failed to update approvers for message {:?}", e)); - } - - // Insert into backend using hooks - self.storage_insert(message_id, message, metadata) - .unwrap_or_else(|e| info!("Failed to insert message {:?}", e)); - } + /// Insert a message into the tangle without overwriting its metadata if it already exists. + pub fn insert(&self, message: &Message, message_id: &MessageId, metadata: &MessageMetadata) { + self.storage + .insert(message_id, message) + .ok() + .and_then(|()| { + self.storage.insert_strict(message_id, metadata).ok()?; - msg + let message_id = *message_id; + for &parent in message.parents().iter() { + self.storage + .insert(&(parent, message_id), &()) + .unwrap_or_else(|e| warn!("Failed to update approvers for message {:?}", e)); + } + Some(()) + }) + .unwrap_or_default() } /// Add a milestone to the tangle. - pub async fn add_milestone(&self, idx: MilestoneIndex, milestone: Milestone) { - // TODO: only insert if vacant + pub fn add_milestone(&self, idx: MilestoneIndex, milestone: Milestone) { + let index = IndexId::new(idx, *milestone.message_id()); + self.update_metadata(milestone.message_id(), |metadata| { metadata.flags_mut().set_milestone(true); metadata.set_milestone_index(idx); - metadata.set_omrsi(IndexId::new(idx, *milestone.message_id())); - metadata.set_ymrsi(IndexId::new(idx, *milestone.message_id())); - }) - .await; - self.storage() + metadata.set_omrsi_and_ymrsi(index, index); + }); + self.storage .insert(&idx, &milestone) - .unwrap_or_else(|e| info!("Failed to insert message {:?}", e)); - self.milestones.lock().await.insert(idx, milestone); - } - - /// Remove a milestone from the tangle. - pub async fn remove_milestone(&self, index: MilestoneIndex) { - self.milestones.lock().await.remove(&index); - } - - async fn pull_milestone(&self, idx: MilestoneIndex) -> Option { - if let Some(milestone) = self.storage().fetch(&idx).unwrap_or_else(|e| { - info!("Failed to insert message {:?}", e); - None - }) { - let message_id = *self - .milestones - .lock() - .await - .entry(idx) - .or_insert(milestone) - .message_id(); - - Some(message_id) - } else { - None - } + .unwrap_or_else(|e| warn!("Failed to insert milestone message {:?}", e)); } /// Get the milestone from the tangle that corresponds to the given milestone index. - pub async fn get_milestone(&self, index: MilestoneIndex) -> Option { - self.milestones.lock().await.get(&index).cloned() + pub fn get_milestone(&self, index: MilestoneIndex) -> Option { + self.storage.fetch(&index).unwrap_or_else(|e| { + warn!("Failed to fetch milestone {:?}", e); + None + }) } /// Get the message associated with the given milestone index from the tangle. - pub async fn get_milestone_message(&self, index: MilestoneIndex) -> Option { - // TODO: use combinator instead of match - match self.get_milestone_message_id(index).await { - None => None, - Some(ref hash) => self.get(hash).await, - } + pub fn get_milestone_message(&self, index: MilestoneIndex) -> Option { + self.get_milestone_message_id(index).and_then(|hash| self.get(&hash)) } /// Get the message ID associated with the given milestone index from the tangle. - pub async fn get_milestone_message_id(&self, index: MilestoneIndex) -> Option { - let message_id = self.milestones.lock().await.get(&index).map(|m| *m.message_id()); - - // TODO: use combinator instead of match - match message_id { - Some(m) => Some(m), - None => Some(self.pull_milestone(index).await?), - } + pub fn get_milestone_message_id(&self, index: MilestoneIndex) -> Option { + self.get_milestone(index).map(|m| *m.message_id()) } /// Return whether the tangle contains the given milestone index. - pub async fn contains_milestone(&self, idx: MilestoneIndex) -> bool { - // Not using `||` as its first operand would keep the lock alive causing a deadlock with its second operand. - if self.milestones.lock().await.contains_key(&idx) { - return true; - } - self.pull_milestone(idx).await.is_some() + pub fn contains_milestone(&self, index: MilestoneIndex) -> bool { + self.storage.exist(&index).unwrap_or_default() } /// Get the index of the latest milestone. @@ -206,12 +140,6 @@ impl Tangle { /// Update the latest solid milestone index. pub fn update_solid_milestone_index(&self, new_index: MilestoneIndex) { self.solid_milestone_index.store(*new_index, Ordering::Relaxed); - - // TODO: Formalise this a little better - let new_len = ((1000.0 + self.get_sync_threshold() as f32 * 500.0) as usize) - .min(DEFAULT_CACHE_LEN) - .max(8192); - self.resize(new_len); } /// Get the latest confirmed milestone index. @@ -334,31 +262,19 @@ impl Tangle { true } else { self.get_metadata(id) - .await .map(|metadata| metadata.flags().is_solid()) .unwrap_or(false) } } - /// Get the oldest milestone root snapshot index. - pub async fn omrsi(&self, id: &MessageId) -> Option { + /// Get the oldest and youngest milestone root snapshot index. + pub async fn omrsi_and_ymrsi(&self, id: &MessageId) -> Option<(IndexId, IndexId)> { match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) { - Some(sep) => Some(IndexId::new(*sep, *id)), - None => match self.get_metadata(id).await { - Some(metadata) => metadata.omrsi(), - None => None, - }, - } - } - - /// Get the youngest milestone root snapshot index. - pub async fn ymrsi(&self, id: &MessageId) -> Option { - match self.solid_entry_points.lock().await.get(SolidEntryPoint::ref_cast(id)) { - Some(sep) => Some(IndexId::new(*sep, *id)), - None => match self.get_metadata(id).await { - Some(metadata) => metadata.ymrsi(), - None => None, - }, + Some(sep) => { + let index = IndexId::new(*sep, *id); + Some((index, index)) + } + None => self.get_metadata(id).and_then(|metadata| metadata.omrsi_and_ymrsi()), } } @@ -387,252 +303,46 @@ impl Tangle { self.tip_pool.lock().await.non_lazy_tips().len() } - /// Change the maximum number of entries to store in the cache. - fn resize(&self, len: usize) { - self.max_len.store(len, Ordering::Relaxed); - } - - /// Return a reference to the storage used by this tangle. - fn storage(&self) -> &B { - &self.storage - } - - async fn insert_inner( - &self, - message_id: MessageId, - message: Message, - metadata: MessageMetadata, - prevent_eviction: bool, - ) -> Option { - let mut vertex = self.vertices.get_mut_or_empty(message_id).await; - - if prevent_eviction { - vertex.prevent_eviction(); - } - - let msg = if vertex.message().is_some() { - drop(vertex); - None - } else { - let parents = message.parents().clone(); - - vertex.insert_message_and_metadata(message, metadata); - let msg = vertex.message().cloned(); - drop(vertex); - - // Insert children for parents - for &parent in parents.iter() { - self.vertices.get_mut_or_empty(parent).await.add_child(message_id); - } - - msg - }; - - self.perform_eviction().await; - - msg - } - - async fn get_inner(&self, message_id: &MessageId) -> Option + '_> { - self.vertices.get_mut(message_id).await - } - /// Get the data of a vertex associated with the given `message_id`. - async fn get_with(&self, message_id: &MessageId, f: impl FnOnce(&mut Vertex) -> R) -> Option { - let exists = self.pull_message(message_id, true).await; - - self.get_inner(message_id).await.map(|mut v| { - if exists { - v.allow_eviction(); - } - f(v.deref_mut()) - }) + pub fn get(&self, message_id: &MessageId) -> Option { + self.storage.fetch(message_id).unwrap_or_default() } - /// Get the data of a vertex associated with the given `message_id`. - pub async fn get(&self, message_id: &MessageId) -> Option { - self.get_with(message_id, |v| v.message().cloned()).await.flatten() - } + /// Get the data and metadata of a vertex associated with the given `message_id`. + pub fn get_message_and_metadata(&self, message_id: &MessageId) -> Option<(Message, MessageMetadata)> { + let msg = self.storage.fetch(message_id).unwrap_or_default()?; + let meta = self.storage.fetch(message_id).unwrap_or_default()?; - async fn contains_inner(&self, message_id: &MessageId) -> bool { - self.vertices - .get(message_id) - .await - .map_or(false, |v| v.message().is_some()) + Some((msg, meta)) } /// Returns whether the message is stored in the Tangle. - pub async fn contains(&self, message_id: &MessageId) -> bool { - self.contains_inner(message_id).await || self.pull_message(message_id, false).await + pub fn contains(&self, message_id: &MessageId) -> bool { + self.storage.exist(message_id).unwrap_or_default() } /// Get the metadata of a vertex associated with the given `message_id`. - pub async fn get_metadata(&self, message_id: &MessageId) -> Option { - self.get_with(message_id, |v| v.metadata().cloned()).await.flatten() - } - - /// Get the metadata of a vertex associated with the given `message_id`. - pub async fn get_vertex(&self, message_id: &MessageId) -> Option + '_> { - let exists = self.pull_message(message_id, true).await; - - self.get_inner(message_id).await.map(|mut v| { - if exists { - v.allow_eviction(); - } - v - }) + pub fn get_metadata(&self, message_id: &MessageId) -> Option { + self.storage.fetch(message_id).unwrap_or_default() } /// Updates the metadata of a vertex. - pub async fn update_metadata(&self, message_id: &MessageId, update: Update) -> Option - where - Update: FnOnce(&mut MessageMetadata) -> R, - { - let exists = self.pull_message(message_id, true).await; - - if let Some(mut vertex) = self.vertices.get_mut(message_id).await { - if exists { - vertex.allow_eviction(); - } - let r = vertex.metadata_mut().map(update); - - if let Some((msg, meta)) = vertex.message_and_metadata() { - let (msg, meta) = ((&**msg).clone(), *meta); - - self.storage_insert(*message_id, msg, meta) - .unwrap_or_else(|e| info!("Failed to update metadata for message {:?}", e)); - - drop(vertex); - } - - r - } else { - None - } - } - - async fn children_inner(&self, message_id: &MessageId) -> Option> + '_> { - struct Wrapper<'a> { - children: Vec, - phantom: PhantomData<&'a ()>, - } - - impl<'a> Deref for Wrapper<'a> { - type Target = Vec; - - fn deref(&self) -> &Self::Target { - &self.children - } - } - - let vertex = self - .vertices - .get(message_id) - .await - // Skip approver lists that are not exhaustive - .filter(|v| v.children_exhaustive()); - - let children = match vertex { - Some(vertex) => { - let children = vertex.children().to_vec(); - drop(vertex); - children - } - None => { - drop(vertex); - let to_insert = match self.storage.fetch(message_id) { - Err(e) => { - info!("Failed to update approvers for message message {:?}", e); - Vec::new() - } - Ok(None) => Vec::new(), - Ok(Some(approvers)) => approvers, - }; - - let mut vertex = self.vertices.get_mut_or_empty(*message_id).await; - - // We've just fetched approvers from the database, so we have all the information available to us now. - // Therefore, the approvers list is exhaustive (i.e: it contains all knowledge we have). - vertex.set_exhaustive(); - - for child in to_insert { - vertex.add_child(child); - } + pub fn update_metadata( + &self, + message_id: &MessageId, + update: impl FnOnce(&mut MessageMetadata) -> R + Copy, + ) -> Option { + let mut output = None; - let children = vertex.children().to_vec(); - drop(vertex); - children - } - }; + self.storage + .update(message_id, |metadata| output = Some(update(metadata))) + .unwrap_or_default(); - Some(Wrapper { - children, - phantom: PhantomData, - }) + output } /// Returns the children of a vertex, if we know about them. - pub async fn get_children(&self, message_id: &MessageId) -> Option> { - // Effectively atomic - self.children_inner(message_id).await.map(|approvers| approvers.clone()) - } - - // Attempts to pull the message from the storage, returns true if successful. - async fn pull_message(&self, message_id: &MessageId, prevent_eviction: bool) -> bool { - let contains_now = if prevent_eviction { - self.vertices.get_mut(message_id).await.map_or(false, |mut v| { - if v.message().is_some() { - v.prevent_eviction(); - true - } else { - false - } - }) - } else { - self.contains_inner(message_id).await - }; - - // If the tangle already contains the message, do no more work - if contains_now { - true - } else if let Ok(Some((msg, metadata))) = self.storage_get(message_id) { - self.insert_inner(*message_id, msg, metadata, prevent_eviction).await; - - true - } else { - false - } - } - - async fn perform_eviction(&self) { - let max_len = self.max_len.load(Ordering::Relaxed); - let max_eviction_retries = self.config.max_eviction_retries(); - - if self.vertices.len() > max_len { - while self.vertices.len() > ((1.0 - CACHE_THRESHOLD_FACTOR) * max_len as f64) as usize { - if self.vertices.pop_random(max_eviction_retries).await.is_none() { - log::warn!( - "could not perform cache eviction after {} attempts", - max_eviction_retries - ); - - break; - } - } - } - } - - fn storage_get(&self, id: &MessageId) -> Result, B::Error> { - let msg = self.storage.fetch(id)?; - let meta = self.storage.fetch(id)?; - - Ok(msg.zip(meta)) - } - - fn storage_insert(&self, id: MessageId, tx: Message, metadata: MessageMetadata) -> Result<(), B::Error> { - self.storage.insert(&id, &tx)?; - self.storage.insert(&id, &metadata)?; - - Ok(()) + pub fn get_children(&self, message_id: &MessageId) -> Option> { + self.storage.fetch(message_id).unwrap_or_default() } } diff --git a/bee-tangle/src/traversal.rs b/bee-tangle/src/traversal.rs index 5c350f290d..e5b3631edd 100644 --- a/bee-tangle/src/traversal.rs +++ b/bee-tangle/src/traversal.rs @@ -7,15 +7,15 @@ use std::collections::HashSet; -use bee_message::MessageId; +use bee_message::{Message, MessageId}; -use crate::{metadata::MessageMetadata, storage::StorageBackend, tangle::Tangle, MessageRef}; +use crate::{metadata::MessageMetadata, storage::StorageBackend, tangle::Tangle}; /// A Tangle walker that - given a starting vertex - visits all of its ancestors that are connected through /// either the *parent1* or the *parent2* edge. The walk continues as long as the visited vertices match a certain /// condition. For each visited vertex customized logic can be applied depending on the availability of the /// vertex. Each traversed vertex provides read access to its associated data and metadata. -pub async fn visit_parents_depth_first( +pub fn visit_parents_depth_first( tangle: &Tangle, root: MessageId, matches: Match, @@ -23,9 +23,9 @@ pub async fn visit_parents_depth_first bool, - Apply: FnMut(&MessageId, &MessageRef, &MessageMetadata), - ElseApply: FnMut(&MessageId, &MessageRef, &MessageMetadata), + Match: Fn(&MessageId, &Message, &MessageMetadata) -> bool, + Apply: FnMut(&MessageId, &Message, &MessageMetadata), + ElseApply: FnMut(&MessageId, &Message, &MessageMetadata), MissingApply: FnMut(&MessageId), { let mut parents = vec![root]; @@ -33,14 +33,10 @@ pub async fn visit_parents_depth_first { - if matches(message_id, msg.clone(), meta) { + if matches(&message_id, &msg, &meta) { apply(&message_id, &msg, &meta); parents.extend_from_slice(msg.parents()); diff --git a/bee-tangle/src/urts.rs b/bee-tangle/src/urts.rs index 26a157f34f..218623bb64 100644 --- a/bee-tangle/src/urts.rs +++ b/bee-tangle/src/urts.rs @@ -141,7 +141,7 @@ impl UrtsTipPool { async fn tip_score(&self, tangle: &Tangle, message_id: &MessageId) -> Score { // in case the tip was pruned by the node, consider tip as lazy - if !tangle.contains(message_id).await { + if !tangle.contains(message_id) { Score::Lazy } else { let smi = *tangle.get_solid_milestone_index(); @@ -149,8 +149,11 @@ impl UrtsTipPool { // The tip pool only works with solid tips. Therefore, all tips added to the pool can be considered to // solid. The solid flag will be set together with omrsi and ymrsi values. Therefore, when a // message is solid, omrsi and ymrsi values are available. Therefore, unwrapping here is fine. - let omrsi = *tangle.omrsi(message_id).await.unwrap().index(); - let ymrsi = *tangle.ymrsi(message_id).await.unwrap().index(); + let (omrsi, ymrsi) = tangle + .omrsi_and_ymrsi(message_id) + .await + .map(|(o, y)| (*o.index(), *y.index())) + .unwrap(); if smi > ymrsi + YMRSI_DELTA || smi > omrsi + self.below_max_depth { Score::Lazy diff --git a/bee-tangle/src/vec_set.rs b/bee-tangle/src/vec_set.rs deleted file mode 100644 index 057e54e83a..0000000000 --- a/bee-tangle/src/vec_set.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2020-2021 IOTA Stiftung -// SPDX-License-Identifier: Apache-2.0 - -use std::{cmp::Eq, ops::Deref}; - -#[derive(Clone)] -pub struct VecSet { - items: Vec, -} - -impl Default for VecSet { - fn default() -> Self { - Self { items: Vec::default() } - } -} - -impl VecSet { - pub fn insert(&mut self, item: T) -> bool - where - T: Eq, - { - if self.items.contains(&item) { - false - } else { - self.items.push(item); - true - } - } -} - -impl Deref for VecSet { - type Target = [T]; - - fn deref(&self) -> &Self::Target { - &self.items - } -} diff --git a/bee-test/src/rand/metadata.rs b/bee-test/src/rand/metadata.rs index ec4e6e66da..4404f3b1de 100644 --- a/bee-test/src/rand/metadata.rs +++ b/bee-test/src/rand/metadata.rs @@ -26,8 +26,10 @@ pub fn rand_message_metadata() -> MessageMetadata { rand_number(), rand_number(), rand_number(), - rand_option(IndexId::new(rand_milestone_index(), rand_message_id())), - rand_option(IndexId::new(rand_milestone_index(), rand_message_id())), + rand_option(( + IndexId::new(rand_milestone_index(), rand_message_id()), + IndexId::new(rand_milestone_index(), rand_message_id()), + )), rand_conflict_reason(), ) }