Skip to content

Commit

Permalink
Merge pull request #3347 from autonomys/optimize-block-finalization
Browse files Browse the repository at this point in the history
Optimize block finalization
  • Loading branch information
nazar-pc authored Jan 14, 2025
2 parents 6aa60d7 + e5a510a commit eed2c70
Showing 1 changed file with 54 additions and 35 deletions.
89 changes: 54 additions & 35 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun};
use sc_client_api::{
AuxStore, Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, LockImportRun,
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -830,7 +832,7 @@ where

fn finalize_block<Block, Backend, Client>(
client: &Client,
telemetry: Option<TelemetryHandle>,
telemetry: Option<&TelemetryHandle>,
hash: Block::Hash,
number: NumberFor<Block>,
) where
Expand Down Expand Up @@ -916,6 +918,7 @@ where
+ HeaderBackend<Block>
+ LockImportRun<Block, Backend>
+ Finalizer<Block, Backend>
+ BlockchainEvents<Block>
+ AuxStore
+ Send
+ Sync
Expand Down Expand Up @@ -1055,19 +1058,67 @@ where
}
}

let max_segment_index_before = segment_headers_store.max_segment_index();
(best_archived_block_hash, best_archived_block_number) = archive_block(
&mut archiver,
segment_headers_store.clone(),
&*client,
&sync_oracle,
telemetry.clone(),
subspace_link.object_mapping_notification_sender.clone(),
subspace_link.archived_segment_notification_sender.clone(),
best_archived_block_hash,
block_number_to_archive,
create_object_mappings,
)
.await?;

let max_segment_index = segment_headers_store.max_segment_index();
if max_segment_index_before != max_segment_index {
let maybe_block_number_to_finalize = max_segment_index
// Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments
.and_then(|max_segment_index| {
max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
})
.and_then(|segment_index| {
segment_headers_store.get_segment_header(segment_index)
})
.map(|segment_header| segment_header.last_archived_block().number)
// Make sure not to finalize block number that does not yet exist (segment
// headers store may contain future blocks during initial sync)
.map(|block_number| block_number_to_archive.min(block_number.into()))
// Do not finalize blocks twice
.filter(|block_number| *block_number > client.info().finalized_number);

if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
{
let mut import_notification = client.every_import_notification_stream();

// Drop notification to drop acknowledgement and allow block import to
// proceed
drop(block_importing_notification);

while let Some(notification) = import_notification.next().await {
// Wait for importing block to finish importing
if notification.header.number() == &importing_block_number {
break;
}
}
}

// Block is not guaranteed to be present this deep if we have only synced recent
// blocks
if let Some(block_hash_to_finalize) =
client.block_hash(block_number_to_finalize)?
{
finalize_block(
&*client,
telemetry.as_ref(),
block_hash_to_finalize,
block_number_to_finalize,
);
}
}
}
}

Ok(())
Expand All @@ -1081,7 +1132,6 @@ async fn archive_block<Block, Backend, Client, AS, SO>(
segment_headers_store: SegmentHeadersStore<AS>,
client: &Client,
sync_oracle: &SubspaceSyncOracle<SO>,
telemetry: Option<TelemetryHandle>,
object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
best_archived_block_hash: Block::Hash,
Expand Down Expand Up @@ -1157,7 +1207,6 @@ where
encoded_block.len() as f32 / 1024.0
);

let mut new_segment_headers = Vec::new();
let block_outcome = archiver.add_block(
encoded_block,
block_object_mappings,
Expand All @@ -1175,36 +1224,6 @@ where

send_archived_segment_notification(&archived_segment_notification_sender, archived_segment)
.await;

new_segment_headers.push(segment_header);
}

if !new_segment_headers.is_empty() {
let maybe_block_number_to_finalize = segment_headers_store
.max_segment_index()
// Skip last `FINALIZATION_DEPTH_IN_SEGMENTS` archived segments
.and_then(|max_segment_index| {
max_segment_index.checked_sub(FINALIZATION_DEPTH_IN_SEGMENTS)
})
.and_then(|segment_index| segment_headers_store.get_segment_header(segment_index))
.map(|segment_header| segment_header.last_archived_block().number)
// Make sure not to finalize block number that does not yet exist (segment
// headers store may contain future blocks during initial sync)
.map(|block_number| block_number_to_archive.min(block_number.into()))
// Do not finalize blocks twice
.filter(|block_number| *block_number > client.info().finalized_number);

if let Some(block_number_to_finalize) = maybe_block_number_to_finalize {
// Block is not guaranteed to be present this deep if we have only synced recent blocks
if let Some(block_hash_to_finalize) = client.block_hash(block_number_to_finalize)? {
finalize_block(
client,
telemetry.clone(),
block_hash_to_finalize,
block_number_to_finalize,
);
}
}
}

Ok((block_hash_to_archive, block_number_to_archive))
Expand Down

0 comments on commit eed2c70

Please sign in to comment.