Skip to content

Commit

Permalink
leverage digest 2
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 19, 2024
1 parent de1a386 commit f1fed82
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 100 deletions.
82 changes: 35 additions & 47 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::collections::HashSet;

use tokio::time::Instant;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, VersionedValue};
use crate::{serialize::*, Version};
use crate::{ChitchatId, VersionedValue};

/// A delta is the message we send to another node to update it.
///
Expand Down Expand Up @@ -32,7 +32,7 @@ impl Delta {
let node_deltas = self.node_deltas.iter().flat_map(|node_delta| {
std::iter::once(DeltaOpRef::Node {
chitchat_id: &node_delta.chitchat_id,
heartbeat: node_delta.heartbeat,
last_gc_version: node_delta.last_gc_version,
})
.chain(node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue {
Expand All @@ -49,7 +49,7 @@ enum DeltaOp {
NodeToReset(ChitchatId),
Node {
chitchat_id: ChitchatId,
heartbeat: Heartbeat,
last_gc_version: Version,
},
KeyValue {
key: String,
Expand All @@ -61,7 +61,7 @@ enum DeltaOpRef<'a> {
NodeToReset(&'a ChitchatId),
Node {
chitchat_id: &'a ChitchatId,
heartbeat: Heartbeat,
last_gc_version: Version,
},
KeyValue {
key: &'a str,
Expand Down Expand Up @@ -108,10 +108,10 @@ impl Deserializable for DeltaOp {
}
DeltaOpTag::Node => {
let chitchat_id = ChitchatId::deserialize(buf)?;
let heartbeat = Heartbeat::deserialize(buf)?;
let last_gc_version = Version::deserialize(buf)?;
Ok(DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version,
})
}
DeltaOpTag::KeyValue => {
Expand Down Expand Up @@ -139,10 +139,10 @@ impl DeltaOp {
match self {
DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version,
} => DeltaOpRef::Node {
chitchat_id,
heartbeat: *heartbeat,
last_gc_version: *last_gc_version,
},
DeltaOp::KeyValue {
key,
Expand Down Expand Up @@ -171,11 +171,11 @@ impl<'a> Serializable for DeltaOpRef<'a> {
match self {
Self::Node {
chitchat_id,
heartbeat,
last_gc_version,
} => {
buf.push(DeltaOpTag::Node.into());
chitchat_id.serialize(buf);
heartbeat.serialize(buf);
last_gc_version.serialize(buf);
}
Self::KeyValue {
key,
Expand All @@ -198,8 +198,8 @@ impl<'a> Serializable for DeltaOpRef<'a> {
1 + match self {
Self::Node {
chitchat_id,
heartbeat,
} => chitchat_id.serialized_len() + heartbeat.serialized_len(),
last_gc_version,
} => chitchat_id.serialized_len() + last_gc_version.serialized_len(),
Self::KeyValue {
key,
versioned_value,
Expand Down Expand Up @@ -252,14 +252,14 @@ impl Delta {
.sum()
}

pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) {
pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) {
assert!(!self
.node_deltas
.iter()
.any(|node_delta| { node_delta.chitchat_id == chitchat_id }));
self.node_deltas.push(NodeDelta {
chitchat_id,
heartbeat,
last_gc_version,
key_values: Vec::new(),
});
}
Expand Down Expand Up @@ -306,8 +306,7 @@ impl Delta {
#[derive(Debug, Eq, PartialEq, serde::Serialize)]
pub(crate) struct NodeDelta {
pub chitchat_id: ChitchatId,
// REMOVE ME
pub heartbeat: Heartbeat,
pub last_gc_version: Version,
pub key_values: Vec<(String, VersionedValue)>,
}

Expand Down Expand Up @@ -336,14 +335,14 @@ impl DeltaBuilder {
match op {
DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version
} => {
self.flush();
anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id));
self.existing_nodes.insert(chitchat_id.clone());
self.current_node_delta = Some(NodeDelta {
chitchat_id,
heartbeat,
last_gc_version,
key_values: Vec::new(),
});
}
Expand Down Expand Up @@ -442,10 +441,10 @@ impl DeltaSerializer {
}

/// Returns false if the node could not be added because the payload would exceed the mtu.
pub fn try_add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) -> bool {
pub fn try_add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) -> bool {
let new_node_op = DeltaOp::Node {
chitchat_id,
heartbeat,
last_gc_version,
};
self.try_add_op(new_node_op)
}
Expand All @@ -472,9 +471,8 @@ mod tests {

// ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len().
let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node).
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes: 2 bytes (key length) + 5 bytes (key) + 7 bytes (values) + 8 bytes (version) +
// 1 bytes (empty tombstone).
Expand All @@ -498,9 +496,8 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +37 bytes
assert!(delta_writer.try_add_node(node2, heartbeat));
assert!(delta_writer.try_add_node(node2, 0));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand Down Expand Up @@ -530,9 +527,8 @@ mod tests {

// ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len().
let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 27 bytes (node) + 2bytes (block length)
assert!(delta_writer.try_add_node(node1, heartbeat));
// +37 bytes = 8 bytes (last gc version) + 27 bytes (node) + 2bytes (block length)
assert!(delta_writer.try_add_node(node1, 0));

// +24 bytes (kv + op tag)
assert!(delta_writer.try_add_kv(
Expand All @@ -555,9 +551,8 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node).
assert!(delta_writer.try_add_node(node2, heartbeat));
// +37 bytes = 8 bytes (last gc version) + 2 bytes (empty node delta) + 27 bytes (node).
assert!(delta_writer.try_add_node(node2, 0));
test_aux_delta_writer(delta_writer, 80);
}

Expand All @@ -577,11 +572,10 @@ mod tests {
assert!(delta_writer.try_add_node_to_reset(ChitchatId::for_local_test(10_000)));

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);

// +8 bytes (heartbeat) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic new
// +8 bytes (last gc version) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic new
// block) = 71
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes (kv) + 1 (op tag)
// = 95
Expand All @@ -605,10 +599,9 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +8 bytes (heartbeat) + 27 bytes (ChitchatId) + 1 byte (op tag)
// +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag)
// = 155
assert!(delta_writer.try_add_node(node2, heartbeat));
assert!(delta_writer.try_add_node(node2, 0u64));
// The block got compressed.
test_aux_delta_writer(delta_writer, 85);
}
Expand All @@ -619,9 +612,8 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(100);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand All @@ -643,9 +635,8 @@ mod tests {
));

let node2 = ChitchatId::for_local_test(10_002);
let heartbeat = Heartbeat(0);
// +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId).
assert!(!delta_writer.try_add_node(node2, heartbeat));
assert!(!delta_writer.try_add_node(node2, 0u64));

// The block got compressed.
test_aux_delta_writer(delta_writer, 72);
Expand All @@ -657,9 +648,8 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(100);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
// +37 bytes.
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes.
assert!(delta_writer.try_add_kv(
Expand Down Expand Up @@ -693,11 +683,10 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(100);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);

// + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag)
// = 40
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

// +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag)
// = 67
Expand Down Expand Up @@ -729,8 +718,7 @@ mod tests {
let mut delta_writer = DeltaSerializer::with_mtu(62);

let node1 = ChitchatId::for_local_test(10_001);
let heartbeat = Heartbeat(0);
assert!(delta_writer.try_add_node(node1, heartbeat));
assert!(delta_writer.try_add_node(node1, 0u64));

assert!(delta_writer.try_add_kv(
"key11",
Expand Down
6 changes: 3 additions & 3 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Chitchat {

/// Digest contains important information about the list of members in
/// the cluster.
fn process_digest(&mut self, digest: &Digest) {
fn report_heartbeats_in_digest(&mut self, digest: &Digest) {
for (chitchat_id, node_digest) in &digest.node_digests {
self.report_heartbeat(chitchat_id, node_digest.heartbeat);
}
Expand All @@ -117,7 +117,7 @@ impl Chitchat {
);
return Some(ChitchatMessage::BadCluster);
}
self.process_digest(&digest);
self.report_heartbeats_in_digest(&digest);
let scheduled_for_deletion: HashSet<_> =
self.scheduled_for_deletion_nodes().collect();
let self_digest = self.compute_digest(&scheduled_for_deletion);
Expand All @@ -133,7 +133,7 @@ impl Chitchat {
})
}
ChitchatMessage::SynAck { digest, delta } => {
self.process_digest(&digest);
self.report_heartbeats_in_digest(&digest);
self.process_delta(delta);
let scheduled_for_deletion =
self.scheduled_for_deletion_nodes().collect::<HashSet<_>>();
Expand Down
4 changes: 2 additions & 2 deletions chitchat/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ mod tests {
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
delta.add_node(node.clone(), 0u64);
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
Expand All @@ -186,7 +186,7 @@ mod tests {
let mut delta = Delta::default();
let node = ChitchatId::for_local_test(10_001);
// +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat).
delta.add_node(node.clone(), Heartbeat(0));
delta.add_node(node.clone(), 0u64);
// +29 bytes.
delta.add_kv(&node, "key", "value", 0, true);
delta.set_serialized_len(62);
Expand Down
11 changes: 8 additions & 3 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,12 @@ impl Server {
.dead_nodes()
.map(|chitchat_id| chitchat_id.gossip_advertise_addr)
.collect::<HashSet<_>>();
let seed_nodes: HashSet<SocketAddr> = chitchat_guard.seed_nodes();
let seed_nodes: HashSet<SocketAddr> = chitchat_guard.seed_nodes()
.into_iter()
.filter(|addr| {
*addr != chitchat_guard.self_chitchat_id().gossip_advertise_addr
})
.collect();
let (selected_nodes, random_dead_node_opt, random_seed_node_opt) = select_nodes_for_gossip(
&mut self.rng,
peer_nodes,
Expand Down Expand Up @@ -610,8 +615,8 @@ mod tests {
};

let node_delta = delta.get(&server_id).unwrap();
let heartbeat = node_delta.heartbeat;
assert_eq!(heartbeat, Heartbeat(5));
let heartbeat = node_delta.last_gc_version;
assert_eq!(heartbeat, 0u64);

server_handle.shutdown().await.unwrap();
}
Expand Down
Loading

0 comments on commit f1fed82

Please sign in to comment.