Skip to content

Commit

Permalink
calculate wal sync latency percentiles
Browse files Browse the repository at this point in the history
  • Loading branch information
jauhararifin committed Dec 20, 2024
1 parent 0991836 commit 668a9dc
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 7 deletions.
18 changes: 12 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::bins::SliceExt;
use crate::btree::{BTree, Cursor};
use crate::id::{PageId, PageIdExt, TxId};
use crate::log::{TxState, WalEntry, WalKind};
use crate::metric::HistogramPercentile;
use crate::pager::{LogContext, PageOps, Pager};
use crate::recovery::{recover, undo_txn};
use crate::runtime::{
Expand Down Expand Up @@ -32,17 +33,20 @@ pub struct Setting {
pub checkpoint_period: Duration,
}

#[derive(Clone, Copy, Debug)]
#[derive(Debug)]
pub struct Stat {
pub main_bytes_read: u64,
pub main_bytes_written: u64,
pub double_buff_bytes_written: u64,
pub wal_bytes_written: u64,
wal_flushed_total: u64,
wal_flushed_because_buffer_almost_full: u64,
wal_flushed_because_buffer_full: u64,
wal_flushed_because_manual_trigger: u64,
wal_flushed_because_sync_request: u64,

pub wal_flushed_total: u64,
pub wal_flushed_because_timeout: u64,
pub wal_flushed_because_buffer_almost_full: u64,
pub wal_flushed_because_buffer_full: u64,
pub wal_flushed_because_manual_trigger: u64,
pub wal_flushed_because_sync_request: u64,
pub wal_flush_latency: HistogramPercentile,
}

impl std::default::Default for Setting {
Expand Down Expand Up @@ -302,10 +306,12 @@ impl<R: Runtime> Db<R> {
wal_bytes_written: wal_stat.bytes_written,

wal_flushed_total: wal_stat.flushed_total,
wal_flushed_because_timeout: wal_stat.flushed_because_timeout,
wal_flushed_because_buffer_almost_full: wal_stat.flushed_because_buffer_almost_full,
wal_flushed_because_buffer_full: wal_stat.flushed_because_buffer_full,
wal_flushed_because_manual_trigger: wal_stat.flushed_because_manual_trigger,
wal_flushed_because_sync_request: wal_stat.flushed_because_sync_request,
wal_flush_latency: wal_stat.flush_latency,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod content;
mod db;
mod id;
mod log;
mod metric;
mod os;
mod pager;
mod recovery;
Expand All @@ -12,6 +13,7 @@ mod simulation;
mod wal;

pub use db::{Db, Setting};
pub use metric::HistogramPercentile;
pub use os::OsRuntime;
pub use runtime::*;
pub use simulation::SimulatedRuntime;
122 changes: 122 additions & 0 deletions src/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use parking_lot::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};

pub(crate) struct Histogram {
lock: RwLock<()>,
buckets: Vec<f64>,
counter: Vec<AtomicU64>,
total: AtomicU64,
}

#[derive(Debug)]
pub struct HistogramPercentile {
pub p50: f64,
pub p80: f64,
pub p90: f64,
pub p95: f64,
pub p99: f64,
}

impl Histogram {
pub(crate) fn new_exponential(start: f64, factor: f64, count: usize) -> Self {
assert!(count > 0);
assert!(start > 0.0);
assert!(factor > 1.0);
let mut buckets = Vec::with_capacity(count);
let mut counter = Vec::with_capacity(count + 1);

let mut last = start;
for _ in 0..count {
buckets.push(last);
counter.push(AtomicU64::new(0));
last *= factor;
}
counter.push(AtomicU64::new(0));

Self {
lock: RwLock::default(),
buckets,
counter,
total: AtomicU64::new(0),
}
}

pub(crate) fn observe(&self, value: f64) {
let _guard = self.lock.read();

let i = self
.buckets
.partition_point(|upper_bound| value >= *upper_bound);
self.counter[i].fetch_add(1, Ordering::SeqCst);
self.total.fetch_add(1, Ordering::SeqCst);
}

pub(crate) fn quantile(&self, p: f64) -> f64 {
assert!(p >= 0.0 && p <= 1.0);
let _guard = self.lock.write();

let total = self.total.load(Ordering::SeqCst);
let pos = p * total as f64;
let mut cum = 0.0;
let mut lb = 0.0;
for (i, f) in self.counter.iter().enumerate() {
if i == self.buckets.len() {
// Special case: if the rank lies in the last bucket where
// the upper bound is +inf, just return the bucket's lower bound
return self.buckets[i - 1];
}

let f = f.load(Ordering::SeqCst) as f64;
if cum + f >= pos {
let w = self.buckets[i] - lb;
// It is assumed that the first bucket has 0 as its lower bound
return lb + (pos - cum) / f * w;
}

lb = self.buckets[i];
cum += f;
}

unreachable!();
}

pub(crate) fn percentile(&self) -> HistogramPercentile {
HistogramPercentile {
p50: self.quantile(0.5),
p80: self.quantile(0.8),
p90: self.quantile(0.9),
p95: self.quantile(0.95),
p99: self.quantile(0.99),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

macro_rules! assert_close_to {
($left:expr, $right:expr) => {
let left = $left;
let right = $right;
assert!(
(left - right).abs() < 0.001,
"left = {left}, right = {right}"
);
};
}

#[test]
fn test_histogram() {
let histogram = Histogram::new_exponential(1.0, 1.3, 20);
histogram.observe(0.5);
histogram.observe(0.7);
histogram.observe(1.2);
assert_close_to!(0.0, histogram.quantile(0.0));
assert_close_to!(0.15, histogram.quantile(0.1));
assert_close_to!(1.3, histogram.quantile(1.0));

histogram.observe(200.0);
assert_close_to!(146.192, histogram.quantile(1.0));
}
}
15 changes: 14 additions & 1 deletion src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::id::Lsn;
use super::log::{WalDecodeResult, WalEntry, WalHeader, WalKind, WAL_HEADER_SIZE};
use super::pager::MAXIMUM_PAGE_SIZE;
use super::runtime::{File, Mutex, Runtime, RwMutex, Timer, TimerHandle};
use crate::metric::{Histogram, HistogramPercentile};
use anyhow::{anyhow, Context};
use std::io::SeekFrom;
use std::path::Path;
Expand Down Expand Up @@ -42,9 +43,11 @@ struct StatInternal {
flushed_because_buffer_full: AtomicU64,
flushed_because_manual_trigger: AtomicU64,
flushed_because_sync_request: AtomicU64,

flush_latency: Histogram,
}

#[derive(Clone, Copy, Debug)]
#[derive(Debug)]
pub(crate) struct Stat {
pub(crate) bytes_written: u64,

Expand All @@ -54,6 +57,8 @@ pub(crate) struct Stat {
pub(crate) flushed_because_buffer_full: u64,
pub(crate) flushed_because_manual_trigger: u64,
pub(crate) flushed_because_sync_request: u64,

pub(crate) flush_latency: HistogramPercentile,
}

impl Buffer {
Expand Down Expand Up @@ -102,6 +107,7 @@ impl<R: Runtime> Wal<R> {
flushed_because_buffer_full: AtomicU64::new(0),
flushed_because_manual_trigger: AtomicU64::new(0),
flushed_because_sync_request: AtomicU64::new(0),
flush_latency: Histogram::new_exponential(1.0, 1.3, 30),
});

let (mut timer, timer_handle) = R::timer(std::time::Duration::from_secs(3600));
Expand Down Expand Up @@ -309,6 +315,8 @@ impl<R: Runtime> Wal<R> {
internal.next,
);

let start = std::time::Instant::now();

if f.is_empty {
let mut buff = [0u8; WAL_HEADER_SIZE * 2];
let header = WalHeader {
Expand Down Expand Up @@ -345,6 +353,9 @@ impl<R: Runtime> Wal<R> {

f.f.sync()?;

let elapsed = start.elapsed();
stat.flush_latency.observe(elapsed.as_millis() as f64);

buffer.start_offset = 0;
buffer.end_offset = 0;
internal.first_unflushed = internal.next;
Expand Down Expand Up @@ -530,6 +541,8 @@ impl<R: Runtime> Wal<R> {
flushed_because_buffer_full,
flushed_because_manual_trigger,
flushed_because_sync_request,

flush_latency: self.stat.flush_latency.percentile(),
}
}
}
Expand Down

0 comments on commit 668a9dc

Please sign in to comment.