From cf0219202d238a1eb1b2fd6551f9ab0ffa295f07 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 25 Dec 2024 21:46:43 +0800 Subject: [PATCH] 1 fe side fe side ok --- fe/be-java-extensions/hudi-scanner/pom.xml | 1 + .../lakesoul-scanner/pom.xml | 1 + fe/fe-core/pom.xml | 16 +++++ .../iceberg/IcebergExternalCatalog.java | 1 + .../IcebergExternalCatalogFactory.java | 2 + .../IcebergS3TablesExternalCatalog.java | 69 ++++++++++++++++++ .../CustomAwsCredentialsProvider.java | 43 ++++++++++++ .../iceberg/source/IcebergScanNode.java | 1 + .../apache/doris/persist/gson/GsonUtils.java | 3 + .../datasource/s3tables/S3TablesTest.java | 70 +++++++++++++++++++ fe/pom.xml | 5 +- 11 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergS3TablesExternalCatalog.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/s3tables/CustomAwsCredentialsProvider.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/s3tables/S3TablesTest.java diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/hudi-scanner/pom.xml index c8f56e55a83031..ea94cdaa80b1b6 100644 --- a/fe/be-java-extensions/hudi-scanner/pom.xml +++ b/fe/be-java-extensions/hudi-scanner/pom.xml @@ -189,6 +189,7 @@ under the License. hudi-scanner + ${project.basedir}/target/ src/main/java src/test/java ${project.basedir}/target/ diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml b/fe/be-java-extensions/lakesoul-scanner/pom.xml index be2927300b362b..b10218d4760a4b 100644 --- a/fe/be-java-extensions/lakesoul-scanner/pom.xml +++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml @@ -64,6 +64,7 @@ under the License. lakesoul-scanner-jar-with-dependencies + ${project.basedir}/target/ src/main/java src/test/java ${project.basedir}/target/ diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index e1ae5dc60db6b4..4b7452fccb5081 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -928,6 +928,22 @@ under the License. azure-storage-blob-batch ${azure.sdk.batch.version} + + software.amazon.awssdk + s3tables + ${awssdk.version} + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + ${s3tables.catalog.version} + + + + software.amazon.awssdk + sdk-core + ${awssdk.version} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 6a3265388f3345..82ae49152bac7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -43,6 +43,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_HADOOP = "hadoop"; public static final String ICEBERG_GLUE = "glue"; public static final String ICEBERG_DLF = "dlf"; + public static final String ICEBERG_S3_TABLES = "s3tables"; public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; protected String icebergCatalogType; protected Catalog catalog; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java index e8f593f293c009..748c0805393b1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java @@ -41,6 +41,8 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String return new IcebergDLFExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_HADOOP: return new IcebergHadoopExternalCatalog(catalogId, name, resource, props, comment); + case IcebergExternalCatalog.ICEBERG_S3_TABLES: + return new IcebergS3TablesExternalCatalog(catalogId, name, resource, props, comment); default: throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " value: " + catalogType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergS3TablesExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergS3TablesExternalCatalog.java new file mode 100644 index 00000000000000..762938abfd153d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergS3TablesExternalCatalog.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.iceberg.s3tables.CustomAwsCredentialsProvider; +import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.datasource.property.constants.S3Properties; + +import com.google.common.collect.Maps; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import software.amazon.s3tables.iceberg.S3TablesCatalog; + +import java.util.HashMap; +import java.util.Map; + +public class IcebergS3TablesExternalCatalog extends IcebergExternalCatalog { + + public IcebergS3TablesExternalCatalog(long catalogId, String name, String resource, Map props, + String comment) { + super(catalogId, name, comment); + props = PropertyConverter.convertToMetaProperties(props); + catalogProperty = new CatalogProperty(resource, props); + } + + @Override + protected void initCatalog() { + icebergCatalogType = ICEBERG_S3_TABLES; + S3TablesCatalog s3TablesCatalog = new S3TablesCatalog(); + Map s3TablesCatalogProperties = convertToS3TablesCatalogProperties(); + String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION); + s3TablesCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); + s3TablesCatalog.initialize(getName(), s3TablesCatalogProperties); + catalog = s3TablesCatalog; + } + + private Map convertToS3TablesCatalogProperties() { + Map props = catalogProperty.getProperties(); + Map s3Properties = Maps.newHashMap(); + s3Properties.put("client.credentials-provider", CustomAwsCredentialsProvider.class.getName()); + if (props.containsKey(S3Properties.ACCESS_KEY)) { + s3Properties.put("client.credentials-provider.s3.access-key-id", props.get(S3Properties.ACCESS_KEY)); + } + if (props.containsKey(S3Properties.SECRET_KEY)) { + s3Properties.put("client.credentials-provider.s3.secret-access-key", props.get(S3Properties.SECRET_KEY)); + } + if (props.containsKey(S3Properties.REGION)) { + s3Properties.put("client.region", props.get(S3Properties.REGION)); + } + return s3Properties; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/s3tables/CustomAwsCredentialsProvider.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/s3tables/CustomAwsCredentialsProvider.java new file mode 100644 index 00000000000000..f42d01818dad23 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/s3tables/CustomAwsCredentialsProvider.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.s3tables; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +import java.util.Map; + +public class CustomAwsCredentialsProvider implements AwsCredentialsProvider { + private final String accessKeyId; + private final String secretAccessKey; + + public CustomAwsCredentialsProvider(String accessKeyId, String secretAccessKey) { + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + } + + @Override + public AwsCredentials resolveCredentials() { + return AwsBasicCredentials.create(accessKeyId, secretAccessKey); + } + + public static CustomAwsCredentialsProvider create(Map props) { + return new CustomAwsCredentialsProvider(props.get("s3.access-key-id"), props.get("s3.secret-access-key")); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 756c9024cdcdd1..491a09791c3e39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -121,6 +121,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol case IcebergExternalCatalog.ICEBERG_DLF: case IcebergExternalCatalog.ICEBERG_GLUE: case IcebergExternalCatalog.ICEBERG_HADOOP: + case IcebergExternalCatalog.ICEBERG_S3_TABLES: source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange); break; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 8a5516b7616999..380dce5c24a78e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -146,6 +146,7 @@ import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergS3TablesExternalCatalog; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaTable; import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase; @@ -417,6 +418,8 @@ public class GsonUtils { .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName()) + .registerSubtype(IcebergS3TablesExternalCatalog.class, + IcebergS3TablesExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonFileExternalCatalog.class, PaimonFileExternalCatalog.class.getSimpleName()) diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/s3tables/S3TablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/s3tables/S3TablesTest.java new file mode 100644 index 00000000000000..dd2da5beac4627 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/s3tables/S3TablesTest.java @@ -0,0 +1,70 @@ +package org.apache.doris.datasource.s3tables; + +import org.apache.doris.datasource.iceberg.s3tables.CustomAwsCredentialsProvider; + +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; +import org.junit.jupiter.api.Test; +import software.amazon.s3tables.iceberg.S3TablesCatalog; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class S3TablesTest { + + @Test + public void testS3TablesCatalog() { + S3TablesCatalog s3TablesCatalog = new S3TablesCatalog(); + Map s3Properties = new HashMap<>(); + + // ak, sk + String accessKeyId = ""; + String secretKey = ""; + + s3Properties.put("client.region", "us-east-1"); + s3Properties.put("client.credentials-provider", CustomAwsCredentialsProvider.class.getName()); + s3Properties.put("client.credentials-provider.s3.access-key-id", accessKeyId); + s3Properties.put("client.credentials-provider.s3.secret-access-key", secretKey); + + String warehouse = "arn:aws:s3tables:us-east-1:169698404049:bucket/yy-s3-table-bucket"; + s3Properties.put("warehouse", warehouse); + + try { + s3TablesCatalog.initialize("s3tables", s3Properties); + System.out.println("Successfully initialized S3 Tables catalog!"); + + try { + // 1. list namespaces + List namespaces = s3TablesCatalog.listNamespaces(); + System.out.println("Successfully listed namespaces:"); + for (Namespace namespace : namespaces) { + System.out.println(namespace); + // 2. list tables + List tblIdentifiers = s3TablesCatalog.listTables(namespace); + for (TableIdentifier tblId : tblIdentifiers) { + // 3. load table and list files + System.out.println(tblId); + Table tbl = s3TablesCatalog.loadTable(tblId); + System.out.println(tbl.schema()); + TableScan scan = tbl.newScan(); + CloseableIterable fileScanTasks = scan.planFiles(); + for (FileScanTask task : fileScanTasks) { + System.out.println(task.file()); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Note: Could not list namespaces - " + e.getMessage()); + } + } catch (Exception e) { + System.err.println("Error connecting to S3 Tables: " + e.getMessage()); + e.printStackTrace(); + } + } +} diff --git a/fe/pom.xml b/fe/pom.xml index 2c61fe32299c38..2a908409c28d56 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -222,7 +222,7 @@ under the License. be-java-extensions - 2.1.1 + 2.1.3 1.11.4 1.13.1 3.4.3 @@ -318,7 +318,7 @@ under the License. - 1.4.3 + 1.6.1 0.49.0-public 17.0.0 2.7.4-11 @@ -380,6 +380,7 @@ under the License. 5.3.0 3.15.0 2.29.26 + 0.1.4