Skip to content

Commit

Permalink
fix: bytes unit on benchmark (#4347)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev authored Jan 21, 2025
1 parent 3853537 commit d7f262f
Showing 1 changed file with 36 additions and 25 deletions.
61 changes: 36 additions & 25 deletions crates/fluvio-benchmark/src/stats_collector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
f64,
sync::{atomic::AtomicU64, Arc},
time::Duration,
time::{Duration, Instant},
};

use async_channel::{Receiver, Sender};
Expand All @@ -18,6 +18,7 @@ pub(crate) struct ProducerStat {}
pub struct AtomicStats {
record_send: AtomicU64,
record_bytes: AtomicU64,
first_start_time: RwLock<Option<Instant>>,
}

pub struct Stats {
Expand Down Expand Up @@ -46,6 +47,7 @@ impl ProducerStat {
let stats = Arc::new(AtomicStats {
record_send: AtomicU64::new(0),
record_bytes: AtomicU64::new(0),
first_start_time: RwLock::new(None),
});

let histogram = Arc::new(Mutex::new(hdrhistogram::Histogram::<u64>::new(3).unwrap()));
Expand Down Expand Up @@ -78,6 +80,11 @@ impl ProducerStat {
.record_bytes
.load(std::sync::atomic::Ordering::Relaxed);
sleep(Duration::from_secs(1)).await;
let first_start_time = stats.first_start_time.read().await;
if first_start_time.is_none() {
continue;
}

let new_record_send = stats.record_send.load(std::sync::atomic::Ordering::Relaxed);
let new_record_bytes = stats
.record_bytes
Expand All @@ -87,20 +94,21 @@ impl ProducerStat {
continue;
}

let records_delta = new_record_send - old_record_send;
let bytes_delta = new_record_bytes - old_record_bytes;

let records_per_sec = records_delta;
let bytes_per_sec = bytes_delta / 1000;
let record_send = new_record_send - old_record_send;
let record_bytes = new_record_bytes - old_record_bytes;
let elapsed = first_start_time.expect("start time").elapsed();
let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0;
let records_per_sec = (new_record_send as f64 / elapsed_seconds).round() as u64;
let bytes_per_sec = (new_record_bytes as f64 / elapsed_seconds).round() as u64;

let hist = histogram.lock().await;
let latency_avg = hist.mean() as u64;
let latency_max = hist.value_at_quantile(1.0);

stats_sender
.send(Stats {
record_send: records_delta,
record_bytes: bytes_delta,
record_send,
record_bytes,
records_per_sec,
bytes_per_sec,
latency_avg,
Expand All @@ -122,14 +130,16 @@ impl ProducerStat {
) {
spawn(async move {
let hist = histogram.clone();
let first_start_time = Arc::new(RwLock::new(None));
while let Ok(event) = event_receiver.recv().await {
let hist = hist.clone();
let stats = stats.clone();
let first_start_time = first_start_time.clone();
spawn(async move {
if first_start_time.read().await.is_none() {
first_start_time.write().await.replace(event.created_at);
if stats.first_start_time.read().await.is_none() {
stats
.first_start_time
.write()
.await
.replace(event.created_at);
}
let mut hist = hist.lock().await;
hist.record(event.elapsed.as_nanos() as u64)
Expand All @@ -145,25 +155,26 @@ impl ProducerStat {
}

// send end
if stats.record_send.load(std::sync::atomic::Ordering::Relaxed) >= num_records {
let record_send = stats.record_send.load(std::sync::atomic::Ordering::Relaxed);
if record_send >= num_records {
let record_bytes = stats
.record_bytes
.load(std::sync::atomic::Ordering::Relaxed);
let hist = hist.lock().await;
let elapsed = first_start_time.read().await.expect("start time").elapsed();
let elapsed = stats
.first_start_time
.read()
.await
.expect("start time")
.elapsed();

let elapsed_seconds = elapsed.as_millis() as f64 / 1000.0;
let records_per_sec = (stats.record_send.load(std::sync::atomic::Ordering::Relaxed)
as f64
/ elapsed_seconds)
.round() as u64;
let bytes_per_sec = (stats
.record_bytes
.load(std::sync::atomic::Ordering::Relaxed)
as f64
/ elapsed_seconds)
.round() as u64;
let records_per_sec = (record_send as f64 / elapsed_seconds).round() as u64;
let bytes_per_sec = (record_bytes as f64 / elapsed_seconds).round() as u64;

let end = EndProducerStat {
histogram: hist.clone(),
total_records: stats.record_send.load(std::sync::atomic::Ordering::Relaxed),
total_records: record_send,
records_per_sec,
bytes_per_sec,
};
Expand Down

0 comments on commit d7f262f

Please sign in to comment.