Skip to content

Commit

Permalink
feat: add slo tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fatalem0 committed Jan 21, 2025
1 parent c882b4c commit 2227063
Show file tree
Hide file tree
Showing 12 changed files with 1,138 additions and 82 deletions.
456 changes: 374 additions & 82 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ members = [
"ydb-grpc",
# "ydb-grpc-helpers",
"ydb-example-urlshortener",
"ydb-slo-tests",
]
25 changes: 25 additions & 0 deletions ydb-slo-tests/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
publish = false
name = "ydb-slo-tests"
version = "0.1.0"
authors = ["fatalem0 <[email protected]>"]
edition = "2021"
license = "Apache-2.0"
description = "Crate contains SLO-tests for YDB"
repository = "https://github.com/ydb-platform/ydb-rs-sdk/tree/master/ydb-slo-tests"
rust-version = "1.74"

[dependencies]
base64 = { version = "0.22.1" }
rand = { version = "0.8.5" }
clap = { version = "4.5.27", features = ["derive"] }
rand_core = { version = "0.6.4" }
ratelimit = { version = "0.10.0" }
tokio = { version = "1.22" }
tokio-util = { version = "0.7.13", features = ["rt"] }
ydb = { version = "0.9.4", path = "../ydb" }
async-trait = "0.1"

[[example]]
name = "native"
path = "examples/native/native.rs"
205 changes: 205 additions & 0 deletions ydb-slo-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# SLO workload

SLO is the type of test where app based on ydb-sdk is tested against falling YDB cluster nodes, tablets, network
(that is possible situations for distributed DBs with hundreds of nodes)

### Usage:

It has 3 commands:

- `create` - creates table in database
- `cleanup` - drops table in database
- `run` - runs workload (read and write to table with sets RPS)

[//]: # (### Run examples with all arguments:)

[//]: # ()
[//]: # (create:)

[//]: # (`python tests/slo/src/ create localhost:2136 /local -t tableName)

[//]: # (--min-partitions-count 6 --max-partitions-count 1000 --partition-size 1 -с 1000)

[//]: # (--write-timeout 10000`)

[//]: # ()
[//]: # (cleanup:)

[//]: # (`python tests/slo/src/ cleanup localhost:2136 /local -t tableName`)

[//]: # ()
[//]: # (run:)

[//]: # (`python tests/slo/src/ run localhost:2136 /local -t tableName)

[//]: # (--prom-pgw http://prometheus-pushgateway:9091 -report-period 250)

[//]: # (--read-rps 1000 --read-timeout 10000)

[//]: # (--write-rps 100 --write-timeout 10000)

[//]: # (--time 600 --shutdown-time 30`)

[//]: # ()
[//]: # (## Arguments for commands:)

[//]: # ()
[//]: # (### create)

[//]: # (`python tests/slo/src/ create <endpoint> <db> [options]`)

[//]: # ()
[//]: # (```)

[//]: # (Arguments:)

[//]: # ( endpoint YDB endpoint to connect to)

[//]: # ( db YDB database to connect to)

[//]: # ()
[//]: # (Options:)

[//]: # ( -t --table-name <string> table name to create)

[//]: # ()
[//]: # ( -p-min --min-partitions-count <int> minimum amount of partitions in table)

[//]: # ( -p-max --max-partitions-count <int> maximum amount of partitions in table)

[//]: # ( -p-size --partition-size <int> partition size in mb)

[//]: # ()
[//]: # ( -c --initial-data-count <int> amount of initially created rows)

[//]: # ()
[//]: # ( --write-timeout <int> write timeout milliseconds)

[//]: # ()
[//]: # ( --batch-size <int> amount of new records in each create request)

[//]: # ( --threads <int> number of threads to use)

[//]: # ()
[//]: # (```)

[//]: # ()
[//]: # (### cleanup)

[//]: # (`python tests/slo/src/ cleanup <endpoint> <db> [options]`)

[//]: # ()
[//]: # (```)

[//]: # (Arguments:)

[//]: # ( endpoint YDB endpoint to connect to)

[//]: # ( db YDB database to connect to)

[//]: # ()
[//]: # (Options:)

[//]: # ( -t --table-name <string> table name to create)

[//]: # (```)

[//]: # ()
[//]: # (### run)

[//]: # (`python tests/slo/src/ run <endpoint> <db> [options]`)

[//]: # ()
[//]: # (```)

[//]: # (Arguments:)

[//]: # ( endpoint YDB endpoint to connect to)

[//]: # ( db YDB database to connect to)

[//]: # ()
[//]: # (Options:)

[//]: # ( -t --table-name <string> table name to create)

[//]: # ()
[//]: # ( --prom-pgw <string> prometheus push gateway)

[//]: # ( --report-period <int> prometheus push period in milliseconds)

[//]: # ()
[//]: # ( --read-rps <int> read RPS)

[//]: # ( --read-timeout <int> read timeout milliseconds)

[//]: # ()
[//]: # ( --write-rps <int> write RPS)

[//]: # ( --write-timeout <int> write timeout milliseconds)

[//]: # ()
[//]: # ( --time <int> run time in seconds)

[//]: # ( --shutdown-time <int> graceful shutdown time in seconds)

[//]: # ()
[//]: # ( --read-threads <int> number of threads to use for write requests)

[//]: # ( --write-threads <int> number of threads to use for read requests)

[//]: # (```)

[//]: # ()
[//]: # (## Authentication)

[//]: # ()
[//]: # (Workload using [auth-env]&#40;https://ydb.yandex-team.ru/docs/reference/ydb-sdk/recipes/auth-env&#41; for authentication.)

[//]: # ()
[//]: # (## What's inside)

[//]: # (When running `run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`.)

[//]: # ()
[//]: # (- `readJob` reads rows from the table one by one with random identifiers generated by writeJob)

[//]: # (- `writeJob` generates and inserts rows)

[//]: # (- `metricsJob` periodically sends metrics to Prometheus)

[//]: # ()
[//]: # (Table have these fields:)

[//]: # (- `object_id Uint64`)

[//]: # (- `object_hash Uint64 Digest::NumericHash&#40;id&#41;`)

[//]: # (- `payload_str UTF8`)

[//]: # (- `payload_double Double`)

[//]: # (- `payload_timestamp Timestamp`)

[//]: # ()
[//]: # (Primary key: `&#40;"object_hash", "object_id"&#41;`)

[//]: # ()
[//]: # (## Collected metrics)

[//]: # (- `oks` - amount of OK requests)

[//]: # (- `not_oks` - amount of not OK requests)

[//]: # (- `inflight` - amount of requests in flight)

[//]: # (- `latency` - summary of latencies in ms)

[//]: # (- `attempts` - summary of amount for request)

[//]: # ()
[//]: # (> You must reset metrics to keep them `0` in prometheus and grafana before beginning and after ending of jobs)

[//]: # ()
[//]: # (## Look at metrics in grafana)
[//]: # (You can get dashboard used in that test [here]&#40;https://github.com/ydb-platform/slo-tests/blob/main/k8s/helms/grafana.yaml#L69&#41; - you will need to import json into grafana.)
121 changes: 121 additions & 0 deletions ydb-slo-tests/examples/native/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use async_trait::async_trait;
use ydb::{
ydb_params, ClientBuilder, Query, Row, TableClient, YdbResult, YdbResultWithCustomerErr,
};
use ydb_slo_tests::cli::SloTestsCli;
use ydb_slo_tests::row::{RowID, TestRow};
use ydb_slo_tests::workers::ReadWriter;

#[derive(Clone)]
pub struct Database {
db_table_client: TableClient,
cli_args: SloTestsCli,
}

impl Database {
pub async fn new(cli: SloTestsCli) -> YdbResult<Self> {
let client = ClientBuilder::new_from_connection_string(&cli.endpoint)?
.with_database(&cli.db)
.client()?;

client.wait().await?;

let table_client = client.table_client();

Ok(Self {
db_table_client: table_client,
cli_args: cli,
})
}

pub async fn create_table(&self) -> YdbResult<()> {
self.db_table_client
.retry_execute_scheme_query(format!(
"CREATE TABLE IF NOT EXISTS {table}
(
hash Uint64?,
id Uint64?,
payload_str Text?,
payload_double Double?,
payload_timestamp Timestamp?,
payload_hash Uint64?,
PRIMARY KEY (hash, id)
)",
table = self.cli_args.table_name,
))
.await
}

pub async fn drop_table(&self) -> YdbResult<()> {
self.db_table_client
.retry_execute_scheme_query(format!("DROP TABLE {}", self.cli_args.table_name))
.await
}
}

#[async_trait]
impl ReadWriter for Database {
async fn read(&self, row_id: RowID) -> YdbResult<Row> {
let query = Query::from(format!(
r#"
DECLARE $id AS Uint64;
SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
FROM {table}
WHERE id = $id AND hash = Digest::NumericHash($id);
"#,
table = self.cli_args.table_name
))
.with_params(ydb_params!("$id" => row_id));

self.db_table_client
.retry_transaction(|t| async {
let mut t = t;
let res = t.query(query.clone()).await?;
Ok(res)
})
.await?
.into_only_row()
}

async fn write(&self, row: TestRow) -> YdbResultWithCustomerErr<()> {
let query = Query::from(format!(
r#"
DECLARE $id AS Uint64;
DECLARE $payload_str AS Utf8;
DECLARE $payload_double AS Double;
DECLARE $payload_timestamp AS Timestamp;
UPSERT INTO {table} (
id,
hash,
payload_str,
payload_double,
payload_timestamp
) VALUES (
$id,
Digest::NumericHash($id),
$payload_str,
$payload_double,
$payload_timestamp
);
"#,
table = &self.cli_args.table_name,
))
.with_params(ydb_params!(
"$id" => row.id,
"$payload_str" => row.payload_str,
"$payload_double" => row.payload_double,
"$payload_timestamp" => row.payload_timestamp, //TODO: поменять
));

self.db_table_client
.retry_transaction(|t| async {
let mut t = t;
t.query(query.clone()).await?;
t.commit().await?;
Ok(())
})
.await
}
}
Loading

0 comments on commit 2227063

Please sign in to comment.