diff --git a/airbyte-config-oss/init-oss/src/main/resources/icons/starburst-galaxy.svg b/airbyte-config-oss/init-oss/src/main/resources/icons/starburst-galaxy.svg
new file mode 100644
index 0000000000000..11eb26295c03e
--- /dev/null
+++ b/airbyte-config-oss/init-oss/src/main/resources/icons/starburst-galaxy.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/destination_definitions.yaml b/airbyte-config-oss/init-oss/src/main/resources/seed/destination_definitions.yaml
index 8eaffb3b2a314..ff2c8a310a86d 100644
--- a/airbyte-config-oss/init-oss/src/main/resources/seed/destination_definitions.yaml
+++ b/airbyte-config-oss/init-oss/src/main/resources/seed/destination_definitions.yaml
@@ -116,6 +116,13 @@
documentationUrl: https://docs.airbyte.io/integrations/destinations/convex
icon: convex.svg
releaseStage: alpha
+- name: Starburst Galaxy
+ destinationDefinitionId: 4528e960-6f7b-4412-8555-7e0097e1da17
+ dockerRepository: airbyte/destination-starburst-galaxy
+ dockerImageTag: 0.0.1
+ documentationUrl: https://docs.airbyte.com/integrations/destinations/starburst-galaxy
+ icon: starburst-galaxy.svg
+ releaseStage: alpha
- name: Databricks Lakehouse
destinationDefinitionId: 072d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-databricks
diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/destination_specs.yaml b/airbyte-config-oss/init-oss/src/main/resources/seed/destination_specs.yaml
index f88cd033c4928..d3469b8072e7e 100644
--- a/airbyte-config-oss/init-oss/src/main/resources/seed/destination_specs.yaml
+++ b/airbyte-config-oss/init-oss/src/main/resources/seed/destination_specs.yaml
@@ -1820,6 +1820,158 @@
- "overwrite"
- "append"
- "append_dedup"
+- dockerImage: "airbyte/destination-starburst-galaxy:0.0.1"
+ spec:
+ documentationUrl: "https://docs.airbyte.com/integrations/destinations/starburst-galaxy"
+ connectionSpecification:
+ $schema: "http://json-schema.org/draft-07/schema#"
+ title: "Starburst Galaxy Destination Spec"
+ type: "object"
+ required:
+ - "accept_terms"
+ - "server_hostname"
+ - "username"
+ - "password"
+ - "catalog"
+ - "staging_object_store"
+ properties:
+ accept_terms:
+ title: "Agree to the Starburst Galaxy terms & conditions"
+ type: "boolean"
+ description: "You must agree to the Starburst Galaxy terms & conditions to use this connector."
+ default: false
+ order: 1
+ server_hostname:
+ title: "Hostname"
+ type: "string"
+ description: "Starburst Galaxy cluster hostname."
+ examples:
+ - "abc-12345678-wxyz.trino.galaxy-demo.io"
+ order: 2
+ port:
+ title: "Port"
+ type: "string"
+ description: "Starburst Galaxy cluster port."
+ default: "443"
+ examples:
+ - "443"
+ order: 3
+ username:
+ title: "User"
+ type: "string"
+ description: "Starburst Galaxy user."
+ examples:
+ - "user@example.com"
+ order: 4
+ password:
+ title: "Password"
+ type: "string"
+ description: "Starburst Galaxy password for the specified user."
+ examples:
+ - "password"
+ airbyte_secret: true
+ order: 5
+ catalog:
+ title: "Amazon S3 catalog"
+ type: "string"
+ description: "Name of the Starburst Galaxy Amazon S3 catalog."
+ examples:
+ - "sample_s3_catalog"
+ order: 6
+ catalog_schema:
+ title: "Amazon S3 catalog schema"
+ type: "string"
+ description: "The default Starburst Galaxy Amazon S3 catalog schema where\
+ \ tables are written to if the source does not specify a namespace. Defaults\
+ \ to \"public\"."
+ default: "public"
+ examples:
+ - "public"
+ order: 7
+ staging_object_store:
+ title: "Staging object store"
+ type: "object"
+ description: "Temporary storage on which temporary Iceberg table is created."
+ oneOf:
+ - title: "Amazon S3"
+ required:
+ - "object_store_type"
+ - "s3_bucket_name"
+ - "s3_bucket_path"
+ - "s3_bucket_region"
+ - "s3_access_key_id"
+ - "s3_secret_access_key"
+ properties:
+ object_store_type:
+ type: "string"
+ enum:
+ - "S3"
+ default: "S3"
+ order: 1
+ s3_bucket_name:
+ title: "S3 bucket name"
+ type: "string"
+ description: "Name of the S3 bucket"
+ examples:
+ - "airbyte_staging"
+ order: 1
+ s3_bucket_path:
+ title: "S3 bucket path"
+ type: "string"
+ description: "Directory in the S3 bucket where staging data is stored."
+ examples:
+ - "temp_airbyte__sync/test"
+ order: 2
+ s3_bucket_region:
+ title: "S3 bucket region"
+ type: "string"
+ default: "us-east-1"
+ description: "The region of the S3 bucket."
+ enum:
+ - "ap-northeast-1"
+ - "ap-southeast-1"
+ - "ap-southeast-2"
+ - "ca-central-1"
+ - "eu-central-1"
+ - "eu-west-1"
+ - "eu-west-2"
+ - "eu-west-3"
+ - "us-east-1"
+ - "us-east-2"
+ - "us-west-1"
+ - "us-west-2"
+ order: 3
+ s3_access_key_id:
+ title: "Access key"
+ type: "string"
+ description: "Access key with access to the bucket. Airbyte requires\
+ \ read and write permissions to a given bucket."
+ examples:
+ - "A012345678910EXAMPLE"
+ airbyte_secret: true
+ order: 4
+ s3_secret_access_key:
+ title: "Secret key"
+ type: "string"
+ description: "Secret key used with the specified access key."
+ examples:
+ - "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"
+ airbyte_secret: true
+ order: 5
+ order: 8
+ purge_staging_table:
+ title: "Purge staging Iceberg table"
+ type: "boolean"
+ description: "Defaults to 'true'. Switch to 'false' for debugging purposes."
+ default: true
+ order: 9
+ supportsIncremental: true
+ supportsNormalization: false
+ supportsDBT: false
+ supported_destination_sync_modes:
+ - "overwrite"
+ - "append"
- dockerImage: "airbyte/destination-databricks:1.0.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/databricks"
diff --git a/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json b/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json
index 2375bb49fcb3f..b0b72c40f32ed 100644
--- a/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json
+++ b/airbyte-config-oss/init-oss/src/main/resources/seed/oss_catalog.json
@@ -1694,6 +1694,147 @@
"public": true,
"custom": false,
"releaseStage": "alpha"
+ }, {
+ "destinationDefinitionId": "4528e960-6f7b-4412-8555-7e0097e1da17",
+ "name": "Starburst Galaxy",
+ "dockerRepository": "airbyte/destination-starburst-galaxy",
+ "dockerImageTag": "0.0.1",
+ "documentationUrl": "https://docs.airbyte.com/integrations/destinations/starburst-galaxy",
+ "icon": "starburst-galaxy.svg",
+ "spec": {
+ "documentationUrl": "https://docs.airbyte.com/integrations/destinations/starburst-galaxy",
+ "connectionSpecification": {
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Starburst Galaxy Destination Spec",
+ "type": "object",
+ "required": [ "accept_terms", "server_hostname", "username", "password", "catalog", "staging_object_store" ],
+ "properties": {
+ "accept_terms": {
+ "title": "Agree to the Starburst Galaxy terms & conditions",
+ "type": "boolean",
+ "description": "You must agree to the Starburst Galaxy terms & conditions to use this connector.",
+ "default": false,
+ "order": 1
+ },
+ "server_hostname": {
+ "title": "Hostname",
+ "type": "string",
+ "description": "Starburst Galaxy cluster hostname.",
+ "examples": [ "abc-12345678-wxyz.trino.galaxy-demo.io" ],
+ "order": 2
+ },
+ "port": {
+ "title": "Port",
+ "type": "string",
+ "description": "Starburst Galaxy cluster port.",
+ "default": "443",
+ "examples": [ "443" ],
+ "order": 3
+ },
+ "username": {
+ "title": "User",
+ "type": "string",
+ "description": "Starburst Galaxy user.",
+ "examples": [ "user@example.com" ],
+ "order": 4
+ },
+ "password": {
+ "title": "Password",
+ "type": "string",
+ "description": "Starburst Galaxy password for the specified user.",
+ "examples": [ "password" ],
+ "airbyte_secret": true,
+ "order": 5
+ },
+ "catalog": {
+ "title": "Amazon S3 catalog",
+ "type": "string",
+ "description": "Name of the Starburst Galaxy Amazon S3 catalog.",
+ "examples": [ "sample_s3_catalog" ],
+ "order": 6
+ },
+ "catalog_schema": {
+ "title": "Amazon S3 catalog schema",
+ "type": "string",
+ "description": "The default Starburst Galaxy Amazon S3 catalog schema where tables are written to if the source does not specify a namespace. Defaults to \"public\".",
+ "default": "public",
+ "examples": [ "public" ],
+ "order": 7
+ },
+ "staging_object_store": {
+ "title": "Staging object store",
+ "type": "object",
+ "description": "Temporary storage on which temporary Iceberg table is created.",
+ "oneOf": [ {
+ "title": "Amazon S3",
+ "required": [ "object_store_type", "s3_bucket_name", "s3_bucket_path", "s3_bucket_region", "s3_access_key_id", "s3_secret_access_key" ],
+ "properties": {
+ "object_store_type": {
+ "type": "string",
+ "enum": [ "S3" ],
+ "default": "S3",
+ "order": 1
+ },
+ "s3_bucket_name": {
+ "title": "S3 bucket name",
+ "type": "string",
+ "description": "Name of the S3 bucket",
+ "examples": [ "airbyte_staging" ],
+ "order": 1
+ },
+ "s3_bucket_path": {
+ "title": "S3 bucket path",
+ "type": "string",
+ "description": "Directory in the S3 bucket where staging data is stored.",
+ "examples": [ "temp_airbyte__sync/test" ],
+ "order": 2
+ },
+ "s3_bucket_region": {
+ "title": "S3 bucket region",
+ "type": "string",
+ "default": "us-east-1",
+ "description": "The region of the S3 bucket.",
+ "enum": [ "ap-northeast-1", "ap-southeast-1", "ap-southeast-2", "ca-central-1", "eu-central-1", "eu-west-1", "eu-west-2", "eu-west-3", "us-east-1", "us-east-2", "us-west-1", "us-west-2" ],
+ "order": 3
+ },
+ "s3_access_key_id": {
+ "title": "Access key",
+ "type": "string",
+ "description": "Access key with access to the bucket. Airbyte requires read and write permissions to a given bucket.",
+ "examples": [ "A012345678910EXAMPLE" ],
+ "airbyte_secret": true,
+ "order": 4
+ },
+ "s3_secret_access_key": {
+ "title": "Secret key",
+ "type": "string",
+ "description": "Secret key used with the specified access key.",
+ "examples": [ "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY" ],
+ "airbyte_secret": true,
+ "order": 5
+ }
+ }
+ } ],
+ "order": 8
+ },
+ "purge_staging_table": {
+ "title": "Purge staging Iceberg table",
+ "type": "boolean",
+ "description": "Defaults to 'true'. Switch to 'false' for debugging purposes.",
+ "default": true,
+ "order": 9
+ }
+ }
+ },
+ "supportsIncremental": true,
+ "supportsNormalization": false,
+ "supportsDBT": false,
+ "supported_destination_sync_modes": [ "overwrite", "append" ]
+ },
+ "tombstone": false,
+ "public": true,
+ "custom": false,
+ "releaseStage": "alpha"
}, {
"destinationDefinitionId": "072d5540-f236-4294-ba7c-ade8fd918496",
"name": "Databricks Lakehouse",
diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/factory/DatabaseDriver.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/factory/DatabaseDriver.java
index ea920a4fe7009..4e23420186595 100644
--- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/factory/DatabaseDriver.java
+++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/factory/DatabaseDriver.java
@@ -12,6 +12,7 @@ public enum DatabaseDriver {
CLICKHOUSE("com.clickhouse.jdbc.ClickHouseDriver", "jdbc:clickhouse:%s://%s:%d/%s"),
DATABRICKS("com.databricks.client.jdbc.Driver", "jdbc:databricks://%s:%s;HttpPath=%s;SSL=1;UserAgentEntry=Airbyte"),
DB2("com.ibm.db2.jcc.DB2Driver", "jdbc:db2://%s:%d/%s"),
+ STARBURST("io.trino.jdbc.TrinoDriver", "jdbc:trino://%s:%s/%s?SSL=true&source=airbyte"),
MARIADB("org.mariadb.jdbc.Driver", "jdbc:mariadb://%s:%d/%s"),
MSSQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://%s:%d/%s"),
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql://%s:%d/%s"),
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/.dockerignore b/airbyte-integrations/connectors/destination-starburst-galaxy/.dockerignore
new file mode 100644
index 0000000000000..65c7d0ad3e73c
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/.dockerignore
@@ -0,0 +1,3 @@
+*
+!Dockerfile
+!build
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/BOOTSTRAP.md b/airbyte-integrations/connectors/destination-starburst-galaxy/BOOTSTRAP.md
new file mode 100644
index 0000000000000..8844bf5bde084
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/BOOTSTRAP.md
@@ -0,0 +1,8 @@
+# Starburst Galaxy destination connector bootstrap
+
+This destination syncs data to Amazon S3 catalog in [Starburst Galaxy](https://www.starburst.io/platform/starburst-galaxy/) by completing the following steps:
+
+1. Persist source stream data to S3 staging storage in the Iceberg table format.
+2. Create a destination Iceberg table in Amazon S3 catalog in Starburst Galaxy from the staged Iceberg table.
+
+Learn more from [the Airbyte documentation](https://docs.airbyte.io/integrations/destinations/starburst-galaxy).
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/Dockerfile b/airbyte-integrations/connectors/destination-starburst-galaxy/Dockerfile
new file mode 100644
index 0000000000000..bc9b3595224d7
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/Dockerfile
@@ -0,0 +1,18 @@
+FROM airbyte/integration-base-java:dev AS build
+
+WORKDIR /airbyte
+ENV APPLICATION destination-starburst-galaxy
+
+COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
+
+RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar
+
+FROM airbyte/integration-base-java:dev
+
+WORKDIR /airbyte
+ENV APPLICATION destination-starburst-galaxy
+
+COPY --from=build /airbyte /airbyte
+
+LABEL io.airbyte.version=0.0.1
+LABEL io.airbyte.name=airbyte/destination-starburst-galaxy
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/README.md b/airbyte-integrations/connectors/destination-starburst-galaxy/README.md
new file mode 100644
index 0000000000000..d8ec77b405d3c
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/README.md
@@ -0,0 +1,65 @@
+# Build and run the Starburst Galaxy destination
+
+This is the repository for the Starburst Galaxy destination connector, written in Java.
+For information about how to use this connector within Airbyte, see [the user documentation](https://docs.airbyte.com/integrations/destinations/starburst-galaxy).
+
+## Local development
+
+#### Build with Gradle
+
+From the Airbyte repository root, run:
+```
+./gradlew :airbyte-integrations:connectors:destination-starburst-galaxy:build
+```
+
+#### Create credentials
+
+If you are a community contributor, you must generate the necessary credentials and place them in `secrets/config.json`, conforming to the spec file in `src/main/resources/spec.json`.
+**Note**: The `secrets` directory is git-ignored by default; sensitive information cannot be checked in.
+
+If you are an Airbyte core member, you must follow the [instructions](https://docs.airbyte.com/connector-development#using-credentials-in-ci) to set up your credentials.
+
+### Build and run a local Docker image for the connector
+
+#### Build
+
+Build the connector image with Gradle:
+```
+./gradlew :airbyte-integrations:connectors:destination-starburst-galaxy:airbyteDocker
+```
+When building with Gradle, the Docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` labels in
+the Dockerfile.
+
+#### Run
+
+Following example commands are Starburst Galaxy-specific version of the [Airbyte protocol commands](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol):
+```
+docker run --rm airbyte/destination-starburst-galaxy:dev spec
+docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-starburst-galaxy:dev check --config /secrets/config.json
+docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-starburst-galaxy:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
+```
+
+### Run tests with Gradle
+
+All commands should be run from airbyte project root.
+
+To run unit tests:
+```
+./gradlew :airbyte-integrations:connectors:destination-starburst-galaxy:unitTest
+```
+To run acceptance and custom integration tests:
+```
+./gradlew :airbyte-integrations:connectors:destination-starburst-galaxy:integrationTest
+```
+
+## Dependency management
+
+### Publish a new version of the connector
+
+After you have implemented a feature, bug fix or enhancement, you must do the following:
+
+1. Ensure all unit and integration tests pass.
+2. Update the connector version by incrementing the value of the `io.airbyte.version` label in the Dockerfile by following the [SemVer](https://semver.org/) versioning rules.
+3. Create a Pull Request.
+
+Airbyte will review your PR and request any changes necessary to merge it into master.
\ No newline at end of file
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/build.gradle b/airbyte-integrations/connectors/destination-starburst-galaxy/build.gradle
new file mode 100644
index 0000000000000..449997bb4e6ff
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/build.gradle
@@ -0,0 +1,44 @@
+plugins {
+ id 'application'
+ id 'airbyte-docker'
+ id 'airbyte-integration-test-java'
+}
+
+application {
+ mainClass = 'io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyDestination'
+}
+
+dependencies {
+ implementation project(':airbyte-config-oss:config-models-oss')
+ implementation libs.airbyte.protocol
+ implementation project(':airbyte-integrations:bases:base-java')
+ implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
+ implementation project(':airbyte-integrations:bases:bases-destination-jdbc')
+ implementation project(path: ':airbyte-db:db-lib')
+ implementation project(path: ':airbyte-integrations:bases:base-java-s3')
+ implementation project(path: ':airbyte-integrations:connectors:destination-s3')
+
+ implementation ('io.trino:trino-iceberg:411') {exclude group: 'commons-cli', module: 'commons-cli'}
+ implementation ('io.trino:trino-main:411') {exclude group: 'commons-cli', module: 'commons-cli'}
+ implementation ('io.trino:trino-jdbc:411') {exclude group: 'commons-cli', module: 'commons-cli'}
+
+ implementation 'org.apache.avro:avro:1.11.1'
+
+ implementation 'org.apache.iceberg:iceberg-core:1.1.0'
+ implementation 'org.apache.iceberg:iceberg-bundled-guava:1.1.0'
+ implementation 'org.apache.iceberg:iceberg-aws:1.1.0'
+ implementation 'org.apache.iceberg:iceberg-parquet:1.1.0'
+
+ implementation 'org.apache.hadoop:hadoop-common:3.3.3'
+ implementation "org.apache.hadoop:hadoop-aws:3.3.2"
+
+ implementation 'software.amazon.awssdk:bundle:2.20.20'
+ implementation 'software.amazon.awssdk:url-connection-client:2.20.20'
+
+ implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
+
+ integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
+ integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-starburst-galaxy')
+
+ implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/destination-starburst-galaxy/integration_tests/configured_catalog.json
new file mode 100644
index 0000000000000..da8be1ef3bd25
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/integration_tests/configured_catalog.json
@@ -0,0 +1,24 @@
+{
+ "streams": [
+ {
+ "stream" : {
+ "name": "users",
+ "json_schema": {
+ "type": "object",
+ "required": ["name"],
+ "properties": {
+ "name": {
+ "type": "string"
+ },
+ "age": {
+ "type": "number"
+ }
+ }
+ },
+ "supported_sync_modes": ["full_refresh"]
+ },
+ "sync_mode": "full_refresh",
+ "destination_sync_mode": "overwrite"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/sample_secrets/config.json b/airbyte-integrations/connectors/destination-starburst-galaxy/sample_secrets/config.json
new file mode 100644
index 0000000000000..b07a907fec1fa
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/sample_secrets/config.json
@@ -0,0 +1,18 @@
+{
+ "accept_terms": true,
+ "server_hostname": "abc-12345678-wxyz.galaxy.starburst.io",
+ "port": "443",
+ "username": "user@example.com",
+ "password": "password",
+ "staging_object_store": {
+ "object_store_type": "S3",
+ "s3_bucket_name": "required",
+ "s3_bucket_path": "required",
+ "s3_bucket_region": "required",
+ "s3_access_key_id": "required",
+ "s3_secret_access_key": "required"
+ },
+ "purge_staging_table": true,
+ "catalog": "s3_catalog",
+ "catalog_schema": "public"
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/ColumnMetadata.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/ColumnMetadata.java
new file mode 100644
index 0000000000000..99c3189063cd2
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/ColumnMetadata.java
@@ -0,0 +1,9 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import io.trino.spi.type.Type;
+
+public record ColumnMetadata(String name, Type galaxyIcebergType, int position) {}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/HadoopCatalogIcebergS3ParquetWriter.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/HadoopCatalogIcebergS3ParquetWriter.java
new file mode 100644
index 0000000000000..fcc679572dfbf
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/HadoopCatalogIcebergS3ParquetWriter.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.integrations.destination.s3.writer.BaseS3Writer.determineOutputFilename;
+import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
+import static org.apache.iceberg.CatalogProperties.FILE_IO_IMPL;
+import static org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION;
+import static org.apache.iceberg.aws.AwsProperties.S3FILEIO_ACCESS_KEY_ID;
+import static org.apache.iceberg.aws.AwsProperties.S3FILEIO_SECRET_ACCESS_KEY;
+
+import com.amazonaws.services.s3.AmazonS3;
+import io.airbyte.integrations.destination.s3.S3DestinationConfig;
+import io.airbyte.integrations.destination.s3.S3Format;
+import io.airbyte.integrations.destination.s3.credential.S3AccessKeyCredentialConfig;
+import io.airbyte.integrations.destination.s3.template.S3FilenameTemplateParameterObject;
+import io.airbyte.protocol.models.v0.AirbyteStream;
+import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetAvroWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HadoopCatalogIcebergS3ParquetWriter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HadoopCatalogIcebergS3ParquetWriter.class);
+
+ private final DataWriter parquetWriter;
+ private final Table table;
+ private final S3DestinationConfig config;
+ private final AirbyteStream stream;
+ private final HadoopCatalog catalog;
+ private final AmazonS3 s3Client;
+ private final String tableStorageRelativePath;
+
+ public HadoopCatalogIcebergS3ParquetWriter(
+ final S3DestinationConfig config,
+ final ConfiguredAirbyteStream configuredStream,
+ final Schema schema,
+ final String schemaName,
+ final String tableName,
+ final Timestamp uploadTime)
+ throws IOException {
+
+ this.config = config;
+ this.stream = configuredStream.getStream();
+ this.s3Client = config.getS3Client();
+
+ String outputFilename = determineOutputFilename(S3FilenameTemplateParameterObject
+ .builder()
+ .s3Format(S3Format.PARQUET)
+ .timestamp(uploadTime)
+ .fileExtension(S3Format.PARQUET.getFileExtension())
+ .build());
+
+ String warehousePath = String.format("s3a://%s/%s", this.config.getBucketName(), this.config.getBucketPath());
+
+ this.tableStorageRelativePath = String.join("/", this.config.getBucketPath(), schemaName, tableName);
+ initializeS3Storage();
+
+ this.catalog = createCatalog(warehousePath);
+ LOGGER.info("Warehouse path {}", warehousePath);
+ Namespace namespace = Namespace.of(schemaName);
+ TableIdentifier name = TableIdentifier.of(namespace, tableName);
+ catalog.createTable(name, schema);
+ // Create table may change the column ids of given schema before committing to metadata file which
+ // brings inconsistencies between table schema and the schema used by parquetWriter.
+ // For sharing consistent schema between parquetWriter and a table, loadTable is used to get the
+ // updated schema which can be used by the parquetWriter
+ // https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/TableMetadata.java#L102-L105
+ this.table = catalog.loadTable(name);
+ String tableLocation = table.location() + "/" + outputFilename;
+ LOGGER.info("Table {} at data file location {} is created", table.name(), tableLocation);
+
+ this.parquetWriter = Parquet.writeData(table.io().newOutputFile(tableLocation))
+ .schema(table.schema())
+ .createWriterFunc(ParquetAvroWriter::buildWriter)
+ .overwrite()
+ .withSpec(PartitionSpec.unpartitioned())
+ .build();
+ }
+
+ private void initializeS3Storage() {
+ try {
+ final String bucket = config.getBucketName();
+ if (!s3Client.doesBucketExistV2(bucket)) {
+ LOGGER.info("Bucket {} does not exist; creating...", bucket);
+ s3Client.createBucket(bucket);
+ LOGGER.info("Bucket {} has been created.", bucket);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to initialize S3 storage: ", e);
+ throw e;
+ }
+ }
+
+ public String getTableStorageRelativePath() {
+ return tableStorageRelativePath;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public void write(GenericData.Record record) {
+ parquetWriter.write(record);
+ }
+
+ private void closeWhenSucceed() throws IOException {
+ parquetWriter.close();
+ }
+
+ private void closeWhenFail() throws IOException {
+ parquetWriter.close();
+ }
+
+ public void close(final boolean hasFailed)
+ throws IOException {
+ try {
+ if (hasFailed) {
+ LOGGER.warn("Failure detected. Aborting upload of stream '{}'...", stream.getName());
+ closeWhenFail();
+ LOGGER.warn("Upload of stream '{}' aborted.", stream.getName());
+ } else {
+ LOGGER.info("Uploading remaining data for stream '{}'.", stream.getName());
+ closeWhenSucceed();
+ LOGGER.info("Upload completed for stream '{}'.", stream.getName());
+ }
+ } finally {
+ table.newAppend().appendFile(parquetWriter.toDataFile()).commit();
+ catalog.close();
+ }
+ }
+
+ private HadoopCatalog createCatalog(String warehousePath) {
+ S3AccessKeyCredentialConfig credentialConfig = (S3AccessKeyCredentialConfig) config.getS3CredentialConfig();
+
+ System.setProperty("aws.region", config.getBucketRegion());
+
+ Map properties = new HashMap<>();
+ properties.put(WAREHOUSE_LOCATION, warehousePath);
+ properties.put(FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
+ properties.put(S3FILEIO_ACCESS_KEY_ID, credentialConfig.getAccessKeyId());
+ properties.put(S3FILEIO_SECRET_ACCESS_KEY, credentialConfig.getSecretAccessKey());
+
+ Configuration configuration = new Configuration();
+ configuration.set(AWS_CREDENTIALS_PROVIDER, "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ configuration.set(ACCESS_KEY, credentialConfig.getAccessKeyId());
+ configuration.set(SECRET_KEY, credentialConfig.getSecretAccessKey());
+ configuration.set(SECURE_CONNECTIONS, "true");
+
+ HadoopCatalog hadoopCatalog = new HadoopCatalog();
+ hadoopCatalog.setConf(configuration);
+
+ hadoopCatalog.initialize("hadoop-catalog", properties);
+
+ return hadoopCatalog;
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyBaseDestination.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyBaseDestination.java
new file mode 100644
index 0000000000000..d0456364ee0f3
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyBaseDestination.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.db.factory.DatabaseDriver.STARBURST;
+import static io.airbyte.integrations.destination.jdbc.copy.CopyConsumerFactory.create;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.CATALOG_SCHEMA;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.STARBURST_GALAXY_DRIVER_CLASS;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyDestinationConfig.get;
+import static java.lang.String.format;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.airbyte.db.factory.DataSourceFactory;
+import io.airbyte.db.jdbc.DefaultJdbcDatabase;
+import io.airbyte.db.jdbc.JdbcDatabase;
+import io.airbyte.integrations.base.AirbyteMessageConsumer;
+import io.airbyte.integrations.destination.StandardNameTransformer;
+import io.airbyte.integrations.destination.jdbc.SqlOperations;
+import io.airbyte.integrations.destination.jdbc.copy.CopyDestination;
+import io.airbyte.protocol.models.v0.AirbyteMessage;
+import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
+import java.util.function.Consumer;
+import javax.sql.DataSource;
+
+public abstract class StarburstGalaxyBaseDestination
+ extends CopyDestination {
+
+ public StarburstGalaxyBaseDestination() {
+ super(CATALOG_SCHEMA);
+ }
+
+ @Override
+ public void checkPersistence(JsonNode config) {
+ checkPersistence(get(config).storageConfig());
+ }
+
+ protected abstract void checkPersistence(StarburstGalaxyStagingStorageConfig galaxyStorageConfig);
+
+ @Override
+ public AirbyteMessageConsumer getConsumer(final JsonNode config,
+ final ConfiguredAirbyteCatalog catalog,
+ final Consumer outputRecordCollector) {
+ final StarburstGalaxyDestinationConfig starburstGalaxyConfig = get(config);
+ final DataSource dataSource = getDataSource(config);
+ return create(
+ outputRecordCollector,
+ dataSource,
+ getDatabase(dataSource),
+ getSqlOperations(),
+ getNameTransformer(),
+ starburstGalaxyConfig,
+ catalog,
+ getStreamCopierFactory(),
+ starburstGalaxyConfig.galaxyCatalogSchema());
+ }
+
+ protected abstract StarburstGalaxyStreamCopierFactory getStreamCopierFactory();
+
+ @Override
+ public StandardNameTransformer getNameTransformer() {
+ return new StarburstGalaxyNameTransformer();
+ }
+
+ @Override
+ public DataSource getDataSource(final JsonNode config) {
+ final StarburstGalaxyDestinationConfig galaxyDestinationConfig = get(config);
+ return DataSourceFactory.create(
+ galaxyDestinationConfig.galaxyUsername(),
+ galaxyDestinationConfig.galaxyPassword(),
+ STARBURST_GALAXY_DRIVER_CLASS,
+ getGalaxyConnectionString(galaxyDestinationConfig));
+ }
+
+ @Override
+ public JdbcDatabase getDatabase(final DataSource dataSource) {
+ return new DefaultJdbcDatabase(dataSource);
+ }
+
+ @Override
+ public SqlOperations getSqlOperations() {
+ return new StarburstGalaxySqlOperations();
+ }
+
+ public static String getGalaxyConnectionString(final StarburstGalaxyDestinationConfig galaxyDestinationConfig) {
+ return format(STARBURST.getUrlFormatString(),
+ galaxyDestinationConfig.galaxyServerHostname(),
+ galaxyDestinationConfig.galaxyPort(),
+ galaxyDestinationConfig.galaxyCatalog());
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyConstants.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyConstants.java
new file mode 100644
index 0000000000000..5e4d77c18f599
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.db.factory.DatabaseDriver.STARBURST;
+
+public final class StarburstGalaxyConstants {
+
+ public static final String STARBURST_GALAXY_DRIVER_CLASS = STARBURST.getDriverClassName();
+ public static final String ACCEPT_TERMS = "accept_terms";
+ public static final String SERVER_HOSTNAME = "server_hostname";
+ public static final String PORT = "port";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String CATALOG = "catalog";
+ public static final String CATALOG_SCHEMA = "catalog_schema";
+ public static final String OBJECT_STORE_TYPE = "object_store_type";
+ public static final String PURGE_STAGING_TABLE = "purge_staging_table";
+ public static final String STAGING_OBJECT_STORE = "staging_object_store";
+
+ private StarburstGalaxyConstants() {}
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestination.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestination.java
new file mode 100644
index 0000000000000..63a65e08c6b8a
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestination.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyStagingStorageType.S3;
+
+import com.google.common.collect.ImmutableMap;
+import io.airbyte.integrations.base.Destination;
+import io.airbyte.integrations.base.IntegrationRunner;
+import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
+import java.io.Closeable;
+import java.sql.DriverManager;
+
+public class StarburstGalaxyDestination extends SwitchingDestination {
+
+ public StarburstGalaxyDestination() {
+ super(StarburstGalaxyStagingStorageType.class,
+ StarburstGalaxyDestinationResolver::getStagingStorageType,
+ ImmutableMap.of(S3, new StarburstGalaxyS3Destination()));
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final Destination destination = new StarburstGalaxyDestination();
+ new IntegrationRunner(destination).run(args);
+ ((Closeable) DriverManager.getDriver("jdbc:trino:")).close();
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestinationConfig.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestinationConfig.java
new file mode 100644
index 0000000000000..78fa900b95212
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestinationConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.ACCEPT_TERMS;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.CATALOG;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.CATALOG_SCHEMA;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.PASSWORD;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.PORT;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.PURGE_STAGING_TABLE;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.SERVER_HOSTNAME;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.STAGING_OBJECT_STORE;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.USERNAME;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyStagingStorageConfig.getStarburstGalaxyStagingStorageConfig;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public record StarburstGalaxyDestinationConfig(String galaxyServerHostname,
+ String galaxyPort,
+ String galaxyUsername,
+ String galaxyPassword,
+ String galaxyCatalog,
+ String galaxyCatalogSchema,
+ boolean purgeStagingData,
+ StarburstGalaxyStagingStorageConfig storageConfig) {
+
+ static final String DEFAULT_STARBURST_GALAXY_PORT = "443";
+ static final String DEFAULT_STARBURST_GALAXY_CATALOG_SCHEMA = "public";
+ static final boolean DEFAULT_PURGE_STAGING_TABLE = true;
+
+ public static StarburstGalaxyDestinationConfig get(final JsonNode config) {
+ checkArgument(
+ config.has(ACCEPT_TERMS) && config.get(ACCEPT_TERMS).asBoolean(),
+ "You must agree to the Starburst Galaxy Terms & Conditions to use this connector.");
+ return new StarburstGalaxyDestinationConfig(
+ config.get(SERVER_HOSTNAME).asText(),
+ config.has(PORT) ? config.get(PORT).asText() : DEFAULT_STARBURST_GALAXY_PORT,
+ config.get(USERNAME).asText(),
+ config.get(PASSWORD).asText(),
+ config.get(CATALOG).asText(),
+ config.has(CATALOG_SCHEMA) ? config.get(CATALOG_SCHEMA).asText() : DEFAULT_STARBURST_GALAXY_CATALOG_SCHEMA,
+ config.has(PURGE_STAGING_TABLE) ? config.get(PURGE_STAGING_TABLE).asBoolean() : DEFAULT_PURGE_STAGING_TABLE,
+ getStarburstGalaxyStagingStorageConfig(config.get(STAGING_OBJECT_STORE)));
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestinationResolver.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestinationResolver.java
new file mode 100644
index 0000000000000..9d823ec0eea99
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyDestinationResolver.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_BUCKET_NAME;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.STAGING_OBJECT_STORE;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyStagingStorageType.S3;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class StarburstGalaxyDestinationResolver {
+
+ public static StarburstGalaxyStagingStorageType getStagingStorageType(final JsonNode config) {
+ if (isS3StagingStore(config)) {
+ return S3;
+ }
+ throw new IllegalArgumentException("Staging storage configurations must be provided");
+ }
+
+ public static boolean isS3StagingStore(final JsonNode config) {
+ return config.has(STAGING_OBJECT_STORE) && config.get(STAGING_OBJECT_STORE).isObject() && config.get(STAGING_OBJECT_STORE).has(S_3_BUCKET_NAME);
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyNameTransformer.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyNameTransformer.java
new file mode 100644
index 0000000000000..1337156046b39
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyNameTransformer.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static java.util.Locale.ENGLISH;
+
+import io.airbyte.integrations.destination.StandardNameTransformer;
+
+public class StarburstGalaxyNameTransformer
+ extends StandardNameTransformer {
+
+ @Override
+ public String convertStreamName(final String input) {
+ return applyDefaultCase(super.convertStreamName(input));
+ }
+
+ @Override
+ public String getIdentifier(final String name) {
+ return applyDefaultCase(super.getIdentifier(name));
+ }
+
+ @Override
+ public String getTmpTableName(final String streamName) {
+ return applyDefaultCase(super.getTmpTableName(streamName));
+ }
+
+ @Override
+ public String getRawTableName(final String streamName) {
+ return applyDefaultCase(super.getRawTableName(streamName));
+ }
+
+ @Override
+ public String applyDefaultCase(final String input) {
+ return input.toLowerCase(ENGLISH);
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3Destination.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3Destination.java
new file mode 100644
index 0000000000000..d6f0b535e43d8
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3Destination.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.integrations.destination.s3.S3BaseChecks.attemptS3WriteAndDelete;
+
+import io.airbyte.integrations.destination.s3.S3DestinationConfig;
+import io.airbyte.integrations.destination.s3.S3StorageOperations;
+
+public class StarburstGalaxyS3Destination
+ extends StarburstGalaxyBaseDestination {
+
+ @Override
+ protected void checkPersistence(StarburstGalaxyStagingStorageConfig galaxyStorageConfig) {
+ S3DestinationConfig s3Config = galaxyStorageConfig.getS3DestinationConfigOrThrow();
+ attemptS3WriteAndDelete(new S3StorageOperations(getNameTransformer(), s3Config.getS3Client(), s3Config), s3Config, "");
+ }
+
+ @Override
+ protected StarburstGalaxyStreamCopierFactory getStreamCopierFactory() {
+ return new StarburstGalaxyS3StreamCopierFactory();
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StagingStorageConfig.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StagingStorageConfig.java
new file mode 100644
index 0000000000000..cb338ef3bc691
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StagingStorageConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_ACCESS_KEY_ID;
+import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_BUCKET_NAME;
+import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_BUCKET_PATH;
+import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_BUCKET_REGION;
+import static io.airbyte.integrations.destination.s3.constant.S3Constants.S_3_SECRET_ACCESS_KEY;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.airbyte.integrations.destination.s3.S3DestinationConfig;
+import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
+
+public class StarburstGalaxyS3StagingStorageConfig
+ extends StarburstGalaxyStagingStorageConfig {
+
+ private final S3DestinationConfig s3Config;
+
+ public StarburstGalaxyS3StagingStorageConfig(JsonNode config) {
+ final S3DestinationConfig.Builder builder = S3DestinationConfig.create(
+ config.get(S_3_BUCKET_NAME).asText(),
+ config.get(S_3_BUCKET_PATH).asText(),
+ config.get(S_3_BUCKET_REGION).asText())
+ .withAccessKeyCredential(
+ config.get(S_3_ACCESS_KEY_ID).asText(),
+ config.get(S_3_SECRET_ACCESS_KEY).asText())
+ .withFormatConfig(new S3ParquetFormatConfig(new ObjectMapper().createObjectNode()));
+ this.s3Config = builder.get();
+ }
+
+ @Override
+ public S3DestinationConfig getS3DestinationConfigOrThrow() {
+ return s3Config;
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StreamCopier.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StreamCopier.java
new file mode 100644
index 0000000000000..04742225219e6
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StreamCopier.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.joining;
+import static org.apache.iceberg.hadoop.Util.VERSION_HINT_FILENAME;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.fasterxml.jackson.databind.JsonNode;
+import io.airbyte.db.jdbc.JdbcDatabase;
+import io.airbyte.integrations.destination.StandardNameTransformer;
+import io.airbyte.integrations.destination.jdbc.SqlOperations;
+import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
+import io.airbyte.integrations.destination.s3.S3DestinationConfig;
+import io.airbyte.integrations.destination.s3.avro.AvroConstants;
+import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
+import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
+import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
+import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.UUID;
+import org.apache.avro.Schema;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+
+/**
+ * This implementation is similar to {@link StreamCopier}. The difference is that this
+ * implementation creates Parquet staging file(s), instead of CSV ones.
+ *
+ *
1. Parquet writer writes data stream into tmp Iceberg table in
+ * s3://bucket-name/bucket-path/namespace/schema/temp-Iceberg-table-name.
+ *
2. Creates(or modifies the schema of) the destination Iceberg table from the tmp Iceberg
+ * table schema in Galaxy Amazon S3 Catalog based on the destination sync mode
+ *
3. Copies the tmp Iceberg table data into the destination Iceberg table in Amazon S3 Galaxy
+ * Catalog.
+ *
5. Deletes the tmp Iceberg table.
+ *
+ */
+public class StarburstGalaxyS3StreamCopier
+ extends StarburstGalaxyStreamCopier {
+
+ private static final Logger LOGGER = getLogger(StarburstGalaxyS3StreamCopier.class);
+ private final AmazonS3 s3Client;
+ private final S3DestinationConfig s3Config;
+ private final HadoopCatalogIcebergS3ParquetWriter icebergWriter;
+ private final AvroRecordFactory avroRecordFactory;
+
+ public StarburstGalaxyS3StreamCopier(final String stagingFolder,
+ final String schema,
+ final ConfiguredAirbyteStream configuredStream,
+ final AmazonS3 s3Client,
+ final JdbcDatabase database,
+ final StarburstGalaxyDestinationConfig galaxyDestinationConfig,
+ final StandardNameTransformer nameTransformer,
+ final SqlOperations sqlOperations,
+ final Timestamp uploadTime)
+ throws Exception {
+ super(stagingFolder, schema, configuredStream, database, galaxyDestinationConfig, nameTransformer, sqlOperations);
+ this.s3Client = s3Client;
+ this.s3Config = galaxyDestinationConfig.storageConfig().getS3DestinationConfigOrThrow();
+ Schema avroSchema = getAvroSchema(configuredStream.getStream().getName(),
+ configuredStream.getStream().getNamespace(), configuredStream.getStream().getJsonSchema());
+ org.apache.iceberg.Schema icebergSchema = getIcebergSchema(avroSchema);
+ this.icebergWriter = new HadoopCatalogIcebergS3ParquetWriter(
+ galaxyDestinationConfig.storageConfig().getS3DestinationConfigOrThrow(), configuredStream, icebergSchema,
+ this.schemaName, this.tmpTableName, uploadTime);
+ this.avroRecordFactory = new AvroRecordFactory(avroSchema, AvroConstants.JSON_CONVERTER);
+ LOGGER.info("[Stream {}] Tmp table {} location: {}", streamName, tmpTableName, getTmpTableLocation());
+ LOGGER.info("[Stream {}] Iceberg schema: {}", streamName, icebergSchema);
+ this.galaxySchema = convertIcebergSchemaToGalaxySchema(icebergSchema);
+ }
+
+ static org.apache.iceberg.Schema getIcebergSchema(Schema avroSchema) {
+ MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
+ return ParquetSchemaUtil.convert(parquetSchema);
+ }
+
+ static Schema getAvroSchema(String streamName, String namespace, JsonNode jsonSchema) {
+ final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
+ return schemaConverter.getAvroSchema(jsonSchema, streamName, namespace, true, true, false, true);
+ }
+
+ @Override
+ public String prepareStagingFile() {
+ return String.join("/", s3Config.getBucketPath(), stagingFolder);
+ }
+
+ @Override
+ public void write(final UUID id, final AirbyteRecordMessage recordMessage, final String fileName) throws Exception {
+ recordMessage.setEmittedAt(recordMessage.getEmittedAt() * 1000); // Corresponding Galaxy type expects micro precision.
+ icebergWriter.write(avroRecordFactory.getAvroRecord(id, recordMessage));
+ }
+
+ @Override
+ public void closeStagingUploader(final boolean hasFailed) throws Exception {
+ icebergWriter.close(hasFailed);
+ }
+
+ @Override
+ protected String getTmpTableLocation() {
+ // Galaxy location privilege doesn't allow path starting with s3a
+ String tmpTableLocation = icebergWriter.getTable().location().replace("s3a://", "s3://");
+ LOGGER.info("[Stream {}] Tmp table location: {}", streamName, tmpTableLocation);
+ return tmpTableLocation;
+ }
+
+ @Override
+ protected String getTmpTableMetadataFileName()
+ throws IOException {
+ String tmpTableBasePath = icebergWriter.getTableStorageRelativePath();
+ LOGGER.info("[Stream {}] Tmp table base path: {}", streamName, tmpTableBasePath);
+ GetObjectRequest getObjectRequest = new GetObjectRequest(s3Config.getBucketName(),
+ tmpTableBasePath + "/metadata/" + VERSION_HINT_FILENAME);
+ S3Object object = s3Client.getObject(getObjectRequest);
+ String currentMetadataFileVersion = new String(object.getObjectContent().readAllBytes(), UTF_8).strip();
+ LOGGER.info("[Stream {}] Current metadata file version {}", streamName, currentMetadataFileVersion);
+ String metadataJsonFile = "v" + currentMetadataFileVersion + ".metadata.json";
+ String newMetadataJsonFileName =
+ "0".repeat(5 - currentMetadataFileVersion.length()) + currentMetadataFileVersion + "-" + randomUUID() + ".metadata.json";
+
+ // https://iceberg.apache.org/spec/#file-system-tables and
+ // https://iceberg.apache.org/spec/#metastore-tables follows different metadata file naming
+ // convention. Galaxy expect the version metadata file to always follow
+ // https://iceberg.apache.org/spec/#metastore-tables convention.
+ // Rename(copy) the metadata file name to follow Galaxy metadata file naming standards.
+ s3Client.copyObject(
+ s3Config.getBucketName(), tmpTableBasePath + "/metadata/" + metadataJsonFile,
+ s3Config.getBucketName(), tmpTableBasePath + "/metadata/" + newMetadataJsonFileName);
+
+ LOGGER.info("New metadata file: {}/{}/{}", tmpTableBasePath, "metadata", newMetadataJsonFileName);
+ return newMetadataJsonFileName;
+ }
+
+ @Override
+ public String generateMergeStatement(final String destTableName) {
+ String fields = String.join(
+ ", ",
+ galaxySchema.columns().stream()
+ .map(ColumnMetadata::name)
+ .collect(joining(", ")));
+ String insertData = format(
+ "INSERT INTO %s.%s(%s) SELECT %s FROM %s.%s",
+ quotedSchemaName,
+ destTableName,
+ fields,
+ fields,
+ quotedSchemaName,
+ tmpTableName);
+ LOGGER.info("[Stream {}] Insert source data into target: {}", streamName, insertData);
+ return insertData;
+ }
+
+ @Override
+ public void closeNonCurrentStagingFileWriters() throws Exception {
+ icebergWriter.close(false);
+ }
+
+ @Override
+ public String getCurrentFile() {
+ return "";
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StreamCopierFactory.java
new file mode 100644
index 0000000000000..542a04381a83f
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyS3StreamCopierFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory.getSchema;
+
+import com.amazonaws.services.s3.AmazonS3;
+import io.airbyte.db.jdbc.JdbcDatabase;
+import io.airbyte.integrations.destination.StandardNameTransformer;
+import io.airbyte.integrations.destination.jdbc.SqlOperations;
+import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
+import io.airbyte.integrations.destination.s3.S3DestinationConfig;
+import io.airbyte.protocol.models.v0.AirbyteStream;
+import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
+import java.sql.Timestamp;
+
+public class StarburstGalaxyS3StreamCopierFactory
+ implements StarburstGalaxyStreamCopierFactory {
+
+ @Override
+ public StreamCopier create(final String configuredSchema,
+ final StarburstGalaxyDestinationConfig starburstGalaxyConfig,
+ final String stagingFolder,
+ final ConfiguredAirbyteStream configuredStream,
+ final StandardNameTransformer nameTransformer,
+ final JdbcDatabase database,
+ final SqlOperations sqlOperations) {
+ try {
+ final AirbyteStream stream = configuredStream.getStream();
+ final String schema = getSchema(stream.getNamespace(), configuredSchema, nameTransformer);
+
+ S3DestinationConfig s3Config = starburstGalaxyConfig.storageConfig().getS3DestinationConfigOrThrow();
+ final AmazonS3 s3Client = s3Config.getS3Client();
+ final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());
+ return new StarburstGalaxyS3StreamCopier(stagingFolder, schema, configuredStream, s3Client, database,
+ starburstGalaxyConfig, nameTransformer, sqlOperations, uploadTimestamp);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxySqlOperations.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxySqlOperations.java
new file mode 100644
index 0000000000000..c43ae602f6396
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxySqlOperations.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static java.lang.String.format;
+
+import io.airbyte.db.jdbc.JdbcDatabase;
+import io.airbyte.integrations.base.JavaBaseConstants;
+import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
+import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
+import java.sql.SQLException;
+import java.util.List;
+
+public class StarburstGalaxySqlOperations
+ extends JdbcSqlOperations {
+
+ @Override
+ public void executeTransaction(final JdbcDatabase database, final List queries) throws Exception {
+ for (final String query : queries) {
+ database.execute(query);
+ }
+ }
+
+ @Override
+ public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
+ String createTable = format(
+ "CREATE TABLE IF NOT EXISTS %s.%s (%s VARCHAR, %s VARCHAR, %s TIMESTAMP(6)) WITH (format = 'PARQUET', type = 'ICEBERG')",
+ schemaName,
+ tableName,
+ JavaBaseConstants.COLUMN_NAME_AB_ID,
+ JavaBaseConstants.COLUMN_NAME_DATA,
+ JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
+ LOGGER.info("Create table: {}", createTable);
+ return createTable;
+ }
+
+ @Override
+ public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
+ String createSchema = format("CREATE SCHEMA IF NOT EXISTS %s", schemaName);
+ LOGGER.info("Create schema if not exists: {}", createSchema);
+ database.execute(createSchema);
+ }
+
+ @Override
+ public void insertRecordsInternal(final JdbcDatabase database,
+ final List records,
+ final String schemaName,
+ final String tmpTableName) {
+ // Do nothing. The records are copied into the table directly from the staging parquet file.
+ // So no manual insertion is needed.
+ }
+
+ @Override
+ public void dropTableIfExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
+ database.execute(format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName));
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStagingStorageConfig.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStagingStorageConfig.java
new file mode 100644
index 0000000000000..9dbeafad4c774
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStagingStorageConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyConstants.OBJECT_STORE_TYPE;
+import static io.airbyte.integrations.destination.starburst_galaxy.StarburstGalaxyStagingStorageType.S3;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.airbyte.integrations.destination.s3.S3DestinationConfig;
+import org.slf4j.Logger;
+
+public abstract class StarburstGalaxyStagingStorageConfig {
+
+ private static final Logger LOGGER = getLogger(StarburstGalaxyStagingStorageConfig.class);
+
+ public static StarburstGalaxyStagingStorageConfig getStarburstGalaxyStagingStorageConfig(final JsonNode config) {
+ final JsonNode typeConfig = config.get(OBJECT_STORE_TYPE);
+ LOGGER.info("Galaxy staging storage type config: {}", typeConfig.toString());
+ final StarburstGalaxyStagingStorageType storageType = StarburstGalaxyStagingStorageType.valueOf(typeConfig.asText().toUpperCase());
+ if (storageType == S3) {
+ return new StarburstGalaxyS3StagingStorageConfig(config);
+ }
+ throw new RuntimeException("Unsupported staging object store type: " + storageType);
+ }
+
+ public S3DestinationConfig getS3DestinationConfigOrThrow() {
+ throw new UnsupportedOperationException("Cannot get S3 destination config from " + this.getClass().getSimpleName());
+ }
+
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStagingStorageType.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStagingStorageType.java
new file mode 100644
index 0000000000000..d30ad4c94fdaa
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStagingStorageType.java
@@ -0,0 +1,9 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+public enum StarburstGalaxyStagingStorageType {
+ S3
+}
diff --git a/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStreamCopier.java b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStreamCopier.java
new file mode 100644
index 0000000000000..b622ec8cfd7cf
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-starburst-galaxy/src/main/java/io/airbyte/integrations/destination/starburst_galaxy/StarburstGalaxyStreamCopier.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.starburst_galaxy;
+
+import static io.airbyte.protocol.models.v0.DestinationSyncMode.APPEND;
+import static io.airbyte.protocol.models.v0.DestinationSyncMode.OVERWRITE;
+import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
+import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
+import static java.lang.String.format;
+import static java.util.Locale.ENGLISH;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import io.airbyte.db.jdbc.JdbcDatabase;
+import io.airbyte.integrations.destination.StandardNameTransformer;
+import io.airbyte.integrations.destination.jdbc.SqlOperations;
+import io.airbyte.integrations.destination.jdbc.copy.StreamCopier;
+import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
+import io.airbyte.protocol.models.v0.DestinationSyncMode;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This implementation is similar to {@link StreamCopier}. It performs the following operations:
+ *
+ *
1. Writes data stream into tmp Iceberg table in cloud storage.
+ *
2. Creates(or modifies the schema of) the destination Iceberg table in Galaxy Catalog based
+ * on the tmp Iceberg table schema.
+ *
4. Copies the tmp Iceberg table into the destination Iceberg table in Galaxy Catalog.