From 2707bbebce4715e0cfe754d826f6020c5f81939f Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Tue, 28 Jan 2025 19:09:20 -0500 Subject: [PATCH 1/5] add support for creating new iceberg tables with partition keys --- .../iceberg/benches/predicate_pushdown.slt | 3 +- .../iceberg/test_case/iceberg_connection.slt | 1 + .../test_case/iceberg_predicate_pushdown.slt | 3 +- .../test_case/iceberg_select_empty_table.slt | 3 +- ...rg_sink_no_partition_append_only_table.slt | 3 +- .../test_case/iceberg_source_all_delete.slt | 1 + .../iceberg_source_equality_delete.slt | 2 + .../iceberg_source_explain_for_delete.slt | 1 + .../iceberg_source_position_delete.slt | 1 + e2e_test/sink/iceberg_sink.slt | 3 +- src/connector/src/sink/iceberg/mod.rs | 61 ++++++++++++++++++- src/connector/with_options_sink.yaml | 4 ++ 12 files changed, 79 insertions(+), 7 deletions(-) diff --git a/e2e_test/iceberg/benches/predicate_pushdown.slt b/e2e_test/iceberg/benches/predicate_pushdown.slt index 355b0858db9c1..b8027e6ceb06a 100644 --- a/e2e_test/iceberg/benches/predicate_pushdown.slt +++ b/e2e_test/iceberg/benches/predicate_pushdown.slt @@ -19,7 +19,8 @@ CREATE SINK sink1 AS select * from t1 WITH ( s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', commit_checkpoint_interval = 1, - create_table_if_not_exists = 'true' + create_table_if_not_exists = 'true', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_connection.slt b/e2e_test/iceberg/test_case/iceberg_connection.slt index abe3236e79af1..9a387140fef26 100644 --- a/e2e_test/iceberg/test_case/iceberg_connection.slt +++ b/e2e_test/iceberg/test_case/iceberg_connection.slt @@ -26,6 +26,7 @@ CREATE SINK sink1 from s1 WITH ( create_table_if_not_exists = 'true', commit_checkpoint_interval = 1, primary_key = 'i1,i2', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt index 64a21f823b10a..1d817ebade30d 100644 --- a/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt +++ b/e2e_test/iceberg/test_case/iceberg_predicate_pushdown.slt @@ -37,7 +37,8 @@ CREATE SINK sink1 AS select * from mv1 WITH ( s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', commit_checkpoint_interval = 1, - create_table_if_not_exists = 'true' + create_table_if_not_exists = 'true', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt index ddfc07220f2b3..c147cf4e3b004 100644 --- a/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt @@ -25,7 +25,8 @@ CREATE SINK sink1 AS select * from mv1 WITH ( s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', commit_checkpoint_interval = 1, - create_table_if_not_exists = 'true' + create_table_if_not_exists = 'true', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt index 49c4cf3fb1145..efc943d77b439 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_no_partition_append_only_table.slt @@ -39,7 +39,8 @@ CREATE SINK s6 AS select * from mv6 WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', - create_table_if_not_exists = 'true' + create_table_if_not_exists = 'true', + partition_by = 'year(v_date)' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt index aa43c888b1507..90c9353ccabc2 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt @@ -24,6 +24,7 @@ CREATE SINK sink1 AS select * from mv1 WITH ( create_table_if_not_exists = 'true', commit_checkpoint_interval = 3, primary_key = 'i1,i2', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt index c265e3d6a505d..f2425e6a475ea 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt @@ -26,6 +26,7 @@ CREATE SINK sink1 AS select * from mv1 WITH ( s3.secret.key = 'hummockadmin', create_table_if_not_exists = 'true', primary_key = 'i1,i2', + partition_by = 'i1' ); statement ok @@ -122,6 +123,7 @@ CREATE SINK sink2 AS select * from s2 WITH ( s3.secret.key = 'hummockadmin', create_table_if_not_exists = 'true', primary_key = 'i2,i1', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.slt index b890e8732587c..dcfbb2f3da135 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_explain_for_delete.slt @@ -26,6 +26,7 @@ CREATE SINK sink1 AS select * from mv1 WITH ( s3.secret.key = 'hummockadmin', create_table_if_not_exists = 'true', primary_key = 'i1,i2', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt index 0109f39350752..28ab9bf1c85f8 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt @@ -24,6 +24,7 @@ CREATE SINK sink1 AS select * from mv1 WITH ( create_table_if_not_exists = 'true', commit_checkpoint_interval = 3, primary_key = 'i1,i2', + partition_by = 'i1' ); statement ok diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index ced999e546d14..3991ef744f303 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -67,7 +67,8 @@ CREATE SINK s7 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH database.name='demo_db', table.name='e2e_auto_create_table', commit_checkpoint_interval = 1, - create_table_if_not_exists = 'true' + create_table_if_not_exists = 'true', + partition_by = 'v1' ); statement ok diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index a18287062bd47..4c4e7f5df972c 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -18,12 +18,15 @@ mod prometheus; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::num::NonZeroU64; +use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context}; use async_trait::async_trait; use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; -use iceberg::spec::{DataFile, SerializedDataFile}; +use iceberg::spec::{ + DataFile, SerializedDataFile, Transform, UnboundPartitionField, UnboundPartitionSpec, +}; use iceberg::table::Table; use iceberg::transaction::Transaction; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; @@ -47,6 +50,7 @@ use itertools::Itertools; use parquet::file::properties::WriterProperties; use prometheus::monitored_general_writer::MonitoredGeneralWriterBuilder; use prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; +use regex::Regex; use risingwave_common::array::arrow::arrow_array_iceberg::{Int32Array, RecordBatch}; use risingwave_common::array::arrow::arrow_schema_iceberg::{ self, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, @@ -109,6 +113,13 @@ pub struct IcebergConfig { #[serde(skip)] pub java_catalog_props: HashMap, + #[serde( + rename = "partition_by", + default, + deserialize_with = "deserialize_optional_string_seq_from_string" + )] + pub partition_by: Option>, + /// Commit every n(>0) checkpoints, default is 10. #[serde(default = "default_commit_checkpoint_interval")] #[serde_as(as = "DisplayFromStr")] @@ -311,9 +322,53 @@ impl IcebergSink { } }; + let partition_fields = match &self.config.partition_by { + Some(partition_by) => { + let mut partition_fields = + Vec::::with_capacity(partition_by.len()); + for partition_field in partition_by { + let re = Regex::new(r"\w+(\(\w+\))?").unwrap(); + if !re.is_match(partition_field) { + bail!(format!("Invalid partition field: {}", partition_field)) + } + let (_, [field1, field2]) = re.captures(partition_field).unwrap().extract(); + let mut func = field1.to_owned(); + let mut column = field2.to_owned(); + if column.is_empty() { + column.replace_range(.., &func); + func.replace_range(.., "identity"); + } + let transform = Transform::from_str(&func).unwrap(); + if transform == Transform::Unknown { + bail!(format!("Invalid partition field: {}", partition_field)) + } + for (pos, col) in self.param.columns.iter().enumerate() { + if col.name == column { + partition_fields.push( + UnboundPartitionField::builder() + .source_id(pos as i32) + .transform(transform) + .name(column.to_owned()) + .build(), + ); + break; + } + } + } + partition_fields + } + None => Vec::::new(), + }; + let table_creation_builder = TableCreation::builder() .name(self.config.common.table_name.clone()) - .schema(iceberg_schema); + .schema(iceberg_schema) + .partition_spec( + UnboundPartitionSpec::builder() + .add_partition_fields(partition_fields) + .unwrap() + .build(), + ); let table_creation = match location { Some(location) => table_creation_builder.location(location).build(), @@ -1402,6 +1457,7 @@ mod test { ("connector", "iceberg"), ("type", "upsert"), ("primary_key", "v1"), + ("partition_by", "v1,identity(v2)"), ("warehouse.path", "s3://iceberg"), ("s3.endpoint", "http://127.0.0.1:9301"), ("s3.access.key", "hummockadmin"), @@ -1446,6 +1502,7 @@ mod test { r#type: "upsert".to_owned(), force_append_only: false, primary_key: Some(vec!["v1".to_owned()]), + partition_by: Some(vec!["v1".to_owned(), "identity(v2)".to_owned()]), java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")] .into_iter() .map(|(k, v)| (k.to_owned(), v.to_owned())) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index a307d1a143f29..d9984b335eb2a 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -511,6 +511,10 @@ IcebergConfig: - name: java_catalog_props field_type: HashMap required: false + - name: partition_by + field_type: Vec + required: false + default: Default::default - name: commit_checkpoint_interval field_type: u64 comments: Commit every n(>0) checkpoints, default is 10. From e394453cd65ade330abf987bb5abfc0c16e255dd Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Thu, 6 Feb 2025 11:40:04 -0500 Subject: [PATCH 2/5] address comments --- src/connector/src/sink/iceberg/mod.rs | 59 ++++++++++++++------------- src/connector/with_options_sink.yaml | 2 +- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 4c4e7f5df972c..94e55aabce6a2 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -113,12 +113,8 @@ pub struct IcebergConfig { #[serde(skip)] pub java_catalog_props: HashMap, - #[serde( - rename = "partition_by", - default, - deserialize_with = "deserialize_optional_string_seq_from_string" - )] - pub partition_by: Option>, + #[serde(default)] + pub partition_by: Option, /// Commit every n(>0) checkpoints, default is 10. #[serde(default = "default_commit_checkpoint_interval")] @@ -323,25 +319,27 @@ impl IcebergSink { }; let partition_fields = match &self.config.partition_by { - Some(partition_by) => { - let mut partition_fields = - Vec::::with_capacity(partition_by.len()); - for partition_field in partition_by { - let re = Regex::new(r"\w+(\(\w+\))?").unwrap(); - if !re.is_match(partition_field) { - bail!(format!("Invalid partition field: {}", partition_field)) - } - let (_, [field1, field2]) = re.captures(partition_field).unwrap().extract(); - let mut func = field1.to_owned(); - let mut column = field2.to_owned(); - if column.is_empty() { - column.replace_range(.., &func); - func.replace_range(.., "identity"); - } - let transform = Transform::from_str(&func).unwrap(); - if transform == Transform::Unknown { - bail!(format!("Invalid partition field: {}", partition_field)) - } + Some(partition_field) => { + let mut partition_fields = Vec::::new(); + // captures column, transform(column), transform(n,column), transform(n, column) + let re = Regex::new(r"(?\w+)(\(((?\d+)?(?:,|(,\s)))?(?\w+)\))?").unwrap(); + if !re.is_match(partition_field) { + bail!(format!("Invalid partition fields: {}", partition_field)) + } + let caps = re.captures_iter(partition_field); + for mat in caps { + let (column, transform) = if &mat["n"] == "" && &mat["field"] == "" { + (&mat["func"], Transform::Identity) + } else { + let mut func = mat["func"].to_owned(); + let n = &mat["n"]; + if func == "bucket" || func == "truncate" { + func = format!("{func}({n})"); + } + (&mat["field"], Transform::from_str(&func).unwrap()) + }; + + let mut exists = false; for (pos, col) in self.param.columns.iter().enumerate() { if col.name == column { partition_fields.push( @@ -349,11 +347,16 @@ impl IcebergSink { .source_id(pos as i32) .transform(transform) .name(column.to_owned()) - .build(), + .build() ); + exists = true; break; } } + // safety check + if !exists { + bail!(format!("Invalid partition column: {}", column)) + } } partition_fields } @@ -1457,7 +1460,7 @@ mod test { ("connector", "iceberg"), ("type", "upsert"), ("primary_key", "v1"), - ("partition_by", "v1,identity(v2)"), + ("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"), ("warehouse.path", "s3://iceberg"), ("s3.endpoint", "http://127.0.0.1:9301"), ("s3.access.key", "hummockadmin"), @@ -1502,7 +1505,7 @@ mod test { r#type: "upsert".to_owned(), force_append_only: false, primary_key: Some(vec!["v1".to_owned()]), - partition_by: Some(vec!["v1".to_owned(), "identity(v2)".to_owned()]), + partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()), java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")] .into_iter() .map(|(k, v)| (k.to_owned(), v.to_owned())) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index d9984b335eb2a..2002637514c6b 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -512,7 +512,7 @@ IcebergConfig: field_type: HashMap required: false - name: partition_by - field_type: Vec + field_type: String required: false default: Default::default - name: commit_checkpoint_interval From 65e01f078b386238af93eafdc3f85e6035671a2e Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Thu, 6 Feb 2025 11:58:49 -0500 Subject: [PATCH 3/5] check failures/warnings --- src/connector/src/sink/iceberg/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 94e55aabce6a2..4d51fdf48bfce 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -322,13 +322,17 @@ impl IcebergSink { Some(partition_field) => { let mut partition_fields = Vec::::new(); // captures column, transform(column), transform(n,column), transform(n, column) - let re = Regex::new(r"(?\w+)(\(((?\d+)?(?:,|(,\s)))?(?\w+)\))?").unwrap(); + let re = Regex::new( + r"(?\w+)(\(((?\d+)?(?:,|(,\s)))?(?\w+)\))?", + ) + .unwrap(); if !re.is_match(partition_field) { bail!(format!("Invalid partition fields: {}", partition_field)) } let caps = re.captures_iter(partition_field); for mat in caps { - let (column, transform) = if &mat["n"] == "" && &mat["field"] == "" { + let (column, transform) = if mat["n"].is_empty() && mat["field"].is_empty() + { (&mat["func"], Transform::Identity) } else { let mut func = mat["func"].to_owned(); @@ -347,7 +351,7 @@ impl IcebergSink { .source_id(pos as i32) .transform(transform) .name(column.to_owned()) - .build() + .build(), ); exists = true; break; From 0d6f2467fc493d62af0b1a57f05088cfb0b5dd00 Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Fri, 7 Feb 2025 13:27:32 -0500 Subject: [PATCH 4/5] address comments --- src/connector/src/sink/iceberg/mod.rs | 36 ++++++++++++--------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 4d51fdf48bfce..4fefeda3048c7 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -336,31 +336,26 @@ impl IcebergSink { (&mat["func"], Transform::Identity) } else { let mut func = mat["func"].to_owned(); - let n = &mat["n"]; if func == "bucket" || func == "truncate" { + let n = &mat["n"]; func = format!("{func}({n})"); } (&mat["field"], Transform::from_str(&func).unwrap()) }; - let mut exists = false; - for (pos, col) in self.param.columns.iter().enumerate() { - if col.name == column { - partition_fields.push( - UnboundPartitionField::builder() - .source_id(pos as i32) - .transform(transform) - .name(column.to_owned()) - .build(), - ); - exists = true; - break; - } - } - // safety check - if !exists { - bail!(format!("Invalid partition column: {}", column)) - } + match iceberg_schema.field_id_by_name(column) { + Some(id) => partition_fields.push( + UnboundPartitionField::builder() + .source_id(id) + .transform(transform) + .name(column.to_owned()) + .build(), + ), + None => bail!(format!( + "Partition column does not exist in schema: {}", + column + )), + }; } partition_fields } @@ -373,7 +368,8 @@ impl IcebergSink { .partition_spec( UnboundPartitionSpec::builder() .add_partition_fields(partition_fields) - .unwrap() + .map_err(|e| SinkError::Iceberg(anyhow!(e))) + .context("failed to add partition columns")? .build(), ); From 8ffef6214c08425b9f7eceac1d8ae7808cc3ce6f Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Mon, 10 Feb 2025 09:02:43 -0500 Subject: [PATCH 5/5] address nit --- src/connector/src/sink/iceberg/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 2e48380615df3..06fc34c99cec7 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -327,7 +327,7 @@ impl IcebergSink { ) .unwrap(); if !re.is_match(partition_field) { - bail!(format!("Invalid partition fields: {}", partition_field)) + bail!(format!("Invalid partition fields: {}\nHINT: Supported formats are column, transform(column), transform(n,column), transform(n, column)", partition_field)) } let caps = re.captures_iter(partition_field); for mat in caps { @@ -352,7 +352,7 @@ impl IcebergSink { .build(), ), None => bail!(format!( - "Partition column does not exist in schema: {}", + "Partition source column does not exist in schema: {}", column )), };