forked from aembke/fred.rs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_redis.rs
116 lines (101 loc) · 3.3 KB
/
_redis.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use crate::{utils, Argv};
use bb8_redis::{
bb8::{self, Pool, PooledConnection},
redis::{cmd, AsyncCommands, ErrorKind as RedisErrorKind, RedisError},
RedisConnectionManager, RedisMultiplexedConnectionManager,
};
use futures::TryStreamExt;
use indicatif::ProgressBar;
use opentelemetry::trace::FutureExt;
use std::{
error::Error,
sync::{atomic::AtomicUsize, Arc},
time::{Duration, SystemTime},
};
use tokio::task::JoinHandle;
async fn incr_key(pool: &Pool<RedisMultiplexedConnectionManager>, key: &str) -> i64 {
let mut conn = pool.get().await.map_err(utils::crash).unwrap();
cmd("INCR")
.arg(key)
.query_async(&mut *conn)
.await
.map_err(utils::crash)
.unwrap()
}
async fn del_key(pool: &Pool<RedisMultiplexedConnectionManager>, key: &str) -> i64 {
let mut conn = pool.get().await.map_err(utils::crash).unwrap();
cmd("DEL")
.arg(key)
.query_async(&mut *conn)
.await
.map_err(utils::crash)
.unwrap()
}
fn spawn_client_task(
bar: &Option<ProgressBar>,
pool: &Pool<RedisMultiplexedConnectionManager>,
counter: &Arc<AtomicUsize>,
argv: &Arc<Argv>,
) -> JoinHandle<()> {
let (bar, pool, counter, argv) = (bar.clone(), pool.clone(), counter.clone(), argv.clone());
tokio::spawn(async move {
let key = utils::random_string(15);
let mut expected = 0;
while utils::incr_atomic(&counter) < argv.count {
expected += 1;
let actual = incr_key(&pool, &key).await;
#[cfg(feature = "assert-expected")]
{
if actual != expected {
println!("Unexpected result: {} == {}", actual, expected);
std::process::exit(1);
}
}
if let Some(ref bar) = bar {
bar.inc(1);
}
}
})
}
// TODO support clustered deployments
async fn init(argv: &Arc<Argv>) -> Pool<RedisMultiplexedConnectionManager> {
let (username, password) = utils::read_auth_env();
let url = if let Some(password) = password {
let username = username.map(|s| format!("{s}:")).unwrap_or("".into());
format!("redis://{}{}@{}:{}", username, password, argv.host, argv.port)
} else {
format!("redis://{}:{}", argv.host, argv.port)
};
debug!("Redis conn: {}", url);
let manager = RedisMultiplexedConnectionManager::new(url).expect("Failed to create redis connection manager");
let pool = bb8::Pool::builder()
.max_size(argv.pool as u32)
.build(manager)
.await
.expect("Failed to create client pool");
// try to warm up the pool first
let mut warmup_ft = Vec::with_capacity(argv.pool + 1);
for _ in 0..argv.pool + 1 {
warmup_ft.push(async { incr_key(&pool, "foo").await });
}
futures::future::join_all(warmup_ft).await;
del_key(&pool, "foo").await;
pool
}
pub async fn run(argv: Arc<Argv>, counter: Arc<AtomicUsize>, bar: Option<ProgressBar>) -> Duration {
info!("Running with redis-rs");
if argv.cluster || argv.replicas {
panic!("Cluster or replica features are not supported yet with redis-rs benchmarks.");
}
let pool = init(&argv).await;
let mut tasks = Vec::with_capacity(argv.tasks);
info!("Starting commands...");
let started = SystemTime::now();
for _ in 0..argv.tasks {
tasks.push(spawn_client_task(&bar, &pool, &counter, &argv));
}
futures::future::join_all(tasks).await;
SystemTime::now()
.duration_since(started)
.expect("Failed to calculate duration")
}