Skip to content

Commit

Permalink
feat(observation): adding rate limit latency and projects registry mo…
Browse files Browse the repository at this point in the history
…nitoring, organizing Grafana panels (#758)

* feat: improving Redis cache monitoring and latency observation

* feat: organising grafana panels
  • Loading branch information
geekbrother authored Aug 31, 2024
1 parent 95ffbe6 commit dc9e8c1
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 23 deletions.
18 changes: 11 additions & 7 deletions src/handlers/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use {
wc::future::FutureExt,
};

const CACHE_TTL: TimeDelta = TimeDelta::seconds(60 * 60 * 24);
const CACHE_TTL: u64 = 60 * 60 * 24;
const CACHE_TTL_DELTA: TimeDelta = TimeDelta::seconds(CACHE_TTL as i64);
const CACHE_TTL_STD: Duration = Duration::from_secs(CACHE_TTL);

const SELF_PROVIDER_ERROR_PREFIX: &str = "SelfProviderError: ";
const EMPTY_RPC_RESPONSE: &str = "0x";
pub const ETHEREUM_MAINNET: &str = "eip155:1";
Expand Down Expand Up @@ -154,7 +157,7 @@ async fn handler_internal(
}

fn ttl_from_resolved_at(resolved_at: DateTime<Utc>, now: DateTime<Utc>) -> TimeDelta {
let expires = resolved_at + CACHE_TTL;
let expires = resolved_at + CACHE_TTL_DELTA;
(expires - now).max(TimeDelta::zero())
}

Expand Down Expand Up @@ -277,9 +280,9 @@ async fn lookup_identity(
let res = res.clone();
// Do not block on cache write.
tokio::spawn(async move {
let cache_ttl = CACHE_TTL.to_std().expect("invalid duration");
let cache_start = SystemTime::now();
cache
.set(&cache_record_key, &res, Some(cache_ttl))
.set(&cache_record_key, &res, Some(CACHE_TTL_STD))
.await
.tap_err(|err| {
warn!(
Expand All @@ -288,6 +291,7 @@ async fn lookup_identity(
)
})
.ok();
state.metrics.add_identity_lookup_cache_latency(cache_start);
debug!("Setting cache success");
});
}
Expand Down Expand Up @@ -572,14 +576,14 @@ mod tests {
#[test]
fn full_ttl_when_resolved_now() {
let now = Utc::now();
assert_eq!(ttl_from_resolved_at(now, now), CACHE_TTL);
assert_eq!(ttl_from_resolved_at(now, now), CACHE_TTL_DELTA);
}

#[test]
fn expires_now() {
let now = Utc::now();
assert_eq!(
ttl_from_resolved_at(now - CACHE_TTL, now),
ttl_from_resolved_at(now - CACHE_TTL_DELTA, now),
TimeDelta::zero()
);
}
Expand All @@ -588,7 +592,7 @@ mod tests {
fn expires_past() {
let now = Utc::now();
assert_eq!(
ttl_from_resolved_at(now - CACHE_TTL - TimeDelta::days(1), now),
ttl_from_resolved_at(now - CACHE_TTL_DELTA - TimeDelta::days(1), now),
TimeDelta::zero()
);
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ pub async fn bootstrap(config: Config) -> RpcResult<()> {
);
RateLimit::new(
redis_addr.write(),
config.storage.redis_max_connections,
max_tokens,
chrono::Duration::seconds(refill_interval_sec as i64),
refill_rate,
metrics.clone(),
)
}
_ => {
Expand Down
32 changes: 31 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct Metrics {

// Rate limiting
pub rate_limited_entries_counter: Histogram<u64>,
pub rate_limiting_latency_tracker: Histogram<f64>,
pub rate_limited_responses_counter: Counter<u64>,

// Account names
pub account_names_count: ObservableGauge<u64>,
Expand Down Expand Up @@ -242,6 +244,16 @@ impl Metrics {
.with_description("The latency of IRN client calls")
.init();

let rate_limiting_latency_tracker = meter
.f64_histogram("rate_limiting_latency_tracker")
.with_description("Rate limiting latency tracker")
.init();

let rate_limited_responses_counter = meter
.u64_counter("rate_limited_responses_counter")
.with_description("Rate limiting responses counter")
.init();

Metrics {
rpc_call_counter,
rpc_call_retries,
Expand Down Expand Up @@ -274,9 +286,11 @@ impl Metrics {
cpu_usage,
memory_total,
memory_used,
rate_limited_entries_counter,
account_names_count,
irn_latency_tracker,
rate_limiting_latency_tracker,
rate_limited_entries_counter,
rate_limited_responses_counter,
}
}
}
Expand Down Expand Up @@ -538,6 +552,22 @@ impl Metrics {
.record(&otel::Context::new(), entry, &[]);
}

pub fn add_rate_limiting_latency(&self, start: SystemTime) {
self.rate_limiting_latency_tracker.record(
&otel::Context::new(),
start
.elapsed()
.unwrap_or(Duration::from_secs(0))
.as_secs_f64(),
&[],
);
}

pub fn add_rate_limited_response(&self) {
self.rate_limited_responses_counter
.add(&otel::Context::new(), 1, &[]);
}

pub fn add_irn_latency(&self, start: SystemTime, operation: OperationType) {
self.irn_latency_tracker.record(
&otel::Context::new(),
Expand Down
38 changes: 30 additions & 8 deletions src/utils/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use {
crate::metrics::Metrics,
chrono::{Duration, Utc},
deadpool_redis::Pool,
moka::future::Cache,
serde::Deserialize,
std::sync::Arc,
std::{sync::Arc, time::SystemTime},
tracing::error,
wc::rate_limit::{token_bucket, RateLimitError, RateLimitExceeded},
};
Expand All @@ -21,18 +22,32 @@ pub struct RateLimit {
max_tokens: u32,
interval: Duration,
refill_rate: u32,
metrics: Arc<Metrics>,
}

impl RateLimit {
pub fn new(
redis_addr: &str,
redis_pool_max_size: usize,
max_tokens: u32,
interval: Duration,
refill_rate: u32,
metrics: Arc<Metrics>,
) -> Option<Self> {
let redis_pool = match deadpool_redis::Config::from_url(redis_addr)
.create_pool(Some(deadpool_redis::Runtime::Tokio1))
{
let redis_builder = deadpool_redis::Config::from_url(redis_addr)
.builder()
.map_err(|e| {
error!(
"Failed to create redis pool builder for rate limiting: {:?}",
e
);
})
.ok()?
.max_size(redis_pool_max_size)
.runtime(deadpool_redis::Runtime::Tokio1)
.build();

let redis_pool = match redis_builder {
Ok(pool) => Arc::new(pool),
Err(e) => {
error!("Failed to create redis pool for rate limiting: {:?}", e);
Expand All @@ -52,6 +67,7 @@ impl RateLimit {
max_tokens,
interval,
refill_rate,
metrics,
})
}

Expand All @@ -67,7 +83,8 @@ impl RateLimit {
ip: &str,
_project_id: Option<&str>,
) -> Result<(), RateLimitExceeded> {
match token_bucket(
let call_start_time = SystemTime::now();
let result = token_bucket(
&self.mem_cache.clone(),
&self.redis_pool.clone(),
self.format_key(endpoint, ip),
Expand All @@ -76,11 +93,16 @@ impl RateLimit {
self.refill_rate,
Utc::now(),
)
.await
{
.await;
self.metrics.add_rate_limiting_latency(call_start_time);

match result {
Ok(_) => Ok(()),
Err(e) => match e {
RateLimitError::RateLimitExceeded(e) => Err(e),
RateLimitError::RateLimitExceeded(e) => {
self.metrics.add_rate_limited_response();
Err(e)
}
RateLimitError::Internal(e) => {
error!("Internal rate limiting error: {:?}", e);
Ok(())
Expand Down
14 changes: 11 additions & 3 deletions terraform/monitoring/dashboard.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,19 @@ dashboard.new(
panels.proxy.latency(ds, vars) { gridPos: pos._2 },
panels.proxy.errors_provider(ds, vars) { gridPos: pos._3 },
panels.proxy.provider_retries(ds, vars) { gridPos: pos._3 },
panels.proxy.rejected_projects(ds, vars) { gridPos: pos._3 },
panels.proxy.quota_limited_projects(ds, vars) { gridPos: pos._3 },
panels.proxy.rate_limited_counter(ds, vars) { gridPos: pos._3 },
panels.proxy.http_codes(ds, vars) { gridPos: pos._3 },

row.new('Projects registry'),
panels.projects.rejected_projects(ds, vars) { gridPos: pos._4 },
panels.projects.quota_limited_projects(ds, vars) { gridPos: pos._4 },
panels.projects.cache_latency(ds, vars) { gridPos: pos._4 },
panels.projects.fetch_latency(ds, vars) { gridPos: pos._4 },

row.new('Rate limiting'),
panels.rate_limiting.counter(ds, vars) { gridPos: pos._3 },
panels.rate_limiting.latency(ds, vars) { gridPos: pos._3 },
panels.rate_limiting.rate_limited(ds, vars) { gridPos: pos._3 },

row.new('History Metrics'),
panels.history.requests(ds, vars) { gridPos: pos_short._3 },
panels.history.latency(ds, vars) { gridPos: pos_short._3 },
Expand Down
29 changes: 27 additions & 2 deletions terraform/monitoring/panels/identity/latency.libsonnet
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;
local panels = grafana.panels;
local targets = grafana.targets;
local alert = grafana.alert;
local alertCondition = grafana.alertCondition;

local _configuration = defaults.configuration.timeseries
.withUnit('s')
Expand All @@ -19,6 +21,29 @@ local _configuration = defaults.configuration.timeseries
)
.configure(_configuration)

.setAlert(vars.environment, alert.new(
namespace = 'Blockchain API',
name = "%s - Identity cache high latency" % vars.environment,
message = "%s - Identity cache high latency" % vars.environment,
period = '5m',
frequency = '1m',
noDataState = 'no_data',
notifications = vars.notifications,
alertRuleTags = {
'og_priority': 'P3',
},
conditions = [
alertCondition.new(
evaluatorParams = [ 100 ],
evaluatorType = 'gt',
operatorType = 'or',
queryRefId = 'CacheLatency',
queryTimeStart = '5m',
reducerType = 'avg',
),
]
))

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(rate(identity_lookup_latency_tracker_sum[$__rate_interval])) / sum(rate(identity_lookup_latency_tracker_count[$__rate_interval]))',
Expand Down
15 changes: 13 additions & 2 deletions terraform/monitoring/panels/panels.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,23 @@ local redis = panels.aws.redis;
errors_non_provider: (import 'proxy/errors_non_provider.libsonnet' ).new,
errors_provider: (import 'proxy/errors_provider.libsonnet' ).new,
provider_retries: (import 'proxy/rpc_retries.libsonnet' ).new,
rejected_projects: (import 'proxy/rejected_projects.libsonnet' ).new,
quota_limited_projects: (import 'proxy/quota_limited_projects.libsonnet').new,
rate_limited_counter: (import 'proxy/rate_limited_counter.libsonnet' ).new,
http_codes: (import 'proxy/http_codes.libsonnet' ).new,
},

projects: {
rejected_projects: (import 'projects/rejected_projects.libsonnet' ).new,
quota_limited_projects: (import 'projects/quota_limited_projects.libsonnet').new,
cache_latency: (import 'projects/cache_latency.libsonnet' ).new,
fetch_latency: (import 'projects/fetch_latency.libsonnet' ).new,
},

rate_limiting: {
counter: (import 'rate_limiting/counter.libsonnet' ).new,
latency: (import 'rate_limiting/latency.libsonnet' ).new,
rate_limited: (import 'rate_limiting/rate_limited.libsonnet').new,
},

redis: {
cpu(ds, vars): redis.cpu.panel(ds.cloudwatch, vars.namespace, vars.environment, vars.notifications, vars.redis_cluster_id),
memory(ds, vars): redis.memory.panel(ds.cloudwatch, vars.namespace, vars.environment, vars.notifications, vars.redis_cluster_id),
Expand Down
49 changes: 49 additions & 0 deletions terraform/monitoring/panels/projects/cache_latency.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;
local alert = grafana.alert;
local alertCondition = grafana.alertCondition;

local _configuration = defaults.configuration.timeseries
.withUnit('ms');

{
new(ds, vars)::
panels.timeseries(
title = 'Cache latency',
datasource = ds.prometheus,
)
.configure(_configuration)

.setAlert(vars.environment, alert.new(
namespace = 'Blockchain API',
name = "%s - ELB High projects registry cache latency" % vars.environment,
message = "%s - ELB High projects registry cache latency" % vars.environment,
period = '5m',
frequency = '1m',
noDataState = 'no_data',
notifications = vars.notifications,
alertRuleTags = {
'og_priority': 'P3',
},
conditions = [
alertCondition.new(
evaluatorParams = [ 1000 ],
evaluatorType = 'gt',
operatorType = 'or',
queryRefId = 'ProjectsRegistryCacheLatency',
queryTimeStart = '5m',
reducerType = 'avg',
),
]
))

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(rate(project_data_local_cache_time_sum[$__rate_interval])) / sum(rate(project_data_local_cache_time_count[$__rate_interval]))',
refId = 'ProjectsRegistryCacheLatency',
legendFormat = 'Cache',
))
}
24 changes: 24 additions & 0 deletions terraform/monitoring/panels/projects/fetch_latency.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

local _configuration = defaults.configuration.timeseries
.withUnit('ms');

{
new(ds, vars)::
panels.timeseries(
title = 'Fetch latency',
datasource = ds.prometheus,
)
.configure(_configuration)

.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(rate(project_data_registry_api_time_sum[$__rate_interval])) / sum(rate(project_data_registry_api_time_count[$__rate_interval]))',
refId = 'ProjectsRegistryFetchLatency',
legendFormat = 'Fetch',
))
}
Loading

0 comments on commit dc9e8c1

Please sign in to comment.