Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix: add debug for message_id and cert_id #485

Closed
wants to merge 11 commits into from
52 changes: 26 additions & 26 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches:
- main
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
types: [ opened, synchronize, reopened, ready_for_review ]

workflow_dispatch:

Expand All @@ -18,16 +18,16 @@ jobs:
name: Test documentation
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: ./.github/actions/install-rust
with:
toolchain: nightly
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID}}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY}}
- name: Checkout
uses: actions/checkout@v4
- uses: ./.github/actions/install-rust
with:
toolchain: nightly
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID}}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY}}

- name: Build Documentation
run: cargo +nightly doc --no-deps --all --all-features
- name: Build Documentation
run: cargo +nightly doc --no-deps --all --all-features

test_stable:
runs-on: ubuntu-latest-16-core
Expand Down Expand Up @@ -62,19 +62,19 @@ jobs:
env:
RUST_LOG: topos=warn

cert_delivery:
runs-on: ubuntu-latest-16-core
needs: [test_stable]
strategy:
fail-fast: true
matrix:
value: ["first", "second", "third"]
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/install-rust
with:
AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID}}
AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY}}
- run: cargo nextest run cert_delivery --locked --no-default-features
env:
RUST_LOG: topos=warn
# cert_delivery:
# runs-on: ubuntu-latest-16-core
# needs: [test_stable]
# strategy:
# fail-fast: true
# matrix:
# value: ["first", "second", "third"]
# steps:
# - uses: actions/checkout@v4
# - uses: ./.github/actions/install-rust
# with:
# AWS_ACCESS_KEY_ID: ${{ secrets.ROBOT_AWS_ACCESS_KEY_ID}}
# AWS_SECRET_ACCESS_KEY: ${{ secrets.ROBOT_AWS_SECRET_ACCESS_KEY}}
# - run: cargo nextest run cert_delivery --locked --no-default-features
# env:
# RUST_LOG: topos=warn
2 changes: 1 addition & 1 deletion crates/topos-certificate-spammer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ lazy_static::lazy_static! {
std::env::var("TOPOS_PROOF_SIZE_BYTES")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
.unwrap_or(1);

/// Dummy proof with specified size
static ref STARK_BLOB: Vec<u8> =
Expand Down
27 changes: 23 additions & 4 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
time::Duration,
};

use libp2p::gossipsub::MessageId;
use libp2p::swarm::{ConnectionClosed, FromSwarm};
use libp2p::PeerId;
use libp2p::{
Expand Down Expand Up @@ -43,18 +44,33 @@ impl Behaviour {
&mut self,
topic: &'static str,
message: Vec<u8>,
) -> Result<usize, &'static str> {
) -> Result<MessageId, &'static str> {
debug!("Publishing {} {}", message.len(), topic);
match topic {
TOPOS_GOSSIP => {
if let Ok(msg_id) = self.gossipsub.publish(IdentTopic::new(topic), message) {
debug!("Published on topos_gossip: {:?}", msg_id);
return Ok(msg_id);
}
}
TOPOS_ECHO | TOPOS_READY => self.pending.entry(topic).or_default().push_back(message),
_ => return Err("Invalid topic"),
TOPOS_ECHO | TOPOS_READY => {
let batch = Batch {
messages: vec![message],
};
if let Ok(msg_id) = self
.gossipsub
.publish(IdentTopic::new(topic), batch.encode_to_vec())
{
debug!("Published on topos_gossip: {:?}", msg_id);
return Ok(msg_id);
}
}
_ => {
return Err("Unknown topic");
}
}

Ok(0)
Err("Failed to publish")
}

pub fn subscribe(&mut self) -> Result<(), P2PError> {
Expand Down Expand Up @@ -231,6 +247,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_GOSSIP,
message: data,
source,
message_id,
},
)))
}
Expand All @@ -240,6 +257,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_ECHO,
message: data,
source,
message_id,
},
)))
}
Expand All @@ -249,6 +267,7 @@ impl NetworkBehaviour for Behaviour {
topic: TOPOS_READY,
message: data,
source,
message_id,
},
)))
}
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::future::BoxFuture;
use libp2p::gossipsub::MessageId;
use libp2p::PeerId;
use tokio::sync::{
mpsc::{self, error::SendError},
Expand Down Expand Up @@ -43,6 +44,7 @@ impl NetworkClient {
&self,
topic: &'static str,
message: T,
sender: oneshot::Sender<MessageId>,
) -> BoxFuture<'static, Result<(), SendError<Command>>> {
let network = self.sender.clone();

Expand All @@ -51,6 +53,7 @@ impl NetworkClient {
.send(Command::Gossip {
topic,
data: message.encode_to_vec(),
sender,
})
.await
})
Expand Down
2 changes: 2 additions & 0 deletions crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use libp2p::gossipsub::MessageId;
use std::fmt::Display;

use libp2p::PeerId;
Expand All @@ -15,6 +16,7 @@ pub enum Command {
Gossip {
topic: &'static str,
data: Vec<u8>,
sender: oneshot::Sender<MessageId>,
},

/// Ask for the creation of a new proxy connection for a gRPC query.
Expand Down
8 changes: 7 additions & 1 deletion crates/topos-p2p/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use libp2p::gossipsub::MessageId;
use libp2p::{identify, kad, PeerId};

use crate::behaviour::{grpc, HealthStatus};
Expand All @@ -10,6 +11,7 @@ pub enum GossipEvent {
source: Option<PeerId>,
topic: &'static str,
message: Vec<u8>,
message_id: MessageId,
},
}

Expand Down Expand Up @@ -50,7 +52,11 @@ impl From<void::Void> for ComposedEvent {
#[derive(Debug)]
pub enum Event {
/// An event emitted when a gossip message is received
Gossip { from: PeerId, data: Vec<u8> },
Gossip {
from: PeerId,
data: Vec<u8>,
message_id: MessageId,
},
/// An event emitted when the p2p layer becomes healthy
Healthy,
/// An event emitted when the p2p layer becomes unhealthy
Expand Down
18 changes: 12 additions & 6 deletions crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,19 @@ impl Runtime {
Command::Gossip {
topic,
data: message,
} => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) {
Ok(message_id) => {
debug!("Published message to {topic}");
P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc();
sender,
} => {
println!("Send to GossipSub: {topic}");
match self.swarm.behaviour_mut().gossipsub.publish(topic, message) {
Ok(message_id) => {
println!("Published message to {topic}");
debug!("Published message to {topic}");
P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc();
let _ = sender.send(message_id);
}
Err(err) => error!("Failed to publish message to {topic}: {err}"),
}
Err(err) => error!("Failed to publish message to {topic}: {err}"),
},
}
}
}
}
3 changes: 3 additions & 0 deletions crates/topos-p2p/src/runtime/handle_event/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl EventHandler<GossipEvent> for Runtime {
source: Some(source),
message,
topic,
message_id,
} = event
{
if self.event_sender.capacity() < *constants::CAPACITY_EVENT_STREAM_BUFFER {
Expand All @@ -34,6 +35,7 @@ impl EventHandler<GossipEvent> for Runtime {
.send(Event::Gossip {
from: source,
data: message,
message_id: message_id.clone(),
})
.await
{
Expand All @@ -53,6 +55,7 @@ impl EventHandler<GossipEvent> for Runtime {
.send(Event::Gossip {
from: source,
data: message,
message_id: message_id.clone(),
})
.await
{
Expand Down
8 changes: 4 additions & 4 deletions crates/topos-tce/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use topos_tce_storage::store::ReadStore;
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use topos_tce_storage::StorageClient;
use topos_tce_synchronizer::SynchronizerEvent;
// use topos_tce_synchronizer::SynchronizerEvent;
use tracing::{error, info, warn};

mod api;
Expand Down Expand Up @@ -94,7 +94,7 @@ impl AppContext {
mut network_stream: impl Stream<Item = NetEvent> + Unpin,
mut tce_stream: impl Stream<Item = ProtocolEvents> + Unpin,
mut api_stream: impl Stream<Item = ApiEvent> + Unpin,
mut synchronizer_stream: impl Stream<Item = SynchronizerEvent> + Unpin,
// mut synchronizer_stream: impl Stream<Item = SynchronizerEvent> + Unpin,
mut broadcast_stream: impl Stream<Item = CertificateDeliveredWithPositions> + Unpin,
shutdown: (CancellationToken, mpsc::Sender<()>),
) {
Expand Down Expand Up @@ -127,8 +127,8 @@ impl AppContext {
}

// Synchronizer events
Some(_event) = synchronizer_stream.next() => {
}
// Some(_event) = synchronizer_stream.next() => {
// }

// Shutdown signal
_ = shutdown.0.cancelled() => {
Expand Down
19 changes: 16 additions & 3 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ impl AppContext {
&evt
);

if let NetEvent::Gossip { data, from } = evt {
if let NetEvent::Gossip {
data,
from,
message_id,
} = evt
{
if let Ok(DoubleEchoRequest {
request: Some(double_echo_request),
}) = DoubleEchoRequest::decode(&data[..])
Expand All @@ -38,8 +43,8 @@ impl AppContext {
entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer());
}
info!(
"Received certificate {} from GossipSub from {}",
cert.id, from
"Received certificate {} from GossipSub from {} with message id: {}",
cert.id, from, message_id,
);

match self.validator_store.insert_pending_certificate(&cert).await {
Expand Down Expand Up @@ -110,6 +115,10 @@ impl AppContext {
signature: Some(signature),
validator_id: Some(validator_id),
}) => {
debug!(
"Received Echo message, certificate_id: {} with message id: {}",
certificate_id, message_id
);
let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
let certificate_id = certificate_id.clone().try_into().map_err(|e| {
Expand Down Expand Up @@ -156,6 +165,10 @@ impl AppContext {
signature: Some(signature),
validator_id: Some(validator_id),
}) => {
debug!(
"Received Ready message, certificate_id: {} with message id: {}",
certificate_id, message_id
);
let channel = self.tce_cli.get_double_echo_channel();
spawn(async move {
let certificate_id = certificate_id.clone().try_into().map_err(|e| {
Expand Down
Loading
Loading