Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
fe side

fe side ok
  • Loading branch information
morningman committed Jan 31, 2025
1 parent 29055f2 commit cf02192
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 2 deletions.
1 change: 1 addition & 0 deletions fe/be-java-extensions/hudi-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ under the License.
</dependencies>
<build>
<finalName>hudi-scanner</finalName>
<directory>${project.basedir}/target/</directory>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<directory>${project.basedir}/target/</directory>
Expand Down
1 change: 1 addition & 0 deletions fe/be-java-extensions/lakesoul-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ under the License.

<build>
<finalName>lakesoul-scanner-jar-with-dependencies</finalName>
<directory>${project.basedir}/target/</directory>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<directory>${project.basedir}/target/</directory>
Expand Down
16 changes: 16 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,22 @@ under the License.
<artifactId>azure-storage-blob-batch</artifactId>
<version>${azure.sdk.batch.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3tables</artifactId>
<version>${awssdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.s3tables</groupId>
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
<version>${s3tables.catalog.version}</version>
</dependency>
<!-- AWS SDK Core -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>${awssdk.version}</version>
</dependency>
</dependencies>
<repositories>
<!-- for huawei obs sdk -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
public static final String ICEBERG_HADOOP = "hadoop";
public static final String ICEBERG_GLUE = "glue";
public static final String ICEBERG_DLF = "dlf";
public static final String ICEBERG_S3_TABLES = "s3tables";
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
protected String icebergCatalogType;
protected Catalog catalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String
return new IcebergDLFExternalCatalog(catalogId, name, resource, props, comment);
case IcebergExternalCatalog.ICEBERG_HADOOP:
return new IcebergHadoopExternalCatalog(catalogId, name, resource, props, comment);
case IcebergExternalCatalog.ICEBERG_S3_TABLES:
return new IcebergS3TablesExternalCatalog(catalogId, name, resource, props, comment);
default:
throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
+ " value: " + catalogType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg;

import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.iceberg.s3tables.CustomAwsCredentialsProvider;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;

import com.google.common.collect.Maps;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import software.amazon.s3tables.iceberg.S3TablesCatalog;

import java.util.HashMap;
import java.util.Map;

public class IcebergS3TablesExternalCatalog extends IcebergExternalCatalog {

public IcebergS3TablesExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment) {
super(catalogId, name, comment);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}

@Override
protected void initCatalog() {
icebergCatalogType = ICEBERG_S3_TABLES;
S3TablesCatalog s3TablesCatalog = new S3TablesCatalog();
Map<String, String> s3TablesCatalogProperties = convertToS3TablesCatalogProperties();
String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
s3TablesCatalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
s3TablesCatalog.initialize(getName(), s3TablesCatalogProperties);
catalog = s3TablesCatalog;
}

private Map<String, String> convertToS3TablesCatalogProperties() {
Map<String, String> props = catalogProperty.getProperties();
Map<String, String> s3Properties = Maps.newHashMap();
s3Properties.put("client.credentials-provider", CustomAwsCredentialsProvider.class.getName());
if (props.containsKey(S3Properties.ACCESS_KEY)) {
s3Properties.put("client.credentials-provider.s3.access-key-id", props.get(S3Properties.ACCESS_KEY));
}
if (props.containsKey(S3Properties.SECRET_KEY)) {
s3Properties.put("client.credentials-provider.s3.secret-access-key", props.get(S3Properties.SECRET_KEY));
}
if (props.containsKey(S3Properties.REGION)) {
s3Properties.put("client.region", props.get(S3Properties.REGION));
}
return s3Properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.s3tables;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

import java.util.Map;

public class CustomAwsCredentialsProvider implements AwsCredentialsProvider {
private final String accessKeyId;
private final String secretAccessKey;

public CustomAwsCredentialsProvider(String accessKeyId, String secretAccessKey) {
this.accessKeyId = accessKeyId;
this.secretAccessKey = secretAccessKey;
}

@Override
public AwsCredentials resolveCredentials() {
return AwsBasicCredentials.create(accessKeyId, secretAccessKey);
}

public static CustomAwsCredentialsProvider create(Map<String, String> props) {
return new CustomAwsCredentialsProvider(props.get("s3.access-key-id"), props.get("s3.secret-access-key"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol
case IcebergExternalCatalog.ICEBERG_DLF:
case IcebergExternalCatalog.ICEBERG_GLUE:
case IcebergExternalCatalog.ICEBERG_HADOOP:
case IcebergExternalCatalog.ICEBERG_S3_TABLES:
source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergS3TablesExternalCatalog;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaTable;
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
Expand Down Expand Up @@ -417,6 +418,8 @@ public class GsonUtils {
.registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergS3TablesExternalCatalog.class,
IcebergS3TablesExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonFileExternalCatalog.class, PaimonFileExternalCatalog.class.getSimpleName())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.apache.doris.datasource.s3tables;

import org.apache.doris.datasource.iceberg.s3tables.CustomAwsCredentialsProvider;

import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.junit.jupiter.api.Test;
import software.amazon.s3tables.iceberg.S3TablesCatalog;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class S3TablesTest {

@Test
public void testS3TablesCatalog() {
S3TablesCatalog s3TablesCatalog = new S3TablesCatalog();
Map<String, String> s3Properties = new HashMap<>();

// ak, sk
String accessKeyId = "";
String secretKey = "";

s3Properties.put("client.region", "us-east-1");
s3Properties.put("client.credentials-provider", CustomAwsCredentialsProvider.class.getName());
s3Properties.put("client.credentials-provider.s3.access-key-id", accessKeyId);
s3Properties.put("client.credentials-provider.s3.secret-access-key", secretKey);

String warehouse = "arn:aws:s3tables:us-east-1:169698404049:bucket/yy-s3-table-bucket";
s3Properties.put("warehouse", warehouse);

try {
s3TablesCatalog.initialize("s3tables", s3Properties);
System.out.println("Successfully initialized S3 Tables catalog!");

try {
// 1. list namespaces
List<Namespace> namespaces = s3TablesCatalog.listNamespaces();
System.out.println("Successfully listed namespaces:");
for (Namespace namespace : namespaces) {
System.out.println(namespace);
// 2. list tables
List<TableIdentifier> tblIdentifiers = s3TablesCatalog.listTables(namespace);
for (TableIdentifier tblId : tblIdentifiers) {
// 3. load table and list files
System.out.println(tblId);
Table tbl = s3TablesCatalog.loadTable(tblId);
System.out.println(tbl.schema());
TableScan scan = tbl.newScan();
CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles();
for (FileScanTask task : fileScanTasks) {
System.out.println(task.file());
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("Note: Could not list namespaces - " + e.getMessage());
}
} catch (Exception e) {
System.err.println("Error connecting to S3 Tables: " + e.getMessage());
e.printStackTrace();
}
}
}
5 changes: 3 additions & 2 deletions fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ under the License.
<module>be-java-extensions</module>
</modules>
<properties>
<doris.hive.catalog.shade.version>2.1.1</doris.hive.catalog.shade.version>
<doris.hive.catalog.shade.version>2.1.3</doris.hive.catalog.shade.version>
<avro.version>1.11.4</avro.version>
<parquet.version>1.13.1</parquet.version>
<spark.version>3.4.3</spark.version>
Expand Down Expand Up @@ -318,7 +318,7 @@ under the License.
<!-- ATTN: avro version must be consistent with Iceberg version -->
<!-- Please modify iceberg.version and avro.version together,
you can find avro version info in iceberg mvn repository -->
<iceberg.version>1.4.3</iceberg.version>
<iceberg.version>1.6.1</iceberg.version>
<maxcompute.version>0.49.0-public</maxcompute.version>
<arrow.version>17.0.0</arrow.version>
<presto.hadoop.version>2.7.4-11</presto.hadoop.version>
Expand Down Expand Up @@ -380,6 +380,7 @@ under the License.
<semver4j.version>5.3.0</semver4j.version>
<aliyun-sdk-oss.version>3.15.0</aliyun-sdk-oss.version>
<awssdk.version>2.29.26</awssdk.version>
<s3tables.catalog.version>0.1.4</s3tables.catalog.version>
</properties>
<profiles>
<profile>
Expand Down

0 comments on commit cf02192

Please sign in to comment.