Skip to content

Commit

Permalink
logic mostly complete
Browse files Browse the repository at this point in the history
  • Loading branch information
jking-aus committed Oct 2, 2024
2 parents b833c17 + ec8b2bb commit c25d8f8
Showing 1 changed file with 111 additions and 83 deletions.
194 changes: 111 additions & 83 deletions anchor/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{debug, warn};
use tracing::{debug, error, warn};

mod config;
mod error;

#[cfg(test)]
mod tests;
// #[cfg(test)]
// mod tests;

type ValidationId = usize;
type Round = usize;
Expand All @@ -23,16 +23,16 @@ type MessageKey = Round;
pub struct Qbft<F, D>
where
F: LeaderFunction + Clone,
D: Debug + Default + Clone + Eq + Hash,
D: Debug + Clone + Eq + Hash,
{
config: Config<F>,
instance_height: usize,
current_round: usize,
data: D,
data: Option<D>,
/// ID used for tracking validation of messages
current_validation_id: usize,
_current_validation_id: usize,
/// Hashmap of validations that have been sent to the processor
inflight_validations: HashMap<ValidationId, ValidationMessage<D>>, // TODO: Potentially unbounded
_inflight_validations: HashMap<ValidationId, ValidationMessage<D>>, // TODO: Potentially unbounded
/// The messages received this round that we have collected to reach quorum
prepare_messages: HashMap<Round, HashMap<InstanceId, PrepareMessage<D>>>,
commit_messages: HashMap<Round, HashMap<InstanceId, CommitMessage<D>>>,
Expand All @@ -50,7 +50,7 @@ where
// Messages that can be received from the message_in channel
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum InMessage<D: Debug + Default + Clone + Eq + Hash> {
pub enum InMessage<D: Debug + Clone + Eq + Hash> {
/// A request for data to form consensus on if we are the leader.
RecvData(GetData<D>),
/// A PROPOSE message to be sent on the network.
Expand All @@ -66,7 +66,7 @@ pub enum InMessage<D: Debug + Default + Clone + Eq + Hash> {
/// Messages that may be sent to the message_out channel from the instance to the client processor
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum OutMessage<D: Debug + Default + Clone + Eq + Hash> {
pub enum OutMessage<D: Debug + Clone + Eq + Hash> {
/// A request for data to form consensus on if we are the leader.
GetData(GetData<D>),
/// A PROPOSE message to be sent on the network.
Expand All @@ -84,7 +84,7 @@ pub enum OutMessage<D: Debug + Default + Clone + Eq + Hash> {
#[allow(dead_code)]
#[derive(Debug, Clone)]

pub struct RoundChange<D: Debug + Default + Clone + Eq + Hash> {
pub struct RoundChange<D: Debug + Clone + Eq + Hash> {
instance_id: usize,
instance_height: usize,
round: usize,
Expand All @@ -93,16 +93,15 @@ pub struct RoundChange<D: Debug + Default + Clone + Eq + Hash> {

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct GetData<D: Debug + Default + Clone + Eq + Hash> {
pub struct GetData<D: Debug + Clone + Eq + Hash> {
instance_id: usize,
instance_height: usize,
round: usize,
value: D,
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct ProposeMessage<D: Debug + Default + Clone + Eq + Hash> {
pub struct ProposeMessage<D: Debug + Clone + Eq + Hash> {
instance_id: usize,
instance_height: usize,
round: usize,
Expand All @@ -111,7 +110,7 @@ pub struct ProposeMessage<D: Debug + Default + Clone + Eq + Hash> {

#[allow(dead_code)]
#[derive(Debug, Clone, Default)]
pub struct PrepareMessage<D: Debug + Default + Clone + Eq + Hash> {
pub struct PrepareMessage<D: Debug + Clone + Eq + Hash> {
instance_id: usize,
instance_height: usize,
round: usize,
Expand All @@ -120,7 +119,7 @@ pub struct PrepareMessage<D: Debug + Default + Clone + Eq + Hash> {

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct CommitMessage<D: Debug + Default + Clone + Eq + Hash> {
pub struct CommitMessage<D: Debug + Clone + Eq + Hash> {
instance_id: usize,
instance_height: usize,
round: usize,
Expand All @@ -129,7 +128,7 @@ pub struct CommitMessage<D: Debug + Default + Clone + Eq + Hash> {

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct ValidationMessage<D: Debug + Default + Clone + Eq + Hash> {
pub struct ValidationMessage<D: Debug + Clone + Eq + Hash> {
instance_id: ValidationId,
instance_height: usize,
round: usize,
Expand Down Expand Up @@ -172,7 +171,7 @@ pub enum Completed<D> {
impl<F, D> Qbft<F, D>
where
F: LeaderFunction + Clone,
D: Debug + Default + Clone + Eq + Hash + Hash + Eq,
D: Debug + Clone + Eq + Hash + Hash + Eq,
{
pub fn new(
config: Config<F>,
Expand All @@ -189,9 +188,9 @@ where
current_round: config.round,
instance_height: config.instance_height,
config,
current_validation_id: 0,
data: D::default(),
inflight_validations: HashMap::with_capacity(100),
_current_validation_id: 0,
_inflight_validations: HashMap::with_capacity(100),
data: None,
prepare_messages: HashMap::with_capacity(estimated_map_size),
commit_messages: HashMap::with_capacity(estimated_map_size),
round_change_messages: HashMap::with_capacity(estimated_map_size),
Expand Down Expand Up @@ -227,7 +226,6 @@ where

None => { }// Channel is closed
//_ => {}
// TODO: FILL THESE IN
}
}
_ = round_end.tick() => {
Expand All @@ -245,7 +243,7 @@ where
}

fn set_data(&mut self, data: D) {
self.data = data;
self.data = Some(data);
}

fn instance_id(&self) -> usize {
Expand All @@ -255,8 +253,8 @@ where
self.config.committee_members.clone()
}

fn validate_data(&self, _data: D) -> bool {
true
fn validate_data(&self, data: D) -> Option<D> {
Some(data)
}

fn send_message(&mut self, message: OutMessage<D>) {
Expand All @@ -283,58 +281,94 @@ where
) {
debug!("ID{}: believes they are the leader", self.instance_id());

self.send_message(OutMessage::GetData(GetData {
instance_id: self.instance_id(),
instance_height: self.instance_height,
round: self.current_round,
value: self.data.clone(),
}));
if let Some(data) = self.data.as_ref() {
self.send_message(OutMessage::GetData(GetData {
instance_id: self.instance_id(),
round: self.current_round,
value: data.clone(),
}));
} else {
error!("Attempted to send empty data");
}
};
}

/// Received data in order to come to consensus on.
///
/// This function validates the data and then sends a proposal to our peers.
fn received_data(&mut self, message: GetData<D>) {
// Check that we are the leader to make sure this is a timely response, for whilst we are
// still the leader
if self.config.leader_fn.leader_function(
self.instance_id(),
self.current_round,
self.instance_height,
self.config.committee_size,
) && self.instance_height == message.instance_height
&& self.validate_data(message.value.clone())
)
// I dont think we need instance height
// here
{
self.set_data(message.value.clone());
self.send_proposal();
self.send_prepare(message.value.clone());
};
if let Some(data) = self.validate_data(message.value) {
self.set_data(data.clone());
self.send_proposal(data.clone());
self.send_prepare(data);
}
}
}

fn send_proposal(&mut self) {
/// Comment
fn send_proposal(&mut self, data: D) {
self.send_message(OutMessage::Propose(ProposeMessage {
instance_id: self.instance_id(),
instance_height: self.instance_height,
round: self.current_round,
value: self.data.clone(),
value: data,
}));
}

fn send_prepare(&mut self, data: D) {
let _ = self.message_out.send(OutMessage::Prepare(PrepareMessage {
let _ = self.send_message(OutMessage::Prepare(PrepareMessage {
instance_id: self.instance_id(),
instance_height: self.instance_height,
round: self.current_round,
value: data.clone(),
}));
//And store a prepare locally
let instance_id = self.instance_id();
let operator_id = self.instance_id();
self.prepare_messages
.entry(self.current_round)
.or_default()
.insert(
instance_id,
operator_id,
PrepareMessage {
instance_id,
instance_id: operator_id,
instance_height: self.instance_height,
round: self.current_round,
value: data.clone(),
value: data,
},
);
}

// TODO: DO this
fn send_commit(&mut self, data: D) {
let _ = self.send_message(OutMessage::Prepare(PrepareMessage {
instance_id: self.instance_id(),
instance_height: self.instance_height,
round: self.current_round,
value: data.clone(),
}));
//And store a prepare locally
let operator_id = self.instance_id();
self.prepare_messages
.entry(self.current_round)
.or_default()
.insert(
operator_id,
PrepareMessage {
instance_id: operator_id,
instance_height: self.instance_height,
round: self.current_round,
value: data,
},
);
}
Expand All @@ -343,23 +377,26 @@ where
/// 1. Check the proposer is valid and who we expect
/// 2. Check that the proposal is valid and we agree on the value
fn received_propose(&mut self, propose_message: ProposeMessage<D>) {
//Check if proposal is from the leader we expect
// Check if proposal is from the leader we expect
if self.config.leader_fn.leader_function(
propose_message.instance_id,
self.current_round,
self.instance_height,
self.config.committee_size,
) {
) && self
.committee_members()
.contains(&propose_message.instance_id)
{
let self_instance_id = self.instance_id();
debug!(
"ID {}: Proposal is from round leader with ID {}",
self_instance_id, propose_message.instance_id,
);
// Validate the proposal with a local function that is is passed in frm the config
// similar to the leaderfunction for now return bool -> true
if self.validate_data(propose_message.value.clone()) {
if let Some(data) = self.validate_data(propose_message.value) {
// If of valid type, send prepare
self.send_prepare(propose_message.value)
self.send_prepare(data)
}
}
}
Expand All @@ -368,11 +405,11 @@ where
/// If we have reached quorum then send a commit
/// Otherwise store the prepare and wait for quorum.
fn received_prepare(&mut self, prepare_message: PrepareMessage<D>) {
// Check if the prepare message is from the committee
// Check if the prepare message is from the committee and the data is valid
if self
.committee_members()
.contains(&prepare_message.instance_id)
&& self.validate_data(prepare_message.value.clone())
&& self.validate_data(prepare_message.value.clone()).is_some()
{
//TODO: check the prepare message contains correct struct of data

Expand All @@ -390,34 +427,20 @@ where
);
}

// Check unique prepare messages for >= quorum
if self.prepare_messages.len() >= self.config.quorum_size {
// TODO: Kingy -> Look at this

/*
if self.prepare_messages.get(prepare_message.round).map(|sub_hashmap| sub_hashmap.len() >= self.config.quorum_size).unwrap_or(false) {
if k.0 == self.current_round
{
if let Some(messages) = self.prepare_messages.get(&k){
this_round_prepares.push(messages);
}
}
}
*/
if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) {
// Check the quorum size
if round_messages.len() >= self.config.quorum_size {
let counter = round_messages.values().fold(
HashMap::<D, usize>::new(),
|mut counter, message| {
*counter.entry(message.value.clone()).or_default() += 1; // Use reference to message.value
counter
},
);
let max_value = counter.iter().max_by_key(|&(_, &v)| v); //.map(|(k, v)| (k, v));
match max_value {
Some((key, value)) => debug!("We have a winner {:?} {:?} ", key, value),
None => warn!("Something is very wrong"),
// If we have stored round messages
if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) {
// Check the quorum size
if round_messages.len() >= self.config.quorum_size {
let counter = round_messages.values().fold(
HashMap::<D, usize>::new(),
|mut counter, message| {
*counter.entry(message.value.clone()).or_default() += 1;
counter
},
);
if let Some((data, count)) = counter.iter().max_by_key(|&(_, &v)| v) {
if *count >= self.config.quorum_size {
self.send_commit(data.clone());
}
}
}
Expand All @@ -428,12 +451,17 @@ where
if let Some(messages) = self.prepare_messages.get(&self.current_round) {
// SEND commit
if messages.len() >= self.config.quorum_size {
let _ = self.message_out.send(OutMessage::Commit(CommitMessage {
instance_id: self.instance_id(),
instance_height: self.instance_height,
round: self.current_round,
value: self.data.clone(),
}));
if let Some(data) = self.data.as_ref() {
let _ = self.message_out.send(OutMessage::Commit(CommitMessage {
instance_id: self.instance_id(),
instance_height: self.instance_height,
round: self.current_round,
value: data.clone(),
}));
} else {
// TODO: Change pattern so we don't emit this.
error!("Attempted to send empty data");
}
}
}
}
Expand Down

0 comments on commit c25d8f8

Please sign in to comment.