Skip to content

Commit

Permalink
fix: DOG behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
BastienFaivre committed Jan 29, 2025
1 parent 4f572ca commit 07b0a45
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 127 deletions.
40 changes: 20 additions & 20 deletions dog/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub struct Behaviour<D = IdentityTransform> {
redundancy_interval: Delay,
redundancy_controller: Controller,
router: Router,
cache: DuplicateCache<TransactionId>,
cache: DuplicateCache<TransactionId, PeerId>,
metrics: Option<Metrics>,
}

Expand Down Expand Up @@ -239,7 +239,8 @@ where

tracing::trace!("Publishing transaction");

self.cache.insert(tx_id.clone());
self.cache
.insert(tx_id.clone(), self.publish_config.get_own_id());

if let Some(m) = self.metrics.as_mut() {
m.set_txs_cache_size(self.cache.len());
Expand Down Expand Up @@ -376,10 +377,8 @@ where
) -> bool {
tracing::debug!(transaction=%transaction_id, "Forwarding transaction");

// TODO: should remove peers that already received the transaction
// TODO: see RawTransaction TODOs
let recipient_peers = self.router.filter_valid_routes(
raw_transaction.from,
*propagation_source,
self.connected_peers
.keys()
.filter(|&peer| peer != propagation_source && peer != &raw_transaction.from)
Expand Down Expand Up @@ -477,7 +476,7 @@ where

// TODO: validate transaction if needed

if !self.cache.insert(tx_id.clone()) {
if !self.cache.insert(tx_id.clone(), *propagation_source) {
tracing::debug!(transaction=%tx_id, "Transaction already received, ignoring");

if let Some(m) = self.metrics.as_mut() {
Expand All @@ -492,12 +491,7 @@ where

tracing::debug!(peer=%propagation_source, "Sending HaveTx to peer");

if self.send_transaction(
*propagation_source,
RpcOut::HaveTx(HaveTx {
from: transaction.from,
}),
) {
if self.send_transaction(*propagation_source, RpcOut::HaveTx(HaveTx { tx_id })) {
self.router.register_have_tx_sent(*propagation_source);
self.redundancy_controller.block_have_tx();

Expand Down Expand Up @@ -539,11 +533,17 @@ where
}
}

fn handle_have_tx(&mut self, froms: Vec<PeerId>, propagation_source: &PeerId) {
tracing::debug!(peer=%propagation_source, "Disabling {} routes to peer", froms.len());
fn handle_have_tx(&mut self, tx_ids: Vec<TransactionId>, propagation_source: &PeerId) {
tracing::debug!(peer=%propagation_source, "Received HaveTx from peer with {} transaction ids", tx_ids.len());

for from in froms {
self.router.disable_route(from, *propagation_source);
for tx_id in tx_ids {
if let Some(source) = self.cache.get(&tx_id) {
if *source == *propagation_source || *source == self.publish_config.get_own_id() {
continue;
}
tracing::debug!(peer=%propagation_source, "Disabling route from {} to peer", source);
self.router.disable_route(*source, *propagation_source);
}
}

self.events
Expand Down Expand Up @@ -695,20 +695,20 @@ where
}

// Handle control messages
let mut have_tx_froms = Vec::new();
let mut have_tx_ids = Vec::new();
let mut reset_route = false;
for control_msg in rpc.control_msgs {
match control_msg {
ControlAction::HaveTx(have_tx) => {
have_tx_froms.push(have_tx.from);
have_tx_ids.push(have_tx.tx_id);
}
ControlAction::ResetRoute(_) => {
reset_route = true;
}
}
}
if !have_tx_froms.is_empty() {
self.handle_have_tx(have_tx_froms, &propagation_source);
if !have_tx_ids.is_empty() {
self.handle_have_tx(have_tx_ids, &propagation_source);
}
if reset_route {
self.handle_reset_route(&propagation_source);
Expand Down
8 changes: 4 additions & 4 deletions dog/src/generated/dog/pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ impl MessageWrite for ControlMessage {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct ControlHaveTx {
pub from: Vec<u8>,
pub tx_id: Vec<u8>,
}

impl<'a> MessageRead<'a> for ControlHaveTx {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.from = r.read_bytes(bytes)?.to_owned(),
Ok(10) => msg.tx_id = r.read_bytes(bytes)?.to_owned(),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
Expand All @@ -156,11 +156,11 @@ impl<'a> MessageRead<'a> for ControlHaveTx {
impl MessageWrite for ControlHaveTx {
fn get_size(&self) -> usize {
0
+ if self.from.is_empty() { 0 } else { 1 + sizeof_len((&self.from).len()) }
+ if self.tx_id.is_empty() { 0 } else { 1 + sizeof_len((&self.tx_id).len()) }
}

fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if !self.from.is_empty() { w.write_with_tag(10, |w| w.write_bytes(&**&self.from))?; }
if !self.tx_id.is_empty() { w.write_with_tag(10, |w| w.write_bytes(&**&self.tx_id))?; }
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion dog/src/generated/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ message ControlMessage {
}

message ControlHaveTx {
bytes from = 1;
bytes tx_id = 1;
}

message ControlResetRoute {}
50 changes: 28 additions & 22 deletions dog/src/time_cache.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,57 @@
use fnv::FnvHashMap;
use std::{collections::VecDeque, time::Duration};

use fnv::FnvHashSet;
use web_time::Instant;

struct ExpiringValues<Element> {
value: Element,
struct ExpiringEntry<K> {
key: K,
expiration: Instant,
}

pub(crate) struct DuplicateCache<T> {
pub(crate) struct DuplicateCache<K, V> {
/// Size of the cache.
len: usize,
/// Set of values in the cache.
values: FnvHashSet<T>,
/// List of values in order of expiration.
list: VecDeque<ExpiringValues<T>>,
/// Map of keys to values in the cache.
values: FnvHashMap<K, V>,
/// List of keys in order of expiration.
list: VecDeque<ExpiringEntry<K>>,
/// The time values remain in the cache.
ttl: Duration,
}

impl<T> DuplicateCache<T>
impl<K, V> DuplicateCache<K, V>
where
T: Eq + std::hash::Hash + Clone,
K: Eq + std::hash::Hash + Clone,
V: Clone,
{
pub(crate) fn new(ttl: Duration) -> Self {
DuplicateCache {
len: 0,
values: FnvHashSet::default(),
values: FnvHashMap::default(),
list: VecDeque::new(),
ttl,
}
}

fn remove_expired_values(&mut self, now: Instant) {
while let Some(element) = self.list.pop_front() {
if element.expiration > now {
self.list.push_front(element);
while let Some(entry) = self.list.pop_front() {
if entry.expiration > now {
self.list.push_front(entry);
break;
}
self.len -= 1;
self.values.remove(&element.value);
self.values.remove(&entry.key);
}
}

pub(crate) fn insert(&mut self, value: T) -> bool {
pub(crate) fn insert(&mut self, key: K, value: V) -> bool {
let now = Instant::now();
self.remove_expired_values(now);
if self.values.insert(value.clone()) {

if !self.values.contains_key(&key) {
self.values.insert(key.clone(), value);
self.len += 1;
self.list.push_back(ExpiringValues {
value,
self.list.push_back(ExpiringEntry {
key,
expiration: now + self.ttl,
});
true
Expand All @@ -58,8 +60,12 @@ where
}
}

pub(crate) fn contains(&self, value: &T) -> bool {
self.values.contains(value)
pub(crate) fn contains(&self, key: &K) -> bool {
self.values.contains_key(key)
}

pub(crate) fn get(&self, key: &K) -> Option<&V> {
self.values.get(key)
}

pub(crate) fn len(&self) -> usize {
Expand Down
12 changes: 6 additions & 6 deletions dog/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,30 +125,30 @@ pub enum ControlAction {

#[derive(Debug, Clone)]
pub struct HaveTx {
pub from: PeerId,
pub tx_id: TransactionId,
}

impl TryFrom<proto::ControlHaveTx> for HaveTx {
type Error = ParseError;

fn try_from(have_tx: proto::ControlHaveTx) -> Result<Self, Self::Error> {
PeerId::from_bytes(&have_tx.from)
.map(|from| HaveTx { from })
.map_err(|err| err)
Ok(HaveTx {
tx_id: TransactionId::new(&have_tx.tx_id),
})
}
}

impl From<HaveTx> for proto::ControlHaveTx {
fn from(have_tx: HaveTx) -> Self {
proto::ControlHaveTx {
from: have_tx.from.to_bytes(),
tx_id: have_tx.tx_id.0,
}
}
}

impl std::fmt::Display for HaveTx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HaveTx {{ from: {} }}", self.from)
write!(f, "HaveTx {{ tx_id: {} }}", self.tx_id)
}
}

Expand Down
77 changes: 3 additions & 74 deletions dog/tests/tests/dog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,11 @@ pub async fn n_nodes_aligned() {
}
}

// Testing that a node receiving the same transaction from different nodes will request eventually
// request all of them except one to stop sending it.
// We consider a random network of size N with target redundancy set to 0.0.
// Each node will publish transactions at constant intervals. We expect that after a certain amount
// of time, the routing status of the network will be stable. Moreover, we expect that, for each
// transaction, there is a single associated tree-like route from the source (root) to all other
// nodes (leaves).
// Testing that everyone receives the transactions from a node in a random network
// with default redundancy
#[tokio::test]
pub async fn random_network_no_redundancy() {
pub async fn random_network() {
let config = libp2p_dog::ConfigBuilder::default()
// We force the nodes to remove any redundancy
.target_redundancy(0.0)
.redundancy_delta_percent(0)
// Speed up have_tx unblocking
.redundancy_interval(Duration::from_millis(10))
// Disable signature to speed up the test
.validation_mode(libp2p_dog::ValidationMode::None)
.build()
Expand Down Expand Up @@ -201,67 +191,6 @@ pub async fn random_network_no_redundancy() {
expected.remove(index);
}
}

// Verify that no reset route messages have been sent
for (_, routing_updates) in events.iter() {
for (j, routes) in routing_updates.iter().enumerate() {
if j == 0 {
continue;
}

assert!(routes.len() > routing_updates[j - 1].len());
}
}

// Build the directed graph of the network
let mut base_adjency_list: Vec<Vec<usize>> = vec![Vec::new(); N];
for i in 0..N {
for j in bootstrap_sets[i].iter() {
base_adjency_list[i].push(*j);
}
}

let peer_id_to_index = |peer_id: &libp2p::PeerId| -> usize {
peer_ids.iter().position(|id| id == peer_id).unwrap()
};

for i in 0..N {
let mut i_adjency_list = base_adjency_list.clone();

for (j, (_, routing_updates)) in events.iter().enumerate() {
match routing_updates.last() {
Some(routes) => {
for route in routes.iter().filter(|r| r.source() == &peer_ids[i]) {
i_adjency_list[j]
.retain(|target| *target != peer_id_to_index(route.target()));
}
}
None => {
continue;
}
};
}

let mut visited = vec![false; N];
let mut stack = vec![(i, i)]; // (node, parent)
while let Some((node, parent)) = stack.pop() {
visited[node] = true;
for neighbor in i_adjency_list[node].iter() {
if *neighbor == parent {
// A -> B and B -> A is not considered as a cycle
continue;
}
if visited[*neighbor] {
panic!("Cycle detected between nodes {} and {}", node, *neighbor);
}
stack.push((*neighbor, node));
}
}

for visited in visited.iter() {
assert!(*visited);
}
}
}

// Testing that a node will request to reset a route previously blocked by a have_tx message if
Expand Down

0 comments on commit 07b0a45

Please sign in to comment.