Skip to content

Commit

Permalink
ref: improve worker draining logic
Browse files Browse the repository at this point in the history
  • Loading branch information
niklaslong committed Jan 27, 2025
1 parent 670effe commit 78db712
Showing 1 changed file with 109 additions and 79 deletions.
188 changes: 109 additions & 79 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,102 +488,103 @@ impl<N: Network> Primary<N> {
return Ok(());
}

// Determined the required number of transmissions per worker.
let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;

// Initialize the map of transmissions.
let mut transmissions: IndexMap<_, _> = Default::default();
// Keeps track of the number of transmissions included thus far. The
// transmissions index is only updated in batches, this counter is more granular.
let mut num_transmissions_included = 0usize;
// Track the total execution costs of the batch proposal as it is being constructed.
let mut proposal_cost = 0u64;
// Take the transmissions from the workers.
'outer: for worker in self.workers.iter() {
// Initialize a tracker for included transmissions for the current worker.
let mut num_transmissions_included_for_worker = 0;
let mut worker_transmissions = worker.transmissions().into_iter();

// Check the transactions for inclusion in the batch proposal.
while num_transmissions_included_for_worker < num_transmissions_per_worker {
let Some((id, transmission)) = worker_transmissions.next() else { break };

// Check if the ledger already contains the transmission.
if self.ledger.contains_transmission(&id).unwrap_or(true) {
trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
continue;
}
// Note: worker draining and transaction inclusion needs to be thought
// through carefully when there is more than one worker. The fairness
// provided by one worker (FIFO) is no longer guaranteed with multiple workers.
debug_assert_eq!(MAX_WORKERS, 1);
let worker_transmissions = self.workers.iter().flat_map(|worker| worker.transmissions());
// Check the transactions for inclusion in the batch proposal.
// while num_transmissions_included_for_worker < num_transmissions_per_worker {
for (id, transmission) in worker_transmissions {
if transmissions.len() == BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
break;
}

// Check if the storage already contain the transmission.
// Note: We do not skip if this is the first transmission in the proposal, to ensure that
// the primary does not propose a batch with no transmissions.
if num_transmissions_included != 0 && self.storage.contains_transmission(id) {
trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
continue;
}
// Check if the ledger already contains the transmission.
if self.ledger.contains_transmission(&id).unwrap_or(true) {
trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
continue;
}

// Check the transmission is still valid.
match (id, transmission.clone()) {
(TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
// Ensure the checksum matches.
match solution.to_checksum::<N>() {
Ok(solution_checksum) if solution_checksum == checksum => (),
_ => {
trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
continue;
}
}
// Check if the solution is still valid.
if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
// Check if the storage already contain the transmission.
// Note: We do not skip if this is the first transmission in the proposal, to ensure that
// the primary does not propose a batch with no transmissions.
if !transmissions.is_empty() && self.storage.contains_transmission(id) {
trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
continue;
}

// Check the transmission is still valid.
match (id, transmission.clone()) {
(TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
// Ensure the checksum matches.
match solution.to_checksum::<N>() {
Ok(solution_checksum) if solution_checksum == checksum => (),
_ => {
trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
continue;
}
}
(TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
// Ensure the checksum matches.
match transaction.to_checksum::<N>() {
Ok(transaction_checksum) if transaction_checksum == checksum => (),
_ => {
trace!(
"Proposing - Skipping transaction '{}' - Checksum mismatch",
fmt_id(transaction_id)
);
continue;
}
}
// Check if the transaction is still valid.
// TODO: check if clone is cheap, otherwise fix.
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
// Check if the solution is still valid.
if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
continue;
}
}
(TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
// Ensure the checksum matches.
match transaction.to_checksum::<N>() {
Ok(transaction_checksum) if transaction_checksum == checksum => (),
_ => {
trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
continue;
}
}
// Check if the transaction is still valid.
// TODO: check if clone is cheap, otherwise fix.
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
continue;
}

// Ensure the transaction doesn't bring the proposal above the spend limit.
match self.ledger.compute_cost(transaction_id, transaction) {
Ok(cost) if proposal_cost + cost <= N::BATCH_SPEND_LIMIT => proposal_cost += cost,
_ => {
trace!(
"Proposing - Skipping transaction '{}' - Batch spend limit surpassed",
fmt_id(transaction_id)
);
break 'outer;
}
// Ensure the transaction doesn't bring the proposal above the spend limit.
match self.ledger.compute_cost(transaction_id, transaction) {
Ok(cost) if proposal_cost + cost <= N::BATCH_SPEND_LIMIT => proposal_cost += cost,
_ => {
trace!(
"Proposing - Skipping transaction '{}' - Batch spend limit surpassed",
fmt_id(transaction_id)
);
break;
}
}
// Note: We explicitly forbid including ratifications,
// as the protocol currently does not support ratifications.
(TransmissionID::Ratification, Transmission::Ratification) => continue,
// All other combinations are clearly invalid.
_ => continue,
}

num_transmissions_included += 1;
num_transmissions_included_for_worker += 1;
// Note: We explicitly forbid including ratifications,
// as the protocol currently does not support ratifications.
(TransmissionID::Ratification, Transmission::Ratification) => continue,
// All other combinations are clearly invalid.
_ => continue,
}

// Drain the selected transactions from the worker and insert them into the batch proposal.
for (id, transmission) in worker.drain(num_transmissions_included_for_worker) {
transmissions.insert(id, transmission);
// If the transmission is valid, insert it into the proposal's transmission list.
transmissions.insert(id, transmission);
}

// Drain the transmissions from each worker.
let transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
let mut remaining = transmissions.len();
for worker in self.workers().iter() {
let to_drain = remaining.min(transmissions_per_worker);
// No need to keep the drained items as they have already been added to the proposal.
worker.drain(to_drain).for_each(std::mem::drop);
// Update the counter and check if we have drained enough.
remaining -= to_drain;
if remaining == 0 {
break;
}
}

Expand Down Expand Up @@ -2036,6 +2037,35 @@ mod tests {
}
}

#[tokio::test]
async fn test_propose_batch_over_spend_limit() {
let mut rng = TestRng::default();
let (primary, _) = primary_without_handlers(&mut rng).await;

// Check there is no batch currently proposed.
assert!(primary.proposed_batch.read().is_none());
// Check the workers are empty.
primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));

// Generate a solution and a transaction.
let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();

// At 10 credits per execution, 10 transactions should max out a batch, add a few more.
for _i in 0..15 {
let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
// Store it on one of the workers.
primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
}

// Try to propose a batch again. This time, it should succeed.
assert!(primary.propose_batch().await.is_ok());
// Expect 10/15 transactions to be included in the proposal, along with the solution.
assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 11);
// Check the transactions were correctly drained from the workers (15 + 1 - 11).
assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 5);
}

#[tokio::test]
async fn test_propose_batch() {
let mut rng = TestRng::default();
Expand Down

0 comments on commit 78db712

Please sign in to comment.