From ec8b2bbc43a8c5d34c9682bd50f41a6f9a102e1e Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 2 Oct 2024 15:11:29 +1000 Subject: [PATCH] Clean up a little bit and fix compilation errors --- anchor/qbft/src/lib.rs | 198 +++++++++++++++++++++++------------------ 1 file changed, 112 insertions(+), 86 deletions(-) diff --git a/anchor/qbft/src/lib.rs b/anchor/qbft/src/lib.rs index b3620055..a6492e35 100644 --- a/anchor/qbft/src/lib.rs +++ b/anchor/qbft/src/lib.rs @@ -4,13 +4,13 @@ use std::collections::HashMap; use std::fmt::Debug; use std::hash::Hash; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; mod config; mod error; -#[cfg(test)] -mod tests; +// #[cfg(test)] +// mod tests; type ValidationId = usize; type Round = usize; @@ -23,16 +23,16 @@ type MessageKey = Round; pub struct Qbft where F: LeaderFunction + Clone, - D: Debug + Default + Clone + Eq + Hash, + D: Debug + Clone + Eq + Hash, { config: Config, instance_height: usize, current_round: usize, - data: D, + data: Option, /// ID used for tracking validation of messages - current_validation_id: usize, + _current_validation_id: usize, /// Hashmap of validations that have been sent to the processor - inflight_validations: HashMap>, // TODO: Potentially unbounded + _inflight_validations: HashMap>, // TODO: Potentially unbounded /// The messages received this round that we have collected to reach quorum prepare_messages: HashMap>>, commit_messages: HashMap>>, @@ -50,7 +50,7 @@ where // Messages that can be received from the message_in channel #[allow(dead_code)] #[derive(Debug, Clone)] -pub enum InMessage { +pub enum InMessage { /// A request for data to form consensus on if we are the leader. RecvData(GetData), /// A PROPOSE message to be sent on the network. @@ -59,8 +59,6 @@ pub enum InMessage { Prepare(PrepareMessage), /// A commit message to be sent on the network. Commit(CommitMessage), - /// A validation request from the application to check if the message should be commited. - Validate(ValidationMessage), /// Round change message received from network RoundChange(RoundChange), } @@ -68,7 +66,7 @@ pub enum InMessage { /// Messages that may be sent to the message_out channel from the instance to the client processor #[allow(dead_code)] #[derive(Debug, Clone)] -pub enum OutMessage { +pub enum OutMessage { /// A request for data to form consensus on if we are the leader. GetData(GetData), /// A PROPOSE message to be sent on the network. @@ -88,7 +86,7 @@ pub enum OutMessage { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct RoundChange { +pub struct RoundChange { instance_id: usize, instance_height: usize, round: usize, @@ -97,16 +95,15 @@ pub struct RoundChange { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct GetData { +pub struct GetData { instance_id: usize, - instance_height: usize, round: usize, value: D, } #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct ProposeMessage { +pub struct ProposeMessage { instance_id: usize, instance_height: usize, round: usize, @@ -115,7 +112,7 @@ pub struct ProposeMessage { #[allow(dead_code)] #[derive(Debug, Clone, Default)] -pub struct PrepareMessage { +pub struct PrepareMessage { instance_id: usize, instance_height: usize, round: usize, @@ -124,7 +121,7 @@ pub struct PrepareMessage { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct CommitMessage { +pub struct CommitMessage { instance_id: usize, instance_height: usize, round: usize, @@ -133,7 +130,7 @@ pub struct CommitMessage { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct ValidationMessage { +pub struct ValidationMessage { instance_id: ValidationId, instance_height: usize, round: usize, @@ -176,7 +173,7 @@ pub enum Completed { impl Qbft where F: LeaderFunction + Clone, - D: Debug + Default + Clone + Eq + Hash + Hash + Eq, + D: Debug + Clone + Eq + Hash + Hash + Eq, { pub fn new( config: Config, @@ -193,9 +190,9 @@ where current_round: config.round, instance_height: config.instance_height, config, - current_validation_id: 0, - data: D::default(), - inflight_validations: HashMap::with_capacity(100), + _current_validation_id: 0, + _inflight_validations: HashMap::with_capacity(100), + data: None, prepare_messages: HashMap::with_capacity(estimated_map_size), commit_messages: HashMap::with_capacity(estimated_map_size), round_change_messages: HashMap::with_capacity(estimated_map_size), @@ -229,8 +226,7 @@ where // received_roundChange function Some(InMessage::RoundChange(round_change_message)) => self.received_round_change(round_change_message), - // None => { }// Channel is closed - _ => {} + None => { }// Channel is closed // TODO: FILL THESE IN } } @@ -249,7 +245,7 @@ where } fn set_data(&mut self, data: D) { - self.data = data; + self.data = Some(data); } fn instance_id(&self) -> usize { @@ -259,8 +255,8 @@ where self.config.committee_members.clone() } - fn validate_data(&self, _data: D) -> bool { - true + fn validate_data(&self, data: D) -> Option { + Some(data) } fn send_message(&mut self, message: OutMessage) { @@ -287,58 +283,94 @@ where ) { debug!("ID{}: believes they are the leader", self.instance_id()); - self.send_message(OutMessage::GetData(GetData { - instance_id: self.instance_id(), - instance_height: self.instance_height, - round: self.current_round, - value: self.data.clone(), - })); + if let Some(data) = self.data.as_ref() { + self.send_message(OutMessage::GetData(GetData { + instance_id: self.instance_id(), + round: self.current_round, + value: data.clone(), + })); + } else { + error!("Attempted to send empty data"); + } }; } + /// Received data in order to come to consensus on. + /// + /// This function validates the data and then sends a proposal to our peers. fn received_data(&mut self, message: GetData) { + // Check that we are the leader to make sure this is a timely response, for whilst we are + // still the leader if self.config.leader_fn.leader_function( self.instance_id(), self.current_round, self.instance_height, self.config.committee_size, - ) && self.instance_height == message.instance_height - && self.validate_data(message.value.clone()) + ) + // I dont think we need instance height + // here { - self.set_data(message.value.clone()); - self.send_proposal(); - self.send_prepare(message.value.clone()); - }; + if let Some(data) = self.validate_data(message.value) { + self.set_data(data.clone()); + self.send_proposal(data.clone()); + self.send_prepare(data); + } + } } - fn send_proposal(&mut self) { + /// Comment + fn send_proposal(&mut self, data: D) { self.send_message(OutMessage::Propose(ProposeMessage { instance_id: self.instance_id(), instance_height: self.instance_height, round: self.current_round, - value: self.data.clone(), + value: data, })); } fn send_prepare(&mut self, data: D) { - let _ = self.message_out.send(OutMessage::Prepare(PrepareMessage { + let _ = self.send_message(OutMessage::Prepare(PrepareMessage { instance_id: self.instance_id(), instance_height: self.instance_height, round: self.current_round, value: data.clone(), })); //And store a prepare locally - let instance_id = self.instance_id(); + let operator_id = self.instance_id(); self.prepare_messages .entry(self.current_round) .or_default() .insert( - instance_id, + operator_id, PrepareMessage { - instance_id, + instance_id: operator_id, instance_height: self.instance_height, round: self.current_round, - value: data.clone(), + value: data, + }, + ); + } + + // TODO: DO this + fn send_commit(&mut self, data: D) { + let _ = self.send_message(OutMessage::Prepare(PrepareMessage { + instance_id: self.instance_id(), + instance_height: self.instance_height, + round: self.current_round, + value: data.clone(), + })); + //And store a prepare locally + let operator_id = self.instance_id(); + self.prepare_messages + .entry(self.current_round) + .or_default() + .insert( + operator_id, + PrepareMessage { + instance_id: operator_id, + instance_height: self.instance_height, + round: self.current_round, + value: data, }, ); } @@ -347,13 +379,16 @@ where /// 1. Check the proposer is valid and who we expect /// 2. Check that the proposal is valid and we agree on the value fn received_propose(&mut self, propose_message: ProposeMessage) { - //Check if proposal is from the leader we expect + // Check if proposal is from the leader we expect if self.config.leader_fn.leader_function( propose_message.instance_id, self.current_round, self.instance_height, self.config.committee_size, - ) { + ) && self + .committee_members() + .contains(&propose_message.instance_id) + { let self_instance_id = self.instance_id(); debug!( "ID {}: Proposal is from round leader with ID {}", @@ -361,9 +396,9 @@ where ); // Validate the proposal with a local function that is is passed in frm the config // similar to the leaderfunction for now return bool -> true - if self.validate_data(propose_message.value.clone()) { + if let Some(data) = self.validate_data(propose_message.value) { // If of valid type, send prepare - self.send_prepare(propose_message.value) + self.send_prepare(data) } } } @@ -372,11 +407,11 @@ where /// If we have reached quorum then send a commit /// Otherwise store the prepare and wait for quorum. fn received_prepare(&mut self, prepare_message: PrepareMessage) { - // Check if the prepare message is from the committee + // Check if the prepare message is from the committee and the data is valid if self .committee_members() .contains(&prepare_message.instance_id) - && self.validate_data(prepare_message.value.clone()) + && self.validate_data(prepare_message.value.clone()).is_some() { //TODO: check the prepare message contains correct struct of data @@ -394,34 +429,20 @@ where ); } - // Check unique prepare messages for >= quorum - if self.prepare_messages.len() >= self.config.quorum_size { - // TODO: Kingy -> Look at this - - /* - if self.prepare_messages.get(prepare_message.round).map(|sub_hashmap| sub_hashmap.len() >= self.config.quorum_size).unwrap_or(false) { - if k.0 == self.current_round - { - if let Some(messages) = self.prepare_messages.get(&k){ - this_round_prepares.push(messages); - } - } - } - */ - if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) { - // Check the quorum size - if round_messages.len() >= self.config.quorum_size { - let counter = round_messages.values().fold( - HashMap::::new(), - |mut counter, message| { - *counter.entry(message.value.clone()).or_default() += 1; // Use reference to message.value - counter - }, - ); - let max_value = counter.iter().max_by_key(|&(_, &v)| v); //.map(|(k, v)| (k, v)); - match max_value { - Some((key, value)) => debug!("We have a winner {:?} {:?} ", key, value), - None => warn!("Something is very wrong"), + // If we have stored round messages + if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) { + // Check the quorum size + if round_messages.len() >= self.config.quorum_size { + let counter = round_messages.values().fold( + HashMap::::new(), + |mut counter, message| { + *counter.entry(message.value.clone()).or_default() += 1; + counter + }, + ); + if let Some((data, count)) = counter.iter().max_by_key(|&(_, &v)| v) { + if *count >= self.config.quorum_size { + self.send_commit(data.clone()); } } } @@ -432,12 +453,17 @@ where if let Some(messages) = self.prepare_messages.get(&self.current_round) { // SEND commit if messages.len() >= self.config.quorum_size { - let _ = self.message_out.send(OutMessage::Commit(CommitMessage { - instance_id: self.instance_id(), - instance_height: self.instance_height, - round: self.current_round, - value: self.data.clone(), - })); + if let Some(data) = self.data.as_ref() { + let _ = self.message_out.send(OutMessage::Commit(CommitMessage { + instance_id: self.instance_id(), + instance_height: self.instance_height, + round: self.current_round, + value: data.clone(), + })); + } else { + // TODO: Change pattern so we don't emit this. + error!("Attempted to send empty data"); + } } } }