Skip to content

Commit

Permalink
fix: consolidate db initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelcr committed Sep 11, 2024
1 parent 5f42235 commit 044bec4
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 180 deletions.
44 changes: 20 additions & 24 deletions components/ordhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use ordhook::chainhook_sdk::types::{BitcoinBlockData, TransactionIdentifier};
use ordhook::chainhook_sdk::utils::BlockHeights;
use ordhook::chainhook_sdk::utils::Context;
use ordhook::config::Config;
use ordhook::core::meta_protocols::brc20::db::{
brc20_new_rw_db_conn, get_brc20_operations_on_block,
};
use ordhook::core::meta_protocols::brc20::db::get_brc20_operations_on_block;
use ordhook::core::pipeline::download_and_pipeline_blocks;
use ordhook::core::pipeline::processors::block_archiving::start_block_archiving_processor;
use ordhook::core::pipeline::processors::start_inscription_indexing_processor;
Expand All @@ -32,17 +30,16 @@ use ordhook::db::blocks::{
use ordhook::db::cursor::BlockBytesCursor;
use ordhook::db::ordinals::{
find_all_inscriptions_in_block, find_all_transfers_in_block, find_inscription_with_id,
find_latest_inscription_block_height, get_default_ordhook_db_file_path,
open_readonly_ordhook_db_conn,
find_latest_inscription_block_height, get_default_ordinals_db_file_path, open_ordinals_db,
};
use ordhook::db::{delete_data_in_ordhook_db, open_readwrite_ordhook_dbs};
use ordhook::db::{drop_block_data_from_all_dbs, initialize_sqlite_dbs, open_all_dbs_rw};
use ordhook::download::download_archive_datasets_if_required;
use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use ordhook::service::observers::initialize_observers_db;
use ordhook::service::{start_observer_forwarding, Service};
use ordhook::utils::bitcoind::bitcoind_get_block_height;
use ordhook::utils::monitoring::PrometheusMonitoring;
use ordhook::{hex, initialize_databases, try_error, try_info, try_warn};
use ordhook::{hex, try_error, try_info, try_warn};
use reqwest::Client as HttpClient;
use std::collections::HashSet;
use std::io::{BufReader, Read};
Expand Down Expand Up @@ -598,12 +595,15 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut total_inscriptions = 0;
let mut total_transfers = 0;

let db_connections = initialize_databases(&config, ctx);
let db_connections = initialize_sqlite_dbs(&config, ctx);
while let Some(block_height) = block_range.pop_front() {
let inscriptions =
find_all_inscriptions_in_block(&block_height, &db_connections.ordhook, ctx);
let inscriptions = find_all_inscriptions_in_block(
&block_height,
&db_connections.ordinals,
ctx,
);
let locations =
find_all_transfers_in_block(&block_height, &db_connections.ordhook, ctx);
find_all_transfers_in_block(&block_height, &db_connections.ordinals, ctx);

let mut total_transfers_in_block = 0;

Expand Down Expand Up @@ -657,7 +657,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
}
if total_transfers == 0 && total_inscriptions == 0 {
let db_file_path =
get_default_ordhook_db_file_path(&config.expected_cache_path());
get_default_ordinals_db_file_path(&config.expected_cache_path());
try_warn!(ctx, "No data available. Check the validity of the range being scanned and the validity of your local database {}", db_file_path.display());
}
}
Expand All @@ -673,8 +673,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {

let _ = download_archive_datasets_if_required(&config, ctx).await;

let inscriptions_db_conn =
open_readonly_ordhook_db_conn(&config.expected_cache_path(), ctx)?;
let inscriptions_db_conn = open_ordinals_db(&config.expected_cache_path(), ctx)?;
let (inscription, block_height) =
match find_inscription_with_id(&cmd.inscription_id, &inscriptions_db_conn, ctx)? {
Some(entry) => entry,
Expand Down Expand Up @@ -742,10 +741,10 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
&cmd.config_path,
&None,
)?;
let db_connections = initialize_databases(&config, ctx);
let db_connections = initialize_sqlite_dbs(&config, ctx);

let last_known_block =
find_latest_inscription_block_height(&db_connections.ordhook, ctx)?;
find_latest_inscription_block_height(&db_connections.ordinals, ctx)?;
if last_known_block.is_none() {
open_blocks_db_with_retry(true, &config, ctx);
}
Expand Down Expand Up @@ -808,12 +807,12 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::New(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path, &None)?;
// Create DB
initialize_databases(&config, ctx);
initialize_sqlite_dbs(&config, ctx);
open_blocks_db_with_retry(true, &config, ctx);
}
Command::Db(OrdhookDbCommand::Sync(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path, &None)?;
initialize_databases(&config, ctx);
initialize_sqlite_dbs(&config, ctx);
let service = Service::new(config, ctx.clone());
service.update_state(None).await?;
}
Expand Down Expand Up @@ -928,16 +927,13 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
return Err("Deletion aborted".to_string());
}

let (blocks_db_rw, inscriptions_db_conn_rw) =
open_readwrite_ordhook_dbs(&config, &ctx)?;
let brc_20_db_conn_rw = brc20_new_rw_db_conn(&config, ctx);
let (blocks_db_rw, sqlite_dbs_rw) = open_all_dbs_rw(&config, &ctx)?;

delete_data_in_ordhook_db(
drop_block_data_from_all_dbs(
cmd.start_block,
cmd.end_block,
&inscriptions_db_conn_rw,
&blocks_db_rw,
&brc_20_db_conn_rw,
&sqlite_dbs_rw,
ctx,
)?;
info!(
Expand Down
10 changes: 5 additions & 5 deletions components/ordhook-core/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use crate::{
open_blocks_db_with_retry,
},
cursor::TransactionBytesCursor,
ordinals::{find_latest_inscription_block_height, open_readonly_ordhook_db_conn},
initialize_sqlite_dbs,
ordinals::{find_latest_inscription_block_height, open_ordinals_db},
},
initialize_databases,
utils::bitcoind::bitcoind_get_block_height,
};

Expand Down Expand Up @@ -116,7 +116,7 @@ pub fn compute_next_satpoint_data(

pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_blocks_db_with_retry(true, &config, &ctx);
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn = open_ordinals_db(&config.expected_cache_path(), &ctx)?;
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?
{
Expand All @@ -140,10 +140,10 @@ pub fn should_sync_ordhook_db(
let mut start_block = find_last_block_inserted(&blocks_db) as u64;

if start_block == 0 {
let _ = initialize_databases(config, ctx);
let _ = initialize_sqlite_dbs(config, ctx);
}

let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn = open_ordinals_db(&config.expected_cache_path(), &ctx)?;

match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? {
Some(height) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{
cursor::TransactionBytesCursor,
ordinals::{
get_any_entry_in_ordinal_activities, get_latest_indexed_inscription_number,
open_readonly_ordhook_db_conn, open_readwrite_ordhook_db_conn,
open_ordinals_db, open_ordinals_db_rw,
},
},
service::write_brc20_block_operations,
Expand Down Expand Up @@ -78,11 +78,11 @@ pub fn start_inscription_indexing_processor(
let mut garbage_collect_nth_block = 0;

let mut inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx).unwrap();
open_ordinals_db_rw(&config.expected_cache_path(), &ctx).unwrap();
let mut empty_cycles = 0;

let inscriptions_db_conn =
open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx).unwrap();
open_ordinals_db(&config.expected_cache_path(), &ctx).unwrap();
let mut sequence_cursor = SequenceCursor::new(&inscriptions_db_conn);

let mut brc20_cache = brc20_new_cache(&config);
Expand Down Expand Up @@ -154,8 +154,7 @@ pub fn start_inscription_indexing_processor(

// Recreate sqlite db connection on a regular basis
inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx)
.unwrap();
open_ordinals_db_rw(&config.expected_cache_path(), &ctx).unwrap();
inscriptions_db_conn_rw.flush_prepared_statement_cache();
garbage_collect_nth_block = 0;
}
Expand Down Expand Up @@ -353,8 +352,10 @@ mod test {
meta_protocols::brc20::cache::brc20_new_cache, new_traversals_lazy_cache,
protocol::inscription_sequencing::SequenceCursor,
},
db::{blocks::open_blocks_db_with_retry, ordinals::open_readwrite_ordhook_db_conn},
drop_databases, initialize_databases,
db::{
blocks::open_blocks_db_with_retry, drop_sqlite_dbs, initialize_sqlite_dbs,
ordinals::open_ordinals_db_rw,
},
utils::{
monitoring::PrometheusMonitoring,
test_helpers::{new_test_block, new_test_reveal_tx},
Expand All @@ -369,15 +370,17 @@ mod test {
let config = Config::test_default();

// Create DBs
drop_databases(&config);
let db_conns = initialize_databases(&config, &ctx);
drop_sqlite_dbs(&config);
let db_conns = initialize_sqlite_dbs(&config, &ctx);
let _ = open_blocks_db_with_retry(true, &config, &ctx);

// Insert block into rocksdb

let mut next_blocks = vec![new_test_block(vec![new_test_reveal_tx()])];
let mut sequence_cursor = SequenceCursor::new(&db_conns.ordhook);
let mut sequence_cursor = SequenceCursor::new(&db_conns.ordinals);
let cache_l2 = Arc::new(new_traversals_lazy_cache(2048));
let mut inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx).expect("");
open_ordinals_db_rw(&config.expected_cache_path(), &ctx).expect("");

let results = process_blocks(
&mut next_blocks,
Expand Down Expand Up @@ -416,13 +419,13 @@ mod test {
let mut config = Config::mainnet_default();
config.storage.working_dir = "tmp".to_string();
config.meta_protocols.brc20 = true;
drop_databases(&config);
let mut db_conns = initialize_databases(&config, &ctx);
drop_sqlite_dbs(&config);
let mut db_conns = initialize_sqlite_dbs(&config, &ctx);
let mut next_blocks = vec![new_test_block(vec![new_test_reveal_tx()])];
let mut sequence_cursor = SequenceCursor::new(&db_conns.ordhook);
let mut sequence_cursor = SequenceCursor::new(&db_conns.ordinals);
let cache_l2 = Arc::new(new_traversals_lazy_cache(2048));
let mut inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx).expect("");
open_ordinals_db_rw(&config.expected_cache_path(), &ctx).expect("");
let mut brc20_cache = brc20_new_cache(&config);

let _ = process_blocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
},
},
db::ordinals::{
insert_entries_from_block_in_inscriptions, open_readwrite_ordhook_db_conn,
insert_entries_from_block_in_inscriptions, open_ordinals_db_rw,
remove_entries_from_locations_at_block_height,
},
try_info, try_warn,
Expand All @@ -35,7 +35,7 @@ pub fn start_transfers_recomputing_processor(
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop")
.spawn(move || {
let mut inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), &ctx).unwrap();
open_ordinals_db_rw(&config.expected_cache_path(), &ctx).unwrap();
let mut empty_cycles = 0;

loop {
Expand Down
24 changes: 13 additions & 11 deletions components/ordhook-core/src/core/protocol/inscription_sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,10 @@ mod test {
use crate::{
config::Config,
core::protocol::inscription_sequencing::SequenceCursor,
db::ordinals::update_sequence_metadata_with_block,
drop_databases, initialize_databases,
db::{
drop_sqlite_dbs, initialize_sqlite_dbs,
ordinals::update_sequence_metadata_with_block,
},
utils::test_helpers::{new_test_block, new_test_reveal_tx_with_operation},
};

Expand All @@ -992,14 +994,14 @@ mod test {
fn picks_next((block_height, cursed): (u64, bool)) -> (i64, i64) {
let ctx = Context::empty();
let config = Config::test_default();
drop_databases(&config);
let db_conns = initialize_databases(&config, &ctx);
drop_sqlite_dbs(&config);
let db_conns = initialize_sqlite_dbs(&config, &ctx);
let mut block = new_test_block(vec![new_test_reveal_tx_with_operation()]);
block.block_identifier.index = block_height;

// Pick next twice so we can test all cases.
update_sequence_metadata_with_block(&block, &db_conns.ordhook, &ctx);
let mut cursor = SequenceCursor::new(&db_conns.ordhook);
update_sequence_metadata_with_block(&block, &db_conns.ordinals, &ctx);
let mut cursor = SequenceCursor::new(&db_conns.ordinals);
let _ = cursor.pick_next(
cursed,
block.block_identifier.index + 1,
Expand All @@ -1009,7 +1011,7 @@ mod test {
cursor.increment(cursed, &ctx);

block.block_identifier.index = block.block_identifier.index + 1;
update_sequence_metadata_with_block(&block, &db_conns.ordhook, &ctx);
update_sequence_metadata_with_block(&block, &db_conns.ordinals, &ctx);
let next = cursor.pick_next(
cursed,
block.block_identifier.index + 1,
Expand All @@ -1024,11 +1026,11 @@ mod test {
fn resets_on_previous_block() {
let ctx = Context::empty();
let config = Config::test_default();
drop_databases(&config);
let db_conns = initialize_databases(&config, &ctx);
drop_sqlite_dbs(&config);
let db_conns = initialize_sqlite_dbs(&config, &ctx);
let block = new_test_block(vec![new_test_reveal_tx_with_operation()]);
update_sequence_metadata_with_block(&block, &db_conns.ordhook, &ctx);
let mut cursor = SequenceCursor::new(&db_conns.ordhook);
update_sequence_metadata_with_block(&block, &db_conns.ordinals, &ctx);
let mut cursor = SequenceCursor::new(&db_conns.ordinals);
let _ = cursor.pick_next(
false,
block.block_identifier.index + 1,
Expand Down
Loading

0 comments on commit 044bec4

Please sign in to comment.