diff --git a/.config/hakari.toml b/.config/hakari.toml index c1355e0892334..ea2dd0f83bb89 100644 --- a/.config/hakari.toml +++ b/.config/hakari.toml @@ -41,4 +41,5 @@ third-party = [ { name = "criterion" }, { name = "console" }, { name = "similar" }, + { name = "deltalake", git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b" }, ] diff --git a/.github/workflows/cherry-pick-to-release-branch.yml b/.github/workflows/cherry-pick-to-release-branch.yml index ca2ba9dac0d7c..a2cdc9e7b4113 100644 --- a/.github/workflows/cherry-pick-to-release-branch.yml +++ b/.github/workflows/cherry-pick-to-release-branch.yml @@ -21,7 +21,7 @@ jobs: pr_body: ${{ format('Cherry picking \#{0} onto branch v1.5.1-rc', github.event.number) }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - + release_pull_request_1_5_0: if: "contains(github.event.pull_request.labels.*.name, 'need-cherry-pick-v1.5.0') && github.event.pull_request.merged == true" runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index d4a33be1ade04..c850906cbd1cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,7 +175,7 @@ dependencies = [ "log", "num-bigint", "quad-rand", - "rand", + "rand 0.8.5", "regex", "serde", "serde_json", @@ -200,7 +200,7 @@ dependencies = [ "log", "num-bigint", "quad-rand", - "rand", + "rand 0.8.5", "regex-lite", "serde", "serde_json", @@ -233,27 +233,87 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "arrayref" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" + [[package]] name = "arrayvec" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8919668503a4f2d8b6da96fa7c16e93046bfb3412ffcfa1e5dc7d2e3adcb378" +dependencies = [ + "ahash 0.8.3", + "arrow-arith 48.0.1", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-cast 48.0.1", + "arrow-csv", + "arrow-data 48.0.1", + "arrow-ipc 48.0.1", + "arrow-json", + "arrow-ord 48.0.1", + "arrow-row 48.0.1", + "arrow-schema 48.0.1", + "arrow-select 48.0.1", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef983914f477d4278b068f13b3224b7d19eb2b807ac9048544d3bfebdf2554c4" +dependencies = [ + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "chrono", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-arith" version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "chrono", "half 2.3.1", "num", ] +[[package]] +name = "arrow-array" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6eaf89041fa5937940ae390294ece29e1db584f46d995608d6e5fe65a2e0e9b" +dependencies = [ + "ahash 0.8.3", + "arrow-buffer 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "chrono", + "chrono-tz", + "half 2.3.1", + "hashbrown 0.14.0", + "num", +] + [[package]] name = "arrow-array" version = "49.0.0" @@ -261,15 +321,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d" dependencies = [ "ahash 0.8.3", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "chrono", "half 2.3.1", "hashbrown 0.14.0", "num", ] +[[package]] +name = "arrow-buffer" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55512d988c6fbd76e514fd3ff537ac50b0a675da5a245e4fdad77ecfd654205f" +dependencies = [ + "bytes", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-buffer" version = "49.0.0" @@ -281,17 +352,35 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-cast" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "655ee51a2156ba5375931ce21c1b2494b1d9260e6dcdc6d4db9060c37dc3325b" +dependencies = [ + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "arrow-select 48.0.1", + "chrono", + "comfy-table", + "half 2.3.1", + "lexical-core", + "num", +] + [[package]] name = "arrow-cast" version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "base64 0.21.4", "chrono", "half 2.3.1", @@ -299,14 +388,45 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-csv" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bb689997ad5b6660b3ce3638bd6b383d668ec555ed41ad7c6559cbb2e4f91" +dependencies = [ + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-cast 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "chrono", + "csv", + "csv-core", + "lazy_static", + "lexical-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dc2b9fec74763427e2e5575b8cc31ce96ba4c9b4eb05ce40e0616d9fad12461" +dependencies = [ + "arrow-buffer 48.0.1", + "arrow-schema 48.0.1", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-data" version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634" dependencies = [ - "arrow-buffer", - "arrow-schema", + "arrow-buffer 49.0.0", + "arrow-schema 49.0.0", "half 2.3.1", "num", ] @@ -317,11 +437,11 @@ version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624e0dcb6b5a7a06222bfd2be3f7e905ce849a6b714ec989f18cdba330c77d38" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-ipc", - "arrow-schema", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-ipc 49.0.0", + "arrow-schema 49.0.0", "base64 0.21.4", "bytes", "futures", @@ -331,35 +451,99 @@ dependencies = [ "tonic 0.10.2", ] +[[package]] +name = "arrow-ipc" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eaa6ab203cc6d89b7eaa1ac781c1dfeef325454c5d5a0419017f95e6bafc03c" +dependencies = [ + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-cast 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "flatbuffers", +] + [[package]] name = "arrow-ipc" version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-schema", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "flatbuffers", ] +[[package]] +name = "arrow-json" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb64e30d9b73f66fdc5c52d5f4cf69bbf03d62f64ffeafa0715590a5320baed7" +dependencies = [ + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-cast 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "chrono", + "half 2.3.1", + "indexmap 2.0.0", + "lexical-core", + "num", + "serde", + "serde_json", +] + +[[package]] +name = "arrow-ord" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9a818951c0d11c428dda03e908175969c262629dd20bd0850bd6c7a8c3bfe48" +dependencies = [ + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "arrow-select 48.0.1", + "half 2.3.1", + "num", +] + [[package]] name = "arrow-ord" version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "half 2.3.1", "num", ] +[[package]] +name = "arrow-row" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d664318bc05f930559fc088888f0f7174d3c5bc888c0f4f9ae8f23aa398ba3" +dependencies = [ + "ahash 0.8.3", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "half 2.3.1", + "hashbrown 0.14.0", +] + [[package]] name = "arrow-row" version = "49.0.0" @@ -367,20 +551,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" dependencies = [ "ahash 0.8.3", - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "half 2.3.1", "hashbrown 0.14.0", ] +[[package]] +name = "arrow-schema" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf4d737bba93da59f16129bec21e087aed0be84ff840e74146d4703879436cb" +dependencies = [ + "serde", +] + [[package]] name = "arrow-schema" version = "49.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167" +[[package]] +name = "arrow-select" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374c4c3b812ecc2118727b892252a4a4308f87a8aca1dbf09f3ce4bc578e668a" +dependencies = [ + "ahash 0.8.3", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "num", +] + [[package]] name = "arrow-select" version = "49.0.0" @@ -388,13 +595,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" dependencies = [ "ahash 0.8.3", - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-data 49.0.0", + "arrow-schema 49.0.0", "num", ] +[[package]] +name = "arrow-string" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15aed5624bb23da09142f58502b59c23f5bea607393298bb81dab1ce60fc769" +dependencies = [ + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-data 48.0.1", + "arrow-schema 48.0.1", + "arrow-select 48.0.1", + "num", + "regex", + "regex-syntax 0.8.0", +] + [[package]] name = "assert_matches" version = "1.5.0" @@ -435,6 +658,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-compression" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio", + "xz2", + "zstd 0.13.0", + "zstd-safe 7.0.0", +] + [[package]] name = "async-executor" version = "1.5.1" @@ -508,7 +749,7 @@ dependencies = [ "nkeys", "nuid", "once_cell", - "rand", + "rand 0.8.5", "regex", "ring 0.16.20", "rustls", @@ -1343,6 +1584,28 @@ dependencies = [ "triple_accel", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest", +] + +[[package]] +name = "blake3" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1934,6 +2197,12 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6" +[[package]] +name = "constant_time_eq" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" + [[package]] name = "convert_case" version = "0.6.0" @@ -2188,7 +2457,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" dependencies = [ "generic-array", - "rand_core", + "rand_core 0.6.4", "subtle", "zeroize", ] @@ -2200,7 +2469,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" dependencies = [ "generic-array", - "rand_core", + "rand_core 0.6.4", "subtle", "zeroize", ] @@ -2469,6 +2738,221 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41b319d1b62ffbd002e057f36bebd1f42b9f97927c9577461d855f3513c4289f" +[[package]] +name = "datafusion" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "676796427e638d85e9eadf13765705212be60b34f8fc5d3934d95184c63ca1b4" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-schema 48.0.1", + "async-compression", + "async-trait", + "bytes", + "bzip2", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-optimizer", + "datafusion-physical-expr", + "datafusion-physical-plan", + "datafusion-sql", + "flate2", + "futures", + "glob", + "half 2.3.1", + "hashbrown 0.14.0", + "indexmap 2.0.0", + "itertools 0.11.0", + "log", + "num_cpus", + "object_store", + "parking_lot 0.12.1", + "parquet 48.0.1", + "pin-project-lite", + "rand 0.8.5", + "sqlparser", + "tempfile", + "tokio", + "tokio-util", + "url", + "uuid", + "xz2", + "zstd 0.13.0", +] + +[[package]] +name = "datafusion-common" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e23b3d21a6531259d291bd20ce59282ea794bda1018b0a1e278c13cd52e50c" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-schema 48.0.1", + "chrono", + "half 2.3.1", + "num_cpus", + "object_store", + "parquet 48.0.1", + "sqlparser", +] + +[[package]] +name = "datafusion-execution" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de1fd0d8db0f2b8e4f4121bfa1c7c09d3a5c08a0a65c2229cd849eb65cff855" +dependencies = [ + "arrow", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-expr", + "futures", + "hashbrown 0.14.0", + "log", + "object_store", + "parking_lot 0.12.1", + "rand 0.8.5", + "tempfile", + "url", +] + +[[package]] +name = "datafusion-expr" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e227fe88bf6730cab378d0cd8fc4c6b2ea42bc7e414a8ea9feba7225932735" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "datafusion-common", + "sqlparser", + "strum", + "strum_macros", +] + +[[package]] +name = "datafusion-optimizer" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6648e62ea7605b9bfcd87fdc9d67e579c3b9ac563a87734ae5fe6d79ee4547" +dependencies = [ + "arrow", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "hashbrown 0.14.0", + "itertools 0.11.0", + "log", + "regex-syntax 0.8.0", +] + +[[package]] +name = "datafusion-physical-expr" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f32b8574add16a32411a9b3fb3844ac1fc09ab4e7be289f86fd56d620e4f2508" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-ord 48.0.1", + "arrow-schema 48.0.1", + "base64 0.21.4", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-expr", + "half 2.3.1", + "hashbrown 0.14.0", + "hex", + "indexmap 2.0.0", + "itertools 0.11.0", + "libc", + "log", + "md-5", + "paste", + "petgraph", + "rand 0.8.5", + "regex", + "sha2", + "unicode-segmentation", + "uuid", +] + +[[package]] +name = "datafusion-physical-plan" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796abd77d5bfecd9e5275a99daf0ec45f5b3a793ec431349ce8211a67826fd22" +dependencies = [ + "ahash 0.8.3", + "arrow", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-schema 48.0.1", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "futures", + "half 2.3.1", + "hashbrown 0.14.0", + "indexmap 2.0.0", + "itertools 0.11.0", + "log", + "once_cell", + "parking_lot 0.12.1", + "pin-project-lite", + "rand 0.8.5", + "tokio", + "uuid", +] + +[[package]] +name = "datafusion-proto" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26de2592417beb20f73f29b131a04d7de14e2a6336c631554d611584b4306236" +dependencies = [ + "arrow", + "chrono", + "datafusion", + "datafusion-common", + "datafusion-expr", + "object_store", + "prost 0.12.1", +] + +[[package]] +name = "datafusion-sql" +version = "33.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced70b8a5648ba7b95c61fc512183c33287ffe2c9f22ffe22700619d7d48c84f" +dependencies = [ + "arrow", + "arrow-schema 48.0.1", + "datafusion-common", + "datafusion-expr", + "log", + "sqlparser", +] + [[package]] name = "debugid" version = "0.8.0" @@ -2478,6 +2962,67 @@ dependencies = [ "uuid", ] +[[package]] +name = "deltalake" +version = "0.17.0" +source = "git+https://github.com/risingwavelabs/delta-rs?rev=5c2dccd4640490202ffe98adbd13b09cef8e007b#5c2dccd4640490202ffe98adbd13b09cef8e007b" +dependencies = [ + "deltalake-core", +] + +[[package]] +name = "deltalake-core" +version = "0.17.0" +source = "git+https://github.com/risingwavelabs/delta-rs?rev=5c2dccd4640490202ffe98adbd13b09cef8e007b#5c2dccd4640490202ffe98adbd13b09cef8e007b" +dependencies = [ + "arrow", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-cast 48.0.1", + "arrow-ord 48.0.1", + "arrow-row 48.0.1", + "arrow-schema 48.0.1", + "arrow-select 48.0.1", + "async-trait", + "bytes", + "cfg-if", + "chrono", + "dashmap", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "datafusion-proto", + "datafusion-sql", + "either", + "errno", + "fix-hidden-lifetime-bug", + "futures", + "itertools 0.11.0", + "lazy_static", + "libc", + "log", + "num-bigint", + "num-traits", + "num_cpus", + "object_store", + "once_cell", + "parking_lot 0.12.1", + "parquet 48.0.1", + "percent-encoding", + "rand 0.8.5", + "regex", + "roaring", + "serde", + "serde_json", + "sqlparser", + "thiserror", + "tokio", + "url", + "uuid", + "z85", +] + [[package]] name = "der" version = "0.6.1" @@ -2614,6 +3159,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dotenvy" version = "0.15.7" @@ -2755,7 +3306,7 @@ dependencies = [ "generic-array", "group 0.12.1", "pkcs8 0.9.0", - "rand_core", + "rand_core 0.6.4", "sec1 0.3.0", "subtle", "zeroize", @@ -2776,7 +3327,7 @@ dependencies = [ "hkdf", "pem-rfc7468", "pkcs8 0.10.2", - "rand_core", + "rand_core 0.6.4", "sec1 0.7.3", "subtle", "zeroize", @@ -2990,7 +3541,7 @@ checksum = "fe5e43d0f78a42ad591453aedb1d7ae631ce7ee445c7643691055a9ed8d3b01c" dependencies = [ "log", "once_cell", - "rand", + "rand 0.8.5", ] [[package]] @@ -3039,7 +3590,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" dependencies = [ - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -3049,7 +3600,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449" dependencies = [ - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -3086,6 +3637,26 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "fix-hidden-lifetime-bug" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4ae9c2016a663983d4e40a9ff967d6dcac59819672f0b47f2b17574e99c33c8" +dependencies = [ + "fix-hidden-lifetime-bug-proc_macros", +] + +[[package]] +name = "fix-hidden-lifetime-bug-proc_macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4c81935e123ab0741c4c4f0d9b8377e5fb21d3de7e062fa4b1263b1fbcba1ea" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -3249,7 +3820,7 @@ dependencies = [ "parking_lot 0.12.1", "paste", "prometheus", - "rand", + "rand 0.8.5", "thiserror", "tracing", "twox-hash", @@ -3275,7 +3846,7 @@ dependencies = [ "memchr", "parking_lot 0.12.1", "parking_lot_core 0.9.8", - "rand", + "rand 0.8.5", "tokio", "tokio-util", "tower", @@ -3360,6 +3931,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "function_name" version = "0.3.0" @@ -3719,7 +4296,7 @@ dependencies = [ "no-std-compat", "nonzero_ext", "parking_lot 0.12.1", - "rand", + "rand 0.8.5", "smallvec", ] @@ -3730,7 +4307,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" dependencies = [ "ff 0.12.1", - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -3741,7 +4318,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" dependencies = [ "ff 0.13.0", - "rand_core", + "rand_core 0.6.4", "subtle", ] @@ -4076,14 +4653,14 @@ source = "git+https://github.com/icelake-io/icelake?rev=5cdcdffd24f4624a0a43f92c dependencies = [ "anyhow", "apache-avro 0.15.0", - "arrow-arith", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-ord", - "arrow-row", - "arrow-schema", - "arrow-select", + "arrow-arith 49.0.0", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-ord 49.0.0", + "arrow-row 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "async-trait", "bitvec", "bytes", @@ -4099,7 +4676,7 @@ dependencies = [ "once_cell", "opendal", "ordered-float 3.9.1", - "parquet", + "parquet 49.0.0", "prometheus", "regex", "reqwest", @@ -4782,7 +5359,7 @@ dependencies = [ "madsim-macros", "naive-timer", "panic-message", - "rand", + "rand 0.8.5", "rand_xoshiro", "rustversion", "serde", @@ -5154,7 +5731,7 @@ dependencies = [ "pem 3.0.2", "percent-encoding", "pin-project", - "rand", + "rand 0.8.5", "serde", "serde_json", "socket2 0.5.3", @@ -5190,7 +5767,7 @@ dependencies = [ "mysql-common-derive", "num-bigint", "num-traits", - "rand", + "rand 0.8.5", "regex", "rust_decimal", "saturating", @@ -5246,7 +5823,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45fc401175963954a7046238c51799a99eaa4d8be7dd7a0c52dbf00813e52f92" dependencies = [ "lazy_static", - "rand", + "rand 0.8.5", "serde", ] @@ -5284,7 +5861,7 @@ dependencies = [ "ed25519-dalek", "getrandom", "log", - "rand", + "rand 0.8.5", "signatory", ] @@ -5341,7 +5918,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" dependencies = [ - "rand", + "rand 0.8.5", ] [[package]] @@ -5381,7 +5958,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand", + "rand 0.8.5", "smallvec", "zeroize", ] @@ -5504,7 +6081,7 @@ dependencies = [ "chrono", "getrandom", "http 0.2.9", - "rand", + "rand 0.8.5", "reqwest", "serde", "serde_json", @@ -5523,6 +6100,35 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" +dependencies = [ + "async-trait", + "base64 0.21.4", + "bytes", + "chrono", + "futures", + "humantime", + "hyper", + "itertools 0.11.0", + "parking_lot 0.12.1", + "percent-encoding", + "quick-xml 0.30.0", + "rand 0.8.5", + "reqwest", + "ring 0.16.20", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -5585,7 +6191,7 @@ dependencies = [ "oauth2", "p256 0.13.2", "p384", - "rand", + "rand 0.8.5", "rsa", "serde", "serde-value", @@ -5735,7 +6341,7 @@ dependencies = [ "opentelemetry_api", "ordered-float 3.9.1", "percent-encoding", - "rand", + "rand 0.8.5", "regex", "serde_json", "thiserror", @@ -5921,6 +6527,40 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "parquet" +version = "48.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bfe55df96e3f02f11bf197ae37d91bb79801631f82f6195dd196ef521df3597" +dependencies = [ + "ahash 0.8.3", + "arrow-array 48.0.1", + "arrow-buffer 48.0.1", + "arrow-cast 48.0.1", + "arrow-data 48.0.1", + "arrow-ipc 48.0.1", + "arrow-schema 48.0.1", + "arrow-select 48.0.1", + "base64 0.21.4", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "hashbrown 0.14.0", + "lz4_flex", + "num", + "num-bigint", + "object_store", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd 0.13.0", +] + [[package]] name = "parquet" version = "49.0.0" @@ -5928,13 +6568,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4" dependencies = [ "ahash 0.8.3", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-ipc", - "arrow-schema", - "arrow-select", + "arrow-array 49.0.0", + "arrow-buffer 49.0.0", + "arrow-cast 49.0.0", + "arrow-data 49.0.0", + "arrow-ipc 49.0.0", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "base64 0.21.4", "brotli", "bytes", @@ -6146,7 +6786,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -6323,7 +6963,7 @@ dependencies = [ "hmac", "md-5", "memchr", - "rand", + "rand 0.8.5", "sha2", "stringprep", ] @@ -6804,7 +7444,7 @@ dependencies = [ "prost 0.11.9", "prost-build 0.11.9", "prost-derive 0.11.9", - "rand", + "rand 0.8.5", "regex", "serde", "serde_json", @@ -6887,6 +7527,19 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.8.5" @@ -6895,7 +7548,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -6905,9 +7558,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", ] +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.6.4" @@ -6923,7 +7591,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" dependencies = [ - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -6973,6 +7641,15 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redact" version = "0.1.5" @@ -7095,6 +7772,15 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3cbb081b9784b07cceb8824c8583f86db4814d172ab043f3c23f7dc600bf83d" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + [[package]] name = "rend" version = "0.4.0" @@ -7124,7 +7810,7 @@ dependencies = [ "once_cell", "percent-encoding", "quick-xml 0.31.0", - "rand", + "rand 0.8.5", "reqwest", "rsa", "rust-ini", @@ -7180,6 +7866,12 @@ dependencies = [ "winreg", ] +[[package]] +name = "retain_mut" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" + [[package]] name = "rfc6979" version = "0.3.1" @@ -7337,7 +8029,7 @@ dependencies = [ "parking_lot 0.12.1", "paste", "prometheus", - "rand", + "rand 0.8.5", "risingwave_common", "risingwave_connector", "risingwave_expr", @@ -7383,7 +8075,7 @@ dependencies = [ "opentelemetry", "parking_lot 0.12.1", "prometheus", - "rand", + "rand 0.8.5", "risingwave_common", "risingwave_rt", "risingwave_storage", @@ -7454,10 +8146,14 @@ version = "1.5.0-alpha" dependencies = [ "anyhow", "arc-swap", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-schema", + "arrow-array 48.0.1", + "arrow-array 49.0.0", + "arrow-buffer 48.0.1", + "arrow-buffer 49.0.0", + "arrow-cast 48.0.1", + "arrow-cast 49.0.0", + "arrow-schema 48.0.1", + "arrow-schema 49.0.0", "async-trait", "auto_enums", "auto_impl", @@ -7511,7 +8207,7 @@ dependencies = [ "procfs 0.16.0", "prometheus", "prost 0.12.1", - "rand", + "rand 0.8.5", "regex", "reqwest", "risingwave-fields-derive", @@ -7606,7 +8302,7 @@ dependencies = [ "futures", "madsim-tokio", "prometheus", - "rand", + "rand 0.8.5", "risingwave_common", "risingwave_compactor", "risingwave_hummock_sdk", @@ -7663,7 +8359,7 @@ dependencies = [ "maplit", "pprof", "prometheus", - "rand", + "rand 0.8.5", "risingwave_batch", "risingwave_common", "risingwave_common_heap_profiling", @@ -7693,8 +8389,8 @@ version = "1.5.0-alpha" dependencies = [ "anyhow", "apache-avro 0.16.0", - "arrow-array", - "arrow-schema", + "arrow-array 49.0.0", + "arrow-schema 49.0.0", "async-nats", "async-trait", "auto_enums", @@ -7714,6 +8410,7 @@ dependencies = [ "clickhouse", "criterion", "csv", + "deltalake", "duration-str", "easy-ext", "enum-as-inner", @@ -7751,7 +8448,7 @@ dependencies = [ "protobuf-src", "pulsar", "quote", - "rand", + "rand 0.8.5", "redis", "regex", "reqwest", @@ -7769,6 +8466,7 @@ dependencies = [ "strum", "strum_macros", "syn 1.0.109", + "tempdir", "tempfile", "thiserror", "time", @@ -7858,8 +8556,8 @@ name = "risingwave_expr" version = "1.5.0-alpha" dependencies = [ "anyhow", - "arrow-array", - "arrow-schema", + "arrow-array 49.0.0", + "arrow-schema 49.0.0", "async-trait", "auto_impl", "await-tree", @@ -7943,7 +8641,7 @@ version = "1.5.0-alpha" dependencies = [ "anyhow", "arc-swap", - "arrow-schema", + "arrow-schema 49.0.0", "assert_matches", "async-recursion", "async-trait", @@ -7979,7 +8677,7 @@ dependencies = [ "pretty-xmlish", "pretty_assertions", "prometheus", - "rand", + "rand 0.8.5", "risingwave_batch", "risingwave_common", "risingwave_common_service", @@ -8037,7 +8735,7 @@ dependencies = [ "itertools 0.12.0", "madsim-tokio", "parking_lot 0.12.1", - "rand", + "rand 0.8.5", "risingwave_common", "risingwave_common_service", "risingwave_hummock_sdk", @@ -8170,7 +8868,7 @@ dependencies = [ "prometheus", "prometheus-http-query", "prost 0.12.1", - "rand", + "rand 0.8.5", "reqwest", "risingwave_backup", "risingwave_common", @@ -8262,7 +8960,7 @@ dependencies = [ "itertools 0.12.0", "madsim-tokio", "madsim-tonic", - "rand", + "rand 0.8.5", "regex", "risingwave_common", "risingwave_connector", @@ -8380,7 +9078,7 @@ dependencies = [ "madsim-tokio", "madsim-tonic", "moka", - "rand", + "rand 0.8.5", "risingwave_common", "risingwave_error", "risingwave_hummock_sdk", @@ -8446,7 +9144,7 @@ dependencies = [ "pin-project", "pretty_assertions", "prometheus", - "rand", + "rand 0.8.5", "rand_chacha", "risingwave_common", "risingwave_compactor", @@ -8487,7 +9185,7 @@ dependencies = [ "madsim-tokio", "parking_lot 0.12.1", "paste", - "rand", + "rand 0.8.5", "risingwave_common", "risingwave_connector", "risingwave_pb", @@ -8534,7 +9232,7 @@ dependencies = [ "itertools 0.12.0", "libtest-mimic", "madsim-tokio", - "rand", + "rand 0.8.5", "rand_chacha", "regex", "risingwave_common", @@ -8609,7 +9307,7 @@ dependencies = [ "procfs 0.16.0", "prometheus", "prost 0.12.1", - "rand", + "rand 0.8.5", "risingwave_backup", "risingwave_common", "risingwave_common_service", @@ -8671,7 +9369,7 @@ dependencies = [ "pin-project", "prometheus", "prost 0.12.1", - "rand", + "rand 0.8.5", "risingwave_common", "risingwave_connector", "risingwave_expr", @@ -8710,10 +9408,10 @@ dependencies = [ name = "risingwave_udf" version = "0.1.0" dependencies = [ - "arrow-array", + "arrow-array 49.0.0", "arrow-flight", - "arrow-schema", - "arrow-select", + "arrow-schema 49.0.0", + "arrow-select 49.0.0", "cfg-or-panic", "futures-util", "madsim-tokio", @@ -8775,6 +9473,17 @@ dependencies = [ "libc", ] +[[package]] +name = "roaring" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873" +dependencies = [ + "bytemuck", + "byteorder", + "retain_mut", +] + [[package]] name = "rsa" version = "0.9.2" @@ -8790,7 +9499,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8 0.10.2", - "rand_core", + "rand_core 0.6.4", "signature 2.0.0", "spki 0.7.2", "subtle", @@ -8818,7 +9527,7 @@ dependencies = [ "bytes", "num-traits", "postgres", - "rand", + "rand 0.8.5", "rkyv", "serde", "serde_json", @@ -9582,7 +10291,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" dependencies = [ "pkcs8 0.10.2", - "rand_core", + "rand_core 0.6.4", "signature 2.0.0", "zeroize", ] @@ -9594,7 +10303,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" dependencies = [ "digest", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -9604,7 +10313,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fe458c98333f9c8152221191a77e2a44e8325d0193484af2e9421a53019e57d" dependencies = [ "digest", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -9717,6 +10426,28 @@ dependencies = [ "serde", ] +[[package]] +name = "snafu" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "snap" version = "1.1.0" @@ -9836,6 +10567,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "sqlparser" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743b4dc2cbde11890ccb254a8fc9d537fa41b36da00de2a1c5e9848c9bc42bd7" +dependencies = [ + "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "sqlx" version = "0.7.1" @@ -9965,7 +10717,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand", + "rand 0.8.5", "rsa", "rust_decimal", "serde", @@ -10010,7 +10762,7 @@ dependencies = [ "memchr", "num-bigint", "once_cell", - "rand", + "rand 0.8.5", "rust_decimal", "serde", "serde_json", @@ -10264,6 +11016,16 @@ dependencies = [ "madsim-tokio", ] +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.8.0" @@ -10562,7 +11324,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", - "rand", + "rand 0.8.5", "socket2 0.5.3", "tokio-util", "whoami", @@ -10575,7 +11337,7 @@ source = "git+https://github.com/madsim-rs/rust-tokio-retry.git?rev=95e2fd3#95e2 dependencies = [ "madsim-tokio", "pin-project", - "rand", + "rand 0.8.5", ] [[package]] @@ -10755,7 +11517,7 @@ dependencies = [ "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -10955,7 +11717,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand", + "rand 0.8.5", "static_assertions", ] @@ -11102,7 +11864,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom", - "rand", + "rand 0.8.5", "serde", ] @@ -11622,9 +12384,9 @@ dependencies = [ "prost 0.12.1", "prost-types 0.12.1", "quote", - "rand", + "rand 0.8.5", "rand_chacha", - "rand_core", + "rand_core 0.6.4", "redis", "regex", "regex-automata 0.4.1", @@ -11696,7 +12458,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d36478bcf71152a2f9f6cf9bc48273333f32780c769ef90e13d464ab778db5f" dependencies = [ "libm", - "rand", + "rand 0.8.5", ] [[package]] @@ -11756,6 +12518,12 @@ dependencies = [ "url", ] +[[package]] +name = "z85" +version = "3.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a599daf1b507819c1121f0bf87fa37eb19daac6aff3aefefd4e6e2e0f2020fc" + [[package]] name = "zerocopy" version = "0.6.4" diff --git a/Cargo.toml b/Cargo.toml index 42018691f110b..54adb86211062 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,14 @@ arrow-flight = "49" arrow-select = "49" arrow-ord = "49" arrow-row = "49" +arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" } +arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" } +arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" } +arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" } +# Use a forked version which removes the dependencies on dynamo db to reduce +# compile time and binary size. +deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = ["s3-no-concurrent-write"] } +parquet = "49" thiserror-ext = "0.0.10" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ diff --git a/ci/scripts/e2e-deltalake-sink-rust-test.sh b/ci/scripts/e2e-deltalake-sink-rust-test.sh new file mode 100755 index 0000000000000..71ff1eede8e4d --- /dev/null +++ b/ci/scripts/e2e-deltalake-sink-rust-test.sh @@ -0,0 +1,80 @@ +#!/usr/bin/env bash + +# Exits as soon as any line fails. +set -euo pipefail + +source ci/scripts/common.sh + +# prepare environment +export CONNECTOR_LIBS_PATH="./connector-node/libs" + +while getopts 'p:' opt; do + case ${opt} in + p ) + profile=$OPTARG + ;; + \? ) + echo "Invalid Option: -$OPTARG" 1>&2 + exit 1 + ;; + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + ;; + esac +done +shift $((OPTIND -1)) + +download_and_prepare_rw "$profile" source + +echo "--- Download connector node package" +buildkite-agent artifact download risingwave-connector.tar.gz ./ +mkdir ./connector-node +tar xf ./risingwave-connector.tar.gz -C ./connector-node + +echo "--- starting risingwave cluster" +mkdir -p .risingwave/log +cargo make ci-start ci-deltalake-test +sleep 1 + +# prepare minio deltalake sink +echo "--- preparing deltalake" +.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/deltalake +wget https://ci-deps-dist.s3.amazonaws.com/spark-3.3.1-bin-hadoop3.tgz +tar -xf spark-3.3.1-bin-hadoop3.tgz --no-same-owner +DEPENDENCIES=io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2 +spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ + --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \ + --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \ + --conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \ + --conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \ + --conf 'spark.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301' \ + --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ + --S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean) using delta;' + + +echo "--- testing sinks" +sqllogictest -p 4566 -d dev './e2e_test/sink/deltalake_rust_sink.slt' +sleep 1 + + +spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \ + --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \ + --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \ + --conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \ + --conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \ + --conf 'spark.hadoop.fs.s3a.endpoint=http://localhost:9301' \ + --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ + --S --e 'INSERT OVERWRITE DIRECTORY "./spark-output" USING CSV SELECT * FROM delta.`s3a://deltalake/deltalake-test`;' + +# check sink destination using shell +if cat ./spark-output/*.csv | sort | awk -F "," '{ + exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false"); }'; then + echo "DeltaLake sink check passed" +else + cat ./spark-output/*.csv + echo "The output is not as expected." + exit 1 +fi + +echo "--- Kill cluster" +cargo make ci-kill \ No newline at end of file diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 02720d6480263..843c4b11d0b6f 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -206,7 +206,7 @@ steps: config: ci/docker-compose.yml environment: - CODECOV_TOKEN - timeout_in_minutes: 20 + timeout_in_minutes: 22 retry: *auto-retry - label: "unit test (deterministic simulation)" @@ -680,7 +680,26 @@ steps: timeout_in_minutes: 14 retry: *auto-retry - - label: "end-to-end clickhouse sink test" + - label: "end-to-end deltalake sink test" + key: "e2e-deltalake-sink-rust-tests" + command: "ci/scripts/e2e-deltalake-sink-rust-test.sh -p ci-release" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-e2e-deltalake-sink-rust-tests" + || build.env("CI_STEPS") =~ /(^|,)e2e-deltalake-sink-rust-tests?(,|$$)/ + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v4.9.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry + + - label: "end-to-end clickhouse sink test" key: "e2e-clickhouse-sink-tests" command: "ci/scripts/e2e-clickhouse-sink-test.sh -p ci-release" if: | diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index ee31496ff0cf5..eac8de1ef49f9 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -277,6 +277,21 @@ steps: timeout_in_minutes: 10 retry: *auto-retry + - label: "end-to-end deltalake sink test" + if: build.pull_request.labels includes "ci/run- e2e-deltalake-sink-rust-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-deltalake-sink-rust-tests?(,|$$)/ + command: "ci/scripts/e2e-deltalake-sink-rust-test.sh -p ci-dev" + depends_on: + - "build" + - "build-other" + plugins: + - docker-compose#v4.9.0: + run: sink-test-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 10 + retry: *auto-retry + - label: "e2e java-binding test" if: build.pull_request.labels includes "ci/run-java-binding-tests" || build.env("CI_STEPS") =~ /(^|,)java-binding-tests?(,|$$)/ command: "ci/scripts/java-binding-test.sh -p ci-dev" @@ -327,7 +342,7 @@ steps: config: ci/docker-compose.yml environment: - CODECOV_TOKEN - timeout_in_minutes: 20 + timeout_in_minutes: 22 retry: *auto-retry - label: "check" diff --git a/e2e_test/sink/deltalake_rust_sink.slt b/e2e_test/sink/deltalake_rust_sink.slt new file mode 100644 index 0000000000000..4207c9446dc1b --- /dev/null +++ b/e2e_test/sink/deltalake_rust_sink.slt @@ -0,0 +1,32 @@ +statement ok +CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean); + +statement ok +CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; + +statement ok +create sink s6 as select * from mv6 +with ( + connector = 'deltalake_rust', + type = 'append-only', + force_append_only = 'true', + location = 's3a://deltalake/deltalake-test', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + s3.endpoint = 'http://127.0.0.1:9301' +); + +statement ok +INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false); + +statement ok +FLUSH; + +statement ok +DROP SINK s6; + +statement ok +DROP MATERIALIZED VIEW mv6; + +statement ok +DROP TABLE t6; \ No newline at end of file diff --git a/risedev.yml b/risedev.yml index f82aa595f2b33..17076e4f2da8e 100644 --- a/risedev.yml +++ b/risedev.yml @@ -781,6 +781,16 @@ profile: - use: frontend - use: compactor + ci-deltalake-test: + config-path: src/config/ci.toml + steps: + - use: minio + - use: meta-node + - use: compute-node + enable-tiered-cache: true + - use: frontend + - use: compactor + ci-clickhouse-test: config-path: src/config/ci.toml steps: diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index fd0406ed36961..e1bbb04be0947 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -17,9 +17,13 @@ normal = ["workspace-hack"] anyhow = "1" arc-swap = "1" arrow-array = { workspace = true } +arrow-array-deltalake = { workspace = true } arrow-buffer = { workspace = true } +arrow-buffer-deltalake = { workspace = true } arrow-cast = { workspace = true } +arrow-cast-deltalake = { workspace = true } arrow-schema = { workspace = true } +arrow-schema-deltalake = { workspace = true } async-trait = "0.1" auto_enums = "0.8" auto_impl = "1" diff --git a/src/common/src/array/arrow/arrow_default.rs b/src/common/src/array/arrow/arrow_default.rs new file mode 100644 index 0000000000000..4db984ba1db83 --- /dev/null +++ b/src/common/src/array/arrow/arrow_default.rs @@ -0,0 +1,26 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This is for arrow dependency named `arrow-xxx` such as `arrow-array` in the cargo workspace. +//! +//! This should the default arrow version to be used in our system. +//! +//! The corresponding version of arrow is currently used by `udf` and `iceberg` sink. + +pub use arrow_impl::to_record_batch_with_schema; +use {arrow_array, arrow_buffer, arrow_cast, arrow_schema}; + +#[expect(clippy::duplicate_mod)] +#[path = "./arrow_impl.rs"] +mod arrow_impl; diff --git a/src/common/src/array/arrow/arrow_deltalake.rs b/src/common/src/array/arrow/arrow_deltalake.rs new file mode 100644 index 0000000000000..c9a0fcf3a4206 --- /dev/null +++ b/src/common/src/array/arrow/arrow_deltalake.rs @@ -0,0 +1,28 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This is for arrow dependency named `arrow-xxx-deltalake` such as `arrow-array-deltalake` +//! in the cargo workspace. +//! +//! The corresponding version of arrow is currently used by `deltalake` sink. + +pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema; +use { + arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer, + arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema, +}; + +#[expect(clippy::duplicate_mod)] +#[path = "./arrow_impl.rs"] +mod arrow_impl; diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow/arrow_impl.rs similarity index 92% rename from src/common/src/array/arrow.rs rename to src/common/src/array/arrow/arrow_impl.rs index c2313e883aeab..78dc79c530b9f 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -13,23 +13,47 @@ // limitations under the License. //! Converts between arrays and Apache Arrow arrays. +//! +//! This file acts as a template file for conversion code between +//! arrays and different version of Apache Arrow. +//! +//! The conversion logic will be implemented for the arrow version specified in the outer mod by +//! `super::arrow_xxx`, such as `super::arrow_array`. +//! +//! When we want to implement the conversion logic for an arrow version, we first +//! create a new mod file, and rename the corresponding arrow package name to `arrow_xxx` +//! using the `use` clause, and then declare a sub-mod and set its file path with attribute +//! `#[path = "./arrow_impl.rs"]` so that the code in this template file can be embedded to +//! the new mod file, and the conversion logic can be implemented for the corresponding arrow +//! version. +//! +//! Example can be seen in `arrow_default.rs`, which is also as followed: +//! ```ignore +//! use {arrow_array, arrow_buffer, arrow_cast, arrow_schema}; +//! +//! #[allow(clippy::duplicate_mod)] +//! #[path = "./arrow_impl.rs"] +//! mod arrow_impl; +//! ``` use std::fmt::Write; +use std::sync::Arc; -use arrow_array::Array as ArrowArray; -use arrow_cast::cast; -use arrow_schema::{Field, Schema, SchemaRef, DECIMAL256_MAX_PRECISION}; use chrono::{NaiveDateTime, NaiveTime}; use itertools::Itertools; -use super::*; -use crate::types::{Int256, StructType}; +// This is important because we want to use the arrow version specified by the outer mod. +use super::{arrow_array, arrow_buffer, arrow_cast, arrow_schema}; +// Other import should always use the absolute path. +use crate::array::*; +use crate::buffer::Bitmap; +use crate::types::*; use crate::util::iter_util::ZipEqFast; /// Converts RisingWave array to Arrow array with the schema. /// This function will try to convert the array if the type is not same with the schema. pub fn to_record_batch_with_schema( - schema: SchemaRef, + schema: arrow_schema::SchemaRef, chunk: &DataChunk, ) -> Result { if !chunk.is_compacted() { @@ -45,7 +69,7 @@ pub fn to_record_batch_with_schema( if column.data_type() == field.data_type() { Ok(column) } else { - cast(&column, field.data_type()).map_err(ArrayError::from_arrow) + arrow_cast::cast(&column, field.data_type()).map_err(ArrayError::from_arrow) } }) .try_collect::<_, _, ArrayError>()?; @@ -72,14 +96,14 @@ impl TryFrom<&DataChunk> for arrow_array::RecordBatch { let fields: Vec<_> = columns .iter() - .map(|array: &Arc| { + .map(|array: &Arc| { let nullable = array.null_count() > 0; let data_type = array.data_type().clone(); - Field::new("", data_type, nullable) + arrow_schema::Field::new("", data_type, nullable) }) .collect(); - let schema = Arc::new(Schema::new(fields)); + let schema = Arc::new(arrow_schema::Schema::new(fields)); let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity())); arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts) @@ -205,7 +229,7 @@ impl TryFrom<&StructType> for arrow_schema::Fields { fn try_from(struct_type: &StructType) -> Result { struct_type .iter() - .map(|(name, ty)| Ok(Field::new(name, ty.try_into()?, true))) + .map(|(name, ty)| Ok(arrow_schema::Field::new(name, ty.try_into()?, true))) .try_collect() } } @@ -225,7 +249,7 @@ impl TryFrom<&DataType> for arrow_schema::DataType { DataType::Int16 => Ok(Self::Int16), DataType::Int32 => Ok(Self::Int32), DataType::Int64 => Ok(Self::Int64), - DataType::Int256 => Ok(Self::Decimal256(DECIMAL256_MAX_PRECISION, 0)), + DataType::Int256 => Ok(Self::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)), DataType::Float32 => Ok(Self::Float32), DataType::Float64 => Ok(Self::Float64), DataType::Date => Ok(Self::Date32), @@ -243,10 +267,10 @@ impl TryFrom<&DataType> for arrow_schema::DataType { DataType::Struct(struct_type) => Ok(Self::Struct( struct_type .iter() - .map(|(name, ty)| Ok(Field::new(name, ty.try_into()?, true))) + .map(|(name, ty)| Ok(arrow_schema::Field::new(name, ty.try_into()?, true))) .try_collect::<_, _, ArrayError>()?, )), - DataType::List(datatype) => Ok(Self::List(Arc::new(Field::new( + DataType::List(datatype) => Ok(Self::List(Arc::new(arrow_schema::Field::new( "item", datatype.as_ref().try_into()?, true, @@ -528,6 +552,20 @@ impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray { } } +impl From for Int256 { + fn from(value: arrow_buffer::i256) -> Self { + let buffer = value.to_be_bytes(); + Int256::from_be_bytes(buffer) + } +} + +impl<'a> From> for arrow_buffer::i256 { + fn from(val: Int256Ref<'a>) -> Self { + let buffer = val.to_be_bytes(); + arrow_buffer::i256::from_be_bytes(buffer) + } +} + impl From<&Int256Array> for arrow_array::Decimal256Array { fn from(array: &Int256Array) -> Self { array @@ -604,7 +642,7 @@ impl TryFrom<&ListArray> for arrow_array::ListArray { array, a, Decimal256Builder::with_capacity(a.len()).with_data_type( - arrow_schema::DataType::Decimal256(DECIMAL256_MAX_PRECISION, 0), + arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0), ), |b, v| b.append_option(v.map(Into::into)), ), @@ -656,7 +694,11 @@ impl TryFrom<&ListArray> for arrow_array::ListArray { ArrayImpl::Struct(a) => { let values = Arc::new(arrow_array::StructArray::try_from(a)?); arrow_array::ListArray::new( - Arc::new(Field::new("item", a.data_type().try_into()?, true)), + Arc::new(arrow_schema::Field::new( + "item", + a.data_type().try_into()?, + true, + )), arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from( array .offsets() @@ -683,6 +725,7 @@ impl TryFrom<&arrow_array::ListArray> for ListArray { type Error = ArrayError; fn try_from(array: &arrow_array::ListArray) -> Result { + use arrow_array::Array; Ok(ListArray { value: Box::new(ArrayImpl::try_from(array.values())?), bitmap: match array.nulls() { @@ -731,6 +774,7 @@ impl TryFrom<&arrow_array::StructArray> for StructArray { #[cfg(test)] mod tests { + use super::arrow_array::Array as _; use super::*; #[test] @@ -860,8 +904,6 @@ mod tests { #[test] fn struct_array() { - use arrow_array::Array as _; - // Empty array - risingwave to arrow conversion. let test_arr = StructArray::new(StructType::empty(), vec![], Bitmap::ones(0)); assert_eq!( diff --git a/src/common/src/array/arrow/mod.rs b/src/common/src/array/arrow/mod.rs new file mode 100644 index 0000000000000..600938dd78b04 --- /dev/null +++ b/src/common/src/array/arrow/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod arrow_default; +mod arrow_deltalake; + +pub use arrow_default::to_record_batch_with_schema; +pub use arrow_deltalake::to_deltalake_record_batch_with_schema; diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 80d84e5245d2d..ed1e09ef0a824 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -15,7 +15,7 @@ //! `Array` defines all in-memory representations of vectorized execution framework. mod arrow; -pub use arrow::to_record_batch_with_schema; +pub use arrow::{to_deltalake_record_batch_with_schema, to_record_batch_with_schema}; mod bool_array; pub mod bytes_array; mod chrono_array; diff --git a/src/common/src/types/num256.rs b/src/common/src/types/num256.rs index 864af97deb374..e2933018b142e 100644 --- a/src/common/src/types/num256.rs +++ b/src/common/src/types/num256.rs @@ -326,20 +326,6 @@ impl Num for Int256 { } } -impl From for Int256 { - fn from(value: arrow_buffer::i256) -> Self { - let buffer = value.to_be_bytes(); - Int256::from_be_bytes(buffer) - } -} - -impl<'a> From> for arrow_buffer::i256 { - fn from(val: Int256Ref<'a>) -> Self { - let buffer = val.to_be_bytes(); - arrow_buffer::i256::from_be_bytes(buffer) - } -} - impl EstimateSize for Int256 { fn estimated_heap_size(&self) -> usize { mem::size_of::() * 2 diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index a016cc64866d2..1d7ad5ef58f77 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6 "time", ] } csv = "1.3" +deltalake = { workspace = true } duration-str = "0.7.0" easy-ext = "1" enum-as-inner = "0.6" @@ -142,6 +143,7 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] criterion = { workspace = true, features = ["async_tokio", "async"] } +deltalake = { workspace = true, features = ["datafusion"] } expect-test = "1" pretty_assertions = "1" quote = "1" @@ -149,6 +151,7 @@ rand = "0.8" serde = { version = "1", features = ["derive"] } serde_yaml = "0.9" syn = { version = "1", features = ["full"] } +tempdir = "0.3.7" tempfile = "3" tracing-subscriber = "0.3" tracing-test = "0.2" diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs new file mode 100644 index 0000000000000..cb9d08a610482 --- /dev/null +++ b/src/connector/src/sink/deltalake.rs @@ -0,0 +1,605 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; +use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType}; +use deltalake::protocol::{DeltaOperation, SaveMode}; +use deltalake::table::builder::s3_storage_options::{ + AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME, + AWS_SECRET_ACCESS_KEY, +}; +use deltalake::writer::{DeltaWriter, RecordBatchWriter}; +use deltalake::DeltaTable; +use risingwave_common::array::{to_deltalake_record_batch_with_schema, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::error::anyhow_error; +use risingwave_common::types::DataType; +use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; +use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; +use risingwave_pb::connector_service::SinkMetadata; +use serde_derive::{Deserialize, Serialize}; +use serde_with::serde_as; +use with_options::WithOptions; + +use super::coordinate::CoordinatedSinkWriter; +use super::writer::{LogSinkerOf, SinkWriter}; +use super::{ + Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam, + SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, +}; +use crate::sink::writer::SinkWriterExt; + +pub const DELTALAKE_SINK: &str = "deltalake_rust"; +pub const DEFAULT_REGION: &str = "us-east-1"; + +#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)] +pub struct DeltaLakeCommon { + #[serde(rename = "s3.access.key")] + pub s3_access_key: Option, + #[serde(rename = "s3.secret.key")] + pub s3_secret_key: Option, + #[serde(rename = "location")] + pub location: String, + #[serde(rename = "s3.region")] + pub s3_region: Option, + #[serde(rename = "s3.endpoint")] + pub s3_endpoint: Option, +} +impl DeltaLakeCommon { + pub async fn create_deltalake_client(&self) -> Result { + let table = match Self::get_table_url(&self.location)? { + DeltaTableUrl::S3(s3_path) => { + let mut storage_options = HashMap::new(); + storage_options.insert( + AWS_ACCESS_KEY_ID.to_string(), + self.s3_access_key.clone().ok_or_else(|| { + SinkError::Config(anyhow!("s3.access.key is required with aws s3")) + })?, + ); + storage_options.insert( + AWS_SECRET_ACCESS_KEY.to_string(), + self.s3_secret_key.clone().ok_or_else(|| { + SinkError::Config(anyhow!("s3.secret.key is required with aws s3")) + })?, + ); + if self.s3_endpoint.is_none() && self.s3_region.is_none() { + return Err(SinkError::Config(anyhow!( + "s3.endpoint and s3.region need to be filled with at least one" + ))); + } + storage_options.insert( + AWS_REGION.to_string(), + self.s3_region + .clone() + .unwrap_or_else(|| DEFAULT_REGION.to_string()), + ); + if let Some(s3_endpoint) = &self.s3_endpoint { + storage_options.insert(AWS_ENDPOINT_URL.to_string(), s3_endpoint.clone()); + } + storage_options.insert(AWS_ALLOW_HTTP.to_string(), "true".to_string()); + storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string()); + deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await? + } + DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?, + }; + Ok(table) + } + + fn get_table_url(path: &str) -> Result { + if path.starts_with("s3://") || path.starts_with("s3a://") { + Ok(DeltaTableUrl::S3(path.to_string())) + } else if let Some(path) = path.strip_prefix("file://") { + Ok(DeltaTableUrl::Local(path.to_string())) + } else { + Err(SinkError::DeltaLake(anyhow!( + "path need to start with 's3://','s3a://'(s3) or file://(local)" + ))) + } + } +} + +enum DeltaTableUrl { + S3(String), + Local(String), +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct DeltaLakeConfig { + #[serde(flatten)] + pub common: DeltaLakeCommon, + + pub r#type: String, +} + +impl DeltaLakeConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = serde_json::from_value::( + serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?, + ) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + Ok(config) + } +} + +#[derive(Debug)] +pub struct DeltaLakeSink { + pub config: DeltaLakeConfig, + param: SinkParam, +} + +impl DeltaLakeSink { + pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result { + Ok(Self { config, param }) + } +} + +fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result { + let result = match rw_data_type { + DataType::Boolean => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Boolean) + ) + } + DataType::Int16 => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Short) + ) + } + DataType::Int32 => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Integer) + ) + } + DataType::Int64 => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Long) + ) + } + DataType::Float32 => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Float) + ) + } + DataType::Float64 => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Double) + ) + } + DataType::Decimal => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_, _)) + ) + } + DataType::Date => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Date) + ) + } + DataType::Varchar => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::String) + ) + } + DataType::Timestamptz => { + matches!( + dl_data_type, + DeltaLakeDataType::Primitive(PrimitiveType::Timestamp) + ) + } + DataType::Struct(rw_struct) => { + if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type { + let mut result = true; + for ((rw_name, rw_type), dl_field) in rw_struct + .names() + .zip_eq_fast(rw_struct.types()) + .zip_eq_fast(dl_struct.fields()) + { + result = check_field_type(rw_type, dl_field.data_type())? + && result + && rw_name.eq(dl_field.name()); + } + result + } else { + false + } + } + DataType::List(rw_list) => { + if let DeltaLakeDataType::Array(dl_list) = dl_data_type { + check_field_type(rw_list, dl_list.element_type())? + } else { + false + } + } + _ => { + return Err(SinkError::DeltaLake(anyhow!( + "deltalake cannot support type {:?}", + rw_data_type.to_string() + ))) + } + }; + Ok(result) +} + +impl Sink for DeltaLakeSink { + type Coordinator = DeltaLakeSinkCommitter; + type LogSinker = LogSinkerOf>; + + const SINK_NAME: &'static str = DELTALAKE_SINK; + + async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + let inner = DeltaLakeSinkWriter::new( + self.config.clone(), + self.param.schema().clone(), + self.param.downstream_pk.clone(), + ) + .await?; + Ok(CoordinatedSinkWriter::new( + writer_param + .meta_client + .expect("should have meta client") + .sink_coordinate_client() + .await, + self.param.clone(), + writer_param.vnode_bitmap.ok_or_else(|| { + SinkError::Remote(anyhow_error!( + "sink needs coordination should not have singleton input" + )) + })?, + inner, + ) + .await? + .into_log_sinker(writer_param.sink_metrics)) + } + + async fn validate(&self) -> Result<()> { + if self.config.r#type != SINK_TYPE_APPEND_ONLY + && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION + { + return Err(SinkError::Config(anyhow!( + "only append-only delta lake sink is supported", + ))); + } + let table = self.config.common.create_deltalake_client().await?; + let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table + .get_schema()? + .fields() + .iter() + .map(|f| (f.name(), f.data_type())) + .collect(); + if deltalake_fields.len() != self.param.schema().fields().len() { + return Err(SinkError::DeltaLake(anyhow!( + "column count not match, rw is {}, deltalake is {}", + self.param.schema().fields().len(), + deltalake_fields.len() + ))); + } + for field in self.param.schema().fields() { + if !deltalake_fields.contains_key(&field.name) { + return Err(SinkError::DeltaLake(anyhow!( + "column {} not found in deltalake table", + field.name + ))); + } + let deltalake_field_type: &&DeltaLakeDataType = deltalake_fields + .get(&field.name) + .ok_or_else(|| SinkError::DeltaLake(anyhow!("cannot find field type")))?; + if !check_field_type(&field.data_type, deltalake_field_type)? { + return Err(SinkError::DeltaLake(anyhow!( + "column {} type is not match, deltalake is {:?}, rw is{:?}", + field.name, + deltalake_field_type, + field.data_type + ))); + } + } + Ok(()) + } + + async fn new_coordinator(&self) -> Result { + Ok(DeltaLakeSinkCommitter { + table: self.config.common.create_deltalake_client().await?, + }) + } +} + +impl TryFrom for DeltaLakeSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let config = DeltaLakeConfig::from_hashmap(param.properties.clone())?; + DeltaLakeSink::new(config, param) + } +} + +pub struct DeltaLakeSinkWriter { + pub config: DeltaLakeConfig, + schema: Schema, + pk_indices: Vec, + writer: RecordBatchWriter, + dl_schema: Arc, + dl_table: DeltaTable, +} + +impl DeltaLakeSinkWriter { + pub async fn new( + config: DeltaLakeConfig, + schema: Schema, + pk_indices: Vec, + ) -> Result { + let dl_table = config.common.create_deltalake_client().await?; + let writer = RecordBatchWriter::for_table(&dl_table)?; + let dl_schema: Arc = + Arc::new(convert_schema(dl_table.get_schema()?)?); + + Ok(Self { + config, + schema, + pk_indices, + writer, + dl_schema, + dl_table, + }) + } + + async fn write(&mut self, chunk: StreamChunk) -> Result<()> { + let a = to_deltalake_record_batch_with_schema(self.dl_schema.clone(), &chunk) + .map_err(|err| SinkError::DeltaLake(anyhow!("convert record batch error: {}", err)))?; + self.writer.write(a).await?; + Ok(()) + } +} + +fn convert_schema(schema: &StructType) -> Result { + let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new(); + for field in schema.fields() { + let dl_field = deltalake::arrow::datatypes::Field::new( + field.name(), + deltalake::arrow::datatypes::DataType::try_from(field.data_type()) + .map_err(|err| SinkError::DeltaLake(anyhow!("convert schema error: {}", err)))?, + field.is_nullable(), + ); + builder.push(dl_field); + } + Ok(builder.finish()) +} + +#[async_trait] +impl SinkWriter for DeltaLakeSinkWriter { + type CommitMetadata = Option; + + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + self.write(chunk).await + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result> { + if !is_checkpoint { + return Ok(None); + } + + let adds = self.writer.flush().await?; + Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult { + adds, + })?)) + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} + +pub struct DeltaLakeSinkCommitter { + table: DeltaTable, +} + +#[async_trait::async_trait] +impl SinkCommitCoordinator for DeltaLakeSinkCommitter { + async fn init(&mut self) -> Result<()> { + tracing::info!("DeltaLake commit coordinator inited."); + Ok(()) + } + + async fn commit(&mut self, epoch: u64, metadata: Vec) -> Result<()> { + tracing::info!("Starting DeltaLake commit in epoch {epoch}."); + + let deltalake_write_result = metadata + .iter() + .map(DeltaLakeWriteResult::try_from) + .collect::>>()?; + let write_adds: Vec = deltalake_write_result + .into_iter() + .flat_map(|v| v.adds.into_iter()) + .map(Action::Add) + .collect(); + + if write_adds.is_empty() { + return Ok(()); + } + let partition_cols = self.table.get_metadata()?.partition_columns.clone(); + let partition_by = if !partition_cols.is_empty() { + Some(partition_cols) + } else { + None + }; + let operation = DeltaOperation::Write { + mode: SaveMode::Append, + partition_by, + predicate: None, + }; + let version = deltalake::operations::transaction::commit( + self.table.log_store().as_ref(), + &write_adds, + operation, + &self.table.state, + None, + ) + .await?; + self.table.update().await?; + tracing::info!( + "Succeeded to commit ti DeltaLake table in epoch {epoch} version {version}." + ); + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +struct DeltaLakeWriteResult { + adds: Vec, +} + +impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata { + type Error = SinkError; + + fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result { + let metadata = serde_json::to_vec(&value.adds).map_err(|e| -> SinkError { + anyhow!("Can't serialized deltalake sink metadata: {}", e).into() + })?; + Ok(SinkMetadata { + metadata: Some(Serialized(SerializedMetadata { metadata })), + }) + } +} + +impl DeltaLakeWriteResult { + fn try_from(value: &SinkMetadata) -> Result { + if let Some(Serialized(v)) = &value.metadata { + let adds = + serde_json::from_slice::>(&v.metadata).map_err(|e| -> SinkError { + anyhow!("Can't deserialize deltalake sink metadata: {}", e).into() + })?; + Ok(DeltaLakeWriteResult { adds }) + } else { + Err(anyhow!("Can't create deltalake sink write result from empty data!").into()) + } + } +} + +#[cfg(all(test, not(madsim)))] +mod test { + use deltalake::kernel::DataType as SchemaDataType; + use deltalake::operations::create::CreateBuilder; + use maplit::hashmap; + use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array}; + use risingwave_common::catalog::{Field, Schema}; + + use super::{DeltaLakeConfig, DeltaLakeSinkWriter}; + use crate::sink::deltalake::DeltaLakeSinkCommitter; + use crate::sink::writer::SinkWriter; + use crate::sink::SinkCommitCoordinator; + use crate::source::DataType; + + #[tokio::test] + async fn test_deltalake() { + let dir = tempdir::TempDir::new("./deltalake").unwrap(); + let path = dir.path().to_str().unwrap(); + CreateBuilder::new() + .with_location(path) + .with_column("id", SchemaDataType::integer(), false, Default::default()) + .with_column("name", SchemaDataType::string(), false, Default::default()) + .await + .unwrap(); + + let properties = hashmap! { + "connector".to_string() => "deltalake_rust".to_string(), + "force_append_only".to_string() => "true".to_string(), + "type".to_string() => "append-only".to_string(), + "location".to_string() => format!("file://{}",path), + }; + + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "id".into(), + sub_fields: vec![], + type_name: "".into(), + }, + Field { + data_type: DataType::Varchar, + name: "name".into(), + sub_fields: vec![], + type_name: "".into(), + }, + ]); + + let deltalake_config = DeltaLakeConfig::from_hashmap(properties).unwrap(); + let deltalake_table = deltalake_config + .common + .create_deltalake_client() + .await + .unwrap(); + + let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0]) + .await + .unwrap(); + let chunk = StreamChunk::new( + vec![Op::Insert, Op::Insert, Op::Insert], + vec![ + I32Array::from_iter(vec![1, 2, 3]).into_ref(), + Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(), + ], + ); + deltalake_writer.write(chunk).await.unwrap(); + let mut committer = DeltaLakeSinkCommitter { + table: deltalake_table, + }; + let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap(); + committer.commit(1, vec![metadata]).await.unwrap(); + + // The following code is to test reading the deltalake data table written with test data. + // To enable the following code, add `deltalake = { workspace = true, features = ["datafusion"] }` + // to the `dev-dependencies` section of the `Cargo.toml` file of this crate. + // + // The feature is commented and disabled because enabling the `datafusion` feature of `deltalake` + // will increase the compile time and output binary size in release build, even though it is a + // dev dependency. + + let ctx = deltalake::datafusion::prelude::SessionContext::new(); + let table = deltalake::open_table(path).await.unwrap(); + ctx.register_table("demo", std::sync::Arc::new(table)) + .unwrap(); + + let batches = ctx + .sql("SELECT * FROM demo") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(3, batches.get(0).unwrap().column(0).len()); + assert_eq!(3, batches.get(0).unwrap().column(1).len()); + } +} diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 795e9f73b52cf..bff34b30dcd32 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -18,6 +18,7 @@ pub mod boxed; pub mod catalog; pub mod clickhouse; pub mod coordinate; +pub mod deltalake; pub mod doris; pub mod doris_starrocks_connector; pub mod encoder; @@ -39,6 +40,7 @@ pub mod writer; use std::collections::HashMap; use ::clickhouse::error::Error as ClickHouseError; +use ::deltalake::DeltaTableError; use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; @@ -84,6 +86,7 @@ macro_rules! for_all_sinks { { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, + { DeltaLakeRust, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, { Table, $crate::sink::table::TableSink } @@ -438,6 +441,12 @@ pub enum SinkError { ), #[error("Doris error: {0}")] Doris(String), + #[error("DeltaLake error: {0}")] + DeltaLake( + #[source] + #[backtrace] + anyhow::Error, + ), #[error("Starrocks error: {0}")] Starrocks(String), #[error("Pulsar error: {0}")] @@ -478,6 +487,12 @@ impl From for SinkError { } } +impl From for SinkError { + fn from(value: DeltaTableError) -> Self { + SinkError::DeltaLake(anyhow_error!("{}", value)) + } +} + impl From for SinkError { fn from(value: RedisError) -> Self { SinkError::Redis(format!("{}", value)) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index b407f90c40f45..4770fd621ad3c 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -66,6 +66,26 @@ ClickHouseConfig: - name: r#type field_type: String required: true +DeltaLakeConfig: + fields: + - name: s3.access.key + field_type: String + required: false + - name: s3.secret.key + field_type: String + required: false + - name: location + field_type: String + required: true + - name: s3.region + field_type: String + required: false + - name: s3.endpoint + field_type: String + required: false + - name: r#type + field_type: String + required: true DorisConfig: fields: - name: doris.url