From 827e63a81a4da26395181ebfa1706e76d76df8a3 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 16 Dec 2024 16:12:08 +0800 Subject: [PATCH] switch to foreset CAR db --- src/db/memory.rs | 54 ++++++++++--------- src/tool/subcommands/api_cmd.rs | 4 +- .../api_cmd/generate_test_snapshot.rs | 38 +++++++++++-- src/tool/subcommands/api_cmd/test_snapshot.rs | 24 ++++++--- 4 files changed, 83 insertions(+), 37 deletions(-) diff --git a/src/db/memory.rs b/src/db/memory.rs index 2f2888aa5e9..b8d345123d9 100644 --- a/src/db/memory.rs +++ b/src/db/memory.rs @@ -1,18 +1,20 @@ // Copyright 2019-2024 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::{EthMappingsStore, SettingsStore}; +use super::{EthMappingsStore, SettingsStore, SettingsStoreExt}; +use crate::blocks::TipsetKey; use crate::cid_collections::CidHashSet; use crate::db::{GarbageCollectable, PersistentStore}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::rpc::eth::types::EthHash; +use crate::utils::db::car_stream::CarBlock; use crate::utils::multihash::prelude::*; use ahash::HashMap; +use anyhow::Context as _; use cid::Cid; use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; use parking_lot::RwLock; -use std::ops::Deref; #[derive(Debug, Default)] pub struct MemoryDB { @@ -23,29 +25,31 @@ pub struct MemoryDB { } impl MemoryDB { - pub fn serialize(&self) -> anyhow::Result> { - let blockchain_db = self.blockchain_db.read(); - let blockchain_persistent_db = self.blockchain_persistent_db.read(); - let settings_db = self.settings_db.read(); - let eth_mappings_db = self.eth_mappings_db.read(); - let tuple = ( - blockchain_db.deref(), - blockchain_persistent_db.deref(), - settings_db.deref(), - eth_mappings_db.deref(), - ); - Ok(fvm_ipld_encoding::to_vec(&tuple)?) - } - - pub fn deserialize_from(bytes: &[u8]) -> anyhow::Result { - let (blockchain_db, blockchain_persistent_db, settings_db, eth_mappings_db) = - fvm_ipld_encoding::from_slice(bytes)?; - Ok(Self { - blockchain_db: RwLock::new(blockchain_db), - blockchain_persistent_db: RwLock::new(blockchain_persistent_db), - settings_db: RwLock::new(settings_db), - eth_mappings_db: RwLock::new(eth_mappings_db), - }) + pub async fn export_forest_car( + &self, + writer: &mut W, + ) -> anyhow::Result<()> { + let roots = + SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? + .context("chain head is not tracked and cannot be exported")? + .into_cids(); + let blocks = { + let blockchain_db = self.blockchain_db.read(); + let blockchain_persistent_db = self.blockchain_persistent_db.read(); + blockchain_db + .iter() + .chain(blockchain_persistent_db.iter()) + .map(|(&cid, data)| { + anyhow::Ok(CarBlock { + cid, + data: data.clone(), + }) + }) + .collect_vec() + }; + let frames = + crate::db::car::forest::Encoder::compress_stream_default(futures::stream::iter(blocks)); + crate::db::car::forest::Encoder::write(writer, roots, frames).await } } diff --git a/src/tool/subcommands/api_cmd.rs b/src/tool/subcommands/api_cmd.rs index 3f8f36b2ea5..b4f02a39583 100644 --- a/src/tool/subcommands/api_cmd.rs +++ b/src/tool/subcommands/api_cmd.rs @@ -264,7 +264,9 @@ impl ApiCommands { { Ok(_) => { let snapshot = { - let db = tracking_db.tracker.serialize()?; + tracking_db.ensure_chain_head_is_tracked()?; + let mut db = vec![]; + tracking_db.export_forest_car(&mut db).await?; RpcTestSnapshot { name: test_dump.request.method_name.to_string(), params: test_dump.request.params, diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 1f9339fbef6..ed8ac0223f3 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -3,12 +3,13 @@ use super::*; use crate::{ + blocks::{CachingBlockHeader, TipsetKey}, chain::ChainStore, chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage}, daemon::db_util::load_all_forest_cars, db::{ db_engine::open_db, parity_db::ParityDb, EthMappingsStore, MemoryDB, SettingsStore, - CAR_DB_DIR_NAME, + SettingsStoreExt, CAR_DB_DIR_NAME, }, genesis::{get_network_name_from_genesis, read_genesis_header}, libp2p::{NetworkMessage, PeerManager}, @@ -128,16 +129,47 @@ async fn ctx( /// A [`Blockstore`] wrapper that tracks read operations to the inner [`Blockstore`] with an [`MemoryDB`] pub struct ReadOpsTrackingStore { inner: T, - pub tracker: Arc, + tracker: Arc, } -impl ReadOpsTrackingStore { +impl ReadOpsTrackingStore +where + T: Blockstore + SettingsStore, +{ pub fn new(inner: T) -> Self { Self { inner, tracker: Arc::new(Default::default()), } } + + pub fn ensure_chain_head_is_tracked(&self) -> anyhow::Result<()> { + if !self.is_chain_head_tracked()? { + let _ = + SettingsStoreExt::read_obj::(self, crate::db::setting_keys::HEAD_KEY)? + .context("HEAD_KEY not found")? + .into_cids() + .into_iter() + .map(|key| CachingBlockHeader::load(self, key)) + .collect::>>>()? + .map(Tipset::new) + .transpose()? + .context("failed to load tipset")?; + } + + Ok(()) + } + + fn is_chain_head_tracked(&self) -> anyhow::Result { + SettingsStore::exists(&self.tracker, crate::db::setting_keys::HEAD_KEY) + } + + pub async fn export_forest_car( + &self, + writer: &mut W, + ) -> anyhow::Result<()> { + self.tracker.export_forest_car(writer).await + } } impl Blockstore for ReadOpsTrackingStore { diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index c4896a9f21e..923b9d1f7ad 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -4,7 +4,10 @@ use crate::{ chain::ChainStore, chain_sync::{network_context::SyncNetworkContext, SyncConfig, SyncStage}, - db::MemoryDB, + db::{ + car::{AnyCar, ManyCar}, + MemoryDB, + }, genesis::{get_network_name_from_genesis, read_genesis_header}, libp2p::{NetworkMessage, PeerManager}, lotus_json::HasLotusJson, @@ -39,24 +42,29 @@ pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> { } else { snapshot_bytes }; - let snapshot: RpcTestSnapshot = serde_json::from_slice(snapshot_bytes.as_slice())?; - let db = Arc::new(MemoryDB::deserialize_from(snapshot.db.as_slice())?); + let RpcTestSnapshot { + name: method_name, + params, + db: db_bytes, + response: expected_response, + } = serde_json::from_slice(snapshot_bytes.as_slice())?; + let db = Arc::new(ManyCar::new(MemoryDB::default()).with_read_only(AnyCar::new(db_bytes)?)?); let chain_config = Arc::new(ChainConfig::calibnet()); let (ctx, _, _) = ctx(db, chain_config).await?; - let params_raw = match serde_json::to_string(&snapshot.params)? { + let params_raw = match serde_json::to_string(¶ms)? { s if s.is_empty() => None, s => Some(s), }; macro_rules! run_test { ($ty:ty) => { - if snapshot.name.as_str() == <$ty>::NAME { + if method_name.as_str() == <$ty>::NAME { let params = <$ty>::parse_params(params_raw.clone(), ParamStructure::Either)?; let result = <$ty>::handle(ctx.clone(), params) .await .map_err(|e| e.to_string()) .and_then(|r| r.into_lotus_json_value().map_err(|e| e.to_string())); - assert_eq!(snapshot.response, result); + assert_eq!(expected_response, result); run = true; } }; @@ -70,10 +78,10 @@ pub async fn run_test_from_snapshot(path: &Path) -> anyhow::Result<()> { } async fn ctx( - db: Arc, + db: Arc>, chain_config: Arc, ) -> anyhow::Result<( - Arc>, + Arc>>, flume::Receiver, tokio::sync::mpsc::Receiver<()>, )> {