Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip parallel peer manager actor pw handling #12684

Draft
wants to merge 35 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
db8f248
big state witness
stedfn Dec 2, 2024
b6dfeaa
Merge branch 'master' into stefan/big_witness
stedfn Dec 2, 2024
f3b6eec
.
stedfn Dec 2, 2024
7f7182a
disable compression
stedfn Dec 2, 2024
c5a4385
remove decompression
stedfn Dec 2, 2024
c86de69
padding with 0
stedfn Dec 2, 2024
e2e87f9
Merge branch 'master' into stefan/big_witness
stedfn Dec 5, 2024
2d3e514
0 padding but still create 30mb
stedfn Dec 5, 2024
17a0d38
3mb
stedfn Dec 9, 2024
6cfc22d
30 mb and don't copy state witness parts
stedfn Dec 10, 2024
af368a4
Merge branch 'master' into stefan/big_witness
stedfn Dec 10, 2024
e77eeea
revert back to copying the parts
stedfn Dec 10, 2024
079dcac
Merge branch 'stefan/big_witness' of github.com:near/nearcore into st…
stedfn Dec 10, 2024
8bbc5e7
remove copying
stedfn Dec 10, 2024
d54505c
add decoding metric
stedfn Dec 11, 2024
d16e32f
.
stedfn Dec 13, 2024
5fe8202
.
stedfn Dec 13, 2024
efc25d5
enable simd
stedfn Dec 13, 2024
80a6dff
spawn thread in handle_partial_encoded_state_witness
stedfn Dec 18, 2024
3613619
remove file
stedfn Dec 18, 2024
c12d23b
clippy
stedfn Dec 18, 2024
1de100c
modify handle_partial_encoded_state_witness_forward
stedfn Dec 19, 2024
3f63d03
.
stedfn Dec 20, 2024
7a95e80
.
stedfn Dec 20, 2024
4b8c670
fix
stedfn Dec 20, 2024
e148f7d
Merge branch 'master' into stefan/big_witness
stedfn Dec 20, 2024
dbba188
Merge branch 'stefan/big_witness' into stefan/improved_parallel_forknet
stedfn Dec 20, 2024
c74680e
Merge branch 'master' into stefan/improved_parallel
stedfn Jan 3, 2025
b5102e8
cleanup
stedfn Jan 3, 2025
53246b4
log error if store_partial_encoded_state_witness fails
stedfn Jan 3, 2025
a052e36
Merge branch 'stefan/improved_parallel' into stefan/improved_parallel…
stedfn Jan 3, 2025
e91146d
.
stedfn Jan 3, 2025
39240a0
.
stedfn Jan 3, 2025
35247ff
Merge branch 'stefan/improved_parallel_forknet' into stefan/improved_…
stedfn Jan 3, 2025
deca4f7
actix fix
stedfn Jan 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ fn get_state_witness_block_range(
last_chunk_shard_id: ShardId,
}

let initial_prev_hash = *state_witness.chunk_header.prev_block_hash();
let initial_prev_hash = *state_witness.inner.chunk_header.prev_block_hash();
let initial_prev_block = store.get_block(&initial_prev_hash)?;
let initial_shard_layout =
epoch_manager.get_shard_layout_from_prev_block(&initial_prev_hash)?;
let initial_shard_id = state_witness.chunk_header.shard_id();
let initial_shard_id = state_witness.inner.chunk_header.shard_id();
// Check that shard id is present in current epoch.
// TODO: consider more proper way to validate this.
let _ = initial_shard_layout.get_shard_index(initial_shard_id)?;
Expand Down Expand Up @@ -333,8 +333,8 @@ pub fn pre_validate_chunk_state_witness(

// Ensure that the chunk header version is supported in this protocol version
let protocol_version =
epoch_manager.get_epoch_info(&state_witness.epoch_id)?.protocol_version();
state_witness.chunk_header.validate_version(protocol_version)?;
epoch_manager.get_epoch_info(&state_witness.inner.epoch_id)?.protocol_version();
state_witness.inner.chunk_header.validate_version(protocol_version)?;

// First, go back through the blockchain history to locate the last new chunk
// and last last new chunk for the shard.
Expand All @@ -348,19 +348,19 @@ pub fn pre_validate_chunk_state_witness(

let receipts_to_apply = validate_source_receipt_proofs(
epoch_manager,
&state_witness.source_receipt_proofs,
&state_witness.inner.source_receipt_proofs,
&blocks_after_last_last_chunk,
last_chunk_shard_layout,
last_chunk_shard_id,
)?;
let applied_receipts_hash = hash(&borsh::to_vec(receipts_to_apply.as_slice()).unwrap());
if applied_receipts_hash != state_witness.applied_receipts_hash {
if applied_receipts_hash != state_witness.inner.applied_receipts_hash {
return Err(Error::InvalidChunkStateWitness(format!(
"Receipts hash {:?} does not match expected receipts hash {:?}",
applied_receipts_hash, state_witness.applied_receipts_hash
applied_receipts_hash, state_witness.inner.applied_receipts_hash
)));
}
let (tx_root_from_state_witness, _) = merklize(&state_witness.transactions);
let (tx_root_from_state_witness, _) = merklize(&state_witness.inner.transactions);
let last_chunk_block = blocks_after_last_last_chunk.first().ok_or_else(|| {
Error::Other("blocks_after_last_last_chunk is empty, this should be impossible!".into())
})?;
Expand All @@ -374,15 +374,15 @@ pub fn pre_validate_chunk_state_witness(
}

let current_protocol_version =
epoch_manager.get_epoch_protocol_version(&state_witness.epoch_id)?;
epoch_manager.get_epoch_protocol_version(&state_witness.inner.epoch_id)?;
if !checked_feature!(
"protocol_feature_relaxed_chunk_validation",
RelaxedChunkValidation,
current_protocol_version
) {
let new_transactions = &state_witness.new_transactions;
let new_transactions = &state_witness.inner.new_transactions;
let (new_tx_root_from_state_witness, _) = merklize(&new_transactions);
let chunk_tx_root = state_witness.chunk_header.tx_root();
let chunk_tx_root = state_witness.inner.chunk_header.tx_root();
if new_tx_root_from_state_witness != chunk_tx_root {
return Err(Error::InvalidChunkStateWitness(format!(
"Witness new transactions root {:?} does not match chunk {:?}",
Expand All @@ -392,21 +392,21 @@ pub fn pre_validate_chunk_state_witness(
// Verify that all proposed transactions are valid.
if !new_transactions.is_empty() {
let transactions_validation_storage_config = RuntimeStorageConfig {
state_root: state_witness.chunk_header.prev_state_root(),
state_root: state_witness.inner.chunk_header.prev_state_root(),
use_flat_storage: true,
source: StorageDataSource::Recorded(PartialStorage {
nodes: state_witness.new_transactions_validation_state.clone(),
nodes: state_witness.inner.new_transactions_validation_state.clone(),
}),
state_patch: Default::default(),
};

match validate_prepared_transactions(
chain,
runtime_adapter,
&state_witness.chunk_header,
&state_witness.inner.chunk_header,
transactions_validation_storage_config,
&new_transactions,
&state_witness.transactions,
&state_witness.inner.transactions,
) {
Ok(result) => {
if result.transactions.len() != new_transactions.len() {
Expand Down Expand Up @@ -450,7 +450,7 @@ pub fn pre_validate_chunk_state_witness(
} else {
MainTransition::NewChunk(NewChunkData {
chunk_header: last_chunk_block.chunks().get(last_chunk_shard_index).unwrap().clone(),
transactions: state_witness.transactions.clone(),
transactions: state_witness.inner.transactions.clone(),
receipts: receipts_to_apply,
block: Chain::get_apply_chunk_block_context(
epoch_manager,
Expand All @@ -461,7 +461,7 @@ pub fn pre_validate_chunk_state_witness(
is_first_block_with_chunk_of_version: false,
storage_context: StorageContext {
storage_data_source: StorageDataSource::Recorded(PartialStorage {
nodes: state_witness.main_state_transition.base_state.clone(),
nodes: state_witness.inner.main_state_transition.base_state.clone(),
}),
state_patch: Default::default(),
},
Expand Down Expand Up @@ -596,13 +596,13 @@ pub fn validate_chunk_state_witness(
main_state_transition_cache: &MainStateTransitionCache,
) -> Result<(), Error> {
let _timer = crate::stateless_validation::metrics::CHUNK_STATE_WITNESS_VALIDATION_TIME
.with_label_values(&[&state_witness.chunk_header.shard_id().to_string()])
.with_label_values(&[&state_witness.inner.chunk_header.shard_id().to_string()])
.start_timer();
let span = tracing::debug_span!(target: "client", "validate_chunk_state_witness").entered();
let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.epoch_id)?;
let witness_chunk_shard_id = state_witness.chunk_header.shard_id();
let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.inner.epoch_id)?;
let witness_chunk_shard_id = state_witness.inner.chunk_header.shard_id();
let witness_chunk_shard_uid =
epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.epoch_id)?;
epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.inner.epoch_id)?;
let block_hash = pre_validation_output.main_transition_params.block_hash();
let epoch_id = epoch_manager.get_epoch_id(&block_hash)?;
let shard_id = pre_validation_output.main_transition_params.shard_id();
Expand Down Expand Up @@ -634,14 +634,14 @@ pub fn validate_chunk_state_witness(
}
(_, Some(result)) => (result.chunk_extra, result.outgoing_receipts),
};
if chunk_extra.state_root() != &state_witness.main_state_transition.post_state_root {
if chunk_extra.state_root() != &state_witness.inner.main_state_transition.post_state_root {
// This is an early check, it's not for correctness, only for better
// error reporting in case of an invalid state witness due to a bug.
// Only the final state root check against the chunk header is required.
return Err(Error::InvalidChunkStateWitness(format!(
"Post state root {:?} for main transition does not match expected post state root {:?}",
chunk_extra.state_root(),
state_witness.main_state_transition.post_state_root,
state_witness.inner.main_state_transition.post_state_root,
)));
}

Expand All @@ -654,7 +654,7 @@ pub fn validate_chunk_state_witness(
&mut outgoing_receipts,
protocol_version,
&witness_shard_layout,
state_witness.chunk_header.shard_id(),
state_witness.inner.chunk_header.shard_id(),
shard_id,
)?;
}
Expand All @@ -676,19 +676,19 @@ pub fn validate_chunk_state_witness(
}

if pre_validation_output.implicit_transition_params.len()
!= state_witness.implicit_transitions.len()
!= state_witness.inner.implicit_transitions.len()
{
return Err(Error::InvalidChunkStateWitness(format!(
"Implicit transitions count mismatch. Expected {}, found {}",
pre_validation_output.implicit_transition_params.len(),
state_witness.implicit_transitions.len(),
state_witness.inner.implicit_transitions.len(),
)));
}

for (implicit_transition_params, transition) in pre_validation_output
.implicit_transition_params
.into_iter()
.zip(state_witness.implicit_transitions.into_iter())
.zip(state_witness.inner.implicit_transitions.into_iter())
{
let (shard_uid, new_state_root) = match implicit_transition_params {
ImplicitTransitionParams::ApplyOldChunk(block, shard_uid) => {
Expand Down Expand Up @@ -744,7 +744,7 @@ pub fn validate_chunk_state_witness(
let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes);
validate_chunk_with_chunk_extra_and_receipts_root(
&chunk_extra,
&state_witness.chunk_header,
&state_witness.inner.chunk_header,
&outgoing_receipts_root,
)?;

Expand Down Expand Up @@ -778,9 +778,9 @@ impl Chain {
runtime_adapter: &dyn RuntimeAdapter,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
let shard_id = witness.chunk_header.shard_id();
let height_created = witness.chunk_header.height_created();
let chunk_hash = witness.chunk_header.chunk_hash();
let shard_id = witness.inner.chunk_header.shard_id();
let height_created = witness.inner.chunk_header.height_created();
let chunk_hash = witness.inner.chunk_header.chunk_hash();
let parent_span = tracing::debug_span!(
target: "chain", "shadow_validate", ?shard_id, height_created);
let (encoded_witness, raw_witness_size) = {
Expand Down
10 changes: 5 additions & 5 deletions chain/chain/src/stateless_validation/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fn record_witness_size_metrics_fallible(
encoded_size: usize,
witness: &ChunkStateWitness,
) -> Result<(), std::io::Error> {
let shard_id = witness.chunk_header.shard_id().to_string();
let shard_id = witness.inner.chunk_header.shard_id().to_string();
CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id.as_str()])
.observe(decoded_size as f64);
Expand All @@ -188,16 +188,16 @@ fn record_witness_size_metrics_fallible(
.observe(encoded_size as f64);
CHUNK_STATE_WITNESS_MAIN_STATE_TRANSISTION_SIZE
.with_label_values(&[shard_id.as_str()])
.observe(borsh::to_vec(&witness.main_state_transition)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.main_state_transition)?.len() as f64);
CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.new_transactions)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.new_transactions)?.len() as f64);
CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_STATE_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.new_transactions_validation_state)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.new_transactions_validation_state)?.len() as f64);
CHUNK_STATE_WITNESS_SOURCE_RECEIPT_PROOFS_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.source_receipt_proofs)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.source_receipt_proofs)?.len() as f64);
Ok(())
}

Expand Down
12 changes: 6 additions & 6 deletions chain/chain/src/store/latest_witnesses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ impl ChainStore {
let _span = tracing::info_span!(
target: "client",
"save_latest_chunk_state_witness",
witness_height = witness.chunk_header.height_created(),
witness_shard = ?witness.chunk_header.shard_id(),
witness_height = witness.inner.chunk_header.height_created(),
witness_shard = ?witness.inner.chunk_header.shard_id(),
)
.entered();

Expand Down Expand Up @@ -172,9 +172,9 @@ impl ChainStore {
let mut random_uuid = [0u8; 16];
OsRng.fill_bytes(&mut random_uuid);
let key = LatestWitnessesKey {
height: witness.chunk_header.height_created(),
shard_id: witness.chunk_header.shard_id().into(),
epoch_id: witness.epoch_id,
height: witness.inner.chunk_header.height_created(),
shard_id: witness.inner.chunk_header.shard_id().into(),
epoch_id: witness.inner.epoch_id,
witness_size: serialized_witness_size,
random_uuid,
};
Expand All @@ -195,7 +195,7 @@ impl ChainStore {

let store_commit_time = start_time.elapsed().saturating_sub(store_update_time);

let shard_id_str = witness.chunk_header.shard_id().to_string();
let shard_id_str = witness.inner.chunk_header.shard_id().to_string();
stateless_validation::metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME
.with_label_values(&[shard_id_str.as_str()])
.observe(store_update_time.as_secs_f64());
Expand Down
10 changes: 10 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,16 @@ pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock<HistogramVec> = LazyLock
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_DECODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_decode_time",
"State witness decoding time from the partial state witness parts in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_time_to_last_part",
Expand Down
24 changes: 12 additions & 12 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ impl ChunkValidator {
processing_done_tracker: Option<ProcessingDoneTracker>,
signer: &Arc<ValidatorSigner>,
) -> Result<(), Error> {
let prev_block_hash = state_witness.chunk_header.prev_block_hash();
let shard_id = state_witness.chunk_header.shard_id();
let prev_block_hash = state_witness.inner.chunk_header.prev_block_hash();
let shard_id = state_witness.inner.chunk_header.shard_id();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?;
if epoch_id != state_witness.epoch_id {
if epoch_id != state_witness.inner.epoch_id {
return Err(Error::InvalidChunkStateWitness(format!(
"Invalid EpochId {:?} for previous block {}, expected {:?}",
state_witness.epoch_id, prev_block_hash, epoch_id
state_witness.inner.epoch_id, prev_block_hash, epoch_id
)));
}

Expand All @@ -92,7 +92,7 @@ impl ChunkValidator {
self.runtime_adapter.as_ref(),
)?;

let chunk_header = state_witness.chunk_header.clone();
let chunk_header = state_witness.inner.chunk_header.clone();
let network_sender = self.network_sender.clone();
let epoch_manager = self.epoch_manager.clone();

Expand Down Expand Up @@ -230,8 +230,8 @@ impl Client {
) -> Result<(), Error> {
tracing::debug!(
target: "client",
chunk_hash=?witness.chunk_header.chunk_hash(),
shard_id=?witness.chunk_header.shard_id(),
chunk_hash=?witness.inner.chunk_header.chunk_hash(),
shard_id=?witness.inner.chunk_header.shard_id(),
"process_chunk_state_witness",
);

Expand All @@ -252,7 +252,7 @@ impl Client {
self.chain.chain_store.save_latest_chunk_state_witness(&witness)?;
}

match self.chain.get_block(witness.chunk_header.prev_block_hash()) {
match self.chain.get_block(witness.inner.chunk_header.prev_block_hash()) {
Ok(block) => self.process_chunk_state_witness_with_prev_block(
witness,
&block,
Expand All @@ -273,7 +273,7 @@ impl Client {
// produced the witness. However some tests bypass PartialWitnessActor, thus when a chunk producer
// receives its own state witness, we log a warning instead of panicking.
// TODO: Make sure all tests run with "test_features" and panic for non-test builds.
if signer.validator_id() == &witness.chunk_producer {
if signer.validator_id() == &witness.inner.chunk_producer {
tracing::warn!(
"Validator {:?} received state witness from itself. Witness={:?}",
signer.validator_id(),
Expand All @@ -283,7 +283,7 @@ impl Client {
}
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitnessAck(
witness.chunk_producer.clone(),
witness.inner.chunk_producer.clone(),
ChunkStateWitnessAck::new(witness),
),
));
Expand All @@ -296,10 +296,10 @@ impl Client {
processing_done_tracker: Option<ProcessingDoneTracker>,
signer: &Arc<ValidatorSigner>,
) -> Result<(), Error> {
if witness.chunk_header.prev_block_hash() != prev_block.hash() {
if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() {
return Err(Error::Other(format!(
"process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})",
witness.chunk_header.prev_block_hash(),
witness.inner.chunk_header.prev_block_hash(),
prev_block.hash()
)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Client {
witness: ChunkStateWitness,
witness_size: usize,
) -> Result<HandleOrphanWitnessOutcome, Error> {
let chunk_header = &witness.chunk_header;
let chunk_header = &witness.inner.chunk_header;
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();

Expand Down Expand Up @@ -83,7 +83,7 @@ impl Client {
.orphan_witness_pool
.take_state_witnesses_waiting_for_block(new_block.hash());
for witness in ready_witnesses {
let header = &witness.chunk_header;
let header = &witness.inner.chunk_header;
tracing::debug!(
target: "client",
witness_height = header.height_created(),
Expand Down
Loading
Loading