From 0c3fa31d80a2c0ce6ab5a7d25603345e020afe3a Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Mon, 27 Jan 2025 13:38:48 -0500 Subject: [PATCH] add 'glue.id' to iceberg sink config --- src/connector/src/connector_common/connection.rs | 5 +++++ src/connector/src/connector_common/iceberg/mod.rs | 8 ++++++++ src/connector/src/sink/iceberg/mod.rs | 1 + 3 files changed, 14 insertions(+) diff --git a/src/connector/src/connector_common/connection.rs b/src/connector/src/connector_common/connection.rs index 3604b1bd2f99c..78be1b97c0362 100644 --- a/src/connector/src/connector_common/connection.rs +++ b/src/connector/src/connector_common/connection.rs @@ -135,6 +135,10 @@ pub struct IcebergConnection { /// Path of iceberg warehouse, only applicable in storage catalog. #[serde(rename = "warehouse.path")] pub warehouse_path: Option, + /// Catalog id, can be omitted for storage catalog or when + /// caller's AWS account ID matches catalog id + #[serde(rename = "catalog.id")] + pub catalog_id: Option, /// Catalog name, can be omitted for storage catalog, but /// must be set for other catalogs. #[serde(rename = "catalog.name")] @@ -255,6 +259,7 @@ impl Connection for IcebergConnection { secret_key: self.secret_key.clone(), gcs_credential: self.gcs_credential.clone(), warehouse_path: self.warehouse_path.clone(), + catalog_id: self.catalog_id.clone(), catalog_name: self.catalog_name.clone(), catalog_uri: self.catalog_uri.clone(), credential: self.credential.clone(), diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index c04e20e53c202..a5d9b36b5f52e 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -57,6 +57,10 @@ pub struct IcebergCommon { /// Path of iceberg warehouse, only applicable in storage catalog. #[serde(rename = "warehouse.path")] pub warehouse_path: Option, + /// Catalog id, can be omitted for storage catalog or when + /// caller's AWS account ID matches catalog id + #[serde(rename = "catalog.id")] + pub catalog_id: Option, /// Catalog name, can be omitted for storage catalog, but /// must be set for other catalogs. #[serde(rename = "catalog.name")] @@ -274,6 +278,10 @@ impl IcebergCommon { format!("https://glue.{}.amazonaws.com", region), ); } + + if let Some(catalog_id) = self.catalog_id.as_deref() { + java_catalog_configs.insert("glue.id".to_owned(), catalog_id.to_owned()); + } } _ => {} } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index f84d0aa2cb339..e33155a6500c8 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -1432,6 +1432,7 @@ mod test { secret_key: Some("hummockadmin".to_owned()), gcs_credential: None, catalog_type: Some("jdbc".to_owned()), + catalog_id: None, catalog_name: Some("demo".to_owned()), database_name: Some("demo_db".to_owned()), table_name: "demo_table".to_owned(),