Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support deltalake sink with rust sdk #13600

Merged
merged 35 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e6bc0ee
add delta lake sink
xxhZs Nov 21, 2023
013eede
fix
xxhZs Nov 21, 2023
f0047ba
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 23, 2023
2ff021f
fmt
xxhZs Nov 23, 2023
7b85ea2
save
xxhZs Nov 23, 2023
6fbbac3
fix cargo lock
xxhZs Nov 24, 2023
c6a709c
fmt
xxhZs Nov 28, 2023
1ddf1f9
fix
xxhZs Nov 28, 2023
cdecda5
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 28, 2023
f521372
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 28, 2023
c890167
use separate arrow version for deltalake
wenym1 Dec 1, 2023
2a35a5e
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 1, 2023
0aa6785
fix timestamptz
xxhZs Dec 1, 2023
9b7b543
use mod path to avoid macro
wenym1 Dec 1, 2023
e460901
add license and comment
wenym1 Dec 1, 2023
13ff6ed
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 4, 2023
c187f72
update test
wenym1 Dec 5, 2023
2217f61
add comment
wenym1 Dec 5, 2023
4c503fa
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 5, 2023
1aaf516
use expect
wenym1 Dec 5, 2023
e1a3814
save
xxhZs Dec 5, 2023
76547e4
fix fmt
xxhZs Dec 7, 2023
b0b1b87
add ci
xxhZs Dec 7, 2023
50899c0
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Dec 7, 2023
10b1ba6
Empty commit
xxhZs Dec 7, 2023
5a492a6
fix ci
xxhZs Dec 7, 2023
435eb73
fix
xxhZs Dec 8, 2023
cf96d64
ut timeout 20 -> 22
xxhZs Dec 12, 2023
4a83b8f
fix region
xxhZs Dec 12, 2023
814dcc9
reduce compile time and binary size
wenym1 Dec 12, 2023
f9605ad
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 12, 2023
899070a
temp add hakari third party
wenym1 Dec 13, 2023
86625ab
use new delta-rs commit
wenym1 Dec 13, 2023
5c365a9
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Dec 14, 2023
a4023e5
fix ci
xxhZs Dec 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,011 changes: 873 additions & 138 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ arrow-flight = "49"
arrow-select = "49"
arrow-ord = "49"
arrow-row = "49"
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "72505449e9538371fe5fda35d545dbd662facd07", features = ["s3"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason for downgrading arrow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because both Delta.rs and IceLake need to use Arrow, version 48.0.1 is the maximum version they both support.

parquet = "49"
thiserror-ext = "0.0.8"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
Expand Down
4 changes: 4 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ normal = ["workspace-hack"]
anyhow = "1"
arc-swap = "1"
arrow-array = { workspace = true }
arrow-array-deltalake = { workspace = true }
arrow-buffer = { workspace = true }
arrow-buffer-deltalake = { workspace = true }
arrow-cast = { workspace = true }
arrow-cast-deltalake = { workspace = true }
arrow-schema = { workspace = true }
arrow-schema-deltalake = { workspace = true }
async-trait = "0.1"
auto_enums = "0.8"
auto_impl = "1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
//! Converts between arrays and Apache Arrow arrays.

use std::fmt::Write;
use std::sync::Arc;

use arrow_array::Array as ArrowArray;
use arrow_cast::cast;
use arrow_schema::{Field, Schema, SchemaRef, DECIMAL256_MAX_PRECISION};
use chrono::{NaiveDateTime, NaiveTime};
use itertools::Itertools;

use super::*;
use crate::types::{Int256, StructType};
use super::{arrow_array, arrow_buffer, arrow_cast, arrow_schema};
use crate::array::*;
use crate::buffer::Bitmap;
use crate::types::*;
use crate::util::iter_util::ZipEqFast;

/// Converts RisingWave array to Arrow array with the schema.
/// This function will try to convert the array if the type is not same with the schema.
pub fn to_record_batch_with_schema(
schema: SchemaRef,
schema: arrow_schema::SchemaRef,
chunk: &DataChunk,
) -> Result<arrow_array::RecordBatch, ArrayError> {
if !chunk.is_compacted() {
Expand All @@ -45,7 +45,7 @@ pub fn to_record_batch_with_schema(
if column.data_type() == field.data_type() {
Ok(column)
} else {
cast(&column, field.data_type())
arrow_cast::cast(&column, field.data_type())
.map_err(|err| ArrayError::FromArrow(err.to_string()))
}
})
Expand Down Expand Up @@ -73,14 +73,14 @@ impl TryFrom<&DataChunk> for arrow_array::RecordBatch {

let fields: Vec<_> = columns
.iter()
.map(|array: &Arc<dyn ArrowArray>| {
.map(|array: &Arc<dyn arrow_array::Array>| {
let nullable = array.null_count() > 0;
let data_type = array.data_type().clone();
Field::new("", data_type, nullable)
arrow_schema::Field::new("", data_type, nullable)
})
.collect();

let schema = Arc::new(Schema::new(fields));
let schema = Arc::new(arrow_schema::Schema::new(fields));
let opts =
arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
Expand Down Expand Up @@ -203,7 +203,7 @@ impl TryFrom<&StructType> for arrow_schema::Fields {
fn try_from(struct_type: &StructType) -> Result<Self, Self::Error> {
struct_type
.iter()
.map(|(name, ty)| Ok(Field::new(name, ty.try_into()?, true)))
.map(|(name, ty)| Ok(arrow_schema::Field::new(name, ty.try_into()?, true)))
.try_collect()
}
}
Expand All @@ -223,7 +223,7 @@ impl TryFrom<&DataType> for arrow_schema::DataType {
DataType::Int16 => Ok(Self::Int16),
DataType::Int32 => Ok(Self::Int32),
DataType::Int64 => Ok(Self::Int64),
DataType::Int256 => Ok(Self::Decimal256(DECIMAL256_MAX_PRECISION, 0)),
DataType::Int256 => Ok(Self::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0)),
DataType::Float32 => Ok(Self::Float32),
DataType::Float64 => Ok(Self::Float64),
DataType::Date => Ok(Self::Date32),
Expand All @@ -241,10 +241,10 @@ impl TryFrom<&DataType> for arrow_schema::DataType {
DataType::Struct(struct_type) => Ok(Self::Struct(
struct_type
.iter()
.map(|(name, ty)| Ok(Field::new(name, ty.try_into()?, true)))
.map(|(name, ty)| Ok(arrow_schema::Field::new(name, ty.try_into()?, true)))
.try_collect::<_, _, ArrayError>()?,
)),
DataType::List(datatype) => Ok(Self::List(Arc::new(Field::new(
DataType::List(datatype) => Ok(Self::List(Arc::new(arrow_schema::Field::new(
"item",
datatype.as_ref().try_into()?,
true,
Expand Down Expand Up @@ -546,6 +546,20 @@ impl TryFrom<&arrow_array::LargeStringArray> for JsonbArray {
}
}

impl From<arrow_buffer::i256> for Int256 {
fn from(value: arrow_buffer::i256) -> Self {
let buffer = value.to_be_bytes();
Int256::from_be_bytes(buffer)
}
}

impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
fn from(val: Int256Ref<'a>) -> Self {
let buffer = val.to_be_bytes();
arrow_buffer::i256::from_be_bytes(buffer)
}
}

impl From<&Int256Array> for arrow_array::Decimal256Array {
fn from(array: &Int256Array) -> Self {
array
Expand Down Expand Up @@ -622,7 +636,7 @@ impl TryFrom<&ListArray> for arrow_array::ListArray {
array,
a,
Decimal256Builder::with_capacity(a.len()).with_data_type(
arrow_schema::DataType::Decimal256(DECIMAL256_MAX_PRECISION, 0),
arrow_schema::DataType::Decimal256(arrow_schema::DECIMAL256_MAX_PRECISION, 0),
),
|b, v| b.append_option(v.map(Into::into)),
),
Expand Down Expand Up @@ -682,7 +696,11 @@ impl TryFrom<&ListArray> for arrow_array::ListArray {
ArrayImpl::Struct(a) => {
let values = Arc::new(arrow_array::StructArray::try_from(a)?);
arrow_array::ListArray::new(
Arc::new(Field::new("item", a.data_type().try_into()?, true)),
Arc::new(arrow_schema::Field::new(
"item",
a.data_type().try_into()?,
true,
)),
arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(
array
.offsets()
Expand All @@ -709,6 +727,7 @@ impl TryFrom<&arrow_array::ListArray> for ListArray {
type Error = ArrayError;

fn try_from(array: &arrow_array::ListArray) -> Result<Self, Self::Error> {
use arrow_array::Array;
Ok(ListArray {
value: Box::new(ArrayImpl::try_from(array.values())?),
bitmap: match array.nulls() {
Expand Down Expand Up @@ -886,7 +905,7 @@ mod tests {

#[test]
fn struct_array() {
use arrow_array::Array as _;
use super::arrow_array::Array as _;

// Empty array - risingwave to arrow conversion.
let test_arr = StructArray::new(StructType::empty(), vec![], Bitmap::ones(0));
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/array/arrow/arrow_common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub use arrow_impl::to_record_batch_with_schema;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};

#[allow(clippy::duplicate_mod)]
#[path = "./arrow.rs"]
mod arrow_impl;
9 changes: 9 additions & 0 deletions src/common/src/array/arrow/arrow_deltalake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
use {
arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer,
arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema,
};

#[allow(clippy::duplicate_mod)]
#[path = "./arrow.rs"]
mod arrow_impl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, this is quite interesting! 😮 How did you come up with this? Any other projects used this trick?

FYI, I tried forked delta-rs and upgrades their datafusion version, but it cannot compile without some extra effort because there is some breaking changes in the upgrade.

... there will always be a time when icelack and delta.rs depend on an inconsistent arrow.
If we want to finally unify both of their dependencies on arrow, i.e. only one version of arrow exists in our system, I think we can totally use 48 in this pr...

Not only deltalake and icelake are using arrow, but also our internal udf. In the future if we upgrade the arrow version of udf, it doesn't make sense to be blocked by arrow version of some external connectors. I think it's acceptable to support several versions of arrow at the same time if we don't have to duplicate the code.

So my thoughts:

  • Agree that upgrade effort might be non-trivial. So blocking everything before unifying deps version might be ineffective.
  • At the beginning, I don't think we might need to upgrade solely the arrow version of udf. Therefore, using the greatest common divisor SGTM, but indeed it may break someday when we want to use a feature on the latest version of one dependency, and it already upgraded its arrow version. So we will run into this problem again. Then we have to consider how to live with multiple versions of arrow.
  • I'm generally OK with multiple versions. Only concerns are:
    1. maintainability. Current solution LGTM
    2. API surface: Will we ever need to e.g., pass arrow48 to arrow49?

Nits:

  • #[allow] -> #[expect]
  • prefer using version number as the suffix, instead of common (this is especially confusing to me. What does "common" mean?) and deltalake.

Changes requested:

  • Add some comments on arrow/mod.rs and/or above mod arrow_impl; about how this trick works.
  • Add some comments on arrow_* about where it is used. i.e., why do we have to pass arrow data around.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally LGTM. Might also hear others' opinion.

Copy link
Contributor

@wenym1 wenym1 Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you come up with this? Any other projects used this trick?

Initially in c890167 I use macro too generate the same code for different version with the same trick, which set different arrow version to the same arrow mod name for different conversion code implementation. But I think this is too hacky and the IDE will fail to index a large part of code. And then I looked for approach to include the code within an outer file to a file so that I don't have to include the code in a long macro, and I found that the current approach is a perfect match😄 And I don't see any other project using this trick yet.

Will we ever need to e.g., pass arrow48 to arrow49?

Currently arrow does not flow in our internal system. It's only used to interact with external systems, so in short term I don't think there will be feature requirement on arrow48 to arrow49.

5 changes: 5 additions & 0 deletions src/common/src/array/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod arrow_common;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
mod arrow_deltalake;

pub use arrow_common::to_record_batch_with_schema;
pub use arrow_deltalake::to_deltalake_record_batch_with_schema;
2 changes: 1 addition & 1 deletion src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! `Array` defines all in-memory representations of vectorized execution framework.

mod arrow;
pub use arrow::to_record_batch_with_schema;
pub use arrow::{to_deltalake_record_batch_with_schema, to_record_batch_with_schema};
mod bool_array;
pub mod bytes_array;
mod chrono_array;
Expand Down
14 changes: 0 additions & 14 deletions src/common/src/types/num256.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,6 @@ impl Num for Int256 {
}
}

impl From<arrow_buffer::i256> for Int256 {
fn from(value: arrow_buffer::i256) -> Self {
let buffer = value.to_be_bytes();
Int256::from_be_bytes(buffer)
}
}

impl<'a> From<Int256Ref<'a>> for arrow_buffer::i256 {
fn from(val: Int256Ref<'a>) -> Self {
let buffer = val.to_be_bytes();
arrow_buffer::i256::from_be_bytes(buffer)
}
}

impl EstimateSize for Int256 {
fn estimated_heap_size(&self) -> usize {
mem::size_of::<i128>() * 2
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "6
"time",
] }
csv = "1.3"
deltalake = { workspace = true }
duration-str = "0.7.0"
easy-ext = "1"
enum-as-inner = "0.6"
Expand Down
Loading
Loading