diff --git a/Cargo.lock b/Cargo.lock index 698ac530d99..d85101239f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -468,7 +468,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -479,7 +479,7 @@ checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -653,7 +653,7 @@ checksum = "a539389a13af092cd345a2b47ae7dec12deb306d660b2223d25cd3419b253ebe" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -910,35 +910,6 @@ dependencies = [ "clap_derive", ] -[[package]] -name = "clap_blocks" -version = "0.1.0" -source = "git+https://github.com/influxdata/influxdb3_core?rev=a5f6076c966f4940a67998e0b85d12c3e8596715#a5f6076c966f4940a67998e0b85d12c3e8596715" -dependencies = [ - "async-trait", - "clap", - "ed25519-dalek", - "http 0.2.12", - "http 1.2.0", - "humantime", - "iox_catalog", - "iox_time", - "itertools 0.13.0", - "libc", - "metric", - "non-empty-string", - "object_store", - "observability_deps", - "paste", - "snafu", - "sysinfo 0.33.1", - "tokio", - "trace_exporters", - "trogging", - "url", - "workspace-hack", -] - [[package]] name = "clap_builder" version = "4.5.23" @@ -960,7 +931,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -1282,33 +1253,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "curve25519-dalek" -version = "4.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" -dependencies = [ - "cfg-if", - "cpufeatures", - "curve25519-dalek-derive", - "digest", - "fiat-crypto", - "rustc_version", - "subtle", - "zeroize", -] - -[[package]] -name = "curve25519-dalek-derive" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.94", -] - [[package]] name = "darling" version = "0.20.10" @@ -1330,7 +1274,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -1341,7 +1285,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -1855,7 +1799,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -1876,30 +1820,6 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" -[[package]] -name = "ed25519" -version = "2.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" -dependencies = [ - "pkcs8", - "signature", -] - -[[package]] -name = "ed25519-dalek" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" -dependencies = [ - "curve25519-dalek", - "ed25519", - "serde", - "sha2", - "subtle", - "zeroize", -] - [[package]] name = "either" version = "1.13.0" @@ -2013,12 +1933,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" -[[package]] -name = "fiat-crypto" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" - [[package]] name = "fixedbitset" version = "0.4.2" @@ -2171,7 +2085,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -2742,7 +2656,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -2826,7 +2740,6 @@ dependencies = [ "backtrace", "base64 0.21.7", "clap", - "clap_blocks", "console-subscriber", "datafusion_util", "dotenvy", @@ -2942,16 +2855,32 @@ dependencies = [ name = "influxdb3_clap_blocks" version = "0.1.0" dependencies = [ + "async-trait", "clap", "datafusion", "futures", + "http 0.2.12", + "http 1.2.0", "humantime", + "iox_catalog", "iox_query", + "iox_time", + "itertools 0.13.0", "libc", + "metric", + "non-empty-string", + "object_store", "observability_deps", "paste", + "snafu", + "sysinfo 0.30.13", + "tempfile", "test-log", + "test_helpers", "tokio", + "trace_exporters", + "trogging", + "url", ] [[package]] @@ -3290,13 +3219,13 @@ dependencies = [ [[package]] name = "insta" -version = "1.41.1" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e9ffc4d4892617c50a928c52b2961cb5174b6fc6ebf252b2fac9d21955c48b8" +checksum = "6513e4067e16e69ed1db5ab56048ed65db32d10ba5fc1217f5393f8f17d8b5a5" dependencies = [ "console", - "lazy_static", "linked-hash-map", + "once_cell", "pest", "pest_derive", "serde", @@ -3870,7 +3799,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -4382,7 +4311,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -4446,29 +4375,29 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95" +checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" +checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] name = "pin-project-lite" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" [[package]] name = "pin-utils" @@ -4609,12 +4538,12 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.25" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" +checksum = "483f8c21f64f3ea09fe0f30f5d48c3e8eefe5dac9129f0075f76593b4c1da705" dependencies = [ "proc-macro2", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -4719,7 +4648,7 @@ dependencies = [ "prost 0.12.6", "prost-types 0.12.6", "regex", - "syn 2.0.94", + "syn 2.0.95", "tempfile", ] @@ -4746,7 +4675,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -4759,7 +4688,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -4827,7 +4756,7 @@ dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -4840,7 +4769,7 @@ dependencies = [ "proc-macro2", "pyo3-build-config", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -5480,7 +5409,7 @@ checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -5534,7 +5463,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -5695,7 +5624,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -5761,7 +5690,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -5841,7 +5770,7 @@ dependencies = [ "quote", "sqlx-core", "sqlx-macros-core", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -5864,7 +5793,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 2.0.94", + "syn 2.0.95", "tempfile", "tokio", "url", @@ -6024,7 +5953,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -6046,9 +5975,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.94" +version = "2.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "987bc0be1cdea8b10216bd06e2ca407d40b9543468fafd3ddfb02f36e77f71f3" +checksum = "46f71c0377baf4ef1cc3e3402ded576dccc315800fbc62dfc7fe04b009773b4a" dependencies = [ "proc-macro2", "quote", @@ -6078,7 +6007,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -6176,7 +6105,7 @@ checksum = "5999e24eaa32083191ba4e425deb75cdf25efefabe5aaccb7446dd0d4122a3f5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -6225,7 +6154,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -6236,7 +6165,7 @@ checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -6414,7 +6343,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -6576,7 +6505,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -6724,7 +6653,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -7034,7 +6963,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", "wasm-bindgen-shared", ] @@ -7069,7 +6998,7 @@ checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -7218,7 +7147,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -7229,7 +7158,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -7520,7 +7449,7 @@ dependencies = [ "sqlx-sqlite", "strum", "subtle", - "syn 2.0.94", + "syn 2.0.95", "thrift", "tokio", "tokio-metrics", @@ -7593,7 +7522,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", "synstructure", ] @@ -7615,7 +7544,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -7635,7 +7564,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", "synstructure", ] @@ -7656,7 +7585,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] @@ -7678,7 +7607,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.94", + "syn 2.0.95", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index baef78af488..2b2f5817bb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,8 @@ members = [ "influxdb3", "influxdb3_cache", - "influxdb3_catalog", "influxdb3_clap_blocks", + "influxdb3_catalog", + "influxdb3_clap_blocks", "influxdb3_client", "influxdb3_id", "influxdb3_load_generator", @@ -77,10 +78,12 @@ humantime = "2.1.0" hyper = "0.14" insta = { version = "1.39", features = ["json", "redactions", "yaml"] } indexmap = { version = "2.2.6" } +itertools = "0.13.0" libc = { version = "0.2" } mime = "0.3.17" mockito = { version = "1.4.0", default-features = false } mockall = { version = "0.13.0" } +non-empty-string = "0.2.5" num_cpus = "1.16.0" object_store = "0.11.1" parking_lot = "0.12.1" @@ -105,9 +108,11 @@ serde_json = "1.0.127" serde_urlencoded = "0.7.0" serde_with = "3.8.1" sha2 = "0.10.8" +snafu = "0.8" snap = "1.0.0" sqlparser = "0.48.0" sysinfo = "0.30.8" +tempfile = "3.14.0" test-log = { version = "0.2.16", features = ["trace"] } thiserror = "1.0" tokio = { version = "1.42", features = ["full"] } @@ -126,7 +131,6 @@ num = { version = "0.4.3" } # Core.git crates we depend on arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } -clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } executor = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } @@ -154,7 +158,7 @@ trace = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c9 trace_exporters = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } trace_http = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } tracker = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } -trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715" } +trogging = { git = "https://github.com/influxdata/influxdb3_core", rev = "a5f6076c966f4940a67998e0b85d12c3e8596715", features = ["clap"] } [workspace.lints.rust] missing_copy_implementations = "deny" diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 457163c4541..b222e937026 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true [dependencies] # Core Crates authz.workspace = true -clap_blocks.workspace = true datafusion_util.workspace = true iox_query.workspace = true iox_time.workspace = true @@ -63,9 +62,9 @@ console-subscriber = { version = "0.1.10", optional = true, features = ["parking [features] default = ["jemalloc_replacing_malloc", "azure", "gcp", "aws"] -azure = ["clap_blocks/azure"] # Optional Azure Object store support -gcp = ["clap_blocks/gcp"] # Optional GCP object store support -aws = ["clap_blocks/aws"] # Optional AWS / S3 object store support +azure = ["influxdb3_clap_blocks/azure"] # Optional Azure Object store support +gcp = ["influxdb3_clap_blocks/gcp"] # Optional GCP object store support +aws = ["influxdb3_clap_blocks/aws"] # Optional AWS / S3 object store support # Enable tokio_console support (https://github.com/tokio-rs/console) # diff --git a/influxdb3/src/commands/activate.rs b/influxdb3/src/commands/activate.rs new file mode 100644 index 00000000000..dd74e871423 --- /dev/null +++ b/influxdb3/src/commands/activate.rs @@ -0,0 +1,63 @@ +use crate::commands::common::InfluxDb3Config; +use influxdb3_client::Client; +use secrecy::ExposeSecret; +use std::error::Error; + +#[derive(Debug, clap::Parser)] +pub struct Config { + #[clap(subcommand)] + cmd: SubCommand, +} + +impl Config { + fn get_client(&self) -> Result> { + let (host_url, auth_token) = match &self.cmd { + SubCommand::Trigger(TriggerConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) => (host_url, auth_token), + }; + let mut client = Client::new(host_url.clone())?; + if let Some(token) = &auth_token { + client = client.with_auth_token(token.expose_secret()); + } + Ok(client) + } +} + +#[derive(Debug, clap::Subcommand)] +enum SubCommand { + /// Activate a trigger to enable plugin execution + Trigger(TriggerConfig), +} + +#[derive(Debug, clap::Parser)] +struct TriggerConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// Name of trigger to manage + #[clap(required = true)] + trigger_name: String, +} + +pub async fn command(config: Config) -> Result<(), Box> { + let client = config.get_client()?; + match config.cmd { + SubCommand::Trigger(TriggerConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + trigger_name, + }) => { + client + .api_v3_configure_processing_engine_trigger_activate(database_name, &trigger_name) + .await?; + println!("Trigger {} activated successfully", trigger_name); + } + } + Ok(()) +} diff --git a/influxdb3/src/commands/common.rs b/influxdb3/src/commands/common.rs index cb189ec23d9..8161eb11465 100644 --- a/influxdb3/src/commands/common.rs +++ b/influxdb3/src/commands/common.rs @@ -1,25 +1,26 @@ -use std::str::FromStr; - use clap::Parser; use secrecy::Secret; +use std::error::Error; +use std::fmt::Display; +use std::str::FromStr; use url::Url; #[derive(Debug, Parser)] pub struct InfluxDb3Config { - /// The host URL of the running InfluxDB 3.0 server + /// The host URL of the running InfluxDB 3 Core server #[clap( - short = 'h', + short = 'H', long = "host", env = "INFLUXDB3_HOST_URL", default_value = "http://127.0.0.1:8181" )] pub host_url: Url, - /// The database name to run the query against - #[clap(short = 'd', long = "dbname", env = "INFLUXDB3_DATABASE_NAME")] + /// The name of the database to operate on + #[clap(short = 'd', long = "database", env = "INFLUXDB3_DATABASE_NAME")] pub database_name: String, - /// The token for authentication with the InfluxDB 3.0 server + /// The token for authentication with the InfluxDB 3 Core server #[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")] pub auth_token: Option>, } @@ -52,3 +53,61 @@ impl IntoIterator for SeparatedList { self.0.into_iter() } } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum DataType { + Int64, + Uint64, + Float64, + Utf8, + Bool, +} + +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +#[error("{0} is not a valid data type, values are int64, uint64, float64, utf8, and bool")] +pub struct ParseDataTypeError(String); + +impl FromStr for DataType { + type Err = ParseDataTypeError; + fn from_str(s: &str) -> Result { + match s { + "int64" => Ok(Self::Int64), + "uint64" => Ok(Self::Uint64), + "float64" => Ok(Self::Float64), + "utf8" => Ok(Self::Utf8), + "bool" => Ok(Self::Bool), + _ => Err(ParseDataTypeError(s.into())), + } + } +} + +impl Display for DataType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Int64 => write!(f, "int64"), + Self::Uint64 => write!(f, "uint64"), + Self::Float64 => write!(f, "float64"), + Self::Utf8 => write!(f, "utf8"), + Self::Bool => write!(f, "bool"), + } + } +} + +impl From for String { + fn from(data: DataType) -> Self { + data.to_string() + } +} + +/// Parse a single key-value pair +pub fn parse_key_val(s: &str) -> Result<(T, U), Box> +where + T: std::str::FromStr, + T::Err: Error + Send + Sync + 'static, + U: std::str::FromStr, + U::Err: Error + Send + Sync + 'static, +{ + let pos = s + .find(':') + .ok_or_else(|| format!("invalid FIELD:VALUE. No `:` found in `{s}`"))?; + Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) +} diff --git a/influxdb3/src/commands/create.rs b/influxdb3/src/commands/create.rs new file mode 100644 index 00000000000..471158add16 --- /dev/null +++ b/influxdb3/src/commands/create.rs @@ -0,0 +1,450 @@ +use crate::commands::common::{parse_key_val, DataType, InfluxDb3Config, SeparatedList}; +use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64; +use base64::Engine as _; +use influxdb3_client::Client; +use influxdb3_wal::TriggerSpecificationDefinition; +use rand::rngs::OsRng; +use rand::RngCore; +use secrecy::ExposeSecret; +use secrecy::Secret; +use sha2::Digest; +use sha2::Sha512; +use std::error::Error; +use std::fs; +use std::num::NonZeroUsize; +use std::str; +use std::time::Duration; +use url::Url; + +#[derive(Debug, clap::Parser)] +pub struct Config { + #[clap(subcommand)] + cmd: SubCommand, +} + +impl Config { + fn get_client(&self) -> Result> { + match &self.cmd { + SubCommand::Database(DatabaseConfig { + host_url, + auth_token, + .. + }) + | SubCommand::LastCache(LastCacheConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::MetaCache(MetaCacheConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::Plugin(PluginConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::Table(TableConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::Trigger(TriggerConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) => { + let mut client = Client::new(host_url.clone())?; + if let Some(token) = &auth_token { + client = client.with_auth_token(token.expose_secret()); + } + Ok(client) + } + SubCommand::Token => unreachable!(), + } + } +} + +#[derive(Debug, clap::Subcommand)] +pub enum SubCommand { + /// Create a new database + Database(DatabaseConfig), + /// Create a new last value cache + #[clap(name = "last_cache")] + LastCache(LastCacheConfig), + /// Create a new metadata cache + #[clap(name = "meta_cache")] + MetaCache(MetaCacheConfig), + /// Create a new processing engine plugin + Plugin(PluginConfig), + /// Create a new table in a database + Table(TableConfig), + /// Create a new auth token + Token, + /// Create a new trigger for the processing engine + Trigger(TriggerConfig), +} + +#[derive(Debug, clap::Args)] +pub struct DatabaseConfig { + /// The host URL of the running InfluxDB 3 Core server + #[clap( + short = 'H', + long = "host", + env = "INFLUXDB3_HOST_URL", + default_value = "http://127.0.0.1:8181" + )] + pub host_url: Url, + + /// The token for authentication with the InfluxDB 3 Core server + #[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")] + pub auth_token: Option>, + + /// The name of the database to create. Valid database names are + /// alphanumeric with - and _ allowed and starts with a letter or number + #[clap(env = "INFLUXDB3_DATABASE_NAME", required = true)] + pub database_name: String, +} +#[derive(Debug, clap::Args)] +pub struct LastCacheConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// The table name for which the cache is being created + #[clap(short = 't', long = "table")] + table: String, + + /// Which columns in the table to use as keys in the cache. This is a comma separated list. + /// + /// Example: --key-columns "foo,bar,baz" + #[clap(long = "key-columns")] + key_columns: Option>, + + /// Which columns in the table to store as values in the cache. This is a comma separated list + /// + /// Example: --value-columns "foo,bar,baz" + #[clap(long = "value-columns")] + value_columns: Option>, + + /// The number of entries per unique key column combination the cache will store + #[clap(long = "count")] + count: Option, + + /// The time-to-live (TTL) for entries in a cache. This uses a humantime form for example: --ttl "10s", + /// --ttl "1min 30sec", --ttl "3 hours" + /// + /// See the parse_duration docs for more details about acceptable forms: + /// + #[clap(long = "ttl", value_parser = humantime::parse_duration)] + ttl: Option, + + /// Give a name for the cache. + #[clap(required = false)] + cache_name: Option, +} + +#[derive(Debug, clap::Args)] +pub struct MetaCacheConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// The table name for which the cache is being created + #[clap(short = 't', long = "table")] + table: String, + + /// Which columns in the table to cache distinct values for, as a comma-separated list of the + /// column names. + /// + /// The cache is a hieararchical structure, with a level for each column specified; the order + /// specified here will determine the order of the levels from top-to-bottom of the cache + /// hierarchy. + #[clap(long = "columns")] + columns: SeparatedList, + + /// The maximum number of distinct value combinations to hold in the cache + #[clap(long = "max-cardinality")] + max_cardinality: Option, + + /// The maximum age of an entry in the cache entered as a human-readable duration, e.g., "30d", "24h" + #[clap(long = "max-age")] + max_age: Option, + + /// Give the name of the cache. + /// + /// This will be automatically generated if not provided + #[clap(required = false)] + cache_name: Option, +} + +#[derive(Debug, clap::Parser)] +pub struct PluginConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + /// Python file containing the plugin code + #[clap(long = "code-filename")] + code_file: String, + /// Entry point function for the plugin + #[clap(long = "entry-point")] + function_name: String, + /// Type of trigger the plugin processes + #[clap(long = "plugin-type", default_value = "wal_rows")] + plugin_type: String, + /// Name of the plugin to create + plugin_name: String, +} + +#[derive(Debug, clap::Args)] +pub struct TableConfig { + #[clap(long = "tags", required = true, num_args=0..)] + /// The list of tag names to be created for the table. Tags are alphanumeric, can contain - and _, and start with a letter or number + tags: Vec, + + #[clap(short = 'f', long = "fields", value_parser = parse_key_val::, num_args=0..)] + /// The list of field names and their data type to be created for the table. Fields are alphanumeric, can contain - and _, and start with a letter or number + /// The expected format is a list like so: 'field_name:data_type'. Valid data types are: int64, uint64, float64, utf8, and bool + fields: Vec<(String, DataType)>, + + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + #[clap(required = true)] + /// The name of the table to be created + table_name: String, +} + +#[derive(Debug, clap::Parser)] +pub struct TriggerConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// Plugin to execute when trigger fires + #[clap(long = "plugin")] + plugin_name: String, + /// When the trigger should fire + #[clap(long = "trigger-spec", + value_parser = TriggerSpecificationDefinition::from_string_rep, + help = "Trigger specification format: 'table:' or 'all_tables'")] + trigger_specification: TriggerSpecificationDefinition, + /// Create trigger in disabled state + #[clap(long)] + disabled: bool, + /// Name for the new trigger + trigger_name: String, +} + +pub async fn command(config: Config) -> Result<(), Box> { + let client = config.get_client()?; + match config.cmd { + SubCommand::Database(DatabaseConfig { database_name, .. }) => { + client.api_v3_configure_db_create(&database_name).await?; + + println!("Database {:?} created successfully", &database_name); + } + SubCommand::LastCache(LastCacheConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + table, + cache_name, + key_columns, + value_columns, + count, + ttl, + }) => { + let mut b = client.api_v3_configure_last_cache_create(database_name, table); + + // Add optional parameters: + if let Some(name) = cache_name { + b = b.name(name); + } + if let Some(keys) = key_columns { + b = b.key_columns(keys); + } + if let Some(vals) = value_columns { + b = b.value_columns(vals); + } + if let Some(count) = count { + b = b.count(count); + } + if let Some(ttl) = ttl { + b = b.ttl(ttl.as_secs()); + } + + // Make the request: + match b.send().await? { + Some(def) => println!( + "new cache created: {}", + serde_json::to_string_pretty(&def) + .expect("serialize last cache definition as JSON") + ), + None => println!("a cache already exists for the provided parameters"), + } + } + SubCommand::MetaCache(MetaCacheConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + table, + cache_name, + columns, + max_cardinality, + max_age, + }) => { + let mut b = client.api_v3_configure_meta_cache_create(database_name, table, columns); + + // Add the optional stuff: + if let Some(name) = cache_name { + b = b.name(name); + } + if let Some(max_cardinality) = max_cardinality { + b = b.max_cardinality(max_cardinality); + } + if let Some(max_age) = max_age { + b = b.max_age(max_age.into()); + } + + match b.send().await? { + Some(def) => println!( + "new cache created: {}", + serde_json::to_string_pretty(&def) + .expect("serialize meta cache definition as JSON") + ), + None => println!("a cache already exists for the provided parameters"), + } + } + SubCommand::Plugin(PluginConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + plugin_name, + code_file, + function_name, + plugin_type, + }) => { + let code = fs::read_to_string(&code_file)?; + client + .api_v3_configure_processing_engine_plugin_create( + database_name, + &plugin_name, + code, + function_name, + plugin_type, + ) + .await?; + println!("Plugin {} created successfully", plugin_name); + } + SubCommand::Table(TableConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + table_name, + tags, + fields, + }) => { + client + .api_v3_configure_table_create(&database_name, &table_name, tags, fields) + .await?; + + println!( + "Table {:?}.{:?} created successfully", + &database_name, &table_name + ); + } + SubCommand::Token => { + let token = { + let mut token = String::from("apiv3_"); + let mut key = [0u8; 64]; + OsRng.fill_bytes(&mut key); + token.push_str(&B64.encode(key)); + token + }; + println!( + "\ + Token: {token}\n\ + Hashed Token: {hashed}\n\n\ + Start the server with `influxdb3 serve --bearer-token {hashed}`\n\n\ + HTTP requests require the following header: \"Authorization: Bearer {token}\"\n\ + This will grant you access to every HTTP endpoint or deny it otherwise + ", + hashed = hex::encode(&Sha512::digest(&token)[..]) + ); + } + SubCommand::Trigger(TriggerConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + trigger_name, + plugin_name, + trigger_specification, + disabled, + }) => { + client + .api_v3_configure_processing_engine_trigger_create( + database_name, + &trigger_name, + plugin_name, + trigger_specification.string_rep(), + disabled, + ) + .await?; + println!("Trigger {} created successfully", trigger_name); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use clap::Parser; + + #[test] + fn parse_args() { + let args = super::Config::parse_from([ + "create", + "last_cache", + "--database", + "bar", + "--table", + "foo", + "--key-columns", + "tag1,tag2,tag3", + "--value-columns", + "field1,field2,field3", + "--ttl", + "1 hour", + "--count", + "5", + "bar", + ]); + let super::SubCommand::LastCache(super::LastCacheConfig { + table, + cache_name, + key_columns, + value_columns, + count, + ttl, + influxdb3_config: crate::commands::common::InfluxDb3Config { database_name, .. }, + }) = args.cmd + else { + panic!("Did not parse args correctly: {args:#?}") + }; + assert_eq!("bar", database_name); + assert_eq!("foo", table); + assert!(cache_name.is_some_and(|n| n == "bar")); + assert!(key_columns.is_some_and(|keys| keys.0 == ["tag1", "tag2", "tag3"])); + assert!(value_columns.is_some_and(|vals| vals.0 == ["field1", "field2", "field3"])); + assert!(count.is_some_and(|c| c == 5)); + assert!(ttl.is_some_and(|t| t.as_secs() == 3600)); + } +} diff --git a/influxdb3/src/commands/deactivate.rs b/influxdb3/src/commands/deactivate.rs new file mode 100644 index 00000000000..5f78c030b54 --- /dev/null +++ b/influxdb3/src/commands/deactivate.rs @@ -0,0 +1,63 @@ +use crate::commands::common::InfluxDb3Config; +use influxdb3_client::Client; +use secrecy::ExposeSecret; +use std::error::Error; + +#[derive(Debug, clap::Parser)] +pub struct Config { + #[clap(subcommand)] + cmd: SubCommand, +} + +impl Config { + fn get_client(&self) -> Result> { + let (host_url, auth_token) = match &self.cmd { + SubCommand::Trigger(TriggerConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) => (host_url, auth_token), + }; + let mut client = Client::new(host_url.clone())?; + if let Some(token) = &auth_token { + client = client.with_auth_token(token.expose_secret()); + } + Ok(client) + } +} + +#[derive(Debug, clap::Subcommand)] +enum SubCommand { + /// Activate a trigger to enable plugin execution + Trigger(TriggerConfig), +} + +#[derive(Debug, clap::Parser)] +struct TriggerConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// Name of trigger to manage + #[clap(required = true)] + trigger_name: String, +} + +pub async fn command(config: Config) -> Result<(), Box> { + let client = config.get_client()?; + match config.cmd { + SubCommand::Trigger(TriggerConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + trigger_name, + }) => { + client + .api_v3_configure_processing_engine_trigger_deactivate(database_name, &trigger_name) + .await?; + println!("Trigger {} deactivated successfully", trigger_name); + } + } + Ok(()) +} diff --git a/influxdb3/src/commands/delete.rs b/influxdb3/src/commands/delete.rs new file mode 100644 index 00000000000..39f10563344 --- /dev/null +++ b/influxdb3/src/commands/delete.rs @@ -0,0 +1,265 @@ +use super::common::InfluxDb3Config; +use influxdb3_client::Client; +use secrecy::ExposeSecret; +use secrecy::Secret; +use std::error::Error; +use std::io; +use url::Url; + +#[derive(Debug, clap::Parser)] +pub struct Config { + #[clap(subcommand)] + cmd: SubCommand, +} + +impl Config { + fn get_client(&self) -> Result> { + match &self.cmd { + SubCommand::Database(DatabaseConfig { + host_url, + auth_token, + .. + }) + | SubCommand::LastCache(LastCacheConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::MetaCache(MetaCacheConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::Plugin(PluginConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::Table(TableConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) + | SubCommand::Trigger(TriggerConfig { + influxdb3_config: + InfluxDb3Config { + host_url, + auth_token, + .. + }, + .. + }) => { + let mut client = Client::new(host_url.clone())?; + if let Some(token) = &auth_token { + client = client.with_auth_token(token.expose_secret()); + } + Ok(client) + } + } + } +} + +#[derive(Debug, clap::Subcommand)] +pub enum SubCommand { + /// Delete a database + Database(DatabaseConfig), + /// Delete a last value cache + #[clap(name = "last_cache")] + LastCache(LastCacheConfig), + /// Delete a meta value cache + #[clap(name = "meta_cache")] + MetaCache(MetaCacheConfig), + /// Delete an existing processing engine plugin + Plugin(PluginConfig), + /// Delete a table in a database + Table(TableConfig), + /// Delete a trigger + Trigger(TriggerConfig), +} + +#[derive(Debug, clap::Args)] +pub struct DatabaseConfig { + /// The host URL of the running InfluxDB 3 Core server + #[clap( + short = 'H', + long = "host", + env = "INFLUXDB3_HOST_URL", + default_value = "http://127.0.0.1:8181" + )] + pub host_url: Url, + + /// The token for authentication with the InfluxDB 3 Core server + #[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")] + pub auth_token: Option>, + + /// The name of the database to be deleted + #[clap(env = "INFLUXDB3_DATABASE_NAME", required = true)] + pub database_name: String, +} + +#[derive(Debug, clap::Args)] +pub struct LastCacheConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// The table under which the cache is being deleted + #[clap(short = 't', long = "table")] + table: String, + + /// The name of the cache being deleted + #[clap(required = true)] + cache_name: String, +} + +#[derive(Debug, clap::Args)] +pub struct MetaCacheConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// The table under which the cache is being deleted + #[clap(short = 't', long = "table")] + table: String, + + /// The name of the cache being deleted + #[clap(required = true)] + cache_name: String, +} + +#[derive(Debug, clap::Parser)] +pub struct PluginConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// Name of the plugin to delete + #[clap(required = true)] + plugin_name: String, +} + +#[derive(Debug, clap::Args)] +pub struct TableConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + #[clap(required = true)] + /// The name of the table to be deleted + table_name: String, +} + +#[derive(Debug, clap::Parser)] +pub struct TriggerConfig { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// Force deletion even if trigger is active + #[clap(long)] + force: bool, + + /// Name of trigger to delete + #[clap(required = true)] + trigger_name: String, +} + +pub async fn command(config: Config) -> Result<(), Box> { + let client = config.get_client()?; + match config.cmd { + SubCommand::Database(DatabaseConfig { database_name, .. }) => { + println!( + "Are you sure you want to delete {:?}? Enter 'yes' to confirm", + database_name + ); + let mut confirmation = String::new(); + let _ = io::stdin().read_line(&mut confirmation); + if confirmation.trim() != "yes" { + println!("Cannot delete database without confirmation"); + } else { + client.api_v3_configure_db_delete(&database_name).await?; + + println!("Database {:?} deleted successfully", &database_name); + } + } + SubCommand::LastCache(LastCacheConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + table, + cache_name, + }) => { + client + .api_v3_configure_last_cache_delete(database_name, table, cache_name) + .await?; + + println!("last cache deleted successfully"); + } + SubCommand::MetaCache(MetaCacheConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + table, + cache_name, + }) => { + client + .api_v3_configure_meta_cache_delete(database_name, table, cache_name) + .await?; + + println!("meta cache deleted successfully"); + } + SubCommand::Plugin(PluginConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + plugin_name, + }) => { + client + .api_v3_configure_processing_engine_plugin_delete(database_name, &plugin_name) + .await?; + println!("Plugin {} deleted successfully", plugin_name); + } + SubCommand::Table(TableConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + table_name, + }) => { + println!( + "Are you sure you want to delete {:?}.{:?}? Enter 'yes' to confirm", + database_name, &table_name, + ); + let mut confirmation = String::new(); + let _ = io::stdin().read_line(&mut confirmation); + if confirmation.trim() != "yes" { + println!("Cannot delete table without confirmation"); + } else { + client + .api_v3_configure_table_delete(&database_name, &table_name) + .await?; + + println!( + "Table {:?}.{:?} deleted successfully", + &database_name, &table_name + ); + } + } + SubCommand::Trigger(TriggerConfig { + influxdb3_config: InfluxDb3Config { database_name, .. }, + trigger_name, + force, + }) => { + client + .api_v3_configure_processing_engine_trigger_delete( + database_name, + &trigger_name, + force, + ) + .await?; + println!("Trigger {} deleted successfully", trigger_name); + } + } + Ok(()) +} diff --git a/influxdb3/src/commands/last_cache/create.rs b/influxdb3/src/commands/last_cache/create.rs deleted file mode 100644 index 09dd4d04e7a..00000000000 --- a/influxdb3/src/commands/last_cache/create.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::error::Error; - -use secrecy::ExposeSecret; - -use crate::commands::common::{InfluxDb3Config, SeparatedList}; - -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - #[clap(flatten)] - last_cache_config: LastCacheConfig, -} - -#[derive(Debug, clap::Parser)] -pub struct LastCacheConfig { - /// The table name for which the cache is being created - #[clap(short = 't', long = "table")] - table: String, - - /// Give a name for the cache. - #[clap(long = "cache-name")] - cache_name: Option, - - /// Which columns in the table to use as keys in the cache - #[clap(long = "key-columns")] - key_columns: Option>, - - /// Which columns in the table to store as values in the cache - #[clap(long = "value-columns")] - value_columns: Option>, - - /// The number of entries per unique key column combination the cache will store - #[clap(long = "count")] - count: Option, - - /// The time-to-live (TTL) for entries in a cache in seconds - #[clap(long = "ttl")] - ttl: Option, -} - -pub(super) async fn command(config: Config) -> Result<(), Box> { - let InfluxDb3Config { - host_url, - database_name, - auth_token, - } = config.influxdb3_config; - let LastCacheConfig { - table, - cache_name, - key_columns, - value_columns, - count, - ttl, - .. - } = config.last_cache_config; - let mut client = influxdb3_client::Client::new(host_url)?; - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - let mut b = client.api_v3_configure_last_cache_create(database_name, table); - - // Add optional parameters: - if let Some(name) = cache_name { - b = b.name(name); - } - if let Some(keys) = key_columns { - b = b.key_columns(keys); - } - if let Some(vals) = value_columns { - b = b.value_columns(vals); - } - if let Some(count) = count { - b = b.count(count); - } - if let Some(ttl) = ttl { - b = b.ttl(ttl); - } - - // Make the request: - match b.send().await? { - Some(def) => println!( - "new cache created: {}", - serde_json::to_string_pretty(&def).expect("serialize last cache definition as JSON") - ), - None => println!("a cache already exists for the provided parameters"), - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use clap::Parser; - - use crate::commands::last_cache::create::LastCacheConfig; - - #[test] - fn parse_args() { - let args = LastCacheConfig::parse_from([ - "last_cache_create", - "--table", - "foo", - "--cache-name", - "bar", - "--key-columns", - "tag1,tag2,tag3", - "--value-columns", - "field1,field2,field3", - "--ttl", - "3600", - "--count", - "5", - ]); - assert_eq!("foo", args.table); - assert!(args.cache_name.is_some_and(|n| n == "bar")); - assert!(args - .key_columns - .is_some_and(|keys| keys.0 == ["tag1", "tag2", "tag3"])); - assert!(args - .value_columns - .is_some_and(|vals| vals.0 == ["field1", "field2", "field3"])); - assert!(args.count.is_some_and(|c| c == 5)); - assert!(args.ttl.is_some_and(|t| t == 3600)); - } -} diff --git a/influxdb3/src/commands/last_cache/delete.rs b/influxdb3/src/commands/last_cache/delete.rs deleted file mode 100644 index b0c7327c815..00000000000 --- a/influxdb3/src/commands/last_cache/delete.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::error::Error; - -use secrecy::ExposeSecret; - -use crate::commands::common::InfluxDb3Config; - -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - /// The table under which the cache is being deleted - #[clap(short = 't', long = "table")] - table: String, - - /// The name of the cache being deleted - #[clap(short = 'n', long = "cache-name")] - cache_name: String, -} - -pub(super) async fn command(config: Config) -> Result<(), Box> { - let InfluxDb3Config { - host_url, - database_name, - auth_token, - } = config.influxdb3_config; - let mut client = influxdb3_client::Client::new(host_url)?; - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - client - .api_v3_configure_last_cache_delete(database_name, config.table, config.cache_name) - .await?; - - println!("last cache deleted successfully"); - - Ok(()) -} diff --git a/influxdb3/src/commands/last_cache/mod.rs b/influxdb3/src/commands/last_cache/mod.rs deleted file mode 100644 index 6e8b26f7b48..00000000000 --- a/influxdb3/src/commands/last_cache/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::error::Error; - -pub mod create; -pub mod delete; - -#[derive(Debug, clap::Parser)] -pub(crate) struct Config { - #[clap(subcommand)] - command: Command, -} - -#[derive(Debug, clap::Parser)] -enum Command { - /// Create a new last-n-value cache - Create(create::Config), - /// Delete an existing last-n-value cache - Delete(delete::Config), -} - -pub(crate) async fn command(config: Config) -> Result<(), Box> { - match config.command { - Command::Create(config) => create::command(config).await, - Command::Delete(config) => delete::command(config).await, - } -} diff --git a/influxdb3/src/commands/manage/database.rs b/influxdb3/src/commands/manage/database.rs deleted file mode 100644 index ae519d08f96..00000000000 --- a/influxdb3/src/commands/manage/database.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::{error::Error, io}; - -use secrecy::ExposeSecret; - -use crate::commands::common::InfluxDb3Config; - -#[derive(Debug, clap::Parser)] -pub(crate) struct Config { - #[clap(subcommand)] - command: Command, -} - -#[derive(Debug, clap::Parser)] -enum Command { - Create(DatabaseConfig), - Delete(DatabaseConfig), -} - -#[derive(Debug, clap::Parser)] -pub struct DatabaseConfig { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, -} - -pub async fn command(config: Config) -> Result<(), Box> { - match config.command { - Command::Create(config) => { - let InfluxDb3Config { - host_url, - database_name, - auth_token, - } = config.influxdb3_config; - - let mut client = influxdb3_client::Client::new(host_url)?; - - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - - client.api_v3_configure_db_create(&database_name).await?; - - println!("Database {:?} created successfully", &database_name); - } - Command::Delete(config) => { - let InfluxDb3Config { - host_url, - database_name, - auth_token, - } = config.influxdb3_config; - println!( - "Are you sure you want to delete {:?}? Enter 'yes' to confirm", - database_name - ); - let mut confirmation = String::new(); - let _ = io::stdin().read_line(&mut confirmation); - if confirmation.trim() != "yes" { - println!("Cannot delete database without confirmation"); - } else { - let mut client = influxdb3_client::Client::new(host_url)?; - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - client.api_v3_configure_db_delete(&database_name).await?; - - println!("Database {:?} deleted successfully", &database_name); - } - } - } - Ok(()) -} diff --git a/influxdb3/src/commands/manage/mod.rs b/influxdb3/src/commands/manage/mod.rs deleted file mode 100644 index b66a9b61b1b..00000000000 --- a/influxdb3/src/commands/manage/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod database; -pub mod table; diff --git a/influxdb3/src/commands/manage/table.rs b/influxdb3/src/commands/manage/table.rs deleted file mode 100644 index 20b49c79b42..00000000000 --- a/influxdb3/src/commands/manage/table.rs +++ /dev/null @@ -1,159 +0,0 @@ -use std::{error::Error, fmt::Display, io, str::FromStr}; - -use secrecy::ExposeSecret; - -use crate::commands::common::InfluxDb3Config; - -#[derive(Debug, clap::Parser)] -pub(crate) struct Config { - #[clap(subcommand)] - command: Command, -} - -#[derive(Debug, clap::Parser)] -enum Command { - Create(CreateTableConfig), - Delete(DeleteTableConfig), -} - -#[derive(Debug, clap::Parser)] -pub struct DeleteTableConfig { - #[clap(short = 't', long = "table", required = true)] - table_name: String, - - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, -} - -#[derive(Debug, clap::Parser)] -pub struct CreateTableConfig { - #[clap(short = 't', long = "table", required = true)] - table_name: String, - - #[clap(long = "tags", required = true, num_args=0..)] - tags: Vec, - - #[clap(short = 'f', long = "fields", value_parser = parse_key_val::, num_args=0..)] - fields: Vec<(String, DataType)>, - - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, -} - -pub async fn command(config: Config) -> Result<(), Box> { - match config.command { - Command::Create(CreateTableConfig { - table_name, - tags, - fields, - influxdb3_config: - InfluxDb3Config { - host_url, - database_name, - auth_token, - }, - }) => { - let mut client = influxdb3_client::Client::new(host_url)?; - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - client - .api_v3_configure_table_create(&database_name, &table_name, tags, fields) - .await?; - - println!( - "Table {:?}.{:?} created successfully", - &database_name, &table_name - ); - } - Command::Delete(config) => { - let InfluxDb3Config { - host_url, - database_name, - auth_token, - } = config.influxdb3_config; - println!( - "Are you sure you want to delete {:?}.{:?}? Enter 'yes' to confirm", - database_name, &config.table_name, - ); - let mut confirmation = String::new(); - let _ = io::stdin().read_line(&mut confirmation); - if confirmation.trim() != "yes" { - println!("Cannot delete table without confirmation"); - } else { - let mut client = influxdb3_client::Client::new(host_url)?; - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - client - .api_v3_configure_table_delete(&database_name, &config.table_name) - .await?; - - println!( - "Table {:?}.{:?} deleted successfully", - &database_name, &config.table_name - ); - } - } - } - Ok(()) -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum DataType { - Int64, - Uint64, - Float64, - Utf8, - Bool, -} - -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -#[error("{0} is not a valid data type, values are int64, uint64, float64, utf8, and bool")] -pub struct ParseDataTypeError(String); - -impl FromStr for DataType { - type Err = ParseDataTypeError; - fn from_str(s: &str) -> Result { - match s { - "int64" => Ok(Self::Int64), - "uint64" => Ok(Self::Uint64), - "float64" => Ok(Self::Float64), - "utf8" => Ok(Self::Utf8), - "bool" => Ok(Self::Bool), - _ => Err(ParseDataTypeError(s.into())), - } - } -} - -impl Display for DataType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Int64 => write!(f, "int64"), - Self::Uint64 => write!(f, "uint64"), - Self::Float64 => write!(f, "float64"), - Self::Utf8 => write!(f, "utf8"), - Self::Bool => write!(f, "bool"), - } - } -} - -impl From for String { - fn from(data: DataType) -> Self { - data.to_string() - } -} - -/// Parse a single key-value pair -fn parse_key_val(s: &str) -> Result<(T, U), Box> -where - T: std::str::FromStr, - T::Err: Error + Send + Sync + 'static, - U: std::str::FromStr, - U::Err: Error + Send + Sync + 'static, -{ - let pos = s - .find(':') - .ok_or_else(|| format!("invalid FIELD:VALUE. No `:` found in `{s}`"))?; - Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) -} diff --git a/influxdb3/src/commands/meta_cache/create.rs b/influxdb3/src/commands/meta_cache/create.rs deleted file mode 100644 index 0f55e2afbb3..00000000000 --- a/influxdb3/src/commands/meta_cache/create.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::{error::Error, num::NonZeroUsize}; - -use secrecy::ExposeSecret; - -use crate::commands::common::{InfluxDb3Config, SeparatedList}; - -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - #[clap(flatten)] - meta_cache_config: MetaCacheConfig, -} - -#[derive(Debug, clap::Parser)] -pub struct MetaCacheConfig { - /// The table name for which the cache is being created - #[clap(short = 't', long = "table")] - table: String, - - /// Give the name of the cache. - /// - /// This will be automatically generated if not provided - #[clap(long = "cache-name")] - cache_name: Option, - - /// Which columns in the table to cache distinct values for, as a comma-separated list of the - /// column names. - /// - /// The cache is a hieararchical structure, with a level for each column specified; the order - /// specified here will determine the order of the levels from top-to-bottom of the cache - /// hierarchy. - #[clap(long = "columns")] - columns: SeparatedList, - - /// The maximum number of distinct value combinations to hold in the cache - #[clap(long = "max-cardinality")] - max_cardinality: Option, - - /// The maximum age of an entry in the cache entered as a human-readable duration, e.g., "30d", "24h" - #[clap(long = "max-age")] - max_age: Option, -} - -pub(super) async fn command(config: Config) -> Result<(), Box> { - let InfluxDb3Config { - host_url, - database_name, - auth_token, - } = config.influxdb3_config; - let MetaCacheConfig { - table, - cache_name, - columns, - max_cardinality, - max_age, - } = config.meta_cache_config; - - let mut client = influxdb3_client::Client::new(host_url)?; - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - let mut b = client.api_v3_configure_meta_cache_create(database_name, table, columns); - - // Add the optional stuff: - if let Some(name) = cache_name { - b = b.name(name); - } - if let Some(max_cardinality) = max_cardinality { - b = b.max_cardinality(max_cardinality); - } - if let Some(max_age) = max_age { - b = b.max_age(max_age.into()); - } - - match b.send().await? { - Some(def) => println!( - "new cache created: {}", - serde_json::to_string_pretty(&def).expect("serialize meta cache definition as JSON") - ), - None => println!("a cache already exists for the provided parameters"), - } - - Ok(()) -} diff --git a/influxdb3/src/commands/meta_cache/delete.rs b/influxdb3/src/commands/meta_cache/delete.rs deleted file mode 100644 index 0f052a0b31d..00000000000 --- a/influxdb3/src/commands/meta_cache/delete.rs +++ /dev/null @@ -1,38 +0,0 @@ -use std::error::Error; - -use secrecy::ExposeSecret; - -use crate::commands::common::InfluxDb3Config; - -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - /// The table under which the cache is being deleted - #[clap(short = 't', long = "table")] - table: String, - - /// The name of the cache being deleted - #[clap(short = 'n', long = "cache-name")] - cache_name: String, -} - -pub(super) async fn command(config: Config) -> Result<(), Box> { - let InfluxDb3Config { - host_url, - database_name, - auth_token, - } = config.influxdb3_config; - let mut client = influxdb3_client::Client::new(host_url)?; - if let Some(t) = auth_token { - client = client.with_auth_token(t.expose_secret()); - } - client - .api_v3_configure_meta_cache_delete(database_name, config.table, config.cache_name) - .await?; - - println!("meta cache deleted successfully"); - - Ok(()) -} diff --git a/influxdb3/src/commands/meta_cache/mod.rs b/influxdb3/src/commands/meta_cache/mod.rs deleted file mode 100644 index 9917d31cde3..00000000000 --- a/influxdb3/src/commands/meta_cache/mod.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::error::Error; - -pub mod create; -pub mod delete; - -#[derive(Debug, clap::Parser)] -pub(crate) struct Config { - #[clap(subcommand)] - command: Command, -} - -#[derive(Debug, clap::Parser)] -enum Command { - /// Create a new metadata cache - Create(create::Config), - - /// Delete a metadata cache - Delete(delete::Config), -} - -pub(crate) async fn command(config: Config) -> Result<(), Box> { - match config.command { - Command::Create(config) => create::command(config).await, - Command::Delete(config) => delete::command(config).await, - } -} diff --git a/influxdb3/src/commands/processing_engine/mod.rs b/influxdb3/src/commands/processing_engine/mod.rs deleted file mode 100644 index 99726de06c0..00000000000 --- a/influxdb3/src/commands/processing_engine/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -mod plugin; -mod trigger; - -use std::error::Error; - -#[derive(Debug, clap::Parser)] -pub(crate) struct Config { - #[clap(subcommand)] - command: Command, -} - -#[derive(Debug, clap::Parser)] -enum Command { - /// Manage plugins (create, delete, update, etc.) - Plugin(plugin::Config), - /// Manage triggers (create, delete, activate, deactivate, etc.) - Trigger(trigger::Config), -} - -pub(crate) async fn command(config: Config) -> Result<(), Box> { - match config.command { - Command::Plugin(plugin_config) => { - plugin::command(plugin_config).await?; - } - Command::Trigger(trigger_config) => { - trigger::command(trigger_config).await?; - } - } - Ok(()) -} diff --git a/influxdb3/src/commands/processing_engine/plugin.rs b/influxdb3/src/commands/processing_engine/plugin.rs deleted file mode 100644 index 963ec08aaa4..00000000000 --- a/influxdb3/src/commands/processing_engine/plugin.rs +++ /dev/null @@ -1,97 +0,0 @@ -use crate::commands::common::InfluxDb3Config; -use secrecy::ExposeSecret; -use std::error::Error; -use std::fs::File; -use std::io::Read; - -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(subcommand)] - action: PluginAction, -} - -#[derive(Debug, clap::Subcommand)] -enum PluginAction { - /// Create a new processing engine plugin - Create(CreateConfig), - /// Delete an existing processing engine plugin - Delete(DeleteConfig), -} - -impl PluginAction { - pub fn get_influxdb3_config(&self) -> &InfluxDb3Config { - match self { - Self::Create(create) => &create.influxdb3_config, - Self::Delete(delete) => &delete.influxdb3_config, - } - } -} - -#[derive(Debug, clap::Parser)] -struct CreateConfig { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - /// Name of the plugin to create - #[clap(long = "plugin-name")] - plugin_name: String, - /// Python file containing the plugin code - #[clap(long = "code-filename")] - code_file: String, - /// Entry point function for the plugin - #[clap(long = "entry-point")] - function_name: String, - /// Type of trigger the plugin processes - #[clap(long = "plugin-type", default_value = "wal_rows")] - plugin_type: String, -} - -#[derive(Debug, clap::Parser)] -struct DeleteConfig { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - /// Name of the plugin to delete - #[clap(long = "plugin-name")] - plugin_name: String, -} - -pub(super) async fn command(config: Config) -> Result<(), Box> { - let influxdb3_config = config.action.get_influxdb3_config(); - let mut client = influxdb3_client::Client::new(influxdb3_config.host_url.clone())?; - if let Some(token) = &influxdb3_config.auth_token { - client = client.with_auth_token(token.expose_secret()); - } - match config.action { - PluginAction::Create(create_config) => { - let code = open_file(&create_config.code_file)?; - client - .api_v3_configure_processing_engine_plugin_create( - create_config.influxdb3_config.database_name, - &create_config.plugin_name, - code, - create_config.function_name, - create_config.plugin_type, - ) - .await?; - println!("Plugin {} created successfully", create_config.plugin_name); - } - PluginAction::Delete(delete_config) => { - client - .api_v3_configure_processing_engine_plugin_delete( - delete_config.influxdb3_config.database_name, - &delete_config.plugin_name, - ) - .await?; - println!("Plugin {} deleted successfully", delete_config.plugin_name); - } - } - Ok(()) -} - -fn open_file(file: &str) -> Result> { - let mut file = File::open(file)?; - let mut contents = String::new(); - file.read_to_string(&mut contents)?; - Ok(contents) -} diff --git a/influxdb3/src/commands/processing_engine/trigger.rs b/influxdb3/src/commands/processing_engine/trigger.rs deleted file mode 100644 index a2000f4ccd2..00000000000 --- a/influxdb3/src/commands/processing_engine/trigger.rs +++ /dev/null @@ -1,146 +0,0 @@ -use crate::commands::common::InfluxDb3Config; -use influxdb3_client::Client; -use influxdb3_wal::TriggerSpecificationDefinition; -use secrecy::ExposeSecret; -use std::error::Error; - -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(subcommand)] - action: TriggerAction, -} - -impl Config { - fn get_client(&self) -> Result> { - let influxdb3_config = match &self.action { - TriggerAction::Create(create) => &create.influxdb3_config, - TriggerAction::Activate(manage) | TriggerAction::Deactivate(manage) => { - &manage.influxdb3_config - } - TriggerAction::Delete(delete) => &delete.influxdb3_config, - }; - let mut client = Client::new(influxdb3_config.host_url.clone())?; - if let Some(token) = &influxdb3_config.auth_token { - client = client.with_auth_token(token.expose_secret()); - } - Ok(client) - } -} - -#[derive(Debug, clap::Subcommand)] -enum TriggerAction { - /// Create a new trigger using an existing plugin - Create(CreateConfig), - /// Activate a trigger to enable plugin execution - Activate(ManageConfig), - /// Deactivate a trigger to disable plugin execution - Deactivate(ManageConfig), - /// Delete a trigger - Delete(DeleteConfig), -} - -#[derive(Debug, clap::Parser)] -struct CreateConfig { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - /// Name for the new trigger - #[clap(long = "trigger-name")] - trigger_name: String, - /// Plugin to execute when trigger fires - #[clap(long = "plugin-name")] - plugin_name: String, - /// When the trigger should fire - #[clap(long = "trigger-spec", - value_parser = TriggerSpecificationDefinition::from_string_rep, - help = "Trigger specification format: 'table:' or 'all_tables'")] - trigger_specification: TriggerSpecificationDefinition, - /// Create trigger in disabled state - #[clap(long)] - disabled: bool, -} - -#[derive(Debug, clap::Parser)] -struct ManageConfig { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - /// Name of trigger to manage - #[clap(long = "trigger-name")] - trigger_name: String, -} - -#[derive(Debug, clap::Parser)] -struct DeleteConfig { - #[clap(flatten)] - influxdb3_config: InfluxDb3Config, - - /// Name of trigger to delete - #[clap(long = "trigger-name")] - trigger_name: String, - - /// Force deletion even if trigger is active - #[clap(long)] - force: bool, -} - -// [Previous CreateConfig and ManageConfig structs remain unchanged] - -pub(super) async fn command(config: Config) -> Result<(), Box> { - let client = config.get_client()?; - match config.action { - TriggerAction::Create(create_config) => { - client - .api_v3_configure_processing_engine_trigger_create( - create_config.influxdb3_config.database_name, - &create_config.trigger_name, - &create_config.plugin_name, - create_config.trigger_specification.string_rep(), - create_config.disabled, - ) - .await?; - println!( - "Trigger {} created successfully", - create_config.trigger_name - ); - } - TriggerAction::Activate(manage_config) => { - client - .api_v3_configure_processing_engine_trigger_activate( - manage_config.influxdb3_config.database_name, - &manage_config.trigger_name, - ) - .await?; - println!( - "Trigger {} activated successfully", - manage_config.trigger_name - ); - } - TriggerAction::Deactivate(manage_config) => { - client - .api_v3_configure_processing_engine_trigger_deactivate( - manage_config.influxdb3_config.database_name, - &manage_config.trigger_name, - ) - .await?; - println!( - "Trigger {} deactivated successfully", - manage_config.trigger_name - ); - } - TriggerAction::Delete(delete_config) => { - client - .api_v3_configure_processing_engine_trigger_delete( - delete_config.influxdb3_config.database_name, - &delete_config.trigger_name, - delete_config.force, - ) - .await?; - println!( - "Trigger {} deleted successfully", - delete_config.trigger_name - ); - } - } - Ok(()) -} diff --git a/influxdb3/src/commands/query.rs b/influxdb3/src/commands/query.rs index 5ffbbe01f75..baa71d1dc35 100644 --- a/influxdb3/src/commands/query.rs +++ b/influxdb3/src/commands/query.rs @@ -35,14 +35,14 @@ pub type Result = std::result::Result; #[derive(Debug, Parser)] #[clap(visible_alias = "q", trailing_var_arg = true)] pub struct Config { - /// Common InfluxDB 3.0 config + /// Common InfluxDB 3 Core config #[clap(flatten)] influxdb3_config: InfluxDb3Config, /// The query language used to format the provided query string #[clap( value_enum, - long = "lang", + long = "language", short = 'l', default_value_t = QueryLanguage::Sql, )] @@ -50,9 +50,9 @@ pub struct Config { /// The format in which to output the query /// - /// If `--fmt` is set to `parquet`, then you must also specify an output + /// If `--format` is set to `parquet`, then you must also specify an output /// file path with `--output`. - #[clap(value_enum, long = "fmt", default_value = "pretty")] + #[clap(value_enum, long = "format", default_value = "pretty")] output_format: Format, /// Put all query output into `output` diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 9424f142f4f..03ca053be8a 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -1,18 +1,19 @@ -//! Entrypoint for InfluxDB 3.0 OSS Server +//! Entrypoint for InfluxDB 3 Core Server use anyhow::{bail, Context}; -use clap_blocks::{ - memory_size::MemorySize, - object_store::{ObjectStoreConfig, ObjectStoreType}, - socket_addr::SocketAddr, -}; use datafusion_util::config::register_iox_object_store; use influxdb3_cache::{ last_cache::{self, LastCacheProvider}, meta_cache::MetaCacheProvider, parquet_cache::create_cached_obj_store_and_oracle, }; -use influxdb3_clap_blocks::{datafusion::IoxQueryDatafusionConfig, tokio::TokioDatafusionConfig}; +use influxdb3_clap_blocks::{ + datafusion::IoxQueryDatafusionConfig, + memory_size::MemorySize, + object_store::{ObjectStoreConfig, ObjectStoreType}, + socket_addr::SocketAddr, + tokio::TokioDatafusionConfig, +}; use influxdb3_process::{ build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID, }; @@ -59,7 +60,7 @@ pub const DEFAULT_TELMETRY_ENDPOINT: &str = #[derive(Debug, Error)] pub enum Error { #[error("Cannot parse object store config: {0}")] - ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError), + ObjectStoreParsing(#[from] influxdb3_clap_blocks::object_store::ParseError), #[error("Tracing config error: {0}")] TracingConfig(#[from] trace_exporters::Error), diff --git a/influxdb3/src/commands/token.rs b/influxdb3/src/commands/token.rs deleted file mode 100644 index 3f5e7f7e311..00000000000 --- a/influxdb3/src/commands/token.rs +++ /dev/null @@ -1,45 +0,0 @@ -use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64; -use base64::Engine as _; -use rand::rngs::OsRng; -use rand::RngCore; -use sha2::Digest; -use sha2::Sha512; -use std::error::Error; -use std::str; - -#[derive(Debug, clap::Parser)] -pub struct Config { - #[clap(subcommand)] - cmd: SubCommand, -} - -#[derive(Debug, clap::Parser)] -pub enum SubCommand { - /// Create a new auth token - Create, -} - -pub fn command(config: Config) -> Result<(), Box> { - match config.cmd { - SubCommand::Create => { - let token = { - let mut token = String::from("apiv3_"); - let mut key = [0u8; 64]; - OsRng.fill_bytes(&mut key); - token.push_str(&B64.encode(key)); - token - }; - println!( - "\ - Token: {token}\n\ - Hashed Token: {hashed}\n\n\ - Start the server with `influxdb3 serve --bearer-token {hashed}`\n\n\ - HTTP requests require the following header: \"Authorization: Bearer {token}\"\n\ - This will grant you access to every HTTP endpoint or deny it otherwise - ", - hashed = hex::encode(&Sha512::digest(&token)[..]) - ); - } - } - Ok(()) -} diff --git a/influxdb3/src/commands/write.rs b/influxdb3/src/commands/write.rs index ad7556e98f6..f053d804bb1 100644 --- a/influxdb3/src/commands/write.rs +++ b/influxdb3/src/commands/write.rs @@ -21,7 +21,7 @@ pub(crate) type Result = std::result::Result; #[derive(Debug, Parser)] #[clap(visible_alias = "w", trailing_var_arg = true)] pub struct Config { - /// Common InfluxDB 3.0 config + /// Common InfluxDB 3 Core config #[clap(flatten)] influxdb3_config: InfluxDb3Config, diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index 287e1233f7d..eed7a0e387f 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -21,14 +21,13 @@ use trogging::{ }; mod commands { + pub mod activate; pub(crate) mod common; - pub mod last_cache; - pub mod manage; - pub mod meta_cache; - pub mod processing_engine; + pub mod create; + pub mod deactivate; + pub mod delete; pub mod query; pub mod serve; - pub mod token; pub mod write; } @@ -43,25 +42,29 @@ version = &VERSION_STRING[..], disable_help_flag = true, arg( clap::Arg::new("help") +.short('h') .long("help") .help("Print help information") .action(clap::ArgAction::Help) .global(true) ), -about = "InfluxDB 3.0 OSS server and command line tools", -long_about = r#"InfluxDB 3.0 OSS server and command line tools +about = "InfluxDB 3 Core server and command line tools", +long_about = r#"InfluxDB 3 Core server and command line tools Examples: - # Run the InfluxDB 3.0 OSS server + # Run the InfluxDB 3 Core server influxdb3 serve --object-store file --data-dir ~/.influxdb3 --host_id my_host_name - # Display all commands + # Display all commands short form + influxdb3 -h + + # Display all commands long form influxdb3 --help - # Run the InfluxDB 3.0 OSS server with extra verbose logging + # Run the InfluxDB 3 Core server with extra verbose logging influxdb3 serve -v --object-store file --data-dir ~/.influxdb3 --host_id my_host_name - # Run InfluxDB 3.0 OSS with full debug logging specified with LOG_FILTER + # Run InfluxDB 3 Core with full debug logging specified with LOG_FILTER LOG_FILTER=debug influxdb3 serve --object-store file --data-dir ~/.influxdb3 --host_id my_host_name "# )] @@ -79,32 +82,26 @@ struct Config { #[derive(Debug, clap::Parser)] #[allow(clippy::large_enum_variant)] enum Command { - /// Run the InfluxDB 3.0 server - Serve(commands::serve::Config), - - /// Perform a query against a running InfluxDB 3.0 server - Query(commands::query::Config), - - /// Perform a set of writes to a running InfluxDB 3.0 server - Write(commands::write::Config), + /// Activate a resource such as a trigger + Activate(commands::activate::Config), - /// Manage tokens for your InfluxDB 3.0 server - Token(commands::token::Config), + /// Create a resource such as a database or auth token + Create(commands::create::Config), - /// Manage last-n-value caches - LastCache(commands::last_cache::Config), + /// Deactivate a resource such as a trigger + Deactivate(commands::deactivate::Config), - /// Manage metadata caches - MetaCache(commands::meta_cache::Config), + /// Delete a resource such as a database or table + Delete(commands::delete::Config), - /// Manage processing engine plugins and triggers - ProcessingEngine(commands::processing_engine::Config), + /// Perform a query against a running InfluxDB 3 Core server + Query(commands::query::Config), - /// Manage database (delete only for the moment) - Database(commands::manage::database::Config), + /// Run the InfluxDB 3 Core server + Serve(commands::serve::Config), - /// Manage table (delete only for the moment) - Table(commands::manage::table::Config), + /// Perform a set of writes to a running InfluxDB 3 Core server + Write(commands::write::Config), } fn main() -> Result<(), std::io::Error> { @@ -130,60 +127,48 @@ fn main() -> Result<(), std::io::Error> { } match config.command { - None => println!("command required, --help for help"), - Some(Command::Serve(config)) => { - let _tracing_guard = - handle_init_logs(init_logs_and_tracing(&config.logging_config)); - if let Err(e) = commands::serve::command(config).await { - eprintln!("Serve command failed: {e}"); + None => println!("command required, -h/--help for help"), + Some(Command::Activate(config)) => { + if let Err(e) = commands::activate::command(config).await { + eprintln!("Activate command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } - Some(Command::Query(config)) => { - if let Err(e) = commands::query::command(config).await { - eprintln!("Query command failed: {e}"); + Some(Command::Create(config)) => { + if let Err(e) = commands::create::command(config).await { + eprintln!("Create command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } - Some(Command::Write(config)) => { - if let Err(e) = commands::write::command(config).await { - eprintln!("Write command failed: {e}"); + Some(Command::Deactivate(config)) => { + if let Err(e) = commands::deactivate::command(config).await { + eprintln!("Deactivate command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } - Some(Command::Token(config)) => { - if let Err(e) = commands::token::command(config) { - eprintln!("Token command failed: {e}"); + Some(Command::Delete(config)) => { + if let Err(e) = commands::delete::command(config).await { + eprintln!("Delete command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } - Some(Command::LastCache(config)) => { - if let Err(e) = commands::last_cache::command(config).await { - eprintln!("Last Cache command failed: {e}"); - std::process::exit(ReturnCode::Failure as _) - } - } - Some(Command::MetaCache(config)) => { - if let Err(e) = commands::meta_cache::command(config).await { - eprintln!("Metadata Cache command faild: {e}"); - std::process::exit(ReturnCode::Failure as _) - } - } - Some(Command::ProcessingEngine(config)) => { - if let Err(e) = commands::processing_engine::command(config).await { - eprintln!("Processing engine command failed: {e}"); + Some(Command::Serve(config)) => { + let _tracing_guard = + handle_init_logs(init_logs_and_tracing(&config.logging_config)); + if let Err(e) = commands::serve::command(config).await { + eprintln!("Serve command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } - Some(Command::Database(config)) => { - if let Err(e) = commands::manage::database::command(config).await { - eprintln!("Database command failed: {e}"); + Some(Command::Query(config)) => { + if let Err(e) = commands::query::command(config).await { + eprintln!("Query command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } - Some(Command::Table(config)) => { - if let Err(e) = commands::manage::table::command(config).await { - eprintln!("Table command failed: {e}"); + Some(Command::Write(config)) => { + if let Err(e) = commands::write::command(config).await { + eprintln!("Write command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index 4e9354249cf..6e824135a05 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -91,14 +91,7 @@ async fn test_create_database() { let server = TestServer::spawn().await; let server_addr = server.client_addr(); let db_name = "foo"; - let result = run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + let result = run_with_confirmation(&["create", "database", db_name, "--host", &server_addr]); debug!(result = ?result, "create database"); assert_contains!(&result, "Database \"foo\" created successfully"); } @@ -110,26 +103,13 @@ async fn test_create_database_limit() { let db_name = "foo"; for i in 0..5 { let name = format!("{db_name}{i}"); - let result = run_with_confirmation(&[ - "database", - "create", - "--dbname", - &name, - "--host", - &server_addr, - ]); + let result = run_with_confirmation(&["create", "database", &name, "--host", &server_addr]); debug!(result = ?result, "create database"); assert_contains!(&result, format!("Database \"{name}\" created successfully")); } - let result = run_with_confirmation_and_err(&[ - "database", - "create", - "--dbname", - "foo5", - "--host", - &server_addr, - ]); + let result = + run_with_confirmation_and_err(&["create", "database", "foo5", "--host", &server_addr]); debug!(result = ?result, "create database"); assert_contains!( &result, @@ -150,14 +130,7 @@ async fn test_delete_database() { ) .await .expect("write to db"); - let result = run_with_confirmation(&[ - "database", - "delete", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + let result = run_with_confirmation(&["delete", "database", db_name, "--host", &server_addr]); debug!(result = ?result, "delete database"); assert_contains!(&result, "Database \"foo\" deleted successfully"); } @@ -167,14 +140,8 @@ async fn test_delete_missing_database() { let server = TestServer::spawn().await; let server_addr = server.client_addr(); let db_name = "foo"; - let result = run_with_confirmation_and_err(&[ - "database", - "delete", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + let result = + run_with_confirmation_and_err(&["delete", "database", db_name, "--host", &server_addr]); debug!(result = ?result, "delete missing database"); assert_contains!(&result, "404"); } @@ -185,23 +152,15 @@ async fn test_create_table() { let server_addr = server.client_addr(); let db_name = "foo"; let table_name = "bar"; - let result = run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + let result = run_with_confirmation(&["create", "database", db_name, "--host", &server_addr]); debug!(result = ?result, "create database"); assert_contains!(&result, "Database \"foo\" created successfully"); let result = run_with_confirmation(&[ - "table", "create", - "--dbname", - db_name, - "--table", + "table", table_name, + "--database", + db_name, "--host", &server_addr, "--tags", @@ -279,12 +238,11 @@ async fn test_delete_table() { .await .expect("write to db"); let result = run_with_confirmation(&[ - "table", "delete", - "--dbname", - db_name, - "--table", + "table", table_name, + "--database", + db_name, "--host", &server_addr, ]); @@ -308,12 +266,11 @@ async fn test_delete_missing_table() { .expect("write to db"); let result = run_with_confirmation_and_err(&[ - "table", "delete", - "--dbname", - db_name, - "--table", + "table", "cpu", + "--database", + db_name, "--host", &server_addr, ]); @@ -338,34 +295,32 @@ async fn test_create_delete_meta_cache() { let cache_name = "baz"; // first create the cache: let result = run(&[ - "meta-cache", "create", + "meta_cache", "--host", &server_addr, - "--dbname", + "--database", db_name, "--table", table_name, - "--cache-name", - cache_name, "--columns", "t1,t2", + cache_name, ]); assert_contains!(&result, "new cache created"); // doing the same thing over again will be a no-op let result = run(&[ - "meta-cache", "create", + "meta_cache", "--host", &server_addr, - "--dbname", + "--database", db_name, "--table", table_name, - "--cache-name", - cache_name, "--columns", "t1,t2", + cache_name, ]); assert_contains!( &result, @@ -373,29 +328,27 @@ async fn test_create_delete_meta_cache() { ); // now delete it: let result = run(&[ - "meta-cache", "delete", + "meta_cache", "--host", &server_addr, - "--dbname", + "--database", db_name, "--table", table_name, - "--cache-name", cache_name, ]); assert_contains!(&result, "meta cache deleted successfully"); // trying to delete again should result in an error as the cache no longer exists: let result = run_and_err(&[ - "meta-cache", "delete", + "meta_cache", "--host", &server_addr, - "--dbname", + "--database", db_name, "--table", table_name, - "--cache-name", cache_name, ]); assert_contains!(&result, "[404 Not Found]: cache not found"); @@ -408,14 +361,7 @@ async fn test_create_plugin() { let plugin_name = "test_plugin"; // Create database first - let result = run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + let result = run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); assert_contains!(&result, "Database \"foo\" created successfully"); // Create plugin file @@ -428,19 +374,17 @@ def process_rows(iterator, output): // Create plugin let result = run_with_confirmation(&[ - "processing-engine", - "plugin", "create", - "--dbname", + "plugin", + "--database", db_name, "--host", &server_addr, - "--plugin-name", - plugin_name, "--code-filename", plugin_file.path().to_str().unwrap(), "--entry-point", "process_rows", + plugin_name, ]); debug!(result = ?result, "create plugin"); assert_contains!(&result, "Plugin test_plugin created successfully"); @@ -454,14 +398,7 @@ async fn test_delete_plugin() { let plugin_name = "test_plugin"; // Setup: create database and plugin - run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); let plugin_file = create_plugin_file( r#" @@ -471,31 +408,27 @@ def process_rows(iterator, output): ); run_with_confirmation(&[ - "processing-engine", - "plugin", "create", - "--dbname", + "plugin", + "--database", db_name, "--host", &server_addr, - "--plugin-name", - plugin_name, "--code-filename", plugin_file.path().to_str().unwrap(), "--entry-point", "process_rows", + plugin_name, ]); // Delete plugin let result = run_with_confirmation(&[ - "processing-engine", - "plugin", "delete", - "--dbname", + "plugin", + "--database", db_name, "--host", &server_addr, - "--plugin-name", plugin_name, ]); debug!(result = ?result, "delete plugin"); @@ -511,14 +444,7 @@ async fn test_create_trigger() { let trigger_name = "test_trigger"; // Setup: create database and plugin - run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); let plugin_file = create_plugin_file( r#" @@ -528,36 +454,32 @@ def process_rows(iterator, output): ); run_with_confirmation(&[ - "processing-engine", - "plugin", "create", - "--dbname", + "plugin", + "--database", db_name, "--host", &server_addr, - "--plugin-name", - plugin_name, "--code-filename", plugin_file.path().to_str().unwrap(), "--entry-point", "process_rows", + plugin_name, ]); // Create trigger let result = run_with_confirmation(&[ - "processing-engine", - "trigger", "create", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", - trigger_name, - "--plugin-name", + "--plugin", plugin_name, "--trigger-spec", "all_tables", + trigger_name, ]); debug!(result = ?result, "create trigger"); assert_contains!(&result, "Trigger test_trigger created successfully"); @@ -572,14 +494,7 @@ async fn test_trigger_activation() { let trigger_name = "test_trigger"; // Setup: create database, plugin, and trigger - run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); let plugin_file = create_plugin_file( r#" @@ -589,47 +504,41 @@ def process_rows(iterator, output): ); run_with_confirmation(&[ - "processing-engine", - "plugin", "create", - "--dbname", + "plugin", + "--database", db_name, "--host", &server_addr, - "--plugin-name", - plugin_name, "--code-filename", plugin_file.path().to_str().unwrap(), "--entry-point", "process_rows", + plugin_name, ]); run_with_confirmation(&[ - "processing-engine", - "trigger", "create", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", - trigger_name, - "--plugin-name", + "--plugin", plugin_name, "--trigger-spec", "all_tables", + trigger_name, ]); // Test activation let result = run_with_confirmation(&[ - "processing-engine", - "trigger", "activate", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", trigger_name, ]); debug!(result = ?result, "activate trigger"); @@ -637,14 +546,12 @@ def process_rows(iterator, output): // Test deactivation let result = run_with_confirmation(&[ - "processing-engine", - "trigger", "deactivate", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", trigger_name, ]); debug!(result = ?result, "deactivate trigger"); @@ -660,14 +567,7 @@ async fn test_delete_active_trigger() { let trigger_name = "test_trigger"; // Setup: create database, plugin, and active trigger - run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); let plugin_file = create_plugin_file( r#" @@ -677,59 +577,51 @@ def process_rows(iterator, output): ); run_with_confirmation(&[ - "processing-engine", - "plugin", "create", - "--dbname", + "plugin", + "--database", db_name, "--host", &server_addr, - "--plugin-name", - plugin_name, "--code-filename", plugin_file.path().to_str().unwrap(), "--entry-point", "process_rows", + plugin_name, ]); run_with_confirmation(&[ - "processing-engine", - "trigger", "create", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", - trigger_name, - "--plugin-name", + "--plugin", plugin_name, "--trigger-spec", "all_tables", + trigger_name, ]); run_with_confirmation(&[ - "processing-engine", - "trigger", "activate", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", trigger_name, ]); // Try to delete active trigger without force flag let result = run_with_confirmation_and_err(&[ - "processing-engine", - "trigger", "delete", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", trigger_name, ]); debug!(result = ?result, "delete active trigger without force"); @@ -737,14 +629,12 @@ def process_rows(iterator, output): // Delete active trigger with force flag let result = run_with_confirmation(&[ - "processing-engine", - "trigger", "delete", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", trigger_name, "--force", ]); @@ -762,28 +652,20 @@ async fn test_table_specific_trigger() { let trigger_name = "test_trigger"; // Setup: create database, table, and plugin - run_with_confirmation(&[ - "database", - "create", - "--dbname", - db_name, - "--host", - &server_addr, - ]); + run_with_confirmation(&["create", "database", "--host", &server_addr, db_name]); run_with_confirmation(&[ - "table", "create", - "--dbname", + "table", + "--database", db_name, - "--table", - table_name, "--host", &server_addr, "--tags", "tag1", "--fields", "field1:float64", + table_name, ]); let plugin_file = create_plugin_file( @@ -794,36 +676,32 @@ def process_rows(iterator, output): ); run_with_confirmation(&[ - "processing-engine", - "plugin", "create", - "--dbname", + "plugin", + "--database", db_name, "--host", &server_addr, - "--plugin-name", - plugin_name, "--code-filename", plugin_file.path().to_str().unwrap(), "--entry-point", "process_rows", + plugin_name, ]); // Create table-specific trigger let result = run_with_confirmation(&[ - "processing-engine", - "trigger", "create", - "--dbname", + "trigger", + "--database", db_name, "--host", &server_addr, - "--trigger-name", - trigger_name, - "--plugin-name", + "--plugin", plugin_name, "--trigger-spec", &format!("table:{}", table_name), + trigger_name, ]); debug!(result = ?result, "create table-specific trigger"); assert_contains!(&result, "Trigger test_trigger created successfully"); diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index c25a8b09bc5..498bb0046da 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -188,11 +188,11 @@ impl Serialize for Catalog { } impl Catalog { - /// Limit for the number of Databases that InfluxDB 3.0 OSS can have + /// Limit for the number of Databases that InfluxDB 3 Core OSS can have pub(crate) const NUM_DBS_LIMIT: usize = 5; - /// Limit for the number of columns per table that InfluxDB 3.0 OSS can have + /// Limit for the number of columns per table that InfluxDB 3 Core OSS can have pub(crate) const NUM_COLUMNS_PER_TABLE_LIMIT: usize = 500; - /// Limit for the number of tables across all DBs that InfluxDB 3.0 OSS can have + /// Limit for the number of tables across all DBs that InfluxDB 3 Core OSS can have pub(crate) const NUM_TABLES_LIMIT: usize = 2000; pub fn new(host_id: Arc, instance_id: Arc) -> Self { diff --git a/influxdb3_clap_blocks/Cargo.toml b/influxdb3_clap_blocks/Cargo.toml index 25da081dd70..26861fe7ab9 100644 --- a/influxdb3_clap_blocks/Cargo.toml +++ b/influxdb3_clap_blocks/Cargo.toml @@ -11,16 +11,38 @@ iox_query.workspace = true observability_deps.workspace = true # crates.io dependencies +async-trait.workspace = true clap.workspace = true datafusion.workspace = true +http.workspace = true +# object store crate uses the new version of the http crate +http_1 = { version = "1.1", package = "http" } humantime.workspace = true +iox_catalog.workspace = true +iox_time.workspace = true +itertools.workspace = true libc.workspace = true +metric.workspace = true +non-empty-string.workspace = true +object_store.workspace = true paste.workspace = true +snafu.workspace = true +sysinfo.workspace = true tokio.workspace = true +trace_exporters.workspace = true +trogging.workspace = true +url.workspace = true [dev-dependencies] +tempfile.workspace = true +test_helpers.workspace = true futures.workspace = true test-log.workspace = true [lints] workspace = true + +[features] +azure = ["object_store/azure"] +gcp = ["object_store/gcp"] +aws = ["object_store/aws"] diff --git a/influxdb3_clap_blocks/src/lib.rs b/influxdb3_clap_blocks/src/lib.rs index f8d7844a299..e5880e50c7d 100644 --- a/influxdb3_clap_blocks/src/lib.rs +++ b/influxdb3_clap_blocks/src/lib.rs @@ -1,4 +1,7 @@ //! Configuration options for the `influxdb3` CLI which uses the `clap` crate pub mod datafusion; +pub mod memory_size; +pub mod object_store; +pub mod socket_addr; pub mod tokio; diff --git a/influxdb3_clap_blocks/src/memory_size.rs b/influxdb3_clap_blocks/src/memory_size.rs new file mode 100644 index 00000000000..ed401a4029f --- /dev/null +++ b/influxdb3_clap_blocks/src/memory_size.rs @@ -0,0 +1,138 @@ +//! Helper types to express memory size. + +use std::{str::FromStr, sync::OnceLock}; + +use observability_deps::tracing::info; +use sysinfo::System; + +/// Memory size. +/// +/// # Parsing +/// This can be parsed from strings in one of the following formats: +/// +/// - **absolute:** just use a non-negative number to specify the absolute +/// bytes, e.g. `1024` +/// - **relative:** use percentage between 0 and 100 (both inclusive) to specify +/// a relative amount of the totally available memory size, e.g. `50%` +/// +/// # Limits +/// +/// Memory limits are read from the following, stopping when a valid value is +/// found: +/// +/// - `/sys/fs/cgroup/memory/memory.limit_in_bytes` (cgroup) +/// - `/sys/fs/cgroup/memory.max` (cgroup2) +/// - Platform specific syscall (infallible) +/// +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct MemorySize(usize); + +impl MemorySize { + /// Number of bytes. + pub fn bytes(&self) -> usize { + self.0 + } +} + +impl std::fmt::Debug for MemorySize { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::fmt::Display for MemorySize { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl FromStr for MemorySize { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.strip_suffix('%') { + Some(s) => { + let percentage = u64::from_str(s).map_err(|e| e.to_string())?; + if percentage > 100 { + return Err(format!( + "relative memory size must be in [0, 100] but is {percentage}" + )); + } + let total = total_mem_bytes(); + let bytes = (percentage as f64 / 100f64 * total as f64).round() as usize; + Ok(Self(bytes)) + } + None => { + let bytes = usize::from_str(s).map_err(|e| e.to_string())?; + Ok(Self(bytes)) + } + } + } +} + +/// Totally available memory size in bytes. +pub fn total_mem_bytes() -> usize { + // Keep this in a global state so that we only need to inspect the system once during IOx startup. + static TOTAL_MEM_BYTES: OnceLock = OnceLock::new(); + + *TOTAL_MEM_BYTES.get_or_init(get_memory_limit) +} + +/// Resolve the amount of memory available to this process. +/// +/// This attempts to find a cgroup limit first, before falling back to the +/// amount of system RAM available. +fn get_memory_limit() -> usize { + let mut sys = System::new(); + sys.refresh_memory(); + + let limit = sys + .cgroup_limits() + .map(|v| v.total_memory) + .unwrap_or_else(|| sys.total_memory()) as usize; + + info!(%limit, "detected process memory available"); + + limit +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse() { + assert_ok("0", 0); + assert_ok("1", 1); + assert_ok("1024", 1024); + assert_ok("0%", 0); + + assert_gt_zero("50%"); + + assert_err("-1", "invalid digit found in string"); + assert_err("foo", "invalid digit found in string"); + assert_err("-1%", "invalid digit found in string"); + assert_err( + "101%", + "relative memory size must be in [0, 100] but is 101", + ); + } + + #[track_caller] + fn assert_ok(s: &'static str, expected: usize) { + let parsed: MemorySize = s.parse().unwrap(); + assert_eq!(parsed.bytes(), expected); + } + + #[track_caller] + fn assert_gt_zero(s: &'static str) { + let parsed: MemorySize = s.parse().unwrap(); + assert!(parsed.bytes() > 0); + } + + #[track_caller] + fn assert_err(s: &'static str, expected: &'static str) { + let err = MemorySize::from_str(s).unwrap_err(); + assert_eq!(err, expected); + } +} diff --git a/influxdb3_clap_blocks/src/object_store.rs b/influxdb3_clap_blocks/src/object_store.rs new file mode 100644 index 00000000000..d5071dc5df7 --- /dev/null +++ b/influxdb3_clap_blocks/src/object_store.rs @@ -0,0 +1,1286 @@ +//! CLI handling for object store config (via CLI arguments and environment variables). + +use async_trait::async_trait; +use non_empty_string::NonEmptyString; +use object_store::{ + local::LocalFileSystem, + memory::InMemory, + path::Path, + throttle::{ThrottleConfig, ThrottledStore}, + DynObjectStore, +}; +use observability_deps::tracing::{info, warn}; +use snafu::{ResultExt, Snafu}; +use std::{convert::Infallible, fs, num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration}; +use url::Url; + +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +pub enum ParseError { + #[snafu(display("Unable to create database directory {:?}: {}", path, source))] + CreatingDatabaseDirectory { + path: PathBuf, + source: std::io::Error, + }, + + #[snafu(display("Unable to create local store {:?}: {}", path, source))] + CreateLocalFileSystem { + path: PathBuf, + source: object_store::Error, + }, + + #[snafu(display( + "Specified {:?} for the object store, required configuration missing for {}", + object_store, + missing + ))] + MissingObjectStoreConfig { + object_store: ObjectStoreType, + missing: String, + }, + + #[snafu(display("Object store not specified"))] + UnspecifiedObjectStore, + + // Creating a new S3 object store can fail if the region is *specified* but + // not *parseable* as a rusoto `Region`. The other object store constructors + // don't return `Result`. + #[snafu(display("Error configuring Amazon S3: {}", source))] + InvalidS3Config { source: object_store::Error }, + + #[snafu(display("Error configuring GCS: {}", source))] + InvalidGCSConfig { source: object_store::Error }, + + #[snafu(display("Error configuring Microsoft Azure: {}", source))] + InvalidAzureConfig { source: object_store::Error }, +} + +/// The AWS region to use for Amazon S3 based object storage if none is +/// specified. +pub const FALLBACK_AWS_REGION: &str = "us-east-1"; + +/// A `clap` `value_parser` which returns `None` when given an empty string and +/// `Some(NonEmptyString)` otherwise. +fn parse_optional_string(s: &str) -> Result, Infallible> { + Ok(NonEmptyString::new(s.to_string()).ok()) +} + +/// Endpoint for S3 & Co. +/// +/// This is a [`Url`] without a trailing slash. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Endpoint(String); + +impl std::fmt::Display for Endpoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl From for String { + fn from(value: Endpoint) -> Self { + value.0 + } +} + +impl std::str::FromStr for Endpoint { + type Err = Box; + + fn from_str(s: &str) -> Result { + // try to parse it + Url::parse(s)?; + + // strip trailing slash + let s = s.strip_suffix("/").unwrap_or(s); + + Ok(Self(s.to_owned())) + } +} + +/// Creation of an `ObjectStoreConfig` struct for `clap` argument handling. +/// +/// This allows for multiple object store configurations to be produced when +/// needed, denoted by a particular use-case prefix. +macro_rules! object_store_config { + ($prefix:expr) => { + object_store_config_inner!($prefix); + }; + () => { + // Creates the original ObjectStoreConfig to maintain backwards + // compatibility. + object_store_config_inner!("_"); + }; +} + +/// Helper macro to generate the relevant name, used by the ID/long attributes +/// for `clap`. +macro_rules! gen_name { + ($prefix:expr, $name:expr) => { + paste::paste! { + if $prefix == "_" { + $name + } else { + concat!(stringify!([<$prefix:lower>]), "-", $name) + } + } + }; +} + +/// Helper macro to generate the appropriate environment variable, used by the +/// env attribute for `clap`. +macro_rules! gen_env { + ($prefix:expr, $name:expr) => { + paste::paste! { + if $prefix == "_" { + $name + } else { + concat!(stringify!([<$prefix:upper>]), "_", $name) + } + } + }; +} + +macro_rules! object_store_config_inner { + ($prefix:expr) => { + paste::paste! { + /// CLI config for object stores. + #[derive(Debug, Clone, clap::Parser)] + pub struct [<$prefix:camel ObjectStoreConfig>] { + /// Which object storage to use. If not specified, defaults to memory. + /// + /// Possible values (case insensitive): + /// + /// * memory (default): Effectively no object persistence. + /// * memorythrottled: Like `memory` but with latency and throughput that somewhat resamble a cloud + /// object store. Useful for testing and benchmarking. + /// * file: Stores objects in the local filesystem. Must also set `--data-dir`. + /// * s3: Amazon S3. Must also set `--bucket`, `--aws-access-key-id`, `--aws-secret-access-key`, and + /// possibly `--aws-default-region`. + /// * google: Google Cloud Storage. Must also set `--bucket` and `--google-service-account`. + /// * azure: Microsoft Azure blob storage. Must also set `--bucket`, `--azure-storage-account`, + /// and `--azure-storage-access-key`. + #[clap( + value_enum, + id = gen_name!($prefix, "object-store"), + long = gen_name!($prefix, "object-store"), + env = gen_env!($prefix, "INFLUXDB3_OBJECT_STORE"), + ignore_case = true, + action, + verbatim_doc_comment + )] + pub object_store: Option, + + /// Name of the bucket to use for the object store. Must also set + /// `--object-store` to a cloud object storage to have any effect. + /// + /// If using Google Cloud Storage for the object store, this item as well + /// as `--google-service-account` must be set. + /// + /// If using S3 for the object store, must set this item as well + /// as `--aws-access-key-id` and `--aws-secret-access-key`. Can also set + /// `--aws-default-region` if not using the fallback region. + /// + /// If using Azure for the object store, set this item to the name of a + /// container you've created in the associated storage account, under + /// Blob Service > Containers. Must also set `--azure-storage-account` and + /// `--azure-storage-access-key`. + #[clap( + id = gen_name!($prefix, "bucket"), + long = gen_name!($prefix, "bucket"), + env = gen_env!($prefix, "INFLUXDB3_BUCKET"), + action + )] + pub bucket: Option, + + /// The location InfluxDB 3 Core will use to store files locally. + #[clap( + id = gen_name!($prefix, "data-dir"), + long = gen_name!($prefix, "data-dir"), + env = gen_env!($prefix, "INFLUXDB3_DB_DIR"), + action + )] + pub database_directory: Option, + + /// When using Amazon S3 as the object store, set this to an access key that + /// has permission to read from and write to the specified S3 bucket. + /// + /// Must also set `--object-store=s3`, `--bucket`, and + /// `--aws-secret-access-key`. Can also set `--aws-default-region` if not + /// using the fallback region. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + /// + /// An empty string value is equivalent to omitting the flag. + /// Note: must refer to std::option::Option explicitly, see + #[clap( + id = gen_name!($prefix, "aws-access-key-id"), + long = gen_name!($prefix, "aws-access-key-id"), + env = gen_env!($prefix, "AWS_ACCESS_KEY_ID"), + value_parser = parse_optional_string, + default_value = "", + action + )] + pub aws_access_key_id: std::option::Option, + + /// When using Amazon S3 as the object store, set this to the secret access + /// key that goes with the specified access key ID. + /// + /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`. + /// Can also set `--aws-default-region` if not using the fallback region. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + /// + /// An empty string value is equivalent to omitting the flag. + /// Note: must refer to std::option::Option explicitly, see + #[clap( + id = gen_name!($prefix, "aws-secret-access-key"), + long = gen_name!($prefix, "aws-secret-access-key"), + env = gen_env!($prefix, "AWS_SECRET_ACCESS_KEY"), + value_parser = parse_optional_string, + default_value = "", + action + )] + pub aws_secret_access_key: std::option::Option, + + /// When using Amazon S3 as the object store, set this to the region + /// that goes with the specified bucket if different from the fallback + /// value. + /// + /// Must also set `--object-store=s3`, `--bucket`, `--aws-access-key-id`, + /// and `--aws-secret-access-key`. + #[clap( + id = gen_name!($prefix, "aws-default-region"), + long = gen_name!($prefix, "aws-default-region"), + env = gen_env!($prefix, "AWS_DEFAULT_REGION"), + default_value = FALLBACK_AWS_REGION, + action + )] + pub aws_default_region: String, + + /// When using Amazon S3 compatibility storage service, set this to the + /// endpoint. + /// + /// Must also set `--object-store=s3`, `--bucket`. Can also set `--aws-default-region` + /// if not using the fallback region. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[clap( + id = gen_name!($prefix, "aws-endpoint"), + long = gen_name!($prefix, "aws-endpoint"), + env = gen_env!($prefix, "AWS_ENDPOINT"), + action + )] + pub aws_endpoint: Option, + + /// When using Amazon S3 as an object store, set this to the session token. This is handy when using a federated + /// login / SSO and you fetch credentials via the UI. + /// + /// It is assumed that the session is valid as long as the InfluxDB3 Core server is running. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[clap( + id = gen_name!($prefix, "aws-session-token"), + long = gen_name!($prefix, "aws-session-token"), + env = gen_env!($prefix, "AWS_SESSION_TOKEN"), + action + )] + pub aws_session_token: Option, + + /// Allow unencrypted HTTP connection to AWS. + #[clap( + id = gen_name!($prefix, "aws-allow-http"), + long = gen_name!($prefix, "aws-allow-http"), + env = gen_env!($prefix, "AWS_ALLOW_HTTP"), + action + )] + pub aws_allow_http: bool, + + /// If enabled, S3 stores will not fetch credentials and will not sign requests. + /// + /// This can be useful when interacting with public S3 buckets that deny authorized requests or for when working + /// with in-cluster proxies that handle the credentials already. + #[clap( + id = gen_name!($prefix, "aws-skip-signature"), + long = gen_name!($prefix, "aws-skip-signature"), + env = gen_env!($prefix, "AWS_SKIP_SIGNATURE"), + action + )] + pub aws_skip_signature: bool, + + /// When using Google Cloud Storage as the object store, set this to the + /// path to the JSON file that contains the Google credentials. + /// + /// Must also set `--object-store=google` and `--bucket`. + #[clap( + id = gen_name!($prefix, "google-service-account"), + long = gen_name!($prefix, "google-service-account"), + env = gen_env!($prefix, "GOOGLE_SERVICE_ACCOUNT"), + action + )] + pub google_service_account: Option, + + /// When using Microsoft Azure as the object store, set this to the + /// name you see when going to All Services > Storage accounts > `[name]`. + /// + /// Must also set `--object-store=azure`, `--bucket`, and + /// `--azure-storage-access-key`. + #[clap( + id = gen_name!($prefix, "azure-storage-account"), + long = gen_name!($prefix, "azure-storage-account"), + env = gen_env!($prefix, "AZURE_STORAGE_ACCOUNT"), + action + )] + pub azure_storage_account: Option, + + /// When using Microsoft Azure as the object store, set this to one of the + /// Key values in the Storage account's Settings > Access keys. + /// + /// Must also set `--object-store=azure`, `--bucket`, and + /// `--azure-storage-account`. + /// + /// Prefer the environment variable over the command line flag in shared + /// environments. + #[clap( + id = gen_name!($prefix, "azure-storage-access-key"), + long = gen_name!($prefix, "azure-storage-access-key"), + env = gen_env!($prefix, "AZURE_STORAGE_ACCESS_KEY"), + action + )] + pub azure_storage_access_key: Option, + + /// When using a network-based object store, limit the number of connection to this value. + #[clap( + id = gen_name!($prefix, "object-store-connection-limit"), + long = gen_name!($prefix, "object-store-connection-limit"), + env = gen_env!($prefix, "OBJECT_STORE_CONNECTION_LIMIT"), + default_value = "16", + action + )] + pub object_store_connection_limit: NonZeroUsize, + + /// Force HTTP/2 connection to network-based object stores. + /// + /// This implies "prior knowledge" as per RFC7540 section 3.4. + #[clap( + id = gen_name!($prefix, "object-store-http2-only"), + long = gen_name!($prefix, "object-store-http2-only"), + env = gen_env!($prefix, "OBJECT_STORE_HTTP2_ONLY"), + action + )] + pub http2_only: bool, + + /// Set max frame size (in bytes/octets) for HTTP/2 connection. + /// + /// If not set, this uses the `object_store`/`reqwest` default. + /// + /// Usually you want to set this as high as possible -- the maximum allowed by the standard is `2^24-1 = 16,777,215`. + /// However under some circumstances (like buggy middleware or upstream providers getting unhappy), you may be + /// required to pick something else. + #[clap( + id = gen_name!($prefix, "object-store-http2-max-frame-size"), + long = gen_name!($prefix, "object-store-http2-max-frame-size"), + env = gen_env!($prefix, "OBJECT_STORE_HTTP2_MAX_FRAME_SIZE"), + action + )] + pub http2_max_frame_size: Option, + + /// The maximum number of times to retry a request + /// + /// Set to 0 to disable retries + #[clap( + id = gen_name!($prefix, "object-store-max-retries"), + long = gen_name!($prefix, "object-store-max-retries"), + env = gen_env!($prefix, "OBJECT_STORE_MAX_RETRIES"), + action + )] + pub max_retries: Option, + + /// The maximum length of time from the initial request + /// after which no further retries will be attempted + /// + /// This not only bounds the length of time before a server + /// error will be surfaced to the application, but also bounds + /// the length of time a request's credentials must remain valid. + /// + /// As requests are retried without renewing credentials or + /// regenerating request payloads, this number should be kept + /// below 5 minutes to avoid errors due to expired credentials + /// and/or request payloads + #[clap( + id = gen_name!($prefix, "object-store-retry-timeout"), + long = gen_name!($prefix, "object-store-retry-timeout"), + env = gen_env!($prefix, "OBJECT_STORE_RETRY_TIMEOUT"), + value_parser = humantime::parse_duration, + action + )] + pub retry_timeout: Option, + + + /// Endpoint of an S3 compatible, HTTP/2 enabled object store cache. + #[clap( + id = gen_name!($prefix, "object-store-cache-endpoint"), + long = gen_name!($prefix, "object-store-cache-endpoint"), + env = gen_env!($prefix, "OBJECT_STORE_CACHE_ENDPOINT"), + action + )] + pub cache_endpoint: Option, + } + + impl [<$prefix:camel ObjectStoreConfig>] { + + /// Create a new instance for all-in-one mode, only allowing some arguments. + pub fn new(database_directory: Option) -> Self { + match &database_directory { + Some(dir) => info!("Object store: File-based in `{}`", dir.display()), + None => info!("Object store: In-memory"), + } + + let object_store = database_directory.as_ref().map(|_| ObjectStoreType::File); + + Self { + aws_access_key_id: Default::default(), + aws_allow_http: Default::default(), + aws_default_region: Default::default(), + aws_endpoint: Default::default(), + aws_secret_access_key: Default::default(), + aws_session_token: Default::default(), + aws_skip_signature: Default::default(), + azure_storage_access_key: Default::default(), + azure_storage_account: Default::default(), + bucket: Default::default(), + database_directory, + google_service_account: Default::default(), + object_store, + object_store_connection_limit: NonZeroUsize::new(16).unwrap(), + http2_only: Default::default(), + http2_max_frame_size: Default::default(), + max_retries: Default::default(), + retry_timeout: Default::default(), + cache_endpoint: Default::default(), + } + } + + #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] + fn client_options(&self) -> object_store::ClientOptions { + let mut options = object_store::ClientOptions::new(); + + if self.http2_only { + options = options.with_http2_only(); + } + if let Some(sz) = self.http2_max_frame_size { + options = options.with_http2_max_frame_size(sz); + } + + options + } + + #[cfg(feature = "gcp")] + fn new_gcs(&self) -> Result, ParseError> { + use object_store::gcp::GoogleCloudStorageBuilder; + use object_store::limit::LimitStore; + + info!(bucket=?self.bucket, object_store_type="GCS", "Object Store"); + + let mut builder = GoogleCloudStorageBuilder::new().with_client_options(self.client_options()).with_retry(self.retry_config()); + + if let Some(bucket) = &self.bucket { + builder = builder.with_bucket_name(bucket); + } + if let Some(account) = &self.google_service_account { + builder = builder.with_service_account_path(account); + } + + Ok(Arc::new(LimitStore::new( + builder.build().context(InvalidGCSConfigSnafu)?, + self.object_store_connection_limit.get(), + ))) + } + + #[cfg(not(feature = "gcp"))] + fn new_gcs(&self) -> Result, ParseError> { + panic!("GCS support not enabled, recompile with the gcp feature enabled") + } + + #[cfg(feature = "aws")] + fn new_s3(&self) -> Result, ParseError> { + use object_store::limit::LimitStore; + + info!( + bucket=?self.bucket, + endpoint=?self.aws_endpoint, + object_store_type="S3", + "Object Store" + ); + + Ok(Arc::new(LimitStore::new( + self.build_s3()?, + self.object_store_connection_limit.get(), + ))) + } + + #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))] + fn retry_config(&self) -> object_store::RetryConfig { + let mut retry_config = object_store::RetryConfig::default(); + + if let Some(max_retries) = self.max_retries { + retry_config.max_retries = max_retries; + } + + if let Some(retry_timeout) = self.retry_timeout { + retry_config.retry_timeout = retry_timeout; + } + + retry_config + } + + /// If further configuration of S3 is needed beyond what this module provides, use this function + /// to create an [`object_store::aws::AmazonS3Builder`] and further customize, then call `.build()` + /// directly. + #[cfg(feature = "aws")] + pub fn s3_builder(&self) -> object_store::aws::AmazonS3Builder { + use object_store::aws::AmazonS3Builder; + + let mut builder = AmazonS3Builder::new() + .with_client_options(self.client_options()) + .with_allow_http(self.aws_allow_http) + .with_region(&self.aws_default_region) + .with_retry(self.retry_config()) + .with_skip_signature(self.aws_skip_signature) + .with_imdsv1_fallback(); + + if let Some(bucket) = &self.bucket { + builder = builder.with_bucket_name(bucket); + } + if let Some(key_id) = &self.aws_access_key_id { + builder = builder.with_access_key_id(key_id.get()); + } + if let Some(token) = &self.aws_session_token { + builder = builder.with_token(token); + } + if let Some(secret) = &self.aws_secret_access_key { + builder = builder.with_secret_access_key(secret.get()); + } + if let Some(endpoint) = &self.aws_endpoint { + builder = builder.with_endpoint(endpoint.clone()); + } + + builder + } + + #[cfg(feature = "aws")] + fn build_s3(&self) -> Result { + let builder = self.s3_builder(); + + builder.build().context(InvalidS3ConfigSnafu) + } + + #[cfg(not(feature = "aws"))] + fn new_s3(&self) -> Result, ParseError> { + panic!("S3 support not enabled, recompile with the aws feature enabled") + } + + #[cfg(feature = "azure")] + fn new_azure(&self) -> Result, ParseError> { + use object_store::azure::MicrosoftAzureBuilder; + use object_store::limit::LimitStore; + + info!(bucket=?self.bucket, account=?self.azure_storage_account, + object_store_type="Azure", "Object Store"); + + let mut builder = MicrosoftAzureBuilder::new().with_client_options(self.client_options()); + + if let Some(bucket) = &self.bucket { + builder = builder.with_container_name(bucket); + } + if let Some(account) = &self.azure_storage_account { + builder = builder.with_account(account) + } + if let Some(key) = &self.azure_storage_access_key { + builder = builder.with_access_key(key) + } + + Ok(Arc::new(LimitStore::new( + builder.build().context(InvalidAzureConfigSnafu)?, + self.object_store_connection_limit.get(), + ))) + } + + #[cfg(not(feature = "azure"))] + fn new_azure(&self) -> Result, ParseError> { + panic!("Azure blob storage support not enabled, recompile with the azure feature enabled") + } + + /// Build cache store. + #[cfg(feature = "aws")] + pub fn make_cache_store( + &self + ) -> Result>, ParseError> { + let Some(endpoint) = &self.cache_endpoint else { + return Ok(None); + }; + + let store = object_store::aws::AmazonS3Builder::new() + // bucket name is ignored by our cache server + .with_bucket_name(self.bucket.as_deref().unwrap_or("placeholder")) + .with_client_options( + object_store::ClientOptions::new() + .with_allow_http(true) + .with_http2_only() + // this is the maximum that is allowed by the HTTP/2 standard and is meant to lower the overhead of + // submitting TCP packages to the kernel + .with_http2_max_frame_size(16777215), + ) + .with_endpoint(endpoint.clone()) + .with_retry(object_store::RetryConfig { + max_retries: 3, + ..Default::default() + }) + .with_skip_signature(true) + .build() + .context(InvalidS3ConfigSnafu)?; + + Ok(Some(Arc::new(store))) + } + + /// Build cache store. + #[cfg(not(feature = "aws"))] + pub fn make_cache_store( + &self + ) -> Result>, ParseError> { + match &self.cache_endpoint { + Some(_) => panic!("Cache support not enabled, recompile with the aws feature enabled"), + None => Ok(None), + } + } + + /// Create config-dependant object store. + pub fn make_object_store(&self) -> Result, ParseError> { + if let Some(data_dir) = &self.database_directory { + if !matches!(&self.object_store, Some(ObjectStoreType::File)) { + warn!(?data_dir, object_store_type=?self.object_store, + "--data-dir / `INFLUXDB3_DB_DIR` ignored. It only affects 'file' object stores"); + } + } + + let remote_store: Arc = match &self.object_store { + None => return Err(ParseError::UnspecifiedObjectStore), + Some(ObjectStoreType::Memory) => { + info!(object_store_type = "Memory", "Object Store"); + Arc::new(InMemory::new()) + } + Some(ObjectStoreType::MemoryThrottled) => { + let config = ThrottleConfig { + // for every call: assume a 100ms latency + wait_delete_per_call: Duration::from_millis(100), + wait_get_per_call: Duration::from_millis(100), + wait_list_per_call: Duration::from_millis(100), + wait_list_with_delimiter_per_call: Duration::from_millis(100), + wait_put_per_call: Duration::from_millis(100), + + // for list operations: assume we need 1 call per 1k entries at 100ms + wait_list_per_entry: Duration::from_millis(100) / 1_000, + wait_list_with_delimiter_per_entry: Duration::from_millis(100) / 1_000, + + // for upload/download: assume 1GByte/s + wait_get_per_byte: Duration::from_secs(1) / 1_000_000_000, + }; + + info!(?config, object_store_type = "Memory", "Object Store"); + Arc::new(ThrottledStore::new(InMemory::new(), config)) + } + + Some(ObjectStoreType::Google) => self.new_gcs()?, + Some(ObjectStoreType::S3) => self.new_s3()?, + Some(ObjectStoreType::Azure) => self.new_azure()?, + Some(ObjectStoreType::File) => self.new_local_file_system()?, + }; + + Ok(remote_store) + } + + fn new_local_file_system(&self) -> Result, ParseError> { + match self.database_directory.as_ref() { + Some(db_dir) => { + info!(?db_dir, object_store_type = "Directory", "Object Store"); + fs::create_dir_all(db_dir).context(CreatingDatabaseDirectorySnafu { path: db_dir })?; + + let store = LocalFileSystem::new_with_prefix(db_dir) + .context(CreateLocalFileSystemSnafu { path: db_dir })?; + Ok(Arc::new(store)) + } + None => MissingObjectStoreConfigSnafu { + object_store: ObjectStoreType::File, + missing: "data-dir", + } + .fail()?, + } + } + + } + } + }; +} + +object_store_config!("source"); // SourceObjectStoreConfig +object_store_config!("sink"); // SinkObjectStoreConfig +object_store_config!(); // ObjectStoreConfig + +/// Object-store type. +#[derive(Debug, Copy, Clone, PartialEq, Eq, clap::ValueEnum)] +pub enum ObjectStoreType { + /// In-memory. + Memory, + + /// In-memory with additional throttling applied for testing + MemoryThrottled, + + /// Filesystem. + File, + + /// AWS S3. + S3, + + /// GCS. + Google, + + /// Azure object store. + Azure, +} + +impl ObjectStoreType { + /// Map enum variant to static string, followed inverse of clap parsing rules. + pub fn as_str(&self) -> &str { + match self { + Self::Memory => "memory", + Self::MemoryThrottled => "memory-throttled", + Self::File => "file", + Self::S3 => "s3", + Self::Google => "google", + Self::Azure => "azure", + } + } +} + +/// The `object_store::signer::Signer` trait is implemented for AWS and local file systems, so when +/// the AWS feature is enabled and the configured object store is S3 or the local file system, +/// return a signer. +#[cfg(feature = "aws")] +pub fn make_presigned_url_signer( + config: &ObjectStoreConfig, +) -> Result>, ParseError> { + match &config.object_store { + Some(ObjectStoreType::S3) => Ok(Some(Arc::new(config.build_s3()?))), + Some(ObjectStoreType::File) => Ok(Some(Arc::new(LocalUploadSigner::new(config)?))), + _ => Ok(None), + } +} + +/// The `object_store::signer::Signer` trait is implemented for AWS and local file systems, so if +/// the AWS feature isn't enabled, only return a signer for local file systems. +#[cfg(not(feature = "aws"))] +pub fn make_presigned_url_signer( + config: &ObjectStoreConfig, +) -> Result>, ParseError> { + match &config.object_store { + Some(ObjectStoreType::File) => Ok(Some(Arc::new(LocalUploadSigner::new(config)?))), + _ => Ok(None), + } +} + +/// An implementation of `object_store::signer::Signer` suitable for local testing. +/// Does NOT actually create presigned URLs; only returns the given path resolved to an absolute `file://` +/// URL that the bulk ingester can write directly to only if the bulk ingester is running on the +/// same system. +/// +/// Again, will not work and not intended to work in production, but is useful in local testing. +#[derive(Debug)] +pub struct LocalUploadSigner { + inner: Arc, +} + +impl LocalUploadSigner { + fn new(config: &ObjectStoreConfig) -> Result { + Ok(Self { + inner: config.new_local_file_system()?, + }) + } +} + +#[async_trait] +impl object_store::signer::Signer for LocalUploadSigner { + async fn signed_url( + &self, + _method: http_1::Method, + path: &Path, + _expires_in: Duration, + ) -> Result { + self.inner.path_to_filesystem(path).and_then(|path| { + Url::from_file_path(&path).map_err(|_| object_store::Error::InvalidPath { + source: object_store::path::Error::InvalidPath { path }, + }) + }) + } +} + +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +pub enum CheckError { + #[snafu(display("Cannot read from object store: {}", source))] + CannotReadObjectStore { source: object_store::Error }, +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::Parser; + use object_store::ObjectStore; + use std::{env, str::FromStr}; + use tempfile::TempDir; + + /// The current object store store configurations. + enum StoreConfigs { + Base(ObjectStoreConfig), + Source(SourceObjectStoreConfig), + Sink(SinkObjectStoreConfig), + } + + impl StoreConfigs { + pub(crate) fn make_object_store(&self) -> Result, ParseError> { + self.object_store_inner() + } + + fn object_store_inner(&self) -> Result, ParseError> { + match self { + Self::Base(o) => o.make_object_store(), + Self::Source(o) => o.make_object_store(), + Self::Sink(o) => o.make_object_store(), + } + } + } + + #[test] + fn object_store_flag_is_required() { + let configs = vec![ + StoreConfigs::Base(ObjectStoreConfig::try_parse_from(["server"]).unwrap()), + StoreConfigs::Source(SourceObjectStoreConfig::try_parse_from(["server"]).unwrap()), + StoreConfigs::Sink(SinkObjectStoreConfig::try_parse_from(["server"]).unwrap()), + ]; + for config in configs { + let err = config.make_object_store().unwrap_err().to_string(); + assert_eq!(err, "Object store not specified"); + } + } + + #[test] + fn explicitly_set_object_store_to_memory() { + let configs = vec![ + StoreConfigs::Base( + ObjectStoreConfig::try_parse_from(["server", "--object-store", "memory"]).unwrap(), + ), + StoreConfigs::Source( + SourceObjectStoreConfig::try_parse_from([ + "server", + "--source-object-store", + "memory", + ]) + .unwrap(), + ), + StoreConfigs::Sink( + SinkObjectStoreConfig::try_parse_from(["server", "--sink-object-store", "memory"]) + .unwrap(), + ), + ]; + for config in configs { + let object_store = config.make_object_store().unwrap(); + assert_eq!(&object_store.to_string(), "InMemory") + } + } + + #[test] + fn default_url_signer_is_none() { + let config = ObjectStoreConfig::try_parse_from(["server"]).unwrap(); + + let signer = make_presigned_url_signer(&config).unwrap(); + assert!(signer.is_none(), "Expected None, got {signer:?}"); + } + + #[test] + #[cfg(feature = "aws")] + fn valid_s3_config() { + let configs = vec![ + StoreConfigs::Base( + ObjectStoreConfig::try_parse_from([ + "server", + "--object-store", + "s3", + "--bucket", + "mybucket", + "--aws-access-key-id", + "NotARealAWSAccessKey", + "--aws-secret-access-key", + "NotARealAWSSecretAccessKey", + ]) + .unwrap(), + ), + StoreConfigs::Source( + SourceObjectStoreConfig::try_parse_from([ + "server", + "--source-object-store", + "s3", + "--source-bucket", + "mybucket", + "--source-aws-access-key-id", + "NotARealAWSAccessKey", + "--source-aws-secret-access-key", + "NotARealAWSSecretAccessKey", + ]) + .unwrap(), + ), + StoreConfigs::Sink( + SinkObjectStoreConfig::try_parse_from([ + "server", + "--sink-object-store", + "s3", + "--sink-bucket", + "mybucket", + "--sink-aws-access-key-id", + "NotARealAWSAccessKey", + "--sink-aws-secret-access-key", + "NotARealAWSSecretAccessKey", + ]) + .unwrap(), + ), + ]; + + for config in configs { + let object_store = config.make_object_store().unwrap(); + assert_eq!( + &object_store.to_string(), + "LimitStore(16, AmazonS3(mybucket))" + ) + } + } + + #[test] + #[cfg(feature = "aws")] + fn valid_s3_endpoint_url() { + ObjectStoreConfig::try_parse_from(["server", "--aws-endpoint", "http://whatever.com"]) + .expect("must successfully parse config with absolute AWS endpoint URL"); + } + + #[test] + #[cfg(feature = "aws")] + fn invalid_s3_endpoint_url_fails_clap_parsing() { + let result = + ObjectStoreConfig::try_parse_from(["server", "--aws-endpoint", "whatever.com"]); + assert!(result.is_err(), "{result:?}"); + let result = SourceObjectStoreConfig::try_parse_from([ + "server", + "--source-aws-endpoint", + "whatever.com", + ]); + assert!(result.is_err(), "{result:?}"); + let result = SinkObjectStoreConfig::try_parse_from([ + "server", + "--sink-aws-endpoint", + "whatever.com", + ]); + assert!(result.is_err(), "{result:?}"); + } + + #[test] + #[cfg(feature = "aws")] + fn s3_config_missing_params() { + let mut config = + ObjectStoreConfig::try_parse_from(["server", "--object-store", "s3"]).unwrap(); + + // clean out eventual leaks via env variables + config.bucket = None; + + let err = config.make_object_store().unwrap_err().to_string(); + + assert_eq!( + err, + "Error configuring Amazon S3: Generic S3 error: Missing bucket name" + ); + } + + #[test] + #[cfg(feature = "aws")] + fn valid_s3_url_signer() { + let config = ObjectStoreConfig::try_parse_from([ + "server", + "--object-store", + "s3", + "--bucket", + "mybucket", + "--aws-access-key-id", + "NotARealAWSAccessKey", + "--aws-secret-access-key", + "NotARealAWSSecretAccessKey", + ]) + .unwrap(); + + assert!(make_presigned_url_signer(&config).unwrap().is_some()); + + // Even with the aws feature on, object stores (other than local files) shouldn't create a + // signer. + let config = + ObjectStoreConfig::try_parse_from(["server", "--object-store", "memory"]).unwrap(); + + let signer = make_presigned_url_signer(&config).unwrap(); + assert!(signer.is_none(), "Expected None, got {signer:?}"); + } + + #[test] + #[cfg(feature = "aws")] + fn s3_url_signer_config_missing_params() { + let mut config = + ObjectStoreConfig::try_parse_from(["server", "--object-store", "s3"]).unwrap(); + + // clean out eventual leaks via env variables + config.bucket = None; + + let err = make_presigned_url_signer(&config).unwrap_err().to_string(); + + assert_eq!( + err, + "Error configuring Amazon S3: Generic S3 error: Missing bucket name" + ); + } + + #[test] + #[cfg(feature = "gcp")] + fn valid_google_config() { + use std::io::Write; + use tempfile::NamedTempFile; + + let mut file = NamedTempFile::new().expect("tempfile should be created"); + const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id": "private_key_id", "client_email":"client_email", "disable_oauth":true}"#; + writeln!(file, "{FAKE_KEY}").unwrap(); + let path = file.path().to_str().expect("file path should exist"); + + let configs = vec![ + StoreConfigs::Base( + ObjectStoreConfig::try_parse_from([ + "server", + "--object-store", + "google", + "--bucket", + "mybucket", + "--google-service-account", + path, + ]) + .unwrap(), + ), + StoreConfigs::Source( + SourceObjectStoreConfig::try_parse_from([ + "server", + "--source-object-store", + "google", + "--source-bucket", + "mybucket", + "--source-google-service-account", + path, + ]) + .unwrap(), + ), + StoreConfigs::Sink( + SinkObjectStoreConfig::try_parse_from([ + "server", + "--sink-object-store", + "google", + "--sink-bucket", + "mybucket", + "--sink-google-service-account", + path, + ]) + .unwrap(), + ), + ]; + + for config in configs { + let object_store = config.make_object_store().unwrap(); + assert_eq!( + &object_store.to_string(), + "LimitStore(16, GoogleCloudStorage(mybucket))" + ) + } + } + + #[test] + #[cfg(feature = "gcp")] + fn google_config_missing_params() { + let mut config = + ObjectStoreConfig::try_parse_from(["server", "--object-store", "google"]).unwrap(); + + // clean out eventual leaks via env variables + config.bucket = None; + + let err = config.make_object_store().unwrap_err().to_string(); + + assert_eq!( + err, + "Error configuring GCS: Generic GCS error: Missing bucket name" + ); + } + + #[test] + #[cfg(feature = "azure")] + fn valid_azure_config() { + let config = ObjectStoreConfig::try_parse_from([ + "server", + "--object-store", + "azure", + "--bucket", + "mybucket", + "--azure-storage-account", + "NotARealStorageAccount", + "--azure-storage-access-key", + "Zm9vYmFy", // base64 encoded "foobar" + ]) + .unwrap(); + + let object_store = config.make_object_store().unwrap(); + assert_eq!(&object_store.to_string(), "LimitStore(16, MicrosoftAzure { account: NotARealStorageAccount, container: mybucket })") + } + + #[test] + #[cfg(feature = "azure")] + fn azure_config_missing_params() { + let mut config = + ObjectStoreConfig::try_parse_from(["server", "--object-store", "azure"]).unwrap(); + + // clean out eventual leaks via env variables + config.bucket = None; + + let err = config.make_object_store().unwrap_err().to_string(); + + assert_eq!( + err, + "Error configuring Microsoft Azure: Generic MicrosoftAzure error: Container name must be specified" + ); + } + + #[test] + fn valid_file_config() { + let root = TempDir::new().unwrap(); + let root_path = root.path().to_str().unwrap(); + + let config = ObjectStoreConfig::try_parse_from([ + "server", + "--object-store", + "file", + "--data-dir", + root_path, + ]) + .unwrap(); + + let object_store = config.make_object_store().unwrap().to_string(); + assert!( + object_store.starts_with("LocalFileSystem"), + "{}", + object_store + ) + } + + #[test] + fn file_config_missing_params() { + // this test tests for failure to configure the object store because of data-dir configuration missing + // if the INFLUXDB3_DB_DIR env variable is set, the test fails because the configuration is + // actually present. + env::remove_var("INFLUXDB3_DB_DIR"); + + let configs = vec![ + StoreConfigs::Base( + ObjectStoreConfig::try_parse_from(["server", "--object-store", "file"]).unwrap(), + ), + StoreConfigs::Source( + SourceObjectStoreConfig::try_parse_from([ + "server", + "--source-object-store", + "file", + ]) + .unwrap(), + ), + StoreConfigs::Sink( + SinkObjectStoreConfig::try_parse_from(["server", "--sink-object-store", "file"]) + .unwrap(), + ), + ]; + + for config in configs { + let err = config.make_object_store().unwrap_err().to_string(); + assert_eq!( + err, + "Specified File for the object store, required configuration missing for \ + data-dir" + ); + } + } + + #[tokio::test] + async fn local_url_signer() { + let root = TempDir::new().unwrap(); + let root_path = root.path().to_str().unwrap(); + let parquet_file_path = "1/2/something.parquet"; + + let signer = make_presigned_url_signer( + &ObjectStoreConfig::try_parse_from([ + "server", + "--object-store", + "file", + "--data-dir", + root_path, + ]) + .unwrap(), + ) + .unwrap() + .unwrap(); + + let object_store_parquet_file_path = Path::parse(parquet_file_path).unwrap(); + let upload_url = signer + .signed_url( + http_1::Method::PUT, + &object_store_parquet_file_path, + Duration::from_secs(100), + ) + .await + .unwrap(); + + assert_eq!( + upload_url.as_str(), + &format!( + "file://{}", + std::fs::canonicalize(root.path()) + .unwrap() + .join(parquet_file_path) + .display() + ) + ); + } + + #[test] + fn endpoint() { + assert_eq!( + Endpoint::from_str("http://localhost:8080") + .unwrap() + .to_string(), + "http://localhost:8080", + ); + assert_eq!( + Endpoint::from_str("http://localhost:8080/") + .unwrap() + .to_string(), + "http://localhost:8080", + ); + assert_eq!( + Endpoint::from_str("whatever.com").unwrap_err().to_string(), + "relative URL without a base", + ); + } +} diff --git a/influxdb3_clap_blocks/src/socket_addr.rs b/influxdb3_clap_blocks/src/socket_addr.rs new file mode 100644 index 00000000000..02a1014a853 --- /dev/null +++ b/influxdb3_clap_blocks/src/socket_addr.rs @@ -0,0 +1,77 @@ +//! Config for socket addresses. +use std::{net::ToSocketAddrs, ops::Deref}; + +/// Parsable socket address. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SocketAddr(std::net::SocketAddr); + +impl Deref for SocketAddr { + type Target = std::net::SocketAddr; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Display for SocketAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::str::FromStr for SocketAddr { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_socket_addrs() { + Ok(mut addrs) => { + if let Some(addr) = addrs.next() { + Ok(Self(addr)) + } else { + Err(format!("Found no addresses for '{s}'")) + } + } + Err(e) => Err(format!("Cannot parse socket address '{s}': {e}")), + } + } +} + +impl From for std::net::SocketAddr { + fn from(addr: SocketAddr) -> Self { + addr.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::{ + net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, + str::FromStr, + }; + + #[test] + fn test_socketaddr() { + let addr: std::net::SocketAddr = SocketAddr::from_str("127.0.0.1:1234").unwrap().into(); + assert_eq!(addr, std::net::SocketAddr::from(([127, 0, 0, 1], 1234)),); + + let addr: std::net::SocketAddr = SocketAddr::from_str("localhost:1234").unwrap().into(); + // depending on where the test runs, localhost will either resolve to a ipv4 or + // an ipv6 addr. + match addr { + std::net::SocketAddr::V4(so) => { + assert_eq!(so, SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234)) + } + std::net::SocketAddr::V6(so) => assert_eq!( + so, + SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 1234, 0, 0) + ), + }; + + assert_eq!( + SocketAddr::from_str("!@INv_a1d(ad0/resp_!").unwrap_err(), + "Cannot parse socket address '!@INv_a1d(ad0/resp_!': invalid socket address", + ); + } +} diff --git a/influxdb3_clap_blocks/src/tokio.rs b/influxdb3_clap_blocks/src/tokio.rs index 39a9d5db3ab..8e2eff8239d 100644 --- a/influxdb3_clap_blocks/src/tokio.rs +++ b/influxdb3_clap_blocks/src/tokio.rs @@ -177,7 +177,7 @@ macro_rules! tokio_rt_config { let thread_counter = Arc::new(AtomicUsize::new(1)); let name = name.to_owned(); builder.thread_name_fn(move || { - format!("InfluxDB 3.0 Tokio {} {}", name, thread_counter.fetch_add(1, Ordering::SeqCst)) + format!("InfluxDB 3 Core Tokio {} {}", name, thread_counter.fetch_add(1, Ordering::SeqCst)) }); // worker thread count @@ -286,7 +286,7 @@ mod tests { .builder() .unwrap(), || { - assert_thread_name("InfluxDB 3.0 Tokio IO"); + assert_thread_name("InfluxDB 3 Core Tokio IO"); }, ); assert_runtime_thread_property( @@ -294,7 +294,7 @@ mod tests { .builder() .unwrap(), || { - assert_thread_name("InfluxDB 3.0 Tokio Datafusion"); + assert_thread_name("InfluxDB 3 Core Tokio Datafusion"); }, ); assert_runtime_thread_property( @@ -302,7 +302,7 @@ mod tests { .builder_with_name("foo") .unwrap(), || { - assert_thread_name("InfluxDB 3.0 Tokio foo"); + assert_thread_name("InfluxDB 3 Core Tokio foo"); }, ); } diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index a72a403ffaf..6000d9e989a 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -64,12 +64,12 @@ impl Error { pub type Result = std::result::Result; -/// The InfluxDB 3.0 Client +/// The InfluxDB 3 Core Client /// -/// For programmatic access to the HTTP API of InfluxDB 3.0 +/// For programmatic access to the HTTP API of InfluxDB 3 Core #[derive(Debug, Clone)] pub struct Client { - /// The base URL for making requests to a running InfluxDB 3.0 server + /// The base URL for making requests to a running InfluxDB 3 Core server base_url: Url, /// The `Bearer` token to use for authenticating on each request to the server auth_token: Option>, diff --git a/influxdb3_load_generator/src/commands/common.rs b/influxdb3_load_generator/src/commands/common.rs index d768664d757..a9b032740a9 100644 --- a/influxdb3_load_generator/src/commands/common.rs +++ b/influxdb3_load_generator/src/commands/common.rs @@ -14,7 +14,7 @@ use crate::{ #[derive(Debug, Parser)] pub(crate) struct InfluxDb3Config { - /// The host URL of the running InfluxDB 3.0 server + /// The host URL of the running InfluxDB 3 Core server #[clap( short = 'h', long = "host", @@ -32,7 +32,7 @@ pub(crate) struct InfluxDb3Config { )] pub(crate) database_name: String, - /// The token for authentication with the InfluxDB 3.0 server + /// The token for authentication with the InfluxDB 3 Core server #[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")] pub(crate) auth_token: Option>, diff --git a/influxdb3_load_generator/src/commands/full.rs b/influxdb3_load_generator/src/commands/full.rs index 682f3b6b0fe..aa556ee6a51 100644 --- a/influxdb3_load_generator/src/commands/full.rs +++ b/influxdb3_load_generator/src/commands/full.rs @@ -9,7 +9,7 @@ use super::{common::InfluxDb3Config, query::QueryConfig, write::WriteConfig}; #[derive(Debug, Parser)] pub(crate) struct Config { - /// Common InfluxDB 3.0 config + /// Common InfluxDB 3 Core config #[clap(flatten)] common: InfluxDb3Config, diff --git a/influxdb3_load_generator/src/commands/query.rs b/influxdb3_load_generator/src/commands/query.rs index 1227d5fdeb3..c1fb5768d46 100644 --- a/influxdb3_load_generator/src/commands/query.rs +++ b/influxdb3_load_generator/src/commands/query.rs @@ -18,7 +18,7 @@ use super::common::InfluxDb3Config; #[derive(Debug, Parser)] #[clap(visible_alias = "q", trailing_var_arg = true)] pub(crate) struct Config { - /// Common InfluxDB 3.0 config + /// Common InfluxDB 3 Core config #[clap(flatten)] common: InfluxDb3Config, diff --git a/influxdb3_load_generator/src/commands/write.rs b/influxdb3_load_generator/src/commands/write.rs index b3f62990395..d288d964cae 100644 --- a/influxdb3_load_generator/src/commands/write.rs +++ b/influxdb3_load_generator/src/commands/write.rs @@ -19,7 +19,7 @@ use super::common::InfluxDb3Config; #[derive(Debug, Parser)] #[clap(visible_alias = "w", trailing_var_arg = true)] pub(crate) struct Config { - /// Common InfluxDB 3.0 config + /// Common InfluxDB 3 Core config #[clap(flatten)] common: InfluxDb3Config, diff --git a/influxdb3_load_generator/src/main.rs b/influxdb3_load_generator/src/main.rs index de220eeb595..3ea5ec3473d 100644 --- a/influxdb3_load_generator/src/main.rs +++ b/influxdb3_load_generator/src/main.rs @@ -45,8 +45,8 @@ clap::Arg::new("help") .action(clap::ArgAction::Help) .global(true) ), -about = "InfluxDB 3.0 Load Generator for writes and queries", -long_about = r#"InfluxDB 3.0 Load Generator for writes and queries +about = "InfluxDB 3 Core Load Generator for writes and queries", +long_about = r#"InfluxDB 3 Core Load Generator for writes and queries Examples: # Run the write load generator @@ -76,13 +76,13 @@ struct Config { #[derive(Debug, clap::Parser)] #[allow(clippy::large_enum_variant)] enum Command { - /// Perform a query against a running InfluxDB 3.0 server + /// Perform a query against a running InfluxDB 3 Core server Query(commands::query::Config), - /// Perform a set of writes to a running InfluxDB 3.0 server + /// Perform a set of writes to a running InfluxDB 3 Core server Write(commands::write::Config), - /// Perform both writes and queries against a running InfluxDB 3.0 server + /// Perform both writes and queries against a running InfluxDB 3 Core server Full(commands::full::Config), } diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 732b9991d51..931583319e6 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -1,4 +1,4 @@ -//! InfluxDB 3.0 OSS server implementation +//! InfluxDB 3 Core server implementation //! //! The server is responsible for handling the HTTP API #![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 3d9bc8c6141..f008456c507 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -1,4 +1,4 @@ -//! This crate provides a Write Ahead Log (WAL) for InfluxDB 3.0. The WAL is used to buffer writes +//! This crate provides a Write Ahead Log (WAL) for InfluxDB 3 Core. The WAL is used to buffer writes //! in memory and persist them as individual files in an object store. The WAL is used to make //! writes durable until they can be written in larger batches as Parquet files and other snapshot and //! index files in object storage.