Skip to content

Commit

Permalink
feat: count and time transactions metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
BastienFaivre committed Jan 21, 2025
1 parent cb74520 commit 405e106
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 60 deletions.
24 changes: 0 additions & 24 deletions benchmark/code/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,48 +13,24 @@ pub(crate) const GOSSIPSUB_TOPIC_STR: &str = "benchmark";

const MAX_TRANSMIT_SIZE: usize = 4 * 1024 * 1024; // 4 MiB

#[cfg(feature = "debug")]
#[derive(Debug)]
pub(crate) enum NetworkEvent {
Dog(libp2p_dog::Event),
Gossipsub(gossipsub::Event),
}

#[cfg(not(feature = "debug"))]
#[derive(Debug)]
pub(crate) enum NetworkEvent {
Dog,
Gossipsub,
}

#[cfg(feature = "debug")]
impl From<libp2p_dog::Event> for NetworkEvent {
fn from(event: libp2p_dog::Event) -> Self {
Self::Dog(event)
}
}

#[cfg(not(feature = "debug"))]
impl From<libp2p_dog::Event> for NetworkEvent {
fn from(_: libp2p_dog::Event) -> Self {
Self::Dog
}
}

#[cfg(feature = "debug")]
impl From<gossipsub::Event> for NetworkEvent {
fn from(event: gossipsub::Event) -> Self {
Self::Gossipsub(event)
}
}

#[cfg(not(feature = "debug"))]
impl From<gossipsub::Event> for NetworkEvent {
fn from(_: gossipsub::Event) -> Self {
Self::Gossipsub
}
}

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "NetworkEvent")]
pub(crate) struct Behaviour {
Expand Down
46 changes: 25 additions & 21 deletions benchmark/code/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,61 @@
#[cfg(feature = "debug")]
use libp2p::{
gossipsub,
swarm::{self, SwarmEvent},
};

#[cfg(feature = "debug")]
use crate::{
behaviour::{self, NetworkEvent},
config::Config,
metrics::Metrics,
};

#[cfg(feature = "debug")]
async fn handle_dog_event(
event: libp2p_dog::Event,
_swarm: &mut swarm::Swarm<behaviour::Behaviour>,
_config: &Config,
metrics: &mut Metrics,
) {
tracing::info!("Dog event: {:?}", event);
match event {
libp2p_dog::Event::Transaction { transaction_id, .. } => {
metrics.add_delivered(
transaction_id.0,
std::time::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64,
);
}
_ => {}
}
}

#[cfg(feature = "debug")]
async fn handle_gossipsub_event(
event: gossipsub::Event,
_swarm: &mut swarm::Swarm<behaviour::Behaviour>,
_config: &Config,
metrics: &mut Metrics,
) {
tracing::info!("Gossipsub event: {:?}", event);
}

#[cfg(feature = "debug")]
async fn handle_swarm_specific_event(
event: SwarmEvent<behaviour::NetworkEvent>,
_swarm: &mut swarm::Swarm<behaviour::Behaviour>,
_config: &Config,
) {
tracing::info!("Swarm event: {:?}", event);
match event {
gossipsub::Event::Message { message_id, .. } => {
metrics.add_delivered(
message_id.0,
std::time::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64,
);
}
_ => {}
}
}

#[cfg(feature = "debug")]
pub(crate) async fn handle_swarm_event(
event: SwarmEvent<behaviour::NetworkEvent>,
swarm: &mut swarm::Swarm<behaviour::Behaviour>,
config: &Config,
metrics: &mut Metrics,
) {
match event {
SwarmEvent::Behaviour(NetworkEvent::Dog(event)) => {
handle_dog_event(event, swarm, config).await;
handle_dog_event(event, swarm, config, metrics).await;
}
SwarmEvent::Behaviour(NetworkEvent::Gossipsub(event)) => {
handle_gossipsub_event(event, swarm, config).await;
}
_ => {
handle_swarm_specific_event(event, swarm, config).await;
handle_gossipsub_event(event, swarm, config, metrics).await;
}
_ => {}
}
}
58 changes: 43 additions & 15 deletions benchmark/code/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use behaviour::GOSSIPSUB_TOPIC_STR;
use libp2p::{futures::StreamExt, gossipsub::IdentTopic, swarm::dial_opts::DialOpts};
use metrics::Metrics;
use prometheus_client::{encoding::text::encode, registry::Registry};
use tokio::{select, time};

Expand All @@ -16,8 +17,11 @@ mod behaviour;
mod config;
mod handler;
mod logging;
mod metrics;
mod swarm;

const STOP_DELAY_IN_SEC: u64 = 5;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
logging::init();
Expand All @@ -31,6 +35,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
prefix => Registry::with_prefix(prefix),
};

let mut metrics = Metrics::new();

let file = fs::File::create(format!("{}/{}.json", args.dir, config.node.id))?;
let mut writer = std::io::BufWriter::new(&file);
writer.write_all(b"{\"metrics\":[")?;
Expand Down Expand Up @@ -66,25 +72,26 @@ async fn main() -> Result<(), Box<dyn Error>> {
let dump_timer = time::sleep(Duration::from_secs(u64::MAX)); // Wait for start timestamp
tokio::pin!(dump_timer);

let stop_instant = start_instant + Duration::from_secs(config.benchmark.duration_in_sec);
let stop_instant = start_instant
+ Duration::from_secs(config.benchmark.duration_in_sec)
+ Duration::from_secs(STOP_DELAY_IN_SEC);
let stop_timer = time::sleep_until(stop_instant);
tokio::pin!(stop_timer);

let gossipsub_topic = IdentTopic::new(GOSSIPSUB_TOPIC_STR);

let total_transactions = config.benchmark.tps * config.benchmark.duration_in_sec;
let mut num_transactions: u64 = 0;

tracing::info!(
"Starting benchmark in {:?}",
start_instant - time::Instant::now(),
);

loop {
select! {
_event = swarm.select_next_some() => {
// We do not process event when benchmarking to avoid unnecessary overhead
#[cfg(feature = "debug")]
{
handler::handle_swarm_event(_event, &mut swarm, &config).await;
}
event = swarm.select_next_some() => {
handler::handle_swarm_event(event, &mut swarm, &config, &mut metrics).await;
}

_ = &mut start_timer, if !start_timer.is_elapsed() => {
Expand All @@ -93,8 +100,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
dump_timer.as_mut().reset(time::Instant::now() + dump_interval);
}

_ = &mut transaction_timer => {
_ = &mut transaction_timer, if num_transactions < total_transactions => {
tracing::debug!("Sending a transaction");
let timestamp = std::time::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;

match config.benchmark.protocol {
config::Protocol::Dog => {
match swarm.behaviour_mut()
Expand All @@ -104,6 +113,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.publish(vec![0 as u8; config.benchmark.tx_size_in_bytes] as Vec<u8>) {
Ok(tx_id) => {
tracing::debug!("Transaction sent with id {}", tx_id);

metrics.add_published(tx_id.0, timestamp);
}
Err(e) => {
tracing::error!("Failed to send transaction: {:?}", e);
Expand All @@ -118,6 +129,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.publish(gossipsub_topic.clone(), vec![0 as u8; config.benchmark.tx_size_in_bytes] as Vec<u8>) {
Ok(msg_id) => {
tracing::debug!("Message sent with id {}", msg_id);

metrics.add_published(msg_id.0, timestamp);
}
Err(e) => {
tracing::error!("Failed to send message: {:?}", e);
Expand All @@ -127,6 +140,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

transaction_timer.as_mut().reset(time::Instant::now() + transaction_interval);
num_transactions += 1;
}

_ = &mut dump_timer => {
Expand All @@ -135,7 +149,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
tracing::info!("Dumping metrics");
}

if let Err(e) = dump_metrics(&mut writer, &registry) {
if let Err(e) = dump_metrics(&mut writer, &registry, &metrics) {
tracing::error!("Failed to dump metrics: {:?}", e);
}

Expand All @@ -154,36 +168,50 @@ async fn main() -> Result<(), Box<dyn Error>> {
writer.write_all(b"]}")?;
writer.flush()?;

metrics.dump_metrics(
format!("{}/published_{}.json", args.dir, config.node.id),
format!("{}/delivered_{}.json", args.dir, config.node.id),
)?;

Ok(())
}

fn dump_metrics(mut writer: impl Write, registry: &Registry) -> Result<(), Box<dyn Error>> {
fn dump_metrics(
mut writer: impl Write,
registry: &Registry,
metrics: &Metrics,
) -> Result<(), Box<dyn Error>> {
let mut output = String::new();
match encode(&mut output, &registry) {
Ok(()) => {
let mut metrics = serde_json::Map::new();
let mut metrics_map = serde_json::Map::new();

metrics.insert(
metrics_map.insert(
"timestamp".to_string(),
serde_json::Value::Number(serde_json::Number::from(
std::time::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64,
)),
);

metrics_map.insert(
"total_delivered".to_string(),
serde_json::Value::Number(serde_json::Number::from(metrics.total_delivered())),
);

for line in output.lines() {
if !line.starts_with('#') {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() == 2 {
metrics.insert(
metrics_map.insert(
parts[0].to_string(),
parts[1].parse::<serde_json::Value>().unwrap(),
);
}
}
}

let metrics = serde_json::Value::Object(metrics);
serde_json::to_writer(&mut writer, &metrics)?;
let metrics_obj = serde_json::Value::Object(metrics_map);
serde_json::to_writer(&mut writer, &metrics_obj)?;
writer.write_all(b",")?;
writer.flush()?;
}
Expand Down
Loading

0 comments on commit 405e106

Please sign in to comment.