From 78db712c7ea5a294cadda65a446325f8f3594d88 Mon Sep 17 00:00:00 2001 From: Niklas Long Date: Mon, 27 Jan 2025 17:08:23 +0100 Subject: [PATCH] ref: improve worker draining logic --- node/bft/src/primary.rs | 188 +++++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 79 deletions(-) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 604322260c..4d16ab6256 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -488,102 +488,103 @@ impl Primary { return Ok(()); } - // Determined the required number of transmissions per worker. - let num_transmissions_per_worker = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize; - // Initialize the map of transmissions. let mut transmissions: IndexMap<_, _> = Default::default(); - // Keeps track of the number of transmissions included thus far. The - // transmissions index is only updated in batches, this counter is more granular. - let mut num_transmissions_included = 0usize; // Track the total execution costs of the batch proposal as it is being constructed. let mut proposal_cost = 0u64; - // Take the transmissions from the workers. - 'outer: for worker in self.workers.iter() { - // Initialize a tracker for included transmissions for the current worker. - let mut num_transmissions_included_for_worker = 0; - let mut worker_transmissions = worker.transmissions().into_iter(); - - // Check the transactions for inclusion in the batch proposal. - while num_transmissions_included_for_worker < num_transmissions_per_worker { - let Some((id, transmission)) = worker_transmissions.next() else { break }; - - // Check if the ledger already contains the transmission. - if self.ledger.contains_transmission(&id).unwrap_or(true) { - trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); - continue; - } + // Note: worker draining and transaction inclusion needs to be thought + // through carefully when there is more than one worker. The fairness + // provided by one worker (FIFO) is no longer guaranteed with multiple workers. + debug_assert_eq!(MAX_WORKERS, 1); + let worker_transmissions = self.workers.iter().flat_map(|worker| worker.transmissions()); + // Check the transactions for inclusion in the batch proposal. + // while num_transmissions_included_for_worker < num_transmissions_per_worker { + for (id, transmission) in worker_transmissions { + if transmissions.len() == BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH { + break; + } - // Check if the storage already contain the transmission. - // Note: We do not skip if this is the first transmission in the proposal, to ensure that - // the primary does not propose a batch with no transmissions. - if num_transmissions_included != 0 && self.storage.contains_transmission(id) { - trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); - continue; - } + // Check if the ledger already contains the transmission. + if self.ledger.contains_transmission(&id).unwrap_or(true) { + trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); + continue; + } - // Check the transmission is still valid. - match (id, transmission.clone()) { - (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => { - // Ensure the checksum matches. - match solution.to_checksum::() { - Ok(solution_checksum) if solution_checksum == checksum => (), - _ => { - trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id)); - continue; - } - } - // Check if the solution is still valid. - if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { - trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); + // Check if the storage already contain the transmission. + // Note: We do not skip if this is the first transmission in the proposal, to ensure that + // the primary does not propose a batch with no transmissions. + if !transmissions.is_empty() && self.storage.contains_transmission(id) { + trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id)); + continue; + } + + // Check the transmission is still valid. + match (id, transmission.clone()) { + (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => { + // Ensure the checksum matches. + match solution.to_checksum::() { + Ok(solution_checksum) if solution_checksum == checksum => (), + _ => { + trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id)); continue; } } - (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => { - // Ensure the checksum matches. - match transaction.to_checksum::() { - Ok(transaction_checksum) if transaction_checksum == checksum => (), - _ => { - trace!( - "Proposing - Skipping transaction '{}' - Checksum mismatch", - fmt_id(transaction_id) - ); - continue; - } - } - // Check if the transaction is still valid. - // TODO: check if clone is cheap, otherwise fix. - if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await { - trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); + // Check if the solution is still valid. + if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { + trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); + continue; + } + } + (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => { + // Ensure the checksum matches. + match transaction.to_checksum::() { + Ok(transaction_checksum) if transaction_checksum == checksum => (), + _ => { + trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id)); continue; } + } + // Check if the transaction is still valid. + // TODO: check if clone is cheap, otherwise fix. + if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await { + trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); + continue; + } - // Ensure the transaction doesn't bring the proposal above the spend limit. - match self.ledger.compute_cost(transaction_id, transaction) { - Ok(cost) if proposal_cost + cost <= N::BATCH_SPEND_LIMIT => proposal_cost += cost, - _ => { - trace!( - "Proposing - Skipping transaction '{}' - Batch spend limit surpassed", - fmt_id(transaction_id) - ); - break 'outer; - } + // Ensure the transaction doesn't bring the proposal above the spend limit. + match self.ledger.compute_cost(transaction_id, transaction) { + Ok(cost) if proposal_cost + cost <= N::BATCH_SPEND_LIMIT => proposal_cost += cost, + _ => { + trace!( + "Proposing - Skipping transaction '{}' - Batch spend limit surpassed", + fmt_id(transaction_id) + ); + break; } } - // Note: We explicitly forbid including ratifications, - // as the protocol currently does not support ratifications. - (TransmissionID::Ratification, Transmission::Ratification) => continue, - // All other combinations are clearly invalid. - _ => continue, } - - num_transmissions_included += 1; - num_transmissions_included_for_worker += 1; + // Note: We explicitly forbid including ratifications, + // as the protocol currently does not support ratifications. + (TransmissionID::Ratification, Transmission::Ratification) => continue, + // All other combinations are clearly invalid. + _ => continue, } - // Drain the selected transactions from the worker and insert them into the batch proposal. - for (id, transmission) in worker.drain(num_transmissions_included_for_worker) { - transmissions.insert(id, transmission); + // If the transmission is valid, insert it into the proposal's transmission list. + transmissions.insert(id, transmission); + } + + // Drain the transmissions from each worker. + let transmissions_per_worker = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize; + let mut remaining = transmissions.len(); + for worker in self.workers().iter() { + let to_drain = remaining.min(transmissions_per_worker); + // No need to keep the drained items as they have already been added to the proposal. + worker.drain(to_drain).for_each(std::mem::drop); + // Update the counter and check if we have drained enough. + remaining -= to_drain; + if remaining == 0 { + break; } } @@ -2036,6 +2037,35 @@ mod tests { } } + #[tokio::test] + async fn test_propose_batch_over_spend_limit() { + let mut rng = TestRng::default(); + let (primary, _) = primary_without_handlers(&mut rng).await; + + // Check there is no batch currently proposed. + assert!(primary.proposed_batch.read().is_none()); + // Check the workers are empty. + primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty())); + + // Generate a solution and a transaction. + let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); + primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap(); + + // At 10 credits per execution, 10 transactions should max out a batch, add a few more. + for _i in 0..15 { + let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng); + // Store it on one of the workers. + primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); + } + + // Try to propose a batch again. This time, it should succeed. + assert!(primary.propose_batch().await.is_ok()); + // Expect 10/15 transactions to be included in the proposal, along with the solution. + assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 11); + // Check the transactions were correctly drained from the workers (15 + 1 - 11). + assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::(), 5); + } + #[tokio::test] async fn test_propose_batch() { let mut rng = TestRng::default();