Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jayakasadev committed Feb 6, 2025
1 parent 206251a commit da30d2d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 29 deletions.
59 changes: 31 additions & 28 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,8 @@ pub struct IcebergConfig {
#[serde(skip)]
pub java_catalog_props: HashMap<String, String>,

#[serde(
rename = "partition_by",
default,
deserialize_with = "deserialize_optional_string_seq_from_string"
)]
pub partition_by: Option<Vec<String>>,
#[serde(default)]
pub partition_by: Option<String>,

/// Commit every n(>0) checkpoints, default is 10.
#[serde(default = "default_commit_checkpoint_interval")]
Expand Down Expand Up @@ -323,37 +319,44 @@ impl IcebergSink {
};

let partition_fields = match &self.config.partition_by {
Some(partition_by) => {
let mut partition_fields =
Vec::<UnboundPartitionField>::with_capacity(partition_by.len());
for partition_field in partition_by {
let re = Regex::new(r"\w+(\(\w+\))?").unwrap();
if !re.is_match(partition_field) {
bail!(format!("Invalid partition field: {}", partition_field))
}
let (_, [field1, field2]) = re.captures(partition_field).unwrap().extract();
let mut func = field1.to_owned();
let mut column = field2.to_owned();
if column.is_empty() {
column.replace_range(.., &func);
func.replace_range(.., "identity");
}
let transform = Transform::from_str(&func).unwrap();
if transform == Transform::Unknown {
bail!(format!("Invalid partition field: {}", partition_field))
}
Some(partition_field) => {
let mut partition_fields = Vec::<UnboundPartitionField>::new();
// captures column, transform(column), transform(n,column), transform(n, column)
let re = Regex::new(r"(?<transform>\w+)(\(((?<n>\d+)?(?:,|(,\s)))?(?<field>\w+)\))?").unwrap();
if !re.is_match(partition_field) {
bail!(format!("Invalid partition fields: {}", partition_field))
}
let caps = re.captures_iter(partition_field);
for mat in caps {
let (column, transform) = if &mat["n"] == "" && &mat["field"] == "" {
(&mat["func"], Transform::Identity)
} else {
let mut func = mat["func"].to_owned();
let n = &mat["n"];
if func == "bucket" || func == "truncate" {
func = format!("{func}({n})");
}
(&mat["field"], Transform::from_str(&func).unwrap())
};

let mut exists = false;
for (pos, col) in self.param.columns.iter().enumerate() {
if col.name == column {
partition_fields.push(
UnboundPartitionField::builder()
.source_id(pos as i32)
.transform(transform)
.name(column.to_owned())
.build(),
.build()
);
exists = true;
break;
}
}
// safety check
if !exists {
bail!(format!("Invalid partition column: {}", column))
}
}
partition_fields
}
Expand Down Expand Up @@ -1457,7 +1460,7 @@ mod test {
("connector", "iceberg"),
("type", "upsert"),
("primary_key", "v1"),
("partition_by", "v1,identity(v2)"),
("partition_by", "v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)"),
("warehouse.path", "s3://iceberg"),
("s3.endpoint", "http://127.0.0.1:9301"),
("s3.access.key", "hummockadmin"),
Expand Down Expand Up @@ -1502,7 +1505,7 @@ mod test {
r#type: "upsert".to_owned(),
force_append_only: false,
primary_key: Some(vec!["v1".to_owned()]),
partition_by: Some(vec!["v1".to_owned(), "identity(v2)".to_owned()]),
partition_by: Some("v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)".to_owned()),
java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
.into_iter()
.map(|(k, v)| (k.to_owned(), v.to_owned()))
Expand Down
2 changes: 1 addition & 1 deletion src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ IcebergConfig:
field_type: HashMap<String,String>
required: false
- name: partition_by
field_type: Vec<String>
field_type: String
required: false
default: Default::default
- name: commit_checkpoint_interval
Expand Down

0 comments on commit da30d2d

Please sign in to comment.