From 8a14d84ec5fe18616d8f144662b7e7101b010cdd Mon Sep 17 00:00:00 2001 From: fatalem0 Date: Wed, 18 Dec 2024 23:03:14 +0300 Subject: [PATCH] feat: add slo tests --- Cargo.lock | 442 ++++++++++++++++++++---- Cargo.toml | 1 + ydb-slo-tests/Cargo.toml | 26 ++ ydb-slo-tests/README.md | 205 +++++++++++ ydb-slo-tests/examples/native/db.rs | 393 +++++++++++++++++++++ ydb-slo-tests/examples/native/native.rs | 170 +++++++++ ydb-slo-tests/src/args.rs | 55 +++ ydb-slo-tests/src/cli.rs | 51 +++ ydb-slo-tests/src/generator.rs | 55 +++ ydb-slo-tests/src/lib.rs | 5 + ydb-slo-tests/src/row.rs | 33 ++ ydb-slo-tests/src/workers.rs | 171 +++++++++ 12 files changed, 1536 insertions(+), 71 deletions(-) create mode 100644 ydb-slo-tests/Cargo.toml create mode 100644 ydb-slo-tests/README.md create mode 100644 ydb-slo-tests/examples/native/db.rs create mode 100644 ydb-slo-tests/examples/native/native.rs create mode 100644 ydb-slo-tests/src/args.rs create mode 100644 ydb-slo-tests/src/cli.rs create mode 100644 ydb-slo-tests/src/generator.rs create mode 100644 ydb-slo-tests/src/lib.rs create mode 100644 ydb-slo-tests/src/row.rs create mode 100644 ydb-slo-tests/src/workers.rs diff --git a/Cargo.lock b/Cargo.lock index 992b87bb..cad67a9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "0.7.18" @@ -20,6 +35,55 @@ dependencies = [ "winapi", ] +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys 0.59.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" +dependencies = [ + "anstyle", + "windows-sys 0.59.0", +] + [[package]] name = "anyhow" version = "1.0.53" @@ -115,6 +179,21 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backtrace" +version = "0.3.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.12.3" @@ -127,6 +206,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "1.3.2" @@ -172,9 +257,12 @@ checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" [[package]] name = "cc" -version = "1.0.90" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" +checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" +dependencies = [ + "shlex", +] [[package]] name = "cfg-if" @@ -192,10 +280,67 @@ dependencies = [ "num-integer", "num-traits", "serde", - "time", + "time 0.1.45", "winapi", ] +[[package]] +name = "clap" +version = "4.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8eb5e908ef3a6efbe1ed62520fb7287959888c88485abe072543190ecc66783" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b01801b5fc6a0a232407abc821660c9c6d25a1cafc0d4f85f29fb8d9afc121" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.1", +] + +[[package]] +name = "clap_derive" +version = "4.5.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.43", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + +[[package]] +name = "clocksource" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "129026dd5a8a9592d96916258f3a5379589e513ea5e86aeb0bd2530286e44e9e" +dependencies = [ + "libc", + "time 0.3.37", + "winapi", +] + +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + [[package]] name = "core-foundation" version = "0.9.3" @@ -251,7 +396,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn 1.0.109", ] @@ -276,6 +421,15 @@ dependencies = [ "stack-buf", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "derivative" version = "2.2.0" @@ -493,6 +647,12 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" + [[package]] name = "h2" version = "0.3.11" @@ -518,6 +678,12 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashers" version = "1.0.1" @@ -567,14 +733,17 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" -version = "0.1.19" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "http" @@ -633,7 +802,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.4", "tokio", "tower-service", "tracing", @@ -689,7 +858,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -707,6 +876,12 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itertools" version = "0.10.3" @@ -818,16 +993,25 @@ dependencies = [ "unicase", ] +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + [[package]] name = "mio" -version = "0.8.6" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi", "libc", - "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys 0.45.0", + "windows-sys 0.52.0", ] [[package]] @@ -973,6 +1157,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.44" @@ -1016,13 +1206,12 @@ dependencies = [ ] [[package]] -name = "num_cpus" -version = "1.13.1" +name = "object" +version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ - "hermit-abi", - "libc", + "memchr", ] [[package]] @@ -1039,9 +1228,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "parking_lot" -version = "0.12.0" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core", @@ -1156,6 +1345,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -1212,9 +1407,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.68" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -1329,6 +1524,17 @@ dependencies = [ "rand_core", ] +[[package]] +name = "ratelimit" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36ea961700fd7260e7fa3701c8287d901b2172c51f9c1421fa0f21d7f7e184b7" +dependencies = [ + "clocksource", + "parking_lot", + "thiserror", +] + [[package]] name = "redox_syscall" version = "0.2.10" @@ -1426,6 +1632,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + [[package]] name = "rustls" version = "0.20.4" @@ -1616,6 +1828,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1658,6 +1876,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "spin" version = "0.5.2" @@ -1676,6 +1904,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum" version = "0.21.0" @@ -1747,22 +1981,22 @@ checksum = "507e9898683b6c43a9aa55b64259b721b52ba226e0f3779137e50ad114a4c90b" [[package]] name = "thiserror" -version = "1.0.30" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +checksum = "6e3de26b0965292219b4287ff031fcba86837900fe9cd2b34ea8ad893c0953d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.30" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +checksum = "268026685b2be38d7103e9e507c938a1fcb3d7e6eb15e87870b617bf37b6d581" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.43", ] [[package]] @@ -1785,6 +2019,37 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.5.1" @@ -1802,22 +2067,20 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.22.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ - "autocfg", + "backtrace", "bytes", "libc", - "memchr", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.8", "tokio-macros", - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -1832,13 +2095,13 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.8.2" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.43", ] [[package]] @@ -1891,16 +2154,17 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -1939,7 +2203,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", - "tokio-util 0.7.8", + "tokio-util 0.7.13", "tower", "tower-layer", "tower-service", @@ -1975,7 +2239,7 @@ dependencies = [ "rand", "slab", "tokio", - "tokio-util 0.7.8", + "tokio-util 0.7.13", "tower-layer", "tower-service", "tracing", @@ -2208,6 +2472,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.0.0" @@ -2276,7 +2546,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite", - "tokio-util 0.7.8", + "tokio-util 0.7.13", "tower-service", "tracing", ] @@ -2445,33 +2715,43 @@ dependencies = [ [[package]] name = "windows-sys" -version = "0.45.0" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ "windows-targets", ] [[package]] name = "windows-targets" -version = "0.42.2" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ "windows_aarch64_gnullvm", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm", - "windows_x86_64_msvc 0.42.2", + "windows_x86_64_msvc 0.52.6", ] [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.2" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -2481,9 +2761,9 @@ checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" [[package]] name = "windows_aarch64_msvc" -version = "0.42.2" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -2493,9 +2773,15 @@ checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" [[package]] name = "windows_i686_gnu" -version = "0.42.2" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -2505,9 +2791,9 @@ checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" [[package]] name = "windows_i686_msvc" -version = "0.42.2" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -2517,15 +2803,15 @@ checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" [[package]] name = "windows_x86_64_gnu" -version = "0.42.2" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.2" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -2535,9 +2821,9 @@ checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" [[package]] name = "windows_x86_64_msvc" -version = "0.42.2" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winreg" @@ -2550,7 +2836,7 @@ dependencies = [ [[package]] name = "ydb" -version = "0.9.3" +version = "0.9.4" dependencies = [ "async-trait", "async_once", @@ -2578,7 +2864,7 @@ dependencies = [ "strum", "tokio", "tokio-stream", - "tokio-util 0.7.8", + "tokio-util 0.7.13", "tonic", "tower", "tracing", @@ -2606,7 +2892,7 @@ dependencies = [ [[package]] name = "ydb-grpc" -version = "0.0.14" +version = "0.1.0" dependencies = [ "pbjson", "pbjson-build", @@ -2620,6 +2906,20 @@ dependencies = [ "walkdir", ] +[[package]] +name = "ydb-slo-tests" +version = "0.1.0" +dependencies = [ + "base64 0.22.1", + "clap", + "rand", + "rand_core", + "ratelimit", + "tokio", + "tokio-util 0.7.13", + "ydb", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index f924550b..448ecabd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,4 +5,5 @@ members = [ "ydb-grpc", # "ydb-grpc-helpers", "ydb-example-urlshortener", + "ydb-slo-tests", ] diff --git a/ydb-slo-tests/Cargo.toml b/ydb-slo-tests/Cargo.toml new file mode 100644 index 00000000..6d44cfed --- /dev/null +++ b/ydb-slo-tests/Cargo.toml @@ -0,0 +1,26 @@ +[package] +publish = false +name = "ydb-slo-tests" +version = "0.1.0" +authors = ["fatalem0 "] +edition = "2021" +license = "Apache-2.0" +description = "Crate contains SLO-tests for YDB" +repository = "https://github.com/ydb-platform/ydb-rs-sdk/tree/master/ydb-slo-tests" +rust-version = "1.60" + +[dependencies] +base64 = { version = "0.22.1" } +rand = { version = "0.8.0" } +clap = { version = "4.5.26", features = ["derive"] } +rand_core = { version = "0.6.3" } + +[dev-dependencies] +ratelimit = { version = "0.10.0" } +tokio-util = { version = "0.7.8", features = ["rt"] } +tokio = { version = "1.22.0" } +ydb = { version = "0.9.4", path="../ydb"} + +[[example]] +name = "native" +path = "examples/native/native.rs" \ No newline at end of file diff --git a/ydb-slo-tests/README.md b/ydb-slo-tests/README.md new file mode 100644 index 00000000..2cb48888 --- /dev/null +++ b/ydb-slo-tests/README.md @@ -0,0 +1,205 @@ +# SLO workload + +SLO is the type of test where app based on ydb-sdk is tested against falling YDB cluster nodes, tablets, network +(that is possible situations for distributed DBs with hundreds of nodes) + +### Usage: + +It has 3 commands: + +- `create` - creates table in database +- `cleanup` - drops table in database +- `run` - runs workload (read and write to table with sets RPS) + +[//]: # (### Run examples with all arguments:) + +[//]: # () +[//]: # (create:) + +[//]: # (`python tests/slo/src/ create localhost:2136 /local -t tableName) + +[//]: # (--min-partitions-count 6 --max-partitions-count 1000 --partition-size 1 -с 1000) + +[//]: # (--write-timeout 10000`) + +[//]: # () +[//]: # (cleanup:) + +[//]: # (`python tests/slo/src/ cleanup localhost:2136 /local -t tableName`) + +[//]: # () +[//]: # (run:) + +[//]: # (`python tests/slo/src/ run localhost:2136 /local -t tableName) + +[//]: # (--prom-pgw http://prometheus-pushgateway:9091 -report-period 250) + +[//]: # (--read-rps 1000 --read-timeout 10000) + +[//]: # (--write-rps 100 --write-timeout 10000) + +[//]: # (--time 600 --shutdown-time 30`) + +[//]: # () +[//]: # (## Arguments for commands:) + +[//]: # () +[//]: # (### create) + +[//]: # (`python tests/slo/src/ create [options]`) + +[//]: # () +[//]: # (```) + +[//]: # (Arguments:) + +[//]: # ( endpoint YDB endpoint to connect to) + +[//]: # ( db YDB database to connect to) + +[//]: # () +[//]: # (Options:) + +[//]: # ( -t --table-name table name to create) + +[//]: # () +[//]: # ( -p-min --min-partitions-count minimum amount of partitions in table) + +[//]: # ( -p-max --max-partitions-count maximum amount of partitions in table) + +[//]: # ( -p-size --partition-size partition size in mb) + +[//]: # () +[//]: # ( -c --initial-data-count amount of initially created rows) + +[//]: # () +[//]: # ( --write-timeout write timeout milliseconds) + +[//]: # () +[//]: # ( --batch-size amount of new records in each create request) + +[//]: # ( --threads number of threads to use) + +[//]: # () +[//]: # (```) + +[//]: # () +[//]: # (### cleanup) + +[//]: # (`python tests/slo/src/ cleanup [options]`) + +[//]: # () +[//]: # (```) + +[//]: # (Arguments:) + +[//]: # ( endpoint YDB endpoint to connect to) + +[//]: # ( db YDB database to connect to) + +[//]: # () +[//]: # (Options:) + +[//]: # ( -t --table-name table name to create) + +[//]: # (```) + +[//]: # () +[//]: # (### run) + +[//]: # (`python tests/slo/src/ run [options]`) + +[//]: # () +[//]: # (```) + +[//]: # (Arguments:) + +[//]: # ( endpoint YDB endpoint to connect to) + +[//]: # ( db YDB database to connect to) + +[//]: # () +[//]: # (Options:) + +[//]: # ( -t --table-name table name to create) + +[//]: # () +[//]: # ( --prom-pgw prometheus push gateway) + +[//]: # ( --report-period prometheus push period in milliseconds) + +[//]: # () +[//]: # ( --read-rps read RPS) + +[//]: # ( --read-timeout read timeout milliseconds) + +[//]: # () +[//]: # ( --write-rps write RPS) + +[//]: # ( --write-timeout write timeout milliseconds) + +[//]: # () +[//]: # ( --time run time in seconds) + +[//]: # ( --shutdown-time graceful shutdown time in seconds) + +[//]: # () +[//]: # ( --read-threads number of threads to use for write requests) + +[//]: # ( --write-threads number of threads to use for read requests) + +[//]: # (```) + +[//]: # () +[//]: # (## Authentication) + +[//]: # () +[//]: # (Workload using [auth-env](https://ydb.yandex-team.ru/docs/reference/ydb-sdk/recipes/auth-env) for authentication.) + +[//]: # () +[//]: # (## What's inside) + +[//]: # (When running `run` command, the program creates three jobs: `readJob`, `writeJob`, `metricsJob`.) + +[//]: # () +[//]: # (- `readJob` reads rows from the table one by one with random identifiers generated by writeJob) + +[//]: # (- `writeJob` generates and inserts rows) + +[//]: # (- `metricsJob` periodically sends metrics to Prometheus) + +[//]: # () +[//]: # (Table have these fields:) + +[//]: # (- `object_id Uint64`) + +[//]: # (- `object_hash Uint64 Digest::NumericHash(id)`) + +[//]: # (- `payload_str UTF8`) + +[//]: # (- `payload_double Double`) + +[//]: # (- `payload_timestamp Timestamp`) + +[//]: # () +[//]: # (Primary key: `("object_hash", "object_id")`) + +[//]: # () +[//]: # (## Collected metrics) + +[//]: # (- `oks` - amount of OK requests) + +[//]: # (- `not_oks` - amount of not OK requests) + +[//]: # (- `inflight` - amount of requests in flight) + +[//]: # (- `latency` - summary of latencies in ms) + +[//]: # (- `attempts` - summary of amount for request) + +[//]: # () +[//]: # (> You must reset metrics to keep them `0` in prometheus and grafana before beginning and after ending of jobs) + +[//]: # () +[//]: # (## Look at metrics in grafana) +[//]: # (You can get dashboard used in that test [here](https://github.com/ydb-platform/slo-tests/blob/main/k8s/helms/grafana.yaml#L69) - you will need to import json into grafana.) \ No newline at end of file diff --git a/ydb-slo-tests/examples/native/db.rs b/ydb-slo-tests/examples/native/db.rs new file mode 100644 index 00000000..4cfb8f3f --- /dev/null +++ b/ydb-slo-tests/examples/native/db.rs @@ -0,0 +1,393 @@ +use ydb::{ydb_params, ClientBuilder, Query, TableClient, YdbResult}; +use ydb_slo_tests::cli::SloTestsCli; +use ydb_slo_tests::row::{Row, RowID}; + +// pub struct Storage { +// client: Client, +// cfg: Config, +// prefix: String, +// upsert_query: Query, +// select_query: String, +// // retry_budget: Budget, // Нужно определить, как это будет работать в Rust +// } +// +// impl Storage { +// pub async fn new( +// // ctx: &tokio::task::Context<'_>, +// cli: &SloTestsCli, +// pool_size: usize, +// ) -> Result> { +// // let timeout_duration = Duration::from_secs(5 * 60); // 5 minutes +// // let ctx = ctx.clone(); +// // let cancellation = ctx.deadline().clone(); +// // +// // let retry_budget = budget::Limited::new((pool_size as f64 * 0.1) as usize); +// +// let client = ydb::ClientBuilder::new_from_connection_string(cli.endpoint)? +// .with_database(cli.db) +// .client()?; +// +// client.wait().await?; +// +// let prefix = format!("{}/{}", db.name(), label); +// +// let upsert_query = Query::new(format!( +// r#" +// PRAGMA TablePathPrefix("{}"); +// +// DECLARE $id AS Uint64; +// DECLARE $payload_str AS Utf8; +// DECLARE $payload_double AS Double; +// DECLARE $payload_timestamp AS Timestamp; +// +// UPSERT INTO `{}` ( +// id, hash, payload_str, payload_double, payload_timestamp +// ) VALUES ( +// $id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp +// ); +// "#, +// prefix, cli.table_name +// )); +// +// let select_query = format!(SELECT_TEMPLATE, prefix, cli.table_name); +// +// Ok(Storage { +// db: Arc::new(db), +// cfg, +// prefix, +// upsert_query, +// select_query, +// }) +// } +// +// pub async fn read( +// ctx: &tokio::task::Context<'_>, +// storage: &Storage, +// entry_id: u64, +// ) -> Result<(Row, usize), StorageError> { +// let timeout_duration = Duration::from_millis(storage.cfg.read_timeout as u64); +// let ctx = ctx.clone(); +// let cancellation = ctx.deadline().clone(); +// +// let session = storage.db.table().create_session().await?; +// +// let tx_control = TransactionControl::new() +// .with_serializable_read_write() +// .with_commit(); +// +// let query_params = QueryParameters::new().with_value("$id", ydb::Value::Uint64(entry_id)); +// +// let result = session +// .execute(ctx, tx_control, &storage.select_query, query_params) +// .await?; +// +// let mut attempts = 0; +// // Здесь нужно обработать трейс, чтобы получить количество попыток +// // Предположим, что это делается через трейсинг, который нужно настроить отдельно +// +// let mut rows = result.rows().await?; +// if !rows.next().await? { +// return Err(StorageError::EntryNotFound(entry_id)); +// } +// +// let row = rows.current().unwrap(); +// +// let mut rows = result.rows().await?; +// if !rows.next().await? { +// return Err(StorageError::EntryNotFound(entry_id)); +// } +// +// let row = rows.current().unwrap(); +// +// let id = row +// .get("id") +// .and_then(|cell| cell.as_uint64()) +// .ok_or_else(|| StorageError::Other("id not found".to_string()))?; +// let payload_str = row +// .get("payload_str") +// .and_then(|cell| cell.as_utf8().map(String::from)); +// let payload_double = row.get("payload_double").and_then(|cell| cell.as_double()); +// let payload_timestamp = row +// .get("payload_timestamp") +// .and_then(|cell| cell.as_timestamp().map(DateTime::::from)); +// +// let e = generator::Row { +// id, +// payload_str, +// payload_double, +// payload_timestamp, +// }; +// +// Ok((e, attempts)) +// } +// +// pub async fn write( +// ctx: &tokio::task::Context<'_>, +// table_client: &TableClient, +// storage: &Storage, +// e: Row, +// ) -> Result<(usize, ()), StorageError> { +// table_client +// .retry_transaction(|tx| async { +// let mut tx = tx; // move tx lifetime into code block +// +// tx.query( +// ydb::Query::from( +// "DECLARE $hash as Utf8; +// DECLARE $src as Utf8; +// +// REPLACE INTO +// urls (hash, src) +// VALUES +// ($hash, $src); +// ", +// ) +// .with_params(ydb_params!("$hash" => hash.clone(), "$src" => long.clone())), +// ) +// .await?; +// tx.commit().await?; +// Ok(()) +// }) +// .await?; +// +// let row = ydb_params!( +// "id" => 1_i64, +// "payload_str" => "test", +// "payload_double" => "", +// "payload_timestamp" => "", +// ); +// +// let list = Value::list_from(example, rows)?; +// +// let query = Query::new( +// "DECLARE $list AS List>; +// +// UPSERT INTO test +// SELECT * FROM AS_TABLE($list) +// ", +// ) +// .with_params(ydb_params!("$list" => list)); +// +// table_client +// .retry_transaction(|tx| async { +// let mut tx = tx; // move tx lifetime into code block +// +// tx.query( +// ydb::Query::from( +// "DECLARE $hash as Utf8; +// DECLARE $src as Utf8; +// +// REPLACE INTO +// urls (hash, src) +// VALUES +// ($hash, $src); +// ", +// ) +// .with_params(ydb_params!("$hash" => hash.clone(), "$src" => long.clone())), +// ) +// .await?; +// tx.commit().await?; +// Ok(()) +// }) +// .await?; +// +// let query_params = QueryParameters::new() +// .with_value("$id", ydb::Value::Uint64(e.id)) +// .with_value( +// "$payload_str", +// ydb::Value::Utf8(e.payload_str.unwrap_or_default()), +// ) +// .with_value( +// "$payload_double", +// ydb::Value::Double(e.payload_double.unwrap_or_default()), +// ) +// .with_value( +// "$payload_timestamp", +// ydb::Value::Timestamp(ydb::Timestamp::from_datetime( +// e.payload_timestamp.unwrap_or_default(), +// )), +// ); +// +// let result = session +// .execute(ctx, tx_control, &storage.upsert_query, query_params) +// .await?; +// +// Ok((attempts, ())) +// } +// +// async fn create_table( +// ctx: &tokio::task::Context<'_>, +// storage: &Storage, +// ) -> Result<(), StorageError> { +// let timeout_duration = Duration::from_millis(storage.cfg.write_timeout as u64); +// let ctx = ctx.clone(); +// let cancellation = ctx.deadline().clone(); +// +// let session = storage.db.table().create_session().await?; +// +// let table_path = format!("{}/{}", storage.prefix, storage.cfg.table); +// +// let column_defs = vec![ +// ydb::table::ColumnDefinition::new("hash", ydb::types::Type::Uint64).with_optional(), +// ydb::table::ColumnDefinition::new("id", ydb::types::Type::Uint64).with_optional(), +// ydb::table::ColumnDefinition::new("payload_str", ydb::types::Type::Utf8) +// .with_optional(), +// ydb::table::ColumnDefinition::new("payload_double", ydb::types::Type::Double) +// .with_optional(), +// ydb::table::ColumnDefinition::new("payload_timestamp", ydb::types::Type::Timestamp) +// .with_optional(), +// ydb::table::ColumnDefinition::new("payload_hash", ydb::types::Type::Uint64) +// .with_optional(), +// ]; +// +// let primary_key = ydb::table::PrimaryKey::new(vec!["hash", "id"]); +// +// let partitioning = ydb::table::PartitioningSettings::new() +// .with_partitioning_by_size(true) +// .with_partition_size_mb(storage.cfg.partition_size) +// .with_min_partitions_count(storage.cfg.min_partitions_count) +// .with_max_partitions_count(storage.cfg.max_partitions_count); +// +// let partitions = ydb::table::Partitions::uniform(storage.cfg.min_partitions_count); +// +// session +// .create_table( +// ctx, +// &table_path, +// column_defs, +// primary_key, +// partitioning, +// partitions, +// ) +// .await?; +// +// Ok(()) +// } +// +// fn drop_table(table_client: &Storage) { +// let _ = table_client +// .retry_execute_scheme_query("DROP TABLE test") +// .await; // ignore drop error +// } +// } + +#[derive(Clone)] +pub struct Db { + db_table_client: TableClient, + pub(crate) cli_args: SloTestsCli, + upsert_query: Query, + select_query: Query, +} + +impl Db { + pub async fn new(cli: SloTestsCli) -> YdbResult { + let client = ClientBuilder::new_from_connection_string(&cli.endpoint)? + .with_database(&cli.db) + .client()?; + + client.wait().await?; + + let table_client = client.table_client(); + + let upsert_query = Query::from(format!( + r#" +PRAGMA TablePathPrefix("{}"); + +DECLARE $id AS Uint64; +DECLARE $payload_str AS Utf8; +DECLARE $payload_double AS Double; +DECLARE $payload_timestamp AS Timestamp; + +UPSERT INTO `{}` ( + id, hash, payload_str, payload_double, payload_timestamp +) VALUES ( + $id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp +); +"#, + &cli.db, &cli.table_name + )); + + let select_query = Query::from(format!( + r#" +PRAGMA TablePathPrefix("{}"); + +DECLARE $id AS Uint64; +SELECT id, payload_str, payload_double, payload_timestamp, payload_hash +FROM `{}` WHERE id = $id AND hash = Digest::NumericHash($id); +"#, + &cli.db, &cli.table_name + )); + + Ok(Self { + db_table_client: table_client, + cli_args: cli, + upsert_query: upsert_query.clone(), + select_query: select_query.clone(), + }) + } + + pub async fn create_table(&self) -> YdbResult<()> { + self.db_table_client + .retry_execute_scheme_query(format!( + "CREATE TABLE {} + ( + hash Uint64 NOT NULL, + id Uint64 NOT NULL, + payload_str Utf8, + payload_double Double, + payload_timestamp Timestamp, + payload_hash Uint64, + PRIMARY KEY (hash, id) + )", + self.cli_args.table_name + )) + .await + } + + pub async fn read_row_by_id(&self, row_id: RowID) -> YdbResult<()> { + let query = self + .select_query + .clone() + .with_params(ydb_params!("$id" => row_id)); + + self.db_table_client + .retry_transaction(|t| async { + let mut t = t; + let res = t.query(query.clone()).await?; + Ok(res) + }) + .await? + .into_only_row()?; + + Ok(()) + } + + pub async fn insert_row(&self, row: Row) -> YdbResult<()> { + let query = self.clone().upsert_query.with_params(ydb_params!( + "$id" => row.id, + "$payload_str" => row.payload_str, + "$payload_double" => row.payload_double, + "$payload_timestamp" => row.payload_timestamp.unwrap(), //TODO: поменять + )); + + self.db_table_client + .retry_transaction(|t| async { + let mut t = t; + t.query(query.clone()).await?; + t.commit().await?; + Ok(()) + }) + .await?; + + Ok(()) + } + + pub async fn drop_table(&self) -> YdbResult<()> { + self.db_table_client + .retry_execute_scheme_query(format!("DROP TABLE {}", self.cli_args.table_name)) + .await + } +} diff --git a/ydb-slo-tests/examples/native/native.rs b/ydb-slo-tests/examples/native/native.rs new file mode 100644 index 00000000..be217409 --- /dev/null +++ b/ydb-slo-tests/examples/native/native.rs @@ -0,0 +1,170 @@ +extern crate ydb_slo_tests; + +use crate::db::Db; +use clap::Parser; +use rand::Rng; +use ratelimit::Ratelimiter; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio::time::timeout; +use tokio_util::task::TaskTracker; +use ydb_slo_tests::cli::{Command, SloTestsCli}; +use ydb_slo_tests::generator::Generator; +use ydb_slo_tests::row::RowID; + +mod db; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // let shutdown_signal = Arc::new(Notify::new()); + // let shutdown_signal_cloned = shutdown_signal.clone(); + // let ctrl_c_signal = tokio::signal::ctrl_c(); + + let cli = SloTestsCli::parse(); + let command = cli.command.clone(); + + // let signal_task = tokio::spawn(async move { + // let _ = ctrl_c_signal.await; + // shutdown_signal_cloned.notify_one(); + // }); + + let db = Db::new(cli) + .await + .unwrap_or_else(|err| panic!("failed to initialize Ydb instance: {}", err)); + + match command { + Command::Create(create_args) => { + db.create_table() + .await + .unwrap_or_else(|err| panic!("failed to create Ydb Table: {}", err)); + + println!("Created table"); + + // let generator = Generator::new(0); + let tracker = TaskTracker::new(); + let db_ref = Arc::new(db.clone()); + let generator_ref = Arc::new(Mutex::new(Generator::new(0))); + + for _ in 0..create_args.initial_data_count { + let db_ref = Arc::clone(&db_ref); + let generator_ref = Arc::clone(&generator_ref); + + tracker.spawn(async move { + let db_ref = &db_ref; + // let mut generator_ref = &generator_ref; + let row = generator_ref.lock().await.generate(); + + db_ref.insert_row(row).await + }); + } + + tracker.close(); + tracker.wait().await; + + println!("entries write ok"); + } + Command::Cleanup => { + db.drop_table() + .await + .unwrap_or_else(|err| panic!("failed to drop Ydb Table: {}", err)); + + println!("Cleaned up table"); + } + Command::Run(run_args) => { + let db_ref = Arc::new(db.clone()); + // let workers = Workers::new(cli.clone(), storage.clone()).await?; + // let worker_shutdown = shutdown_signal.clone(); + + let tracker = TaskTracker::new(); + + let read_rate_limiter_ref = + Arc::new(Ratelimiter::builder(1, Duration::from_secs(run_args.read_rps)).build()?); + + for _ in 0..run_args.read_rps { + let db_ref = Arc::clone(&db_ref); + let read_rate_limiter_ref = Arc::clone(&read_rate_limiter_ref); + + timeout( + Duration::from_secs(run_args.time), + tracker.spawn(async move { + let db_ref = &db_ref; + let read_rate_limiter_ref = &read_rate_limiter_ref; + + read_load(db_ref, read_rate_limiter_ref, run_args.initial_data_count).await; + }), + ) + .await??; + } + + let write_rate_limiter_ref = + Arc::new(Ratelimiter::builder(1, Duration::from_secs(run_args.write_rps)).build()?); + + let generator_ref = Arc::new(Mutex::new(Generator::new( + run_args.initial_data_count as RowID, + ))); + + for _ in 0..run_args.write_rps { + let db_ref = Arc::clone(&db_ref); + let write_rate_limiter_ref = Arc::clone(&write_rate_limiter_ref); + let generator_ref = Arc::clone(&generator_ref); + + timeout( + Duration::from_secs(run_args.time), + tracker.spawn(async move { + let db_ref = &db_ref; + let write_rate_limiter_ref = &write_rate_limiter_ref; + let generator_ref = generator_ref.lock().await; + + write_load(db_ref, write_rate_limiter_ref, &generator_ref).await; + }), + ) + .await?? + } + + // let metrics_task = tokio::spawn(async move { + // workers.report_metrics(worker_shutdown).await.unwrap(); + // }); + + tracker.close(); + tracker.wait().await; + + // let _ = tokio::try_join!( + // signal_task, + // join_all(read_tasks), + // join_all(write_tasks), + // // metrics_task + // ); + + println!("workers completed"); + } + } + + println!("program finished"); + Ok(()) +} + +async fn read_load(db: &Db, limiter: &Ratelimiter, initial_data_count: u64) { + loop { + if let Err(_) = limiter.try_wait() { + break; + } + + let row_id = rand::thread_rng().gen_range(0..initial_data_count); + let _ = db.read_row_by_id(row_id).await; + } +} + +async fn write_load(db: &Db, limiter: &Ratelimiter, generator: &Generator) { + loop { + if let Err(_) = limiter.try_wait() { + break; + } + + let row = generator.to_owned().generate(); + + if let Err(err) = db.clone().insert_row(row).await { + println!("write failed: `{}'", err); + } + } +} diff --git a/ydb-slo-tests/src/args.rs b/ydb-slo-tests/src/args.rs new file mode 100644 index 00000000..639fcad3 --- /dev/null +++ b/ydb-slo-tests/src/args.rs @@ -0,0 +1,55 @@ +use clap::Args; + +#[derive(Args, Clone)] +pub struct CreateArgs { + /// minimum amount of partitions in table + #[arg(long = "min-partitions-count", default_value_t = 6)] + pub min_partitions_count: i64, + + /// maximum amount of partitions in table + #[arg(long = "max-partitions-count", default_value_t = 1000)] + pub max_partitions_count: i64, + + /// partition size in mb + #[arg(long = "partition-size", default_value_t = 1)] + pub partition_size: i64, + + /// amount of initially created rows + #[arg(long = "initial-data-count", short = 'c', default_value_t = 1000)] + pub initial_data_count: i64, +} + +#[derive(Args, Clone)] +pub struct RunArgs { + /// amount of initially created rows + #[arg(long = "initial-data-count", short = 'c', default_value_t = 1000)] + pub initial_data_count: u64, + + /// prometheus push gateway + #[arg(long = "prom-pgw", default_value_t = String::new())] + pub prom_pgw: String, + + /// prometheus push period in milliseconds + #[arg(long = "report-period", default_value_t = 250)] + pub report_period: i64, + + /// read RPS + #[arg(long = "read-rps", default_value_t = 1000)] + pub read_rps: u64, + + /// read timeout milliseconds + #[arg(long = "read-timeout", default_value_t = 10000)] + pub read_timeout: i64, + + /// write RPS + #[arg(long = "write-rps", default_value_t = 100)] + pub write_rps: u64, + + /// run time in seconds + #[arg(long, default_value_t = 600)] + pub time: u64, + + /// graceful shutdown time in seconds + #[arg(long = "shutdown-time", default_value_t = 30)] + pub shutdown_time: i64, +} diff --git a/ydb-slo-tests/src/cli.rs b/ydb-slo-tests/src/cli.rs new file mode 100644 index 00000000..65772fca --- /dev/null +++ b/ydb-slo-tests/src/cli.rs @@ -0,0 +1,51 @@ +// const VERSION: &str = env!("CARGO_YDB_SLO_TESTS_VERSION"); +// const SHA: &str = env!("GIT_HASH"); + +// static HELP_TEMPLATE: &str = "\ +// {before-help}{name} {version} +// {author} +// {about} +// +// {usage-heading} +// {usage} +// +// {all-args}{after-help}"; + +use crate::args; +use clap::{Parser, Subcommand}; + +#[derive(Clone, Parser)] +// #[command( +// version = VERSION, +// help_template(HELP_TEMPLATE), +// )] +pub struct SloTestsCli { + #[command(subcommand)] + pub command: Command, + + /// YDB endpoint to connect to + pub endpoint: String, + + /// YDB database to connect to + pub db: String, + + /// table name to create + #[arg(long = "table-name", short, default_value_t = String::from("testingTable"))] + pub table_name: String, + + /// write timeout milliseconds + #[arg(long = "write-timeout", default_value_t = 10000)] + pub write_timeout: i64, +} + +#[derive(Clone, Subcommand)] +pub enum Command { + /// creates table in database + Create(args::CreateArgs), + + /// drops table in database + Cleanup, + + /// runs workload (read and write to table with sets RPS) + Run(args::RunArgs), +} diff --git a/ydb-slo-tests/src/generator.rs b/ydb-slo-tests/src/generator.rs new file mode 100644 index 00000000..92e4a6bc --- /dev/null +++ b/ydb-slo-tests/src/generator.rs @@ -0,0 +1,55 @@ +use crate::row::{Row, RowID}; +use base64::{engine::general_purpose::STANDARD, Engine as _}; +use rand::prelude::StdRng; +use rand::Rng; +use rand_core::{OsRng, RngCore, SeedableRng}; +use std::sync::{Arc, Mutex}; +use std::time::SystemTime; + +const MIN_LENGTH: usize = 20; +const MAX_LENGTH: usize = 40; + +#[derive(Clone, Debug)] +pub struct Generator { + current_id: Arc>, + rng: StdRng, +} + +impl Generator { + pub fn new(id: RowID) -> Self { + Self { + current_id: Arc::new(Mutex::new(id)), + rng: SeedableRng::from_entropy(), + } + } + + pub fn generate(&mut self) -> Row { + let id = Some({ + let mut id_guard = self.current_id.lock().unwrap(); + *id_guard += 1; + *id_guard + }); + + let payload_double = Some(self.rng.gen::()); + let payload_timestamp = Some(SystemTime::now()); + let payload_str = Some(self.gen_payload_string()); + + Row::new( + None, + id, + payload_str, + payload_double, + payload_timestamp, + None, + ) + } + + fn gen_payload_string(&mut self) -> String { + let length = MIN_LENGTH + self.rng.gen_range(0..=(MAX_LENGTH - MIN_LENGTH)); + + let mut buffer = vec![0u8; length]; + OsRng.fill_bytes(&mut buffer); + + STANDARD.encode(&buffer) + } +} diff --git a/ydb-slo-tests/src/lib.rs b/ydb-slo-tests/src/lib.rs new file mode 100644 index 00000000..3a334de1 --- /dev/null +++ b/ydb-slo-tests/src/lib.rs @@ -0,0 +1,5 @@ +pub mod args; +pub mod cli; +pub mod generator; +pub mod row; +pub mod workers; diff --git a/ydb-slo-tests/src/row.rs b/ydb-slo-tests/src/row.rs new file mode 100644 index 00000000..ae462ffe --- /dev/null +++ b/ydb-slo-tests/src/row.rs @@ -0,0 +1,33 @@ +use std::time::SystemTime; + +pub type RowID = u64; + +#[derive(Debug, Clone)] +pub struct Row { + pub hash: Option, + pub id: Option, + pub payload_str: Option, + pub payload_double: Option, + pub payload_timestamp: Option, + pub payload_hash: Option, +} + +impl Row { + pub fn new( + hash: Option, + id: Option, + payload_str: Option, + payload_double: Option, + payload_timestamp: Option, + payload_hash: Option, + ) -> Self { + Self { + hash, + id, + payload_str, + payload_double, + payload_timestamp, + payload_hash, + } + } +} diff --git a/ydb-slo-tests/src/workers.rs b/ydb-slo-tests/src/workers.rs new file mode 100644 index 00000000..65400554 --- /dev/null +++ b/ydb-slo-tests/src/workers.rs @@ -0,0 +1,171 @@ +// use crate::rate_limiter::RateLimiter; +// use log::{error, info}; +// use rand::Rng; +// use std::sync::Arc; +// use tokio::sync::Mutex; +// use tokio_util::sync::CancellationToken; +// +// pub struct Config { +// pub push_gateway: String, +// pub some_other_config: String, +// } +// +// #[async_trait::async_trait] +// pub trait ReadWriter { +// async fn read( +// &self, +// ctx: &CancellationToken, +// row_id: u64, +// ) -> Result<(Row, usize, Option), String>; +// async fn write( +// &self, +// ctx: &CancellationToken, +// row: Row, +// ) -> Result<(usize, Option), String>; +// } +// +// pub struct Metrics { +// pub push_gateway: String, +// pub ref_label: String, +// pub job_name: String, +// } +// +// impl Metrics { +// pub fn new( +// push_gateway: &str, +// ref_label: &str, +// label: &str, +// job_name: &str, +// ) -> Result { +// Ok(Metrics { +// push_gateway: push_gateway.to_string(), +// ref_label: ref_label.to_string(), +// job_name: job_name.to_string(), +// }) +// } +// +// pub fn reset(&self) -> Result<(), String> { +// info!("Metrics reset successfully."); +// Ok(()) +// } +// } +// +// pub struct Workers { +// cfg: Arc, +// s: Arc, +// m: Arc, +// } +// +// impl Workers { +// pub fn new( +// cfg: Arc, +// s: Arc, +// ref_label: &str, +// label: &str, +// job_name: &str, +// ) -> Result { +// let m = Metrics::new(&cfg.push_gateway, ref_label, label, job_name)?; +// Ok(Workers { +// cfg, +// s, +// m: Arc::new(m), +// }) +// } +// +// pub async fn read( +// &self, +// ctx: CancellationToken, +// limiter: Arc, +// ) -> Result<(), String> { +// let limiter = limiter.clone(); +// let cancellation = ctx.clone(); +// +// while !cancellation.is_cancelled() { +// if limiter.wait().await.is_err() { +// return Ok(()); +// } +// +// match self.execute_read(ctx.clone()).await { +// Ok(_) => {} +// Err(err) => { +// if !cancellation.is_cancelled() { +// error!("Read failed: {}", err); +// } +// return Err(err); +// } +// } +// } +// Ok(()) +// } +// +// async fn execute_read(&self, ctx: CancellationToken) -> Result<(), String> { +// let id = rand::thread_rng().gen_range(0..self.cfg.initial_data_count); +// +// let metric = self.m.start(OperationType::Read); +// let (_result, attempts, err) = self.s.read(ctx.clone(), id); +// metric.finish(err.clone(), attempts); +// +// match err { +// Some(e) => Err(e), +// None => Ok(()), +// } +// } +// +// pub async fn write( +// &self, +// ctx: CancellationToken, +// limiter: Arc, +// gen: Arc>, +// ) -> Result<(), String> { +// let limiter = limiter.clone(); +// let cancellation = ctx.clone(); +// +// loop { +// if cancellation.is_cancelled() { +// break; +// } +// +// if limiter.wait().await.is_err() { +// break; +// } +// +// match self.execute_write(ctx.clone(), gen.clone()).await { +// Ok(_) => {} +// Err(err) => { +// if !cancellation.is_cancelled() { +// error!("Write failed: {}", err); +// } +// } +// } +// } +// Ok(()) +// } +// +// async fn execute_write( +// &self, +// ctx: CancellationToken, +// gen: Arc>, +// ) -> Result<(), String> { +// let mut generator = gen.lock().await; +// let row = match generator.generate() { +// Ok(row) => row, +// Err(e) => { +// error!("Generate error: {}", e); +// return Err(e); +// } +// }; +// +// let metric = self.m.start(OperationType::Write); +// let (attempts, err) = self.s.write(ctx.clone(), row).await; +// metric.finish(err.clone(), attempts); +// +// match err { +// Some(e) => Err(e), +// None => Ok(()), +// } +// } +// +// pub async fn close(&self) -> Result<(), String> { +// self.m.reset() +// } +// }