From bb3559e1119dd249dd99c1d0bbe227c4646a2aa2 Mon Sep 17 00:00:00 2001 From: Bastien Faivre Date: Thu, 23 Jan 2025 13:57:57 +0100 Subject: [PATCH] feat: time cache + plot script --- .gitignore | 3 +- Cargo.lock | 3 +- Cargo.toml | 3 +- benchmark/output/plot.py | 439 ++++++++++++++++++++++++++++++--------- dog/Cargo.toml | 3 +- dog/src/behaviour.rs | 20 +- dog/src/config.rs | 16 +- dog/src/lib.rs | 1 + dog/src/metrics.rs | 18 ++ dog/src/time_cache.rs | 68 ++++++ 10 files changed, 456 insertions(+), 118 deletions(-) create mode 100644 dog/src/time_cache.rs diff --git a/.gitignore b/.gitignore index da4a9d9..0c40535 100644 --- a/.gitignore +++ b/.gitignore @@ -2,8 +2,7 @@ benchmark/machines.txt benchmark/benchmark.conf benchmark/inventory.ini benchmark/*.json -benchmark/output/output-* -benchmark/output/*.html +benchmark/output/*/ benchmark/config/* !benchmark/config/README.md !benchmark/config/ntp.conf diff --git a/Cargo.lock b/Cargo.lock index b836407..18116d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1559,12 +1559,12 @@ dependencies = [ "asynchronous-codec", "bytes", "either", + "fnv", "futures", "futures-timer", "hex_fmt", "libp2p", "libp2p-core", - "lru", "prometheus-client", "quick-protobuf", "quick-protobuf-codec", @@ -1573,6 +1573,7 @@ dependencies = [ "thiserror 2.0.7", "tracing", "void", + "web-time", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 846eedd..fe2d33a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,12 @@ asynchronous-codec = "0.7.0" bytes = "1.6" clap = "4.5.16" either = "1.13.0" +fnv = "1.0.7" futures = "0.3.30" futures-timer = "3.0.3" hex_fmt = "0.3.0" libp2p = "0.55.0" libp2p-core = "0.43.0" -lru = "0.12.5" prometheus-client = "0.22.3" quick-protobuf = "0.8.1" quick-protobuf-codec = "0.3.1" @@ -39,6 +39,7 @@ toml = "0.8.19" tracing = "0.1.37" tracing-subscriber = "0.3.18" void = "1.0.2" +web-time = "1.1.0" [workspace.lints] rust.unreachable_pub = "warn" diff --git a/benchmark/output/plot.py b/benchmark/output/plot.py index c4d7c3d..6aae192 100644 --- a/benchmark/output/plot.py +++ b/benchmark/output/plot.py @@ -1,134 +1,91 @@ -# GENERATED BY chatGPT (o1) on Jan 17, 2025 +# GENERATED BY chatGPT (o1) on Jan 21, 2025 import os import json import pandas as pd import plotly.graph_objects as go from plotly.offline import plot -def load_data_from_output_folders(base_dir='.'): +############################################################################### +# 1. LOAD NODE METRICS +############################################################################### + +def load_metrics_from_output_folders(base_dir='.'): """ - Recursively search for folders named 'output-' - within base_dir. For each node folder, read the JSON file - named 'node-.json' and extract the metrics. + Recursively search for 'output-' folders under base_dir. + For each folder, read 'node-.json' and extract 'metrics'. - Returns a DataFrame with columns: - [ - 'node_id', 'timestamp', 'disabled_routes_count', ..., - 'redundancy', ... - ] + Returns a DataFrame with columns like: + ['node_id', 'timestamp', 'redundancy', 'peers_count', ...] """ - df_list = [] + records = [] - # Scan all items in the base directory for item in os.listdir(base_dir): folder_path = os.path.join(base_dir, item) - - # We only want directories named "output-" - if ( - os.path.isdir(folder_path) and - item.startswith("output-") - ): - # Extract node_id from folder name - # e.g. "output-2" -> node_id = "2" + if os.path.isdir(folder_path) and item.startswith("output-"): + # extract node_id _, node_id_str = item.split("-", maxsplit=1) - # Construct the JSON filename - json_file = os.path.join(folder_path, f"node-{node_id_str}.json") - - # Check if the JSON file exists - if os.path.isfile(json_file): - with open(json_file, 'r') as f: + node_json = os.path.join(folder_path, f"node-{node_id_str}.json") + if os.path.isfile(node_json): + with open(node_json, 'r') as f: data = json.load(f) - - # Each JSON has a "metrics" key containing a list for entry in data.get("metrics", []): - # Attach the node_id to each record entry["node_id"] = node_id_str - - # Keep timestamps as-is for now (assume ms). - # We'll convert later to relative seconds. - df_list.append(entry) + records.append(entry) - # Convert to DataFrame - if not df_list: + if not records: return pd.DataFrame() - - df = pd.DataFrame(df_list) - return df + return pd.DataFrame(records) -def create_interactive_figure(df, default_metric="redundancy"): +def create_metrics_figure(df, default_metric="redundancy"): """ - Create a Plotly figure that shows one line per (node_id, metric), - with a dropdown menu to switch which metric is displayed. - - By default, only the traces for `default_metric` are visible, - and all nodes appear in the legend so the user can hide/show them. + Create an interactive line chart of node metrics over time, in ms since first event. + Returns a Plotly Figure object (or None if no data). """ if df.empty: - raise ValueError("No data found. Make sure the JSON files exist and contain 'metrics'.") - - # Identify all metrics in the DataFrame (besides timestamp and node_id) - ignore_cols = {"timestamp", "node_id", "relative_timestamp"} - all_cols = set(df.columns) - metric_cols = list(all_cols - ignore_cols) + return None + + # Compute "relative_timestamp" = ms offset from earliest event + t0 = df["timestamp"].min() + df["relative_timestamp"] = df["timestamp"] - t0 + # Determine which columns are metrics + ignore_cols = {"timestamp", "node_id", "relative_timestamp"} + metric_cols = [c for c in df.columns if c not in ignore_cols] if not metric_cols: - raise ValueError("No metric columns found in the data.") + return None - # If default_metric not in the list, just pick the first one if default_metric not in metric_cols: default_metric = metric_cols[0] - # Sort metrics (optional, for nicer dropdown ordering) metric_cols.sort() - - # Convert 'timestamp' to a relative timestamp (in seconds): - # 1. Find the earliest timestamp - t0 = df["timestamp"].min() - # 2. Create a new column "relative_timestamp" in seconds - # (assuming original 'timestamp' is in ms) - df["relative_timestamp"] = (df["timestamp"] - t0) / 1000.0 - - # Grab the unique list of nodes node_ids = df["node_id"].unique() - # We'll create one trace for each (metric, node_id) pair, - # but only the traces for the currently active metric are visible. - # Then we use update menus to show/hide the relevant traces. fig = go.Figure() - - # Keep track of the trace indices in the same order as we add them - # For each metric, we will store the list of trace indices that belong to it. metric_to_trace_indices = {m: [] for m in metric_cols} - # Add a trace for each (metric, node_id) pair + # Add one trace per (metric, node) for metric in metric_cols: for node in node_ids: - node_mask = (df["node_id"] == node) - # Sort by relative_timestamp to ensure line is drawn in ascending time - node_df = df[node_mask].sort_values(by="relative_timestamp") - - # Create a line trace + node_df = df[df["node_id"] == node].sort_values("relative_timestamp") trace = go.Scatter( x=node_df["relative_timestamp"], y=node_df[metric], mode='lines+markers', name=f"Node {node}", - # By default, set visibility to True only if this metric is the default. visible=(metric == default_metric) ) - fig.add_trace(trace) metric_to_trace_indices[metric].append(len(fig.data) - 1) - # Build the 'updatemenus' to switch which metric is visible + # Build dropdown to switch metrics def make_visibility_array(selected_metric): - total_traces = len(fig.data) - visibility = [False] * total_traces + total = len(fig.data) + visible = [False] * total for idx in metric_to_trace_indices[selected_metric]: - visibility[idx] = True - return visibility - + visible[idx] = True + return visible + buttons = [] for m in metric_cols: buttons.append( @@ -153,42 +110,326 @@ def make_visibility_array(selected_metric): ) ] - # Crop the x-axis to show from 0 to the maximum relative timestamp max_rel_ts = df["relative_timestamp"].max() fig.update_layout( title=f"Metric: {default_metric}", xaxis=dict( - title="Seconds since first event", - range=[0, max_rel_ts] # Crop from the earliest to the latest - ), - yaxis=dict( - title="Value (unitless)" + title="Milliseconds since first event", + range=[0, max_rel_ts] ), + yaxis=dict(title="Value (unitless)"), updatemenus=updatemenus, legend=dict( orientation="v", - x=1.02, - xanchor="left", - y=1.0, + x=1.02, + xanchor="left", + y=1.0, yanchor="top" ), margin=dict(l=50, r=200, t=100, b=50) ) + return fig + +############################################################################### +# 2. LOAD PUBLISHED & DELIVERED DATA +############################################################################### + +def load_published_and_delivered(base_dir='.'): + """ + Scans 'output-' directories for: + - published_node-.json + - delivered_node-.json + + Returns three items: + published_by_node: dict[node_id -> dict[tx_id -> publish_time_ms]] + earliest_publish_times: dict[tx_id -> earliest_publish_time_ms] + latest_delivery_times: dict[tx_id -> latest_delivery_time_ms] + """ + published_by_node = {} + earliest_publish_times = {} + latest_delivery_times = {} + + for item in os.listdir(base_dir): + folder_path = os.path.join(base_dir, item) + if os.path.isdir(folder_path) and item.startswith("output-"): + # node_id + _, node_id_str = item.split("-", maxsplit=1) + + if node_id_str not in published_by_node: + published_by_node[node_id_str] = {} + + # published file + published_json = os.path.join(folder_path, f"published_node-{node_id_str}.json") + if os.path.isfile(published_json): + with open(published_json, 'r') as f: + data = json.load(f) + for (tx_id, pub_time) in data.get("published", []): + # store in per-node dict + published_by_node[node_id_str][tx_id] = pub_time + # track earliest publish globally + if tx_id not in earliest_publish_times: + earliest_publish_times[tx_id] = pub_time + else: + if pub_time < earliest_publish_times[tx_id]: + earliest_publish_times[tx_id] = pub_time + + # delivered file + delivered_json = os.path.join(folder_path, f"delivered_node-{node_id_str}.json") + if os.path.isfile(delivered_json): + with open(delivered_json, 'r') as f: + data = json.load(f) + for (tx_id, del_time) in data.get("delivered", []): + # track latest delivery globally + if tx_id not in latest_delivery_times: + latest_delivery_times[tx_id] = del_time + else: + if del_time > latest_delivery_times[tx_id]: + latest_delivery_times[tx_id] = del_time + + return published_by_node, earliest_publish_times, latest_delivery_times +############################################################################### +# 3. COMPUTE DISTRIBUTION TIMES +############################################################################### + +def compute_global_worstcase_distribution_times(earliest_publish_times, latest_delivery_times): + """ + For each transaction, worst-case distribution = (latest_delivery - earliest_publish). + Returns a list of distribution times in ms. + """ + dist_times = [] + for tx_id, pub_time in earliest_publish_times.items(): + if tx_id in latest_delivery_times: + latest_del = latest_delivery_times[tx_id] + dist_ms = latest_del - pub_time + dist_times.append(dist_ms) + return dist_times + +def compute_per_node_published_distribution_times(published_by_node, latest_delivery_times): + """ + Build a dict[node_id -> list of distribution times (ms)]. + + For each node, for each tx that *this node published*, we compute: + distribution_time = (latest_delivery_times[tx] - this_node_publish_time) + if that tx actually appears in latest_delivery_times. + """ + dist_times_per_node = {} + + for node_id, tx_map in published_by_node.items(): + # tx_map: dict[tx_id -> publish_time_ms for this node] + dist_list = [] + for tx_id, pub_time in tx_map.items(): + if tx_id in latest_delivery_times: + latest_del = latest_delivery_times[tx_id] + dist_ms = latest_del - pub_time + dist_list.append(dist_ms) + dist_times_per_node[node_id] = dist_list + return dist_times_per_node + +############################################################################### +# 4. CREATE VIOLIN PLOTS +############################################################################### + +def create_distribution_violin_global(distribution_times): + """ + A single violin with all global worst-case distribution times (in ms). + """ + if not distribution_times: + return None + + fig = go.Figure() + fig.add_trace(go.Violin( + y=distribution_times, + meanline_visible=True, + name='Worst-case Dist Times (ms)' + )) + + fig.update_layout( + title="Global Worst-case Distribution Times", + yaxis=dict(title="Time (milliseconds)") + ) + return fig + +def create_distribution_violin_per_node(dist_times_per_node): + """ + One violin per node, each showing distribution times for the txs + published by that node. + """ + if not dist_times_per_node: + return None + + fig = go.Figure() + # We'll sort node IDs to have a consistent order on the x-axis + sorted_nodes = sorted(dist_times_per_node.keys(), key=lambda x: int(x) if x.isdigit() else x) + + for node_id in sorted_nodes: + times = dist_times_per_node[node_id] + if times: # only add a trace if there's data + fig.add_trace(go.Violin( + y=times, + x=[f"Node {node_id}"] * len(times), + meanline_visible=True, + name=f"Node {node_id}" + )) + + fig.update_layout( + title="Per-node Published Distribution Times", + yaxis=dict(title="Time (milliseconds)"), + xaxis=dict(title="Node ID", type="category"), + violinmode="group" + ) return fig +############################################################################### +# 5. WRITE .DAT FILES +############################################################################### + +def write_per_node_metric_dat_files(df_metrics, prefix_dir, prefix): + """ + For each node *and* for each metric, create a .dat file: + prefix/node-/_node__.dat + Lines of the form: + + sorted by ascending relative_timestamp. + + Example: + "myrun/node-1/myrun_node_1_redundancy.dat" + 0 1.23 + 500 1.50 + ... + """ + if df_metrics.empty: + return + + # Compute relative_timestamp + t0 = df_metrics["timestamp"].min() + df_metrics["relative_timestamp"] = df_metrics["timestamp"] - t0 + + ignore_cols = {"timestamp", "node_id", "relative_timestamp"} + metric_cols = [c for c in df_metrics.columns if c not in ignore_cols] + metric_cols.sort() + + # We'll group by node, then for each metric we'll write a file + grouped = df_metrics.groupby("node_id") + + for node_id, group_df in grouped: + # Make subfolder for this node + node_folder = os.path.join(prefix_dir, f"node-{node_id}") + os.makedirs(node_folder, exist_ok=True) + + # Sort by relative_timestamp + group_df = group_df.sort_values("relative_timestamp") + + # For each metric, write a separate file + for metric in metric_cols: + out_path = os.path.join(node_folder, f"{prefix}_node_{node_id}_{metric}.dat") + with open(out_path, "w") as f: + for _, row in group_df.iterrows(): + rel_ts = row["relative_timestamp"] + val = row[metric] + f.write(f"{rel_ts} {val}\n") + +def write_global_distribution_dat_file(global_times, prefix_dir, prefix): + """ + Write _global_distribution.dat in prefix_dir, lines of the form: + + """ + if not global_times: + return + out_path = os.path.join(prefix_dir, f"{prefix}_global_distribution.dat") + with open(out_path, "w") as f: + for dist in global_times: + f.write(f"{dist}\n") + +def write_node_distribution_dat_files(dist_times_per_node, prefix_dir, prefix): + """ + For each node, create prefix/node-/_node__distribution.dat + containing lines: + + """ + for node_id, dist_list in dist_times_per_node.items(): + if dist_list: + node_folder = os.path.join(prefix_dir, f"node-{node_id}") + os.makedirs(node_folder, exist_ok=True) + + out_path = os.path.join(node_folder, f"{prefix}_node_{node_id}_distribution.dat") + with open(out_path, "w") as f: + for dist in dist_list: + f.write(f"{dist}\n") + +############################################################################### +# 6. MAIN +############################################################################### + def main(): - # 1. Load the DataFrame from all the JSON files in ./output-/node-.json - df = load_data_from_output_folders(base_dir='.') + # Ask the user for a prefix + prefix = input("Enter the prefix for the data files (and plots): ").strip() + if not prefix: + print("No prefix given. Exiting.") + return + + # Create top-level prefix folder + os.makedirs(prefix, exist_ok=True) + + # A) LOAD METRICS + df_metrics = load_metrics_from_output_folders(base_dir='.') + + # Build the interactive metrics figure + metrics_fig = create_metrics_figure(df_metrics, default_metric="redundancy") + + # B) LOAD PUBLISHED/DELIVERED & DISTRIBUTION + published_by_node, earliest_publish_times, latest_delivery_times = load_published_and_delivered(base_dir='.') + global_times = compute_global_worstcase_distribution_times(earliest_publish_times, latest_delivery_times) + dist_times_per_node = compute_per_node_published_distribution_times(published_by_node, latest_delivery_times) + + # B1) Build distribution figures + global_fig = create_distribution_violin_global(global_times) + node_fig = create_distribution_violin_per_node(dist_times_per_node) + + #--------------------------------------------------------------------------- + # WRITE .DAT FILES + #--------------------------------------------------------------------------- + + # 1. Metrics data: one .dat file per node per metric + if not df_metrics.empty: + write_per_node_metric_dat_files(df_metrics, prefix, prefix) + else: + print("No node metrics found.") + + # 2. Global distribution + if global_times: + write_global_distribution_dat_file(global_times, prefix, prefix) + else: + print("No global distribution data found.") + + # 3. Per-node distribution + if dist_times_per_node: + write_node_distribution_dat_files(dist_times_per_node, prefix, prefix) + else: + print("No per-node distribution data found.") + + #--------------------------------------------------------------------------- + # CREATE & SAVE HTML PLOTS + #--------------------------------------------------------------------------- + + if metrics_fig is not None: + html_path = os.path.join(prefix, f"{prefix}_metrics_plot.html") + plot(metrics_fig, filename=html_path, auto_open=True) + else: + print("No node metrics to plot.") - # 2. Create the interactive figure - # Default to showing 'redundancy' if it exists, else pick the first metric found - fig = create_interactive_figure(df, default_metric="redundancy") + if global_fig is not None: + html_path = os.path.join(prefix, f"{prefix}_distribution_violin_global.html") + plot(global_fig, filename=html_path, auto_open=True) + else: + print("No global distribution to plot.") - # 3. Display the figure in your default browser. - # This will generate an HTML file (plot.html) and open it. - plot(fig, filename='plot.html', auto_open=True) + if node_fig is not None: + html_path = os.path.join(prefix, f"{prefix}_distribution_violin_per_node.html") + plot(node_fig, filename=html_path, auto_open=True) + else: + print("No per-node distribution to plot.") if __name__ == "__main__": main() diff --git a/dog/Cargo.toml b/dog/Cargo.toml index fb969d6..0ff767c 100644 --- a/dog/Cargo.toml +++ b/dog/Cargo.toml @@ -12,12 +12,12 @@ async-channel = { workspace = true } asynchronous-codec = { workspace = true } bytes = { workspace = true } either = { workspace = true } +fnv = { workspace = true } futures = { workspace = true } futures-timer = { workspace = true } hex_fmt = { workspace = true } libp2p = { workspace = true } libp2p-core = { workspace = true } -lru = { workspace = true } prometheus-client = { workspace = true } quick-protobuf = { workspace = true } quick-protobuf-codec = { workspace = true } @@ -26,6 +26,7 @@ serde = { workspace = true, optional = true, features = ["derive"] } thiserror = { workspace = true } tracing = { workspace = true } void = { workspace = true } +web-time = { workspace = true } [lints] workspace = true diff --git a/dog/src/behaviour.rs b/dog/src/behaviour.rs index 268af23..4afa70b 100644 --- a/dog/src/behaviour.rs +++ b/dog/src/behaviour.rs @@ -1,6 +1,5 @@ use std::{ collections::{HashMap, VecDeque}, - num::NonZeroUsize, task::Poll, time::SystemTime, }; @@ -14,7 +13,6 @@ use libp2p::{ }, PeerId, }; -use lru::LruCache; use prometheus_client::registry::Registry; use quick_protobuf::{MessageWrite, Writer}; @@ -27,6 +25,7 @@ use crate::{ protocol::SIGNING_PREFIX, rpc::Sender, rpc_proto::proto, + time_cache::DuplicateCache, transform::{DataTransform, IdentityTransform}, types::{ ControlAction, HaveTx, PeerConnections, RawTransaction, ResetRoute, RpcOut, Transaction, @@ -159,7 +158,7 @@ pub struct Behaviour { redundancy_interval: Delay, redundancy_controller: Controller, router: Router, - cache: LruCache, + cache: DuplicateCache, metrics: Option, } @@ -203,7 +202,7 @@ where redundancy_interval: Delay::new(config.redundancy_interval()), redundancy_controller: Controller::new(&config), router: Router::new(), - cache: LruCache::new(NonZeroUsize::new(config.cache_size()).unwrap()), + cache: DuplicateCache::new(config.cache_time()), config, metrics: metrics.map(Metrics::new), }) @@ -240,7 +239,11 @@ where tracing::trace!("Publishing transaction"); - self.cache.put(tx_id.clone(), ()); + self.cache.insert(tx_id.clone()); + + if let Some(m) = self.metrics.as_mut() { + m.set_txs_cache_size(self.cache.len()); + } if self.config.deliver_own_transactions() { self.events @@ -474,9 +477,13 @@ where // TODO: validate transaction if needed - if self.cache.put(tx_id.clone(), ()).is_some() { + if !self.cache.insert(tx_id.clone()) { tracing::debug!(transaction=%tx_id, "Transaction already received, ignoring"); + if let Some(m) = self.metrics.as_mut() { + m.set_txs_cache_size(self.cache.len()); + } + self.redundancy_controller.incr_duplicate_txs_count(); if self.redundancy_controller.is_have_tx_blocked() { @@ -505,6 +512,7 @@ where if let Some(m) = self.metrics.as_mut() { m.tx_recv(); + m.set_txs_cache_size(self.cache.len()); } tracing::debug!("Deliver received transaction to user"); diff --git a/dog/src/config.rs b/dog/src/config.rs index a16c5e6..6d25654 100644 --- a/dog/src/config.rs +++ b/dog/src/config.rs @@ -21,7 +21,7 @@ pub struct Config { transaction_id_fn: Arc TransactionId + Send + Sync + 'static>, max_transactions_per_rpc: Option, connection_handler_queue_len: usize, - cache_size: usize, + cache_time: Duration, target_redundancy: f64, redundancy_delta_percent: u8, redundancy_interval: Duration, @@ -53,9 +53,9 @@ impl Config { self.connection_handler_queue_len } - /// The size of the cache for the `TransactionId`. The default is 1000. - pub fn cache_size(&self) -> usize { - self.cache_size + /// The time a transaction id is stored in the cache. The default is 30 seconds. + pub fn cache_time(&self) -> Duration { + self.cache_time } /// The target redundancy for the network. The default is 1.0. @@ -128,7 +128,7 @@ impl Default for ConfigBuilder { }), max_transactions_per_rpc: None, connection_handler_queue_len: 5000, - cache_size: 1000, + cache_time: Duration::from_secs(30), target_redundancy: 1.0, redundancy_delta_percent: 10, redundancy_interval: Duration::from_secs(1), @@ -174,9 +174,9 @@ impl ConfigBuilder { self } - /// The size of the cache for the `TransactionId`. The default is 1000. - pub fn cache_size(&mut self, cache_size: usize) -> &mut Self { - self.config.cache_size = cache_size; + /// The time a transaction id is stored in the cache. The default is 30 seconds. + pub fn cache_time(&mut self, cache_time: Duration) -> &mut Self { + self.config.cache_time = cache_time; self } diff --git a/dog/src/lib.rs b/dog/src/lib.rs index 440584a..f6c4feb 100644 --- a/dog/src/lib.rs +++ b/dog/src/lib.rs @@ -7,6 +7,7 @@ mod metrics; pub mod protocol; mod rpc; mod rpc_proto; +mod time_cache; mod transform; mod types; diff --git a/dog/src/metrics.rs b/dog/src/metrics.rs index da96968..412f419 100644 --- a/dog/src/metrics.rs +++ b/dog/src/metrics.rs @@ -39,6 +39,9 @@ pub(crate) struct Metrics { txs_invalid_counts: Counter, /// Number of bytes received. txs_recv_bytes: Counter, + + /// Transactions cache size. + txs_cache_size: Gauge, } impl Metrics { @@ -58,6 +61,7 @@ impl Metrics { let txs_recv_counts = Counter::default(); let txs_invalid_counts = Counter::default(); let txs_recv_bytes = Counter::default(); + let txs_cache_size = Gauge::default(); registry.register("peers_count", "Number of peers.", peers_count.clone()); registry.register("redundancy", "Redundancy.", redundancy.clone()); @@ -126,6 +130,11 @@ impl Metrics { "Number of bytes received.", txs_recv_bytes.clone(), ); + registry.register( + "txs_cache_size", + "Transactions cache size.", + txs_cache_size.clone(), + ); Self { peers_count, @@ -143,6 +152,7 @@ impl Metrics { txs_recv_counts, txs_invalid_counts, txs_recv_bytes, + txs_cache_size, } } @@ -207,4 +217,12 @@ impl Metrics { pub(crate) fn register_invalid_tx(&mut self) { self.txs_invalid_counts.inc(); } + + pub(crate) fn set_txs_cache_size(&mut self, size: usize) { + if let Ok(size) = size.try_into() { + self.txs_cache_size.set(size); + } else { + tracing::error!("Failed to set transactions cache size"); + } + } } diff --git a/dog/src/time_cache.rs b/dog/src/time_cache.rs new file mode 100644 index 0000000..ab5721f --- /dev/null +++ b/dog/src/time_cache.rs @@ -0,0 +1,68 @@ +use std::{collections::VecDeque, time::Duration}; + +use fnv::FnvHashSet; +use web_time::Instant; + +struct ExpiringValues { + value: Element, + expiration: Instant, +} + +pub(crate) struct DuplicateCache { + /// Size of the cache. + len: usize, + /// Set of values in the cache. + values: FnvHashSet, + /// List of values in order of expiration. + list: VecDeque>, + /// The time values remain in the cache. + ttl: Duration, +} + +impl DuplicateCache +where + T: Eq + std::hash::Hash + Clone, +{ + pub(crate) fn new(ttl: Duration) -> Self { + DuplicateCache { + len: 0, + values: FnvHashSet::default(), + list: VecDeque::new(), + ttl, + } + } + + fn remove_expired_values(&mut self, now: Instant) { + while let Some(element) = self.list.pop_front() { + if element.expiration > now { + self.list.push_front(element); + break; + } + self.len -= 1; + self.values.remove(&element.value); + } + } + + pub(crate) fn insert(&mut self, value: T) -> bool { + let now = Instant::now(); + self.remove_expired_values(now); + if self.values.insert(value.clone()) { + self.len += 1; + self.list.push_back(ExpiringValues { + value, + expiration: now + self.ttl, + }); + true + } else { + false + } + } + + pub(crate) fn contains(&self, value: &T) -> bool { + self.values.contains(value) + } + + pub(crate) fn len(&self) -> usize { + self.len + } +}