diff --git a/pom.xml b/pom.xml index 7a5973428..5450f174d 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ 1.18.20.0 3.4.0 0.14.0 + 2.3.9 3.3.1 3.8.0 3.2.4 diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 63e9d6733..0d50537bb 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -51,6 +51,7 @@ public class ConversionConfig { SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; + this.targetCatalogs = targetCatalogs; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); diff --git a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java index af85c9007..f02adc308 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java @@ -31,7 +31,8 @@ public enum ErrorCode { UNSUPPORTED_SCHEMA_TYPE(10007), UNSUPPORTED_FEATURE(10008), PARSE_EXCEPTION(10009), - CATALOG_REFRESH_EXCEPTION(10010); + CATALOG_REFRESH_EXCEPTION(10010), + CATALOG_SYNC_GENERIC_EXCEPTION(10011); private final int errorCode; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java new file mode 100644 index 000000000..f1a91c98d --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.storage; + +public class CatalogType { + public static final String HMS = "HMS"; +} diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index 80de22991..cb0e7df0e 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -137,6 +137,94 @@ spark-sql_${scala.binary.version} provided + + org.apache.iceberg + iceberg-hive-runtime + ${iceberg.version} + + + + + org.apache.hive + hive-metastore + ${hive.version} + + + log4j + * + + + org.slf4j + * + + + org.apache.logging.log4j + * + + + org.apache.parquet + * + + + + + org.apache.hive + hive-jdbc + ${hive.version} + + + log4j + * + + + org.slf4j + * + + + org.apache.logging.log4j + * + + + org.apache.parquet + * + + + + + org.apache.hive + hive-common + ${hive.version} + + + org.apache.logging.log4j + * + + + + + org.apache.hive + hive-exec + ${hive.version} + core + + + org.pentaho + * + + + org.apache.logging.log4j + * + + + org.codehaus.janino + * + + + log4j + * + + + @@ -144,6 +232,10 @@ mockito-core test + + org.mockito + mockito-junit-jupiter + diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogTableBuilder.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogTableBuilder.java new file mode 100644 index 000000000..6216d111c --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogTableBuilder.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; + +/** + * The interface for creating/updating catalog table object, each catalog can have its own + * implementation that can be plugged in. + */ +public interface CatalogTableBuilder { + public REQUEST getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier); + + public REQUEST getUpdateTableRequest( + InternalTable table, TABLE catalogTable, CatalogTableIdentifier tableIdentifier); +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java b/xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java new file mode 100644 index 000000000..fec8607fd --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +public class Constants { + + public static final String PROP_SPARK_SQL_SOURCES_PROVIDER = "spark.sql.sources.provider"; + public static final String PROP_PATH = "path"; + public static final String PROP_SERIALIZATION_FORMAT = "serialization.format"; + public static final String PROP_EXTERNAL = "EXTERNAL"; +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java index 3649ae8e0..109fe0de4 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java @@ -20,16 +20,27 @@ import java.util.Map; +import org.apache.xtable.catalog.hms.HMSCatalogConversionSource; +import org.apache.xtable.catalog.hms.HMSCatalogSyncClient; import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.storage.CatalogType; /** A factory class which returns {@link ExternalCatalogConfig} based on catalogType. */ public class ExternalCatalogConfigFactory { public static ExternalCatalogConfig fromCatalogType( String catalogType, String catalogId, Map properties) { - // TODO: Choose existing implementation based on catalogType. - String catalogSyncClientImpl = ""; - String catalogConversionSourceImpl = ""; + String catalogSyncClientImpl; + String catalogConversionSourceImpl; + switch (catalogType) { + case CatalogType.HMS: + catalogSyncClientImpl = HMSCatalogSyncClient.class.getName(); + catalogConversionSourceImpl = HMSCatalogConversionSource.class.getName(); + break; + default: + throw new NotSupportedException("Unsupported catalogType: " + catalogType); + } return ExternalCatalogConfig.builder() .catalogType(catalogType) .catalogSyncClientImpl(catalogSyncClientImpl) diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/TableFormatUtils.java b/xtable-core/src/main/java/org/apache/xtable/catalog/TableFormatUtils.java new file mode 100644 index 000000000..78825ae42 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/TableFormatUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER; + +import java.util.Map; + +import org.apache.iceberg.TableProperties; + +import com.google.common.base.Strings; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.storage.TableFormat; + +public class TableFormatUtils { + + public static String getTableDataLocation( + String tableFormat, String tableLocation, Map properties) { + switch (tableFormat) { + case TableFormat.ICEBERG: + return getIcebergDataLocation(tableLocation, properties); + case TableFormat.DELTA: + case TableFormat.HUDI: + return tableLocation; + default: + throw new NotSupportedException("Unsupported table format: " + tableFormat); + } + } + + /** Get iceberg table data files location */ + private static String getIcebergDataLocation( + String tableLocation, Map properties) { + String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH); + if (dataLocation == null) { + dataLocation = String.format("%s/data", tableLocation); + } + } + } + return dataLocation; + } + + // Get table format name from table properties + public static String getTableFormat(Map properties) { + // - In case of ICEBERG, table_type param will give the table format + // - In case of DELTA, table_type or spark.sql.sources.provider param will give the table + // format + // - In case of HUDI, spark.sql.sources.provider param will give the table format + String tableFormat = properties.get(TABLE_TYPE_PROP); + if (Strings.isNullOrEmpty(tableFormat)) { + tableFormat = properties.get(PROP_SPARK_SQL_SOURCES_PROVIDER); + } + return tableFormat; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogConfig.java b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogConfig.java new file mode 100644 index 000000000..ba8b18dcb --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogConfig.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import java.util.Map; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +@Getter +@EqualsAndHashCode +@ToString +public class HMSCatalogConfig { + + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @JsonProperty("externalCatalog.hms.serverUrl") + private String serverUrl; + + protected static HMSCatalogConfig of(Map properties) { + try { + return OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(properties), HMSCatalogConfig.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogConversionSource.java new file mode 100644 index 000000000..18c1c2899 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogConversionSource.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import java.util.Locale; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.thrift.TException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; + +import org.apache.xtable.catalog.TableFormatUtils; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.spi.extractor.CatalogConversionSource; + +public class HMSCatalogConversionSource implements CatalogConversionSource { + + private final HMSCatalogConfig hmsCatalogConfig; + private final IMetaStoreClient metaStoreClient; + + public HMSCatalogConversionSource( + ExternalCatalogConfig catalogConfig, Configuration configuration) { + this.hmsCatalogConfig = HMSCatalogConfig.of(catalogConfig.getCatalogProperties()); + try { + this.metaStoreClient = new HMSClientProvider(hmsCatalogConfig, configuration).getMSC(); + } catch (MetaException | HiveException e) { + throw new CatalogSyncException("HiveMetastoreClient could not be created", e); + } + } + + @VisibleForTesting + HMSCatalogConversionSource(HMSCatalogConfig hmsCatalogConfig, IMetaStoreClient metaStoreClient) { + this.hmsCatalogConfig = hmsCatalogConfig; + this.metaStoreClient = metaStoreClient; + } + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + try { + Table table = + metaStoreClient.getTable( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()); + if (table == null) { + throw new IllegalStateException(String.format("table: %s is null", tableIdentifier)); + } + + String tableFormat = TableFormatUtils.getTableFormat(table.getParameters()); + if (Strings.isNullOrEmpty(tableFormat)) { + throw new IllegalStateException( + String.format("TableFormat is null or empty for table: %s", tableIdentifier)); + } + tableFormat = tableFormat.toUpperCase(Locale.ENGLISH); + + String tableLocation = table.getSd().getLocation(); + String dataPath = + TableFormatUtils.getTableDataLocation(tableFormat, tableLocation, table.getParameters()); + + Properties tableProperties = new Properties(); + tableProperties.putAll(table.getParameters()); + return SourceTable.builder() + .name(table.getTableName()) + .basePath(tableLocation) + .dataPath(dataPath) + .formatName(tableFormat) + .additionalProperties(tableProperties) + .build(); + } catch (TException e) { + throw new CatalogSyncException("Failed to get table: " + tableIdentifier, e); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogSyncClient.java b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogSyncClient.java new file mode 100644 index 000000000..494024ef3 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogSyncClient.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import java.time.ZonedDateTime; +import java.util.Collections; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.thrift.TException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +@Log4j2 +public class HMSCatalogSyncClient implements CatalogSyncClient { + + private static final String TEMP_SUFFIX = "_temp"; + private final ExternalCatalogConfig catalogConfig; + private final HMSCatalogConfig hmsCatalogConfig; + private final Configuration configuration; + private final IMetaStoreClient metaStoreClient; + private final CatalogTableBuilder tableBuilder; + + public HMSCatalogSyncClient( + ExternalCatalogConfig catalogConfig, Configuration configuration, String tableFormat) { + this.catalogConfig = catalogConfig; + this.hmsCatalogConfig = HMSCatalogConfig.of(catalogConfig.getCatalogProperties()); + this.configuration = configuration; + try { + this.metaStoreClient = new HMSClientProvider(hmsCatalogConfig, configuration).getMSC(); + } catch (MetaException | HiveException e) { + throw new CatalogSyncException("HiveMetastoreClient could not be created", e); + } + this.tableBuilder = + HMSCatalogTableBuilderFactory.getTableBuilder(tableFormat, this.configuration); + } + + @VisibleForTesting + HMSCatalogSyncClient( + ExternalCatalogConfig catalogConfig, + HMSCatalogConfig hmsCatalogConfig, + Configuration configuration, + IMetaStoreClient metaStoreClient, + CatalogTableBuilder tableBuilder) { + this.catalogConfig = catalogConfig; + this.hmsCatalogConfig = hmsCatalogConfig; + this.configuration = configuration; + this.metaStoreClient = metaStoreClient; + this.tableBuilder = tableBuilder; + } + + @Override + public String getCatalogId() { + return catalogConfig.getCatalogId(); + } + + @Override + public String getStorageLocation(Table table) { + if (table == null || table.getSd() == null) { + return null; + } + return table.getSd().getLocation(); + } + + @Override + public boolean hasDatabase(String databaseName) { + try { + return metaStoreClient.getDatabase(databaseName) != null; + } catch (NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new CatalogSyncException("Failed to get database: " + databaseName, e); + } + } + + @Override + public void createDatabase(String databaseName) { + try { + Database database = + new Database( + databaseName, + "Created by " + this.getClass().getName(), + null, + Collections.emptyMap()); + metaStoreClient.createDatabase(database); + } catch (TException e) { + throw new CatalogSyncException("Failed to create database: " + databaseName, e); + } + } + + @Override + public Table getTable(CatalogTableIdentifier tableIdentifier) { + try { + return metaStoreClient.getTable( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()); + } catch (NoSuchObjectException e) { + return null; + } catch (TException e) { + throw new CatalogSyncException("Failed to get table: " + tableIdentifier, e); + } + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + Table hmsTable = tableBuilder.getCreateTableRequest(table, tableIdentifier); + try { + metaStoreClient.createTable(hmsTable); + } catch (TException e) { + throw new CatalogSyncException("Failed to create table: " + tableIdentifier, e); + } + } + + @Override + public void refreshTable( + InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) { + catalogTable = tableBuilder.getUpdateTableRequest(table, catalogTable, tableIdentifier); + try { + metaStoreClient.alter_table( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), catalogTable); + } catch (TException e) { + throw new CatalogSyncException("Failed to refresh table: " + tableIdentifier, e); + } + } + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + // validate before dropping the table + validateTempTableCreation(table, tableIdentifier); + dropTable(table, tableIdentifier); + createTable(table, tableIdentifier); + } + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + try { + metaStoreClient.dropTable(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()); + } catch (TException e) { + throw new CatalogSyncException("Failed to drop table: " + tableIdentifier, e); + } + } + + /** + * creates a temp table with new metadata and properties to ensure table creation succeeds before + * dropping the table and recreating it. This ensures that actual table is not dropped in case + * there are any issues + */ + private void validateTempTableCreation( + InternalTable table, CatalogTableIdentifier tableIdentifier) { + String tempTableName = + tableIdentifier.getTableName() + TEMP_SUFFIX + ZonedDateTime.now().toEpochSecond(); + CatalogTableIdentifier tempTableIdentifier = + CatalogTableIdentifier.builder() + .tableName(tempTableName) + .databaseName(tableIdentifier.getDatabaseName()) + .build(); + createTable(table, tempTableIdentifier); + dropTable(table, tempTableIdentifier); + } + + @Override + public void close() throws Exception { + if (metaStoreClient != null) { + metaStoreClient.close(); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogTableBuilderFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogTableBuilderFactory.java new file mode 100644 index 000000000..df81c838a --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSCatalogTableBuilderFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import java.io.IOException; +import java.time.ZonedDateTime; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.catalog.hms.table.IcebergHMSCatalogTableBuilder; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.storage.TableFormat; + +public class HMSCatalogTableBuilderFactory { + + public static CatalogTableBuilder getTableBuilder( + String tableFormat, Configuration configuration) { + switch (tableFormat) { + case TableFormat.ICEBERG: + return new IcebergHMSCatalogTableBuilder(configuration); + default: + throw new NotSupportedException("Unsupported table format: " + tableFormat); + } + } + + public static Table newHmsTable( + CatalogTableIdentifier tableIdentifier, + StorageDescriptor storageDescriptor, + Map params) { + try { + Table newTb = new Table(); + newTb.setDbName(tableIdentifier.getDatabaseName()); + newTb.setTableName(tableIdentifier.getTableName()); + newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + newTb.setCreateTime((int) ZonedDateTime.now().toEpochSecond()); + newTb.setSd(storageDescriptor); + newTb.setTableType(TableType.EXTERNAL_TABLE.toString()); + newTb.setParameters(params); + return newTb; + } catch (IOException e) { + throw new RuntimeException("Failed to set owner for hms table: " + tableIdentifier, e); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSClientProvider.java b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSClientProvider.java new file mode 100644 index 000000000..4aeca35c3 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSClientProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; + +import java.lang.reflect.InvocationTargetException; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +@Log4j2 +public class HMSClientProvider { + + private final HMSCatalogConfig hmsCatalogConfig; + private final Configuration configuration; + + public HMSClientProvider(HMSCatalogConfig hmsCatalogConfig, Configuration configuration) { + this.hmsCatalogConfig = hmsCatalogConfig; + this.configuration = configuration; + } + + public IMetaStoreClient getMSC() throws MetaException, HiveException { + HiveConf hiveConf = new HiveConf(configuration, HiveConf.class); + hiveConf.set(METASTOREURIS.varname, hmsCatalogConfig.getServerUrl()); + IMetaStoreClient metaStoreClient; + try { + metaStoreClient = + ((Hive) + Hive.class + .getMethod("getWithoutRegisterFns", HiveConf.class) + .invoke(null, hiveConf)) + .getMSC(); + } catch (NoSuchMethodException + | IllegalAccessException + | IllegalArgumentException + | InvocationTargetException ex) { + metaStoreClient = Hive.get(hiveConf).getMSC(); + } + log.debug("Connected to metastore with uri: {}", hmsCatalogConfig.getServerUrl()); + return metaStoreClient; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSSchemaExtractor.java new file mode 100644 index 000000000..7e6dc9825 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/HMSSchemaExtractor.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; + +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.exception.SchemaExtractorException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class HMSSchemaExtractor { + + private static final HMSSchemaExtractor INSTANCE = new HMSSchemaExtractor(); + + public static HMSSchemaExtractor getInstance() { + return INSTANCE; + } + + /** + * Extract HMS schema from OneTable schema + * + * @param tableFormat tableFormat to handle format specific type conversion + * @param tableSchema OneTable schema + * @return HMS Field schema list + */ + public List toColumns(String tableFormat, InternalSchema tableSchema) { + return tableSchema.getFields().stream() + .map( + field -> + new FieldSchema( + field.getName(), + convertToTypeString(field.getSchema()), + field.getSchema().getComment())) + .collect(Collectors.toList()); + } + + private String convertToTypeString(InternalSchema fieldSchema) { + switch (fieldSchema.getDataType()) { + case BOOLEAN: + return "boolean"; + case INT: + return "int"; + case LONG: + return "bigint"; + case FLOAT: + return "float"; + case DOUBLE: + return "double"; + case DATE: + return "date"; + case ENUM: + case STRING: + return "string"; + case TIMESTAMP: + case TIMESTAMP_NTZ: + return "timestamp"; + case FIXED: + case BYTES: + return "binary"; + case DECIMAL: + Map metadata = fieldSchema.getMetadata(); + if (metadata == null || metadata.isEmpty()) { + throw new NotSupportedException("Invalid decimal type, precision and scale is missing"); + } + int precision = + (int) + metadata.computeIfAbsent( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + k -> { + throw new NotSupportedException("Invalid decimal type, precision is missing"); + }); + int scale = + (int) + metadata.computeIfAbsent( + InternalSchema.MetadataKey.DECIMAL_SCALE, + k -> { + throw new NotSupportedException("Invalid decimal type, scale is missing"); + }); + return String.format("decimal(%s,%s)", precision, scale); + case RECORD: + final String nameToType = + fieldSchema.getFields().stream() + .map(f -> String.format("%s:%s", f.getName(), convertToTypeString(f.getSchema()))) + .collect(Collectors.joining(",")); + return String.format("struct<%s>", nameToType); + case LIST: + InternalField arrayElement = + fieldSchema.getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals( + arrayField.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid array schema")); + return String.format("array<%s>", convertToTypeString(arrayElement.getSchema())); + case MAP: + InternalField key = + fieldSchema.getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); + InternalField value = + fieldSchema.getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new SchemaExtractorException("Invalid map schema")); + return String.format( + "map<%s,%s>", + convertToTypeString(key.getSchema()), convertToTypeString(value.getSchema())); + default: + throw new NotSupportedException("Unsupported type: " + fieldSchema.getDataType()); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/hms/table/IcebergHMSCatalogTableBuilder.java b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/table/IcebergHMSCatalogTableBuilder.java new file mode 100644 index 000000000..c24cf6a09 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/hms/table/IcebergHMSCatalogTableBuilder.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms.table; + +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.xtable.catalog.Constants.PROP_EXTERNAL; +import static org.apache.xtable.catalog.hms.HMSCatalogTableBuilderFactory.newHmsTable; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; + +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.mr.hive.HiveIcebergInputFormat; +import org.apache.iceberg.mr.hive.HiveIcebergOutputFormat; +import org.apache.iceberg.mr.hive.HiveIcebergSerDe; +import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.catalog.hms.HMSSchemaExtractor; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.storage.TableFormat; + +public class IcebergHMSCatalogTableBuilder implements CatalogTableBuilder { + + private static final String ICEBERG_CATALOG_NAME_PROP = "iceberg.catalog"; + private static final String ICEBERG_HADOOP_TABLE_NAME = "location_based_table"; + private static final String tableFormat = TableFormat.ICEBERG; + private final HMSSchemaExtractor schemaExtractor; + private final HadoopTables hadoopTables; + + public IcebergHMSCatalogTableBuilder(Configuration configuration) { + this.schemaExtractor = HMSSchemaExtractor.getInstance(); + this.hadoopTables = new HadoopTables(configuration); + } + + @VisibleForTesting + IcebergHMSCatalogTableBuilder(HMSSchemaExtractor schemaExtractor, HadoopTables hadoopTables) { + this.schemaExtractor = schemaExtractor; + this.hadoopTables = hadoopTables; + } + + @Override + public Table getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier) { + return newHmsTable( + tableIdentifier, + getStorageDescriptor(table), + getTableParameters(loadTableFromFs(table.getBasePath()))); + } + + @Override + public Table getUpdateTableRequest( + InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) { + BaseTable icebergTable = loadTableFromFs(table.getBasePath()); + Table copyTb = new Table(catalogTable); + Map parameters = copyTb.getParameters(); + parameters.putAll(icebergTable.properties()); + String currentMetadataLocation = parameters.get(METADATA_LOCATION_PROP); + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation); + parameters.put(METADATA_LOCATION_PROP, getMetadataFileLocation(icebergTable)); + copyTb.setParameters(parameters); + copyTb.getSd().setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema())); + return copyTb; + } + + @VisibleForTesting + StorageDescriptor getStorageDescriptor(InternalTable table) { + final StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema())); + storageDescriptor.setLocation(table.getBasePath()); + storageDescriptor.setInputFormat(HiveIcebergInputFormat.class.getCanonicalName()); + storageDescriptor.setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName()); + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setSerializationLib(HiveIcebergSerDe.class.getCanonicalName()); + storageDescriptor.setSerdeInfo(serDeInfo); + return storageDescriptor; + } + + @VisibleForTesting + Map getTableParameters(BaseTable icebergTable) { + Map parameters = new HashMap<>(icebergTable.properties()); + parameters.put(PROP_EXTERNAL, "TRUE"); + parameters.put(TABLE_TYPE_PROP, tableFormat); + parameters.put(METADATA_LOCATION_PROP, getMetadataFileLocation(icebergTable)); + parameters.put( + hive_metastoreConstants.META_TABLE_STORAGE, + HiveIcebergStorageHandler.class.getCanonicalName()); + parameters.put(ICEBERG_CATALOG_NAME_PROP, ICEBERG_HADOOP_TABLE_NAME); + return parameters; + } + + private BaseTable loadTableFromFs(String tableBasePath) { + return (BaseTable) hadoopTables.load(tableBasePath); + } + + private String getMetadataFileLocation(BaseTable table) { + return table.operations().current().metadataFileLocation(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java new file mode 100644 index 000000000..56e56df99 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.exception; + +import org.apache.xtable.model.exception.ErrorCode; +import org.apache.xtable.model.exception.InternalException; + +public class CatalogSyncException extends InternalException { + + public CatalogSyncException(ErrorCode errorCode, String message, Throwable e) { + super(errorCode, message, e); + } + + public CatalogSyncException(String message, Throwable e) { + super(ErrorCode.CATALOG_SYNC_GENERIC_EXCEPTION, message, e); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestSchemaExtractorBase.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestSchemaExtractorBase.java new file mode 100644 index 000000000..c8c4cc3b7 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestSchemaExtractorBase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import java.util.Collections; +import java.util.Map; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; + +public class TestSchemaExtractorBase { + protected static InternalField getPrimitiveOneField( + String fieldName, String schemaName, InternalType dataType, boolean isNullable, int fieldId) { + return getPrimitiveOneField( + fieldName, schemaName, dataType, isNullable, fieldId, Collections.emptyMap()); + } + + protected static InternalField getPrimitiveOneField( + String fieldName, + String schemaName, + InternalType dataType, + boolean isNullable, + int fieldId, + String parentPath) { + return getPrimitiveOneField( + fieldName, schemaName, dataType, isNullable, fieldId, parentPath, Collections.emptyMap()); + } + + protected static InternalField getPrimitiveOneField( + String fieldName, + String schemaName, + InternalType dataType, + boolean isNullable, + int fieldId, + Map metadata) { + return getPrimitiveOneField( + fieldName, schemaName, dataType, isNullable, fieldId, null, metadata); + } + + protected static InternalField getPrimitiveOneField( + String fieldName, + String schemaName, + InternalType dataType, + boolean isNullable, + int fieldId, + String parentPath, + Map metadata) { + return InternalField.builder() + .name(fieldName) + .parentPath(parentPath) + .schema( + InternalSchema.builder() + .name(schemaName) + .dataType(dataType) + .isNullable(isNullable) + .metadata(metadata) + .build()) + .fieldId(fieldId) + .build(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/hms/HMSCatalogSyncClientTestBase.java b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/HMSCatalogSyncClientTestBase.java new file mode 100644 index 000000000..21bcb0d15 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/HMSCatalogSyncClientTestBase.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.mockito.Mock; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.model.storage.TableFormat; + +public class HMSCatalogSyncClientTestBase { + + @Mock protected IMetaStoreClient mockMetaStoreClient; + @Mock protected HMSCatalogConfig mockHMSCatalogConfig; + @Mock protected HMSSchemaExtractor mockHmsSchemaExtractor; + protected Configuration testConfiguration = new Configuration(); + + protected static final String TEST_HMS_DATABASE = "hms_db"; + protected static final String TEST_HMS_TABLE = "hms_table"; + protected static final String TEST_BASE_PATH = "base-path"; + protected static final String TEST_CATALOG_NAME = "hms-1"; + protected static final ExternalCatalogConfig TEST_CATALOG_CONFIG = + ExternalCatalogConfig.builder() + .catalogId(TEST_CATALOG_NAME) + .catalogType(CatalogType.HMS) + .catalogSyncClientImpl(HMSCatalogSyncClient.class.getCanonicalName()) + .catalogProperties(Collections.emptyMap()) + .build(); + + protected static final String ICEBERG_METADATA_FILE_LOCATION = "base-path/metadata"; + protected static final String ICEBERG_METADATA_FILE_LOCATION_V2 = "base-path/v2-metadata"; + protected static final InternalTable TEST_ICEBERG_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.ICEBERG) + .readSchema(InternalSchema.builder().fields(Collections.emptyList()).build()) + .build(); + protected static final InternalTable TEST_HUDI_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.HUDI) + .readSchema(InternalSchema.builder().fields(Collections.emptyList()).build()) + .build(); + protected static final CatalogTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER = + CatalogTableIdentifier.builder() + .databaseName(TEST_HMS_DATABASE) + .tableName(TEST_HMS_TABLE) + .build(); + + protected Table newTable(String dbName, String tableName) { + return newTable(dbName, tableName, new HashMap<>()); + } + + protected Table newTable(String dbName, String tableName, Map params) { + Table table = new Table(); + table.setDbName(dbName); + table.setTableName(tableName); + table.setParameters(params); + return table; + } + + protected Table newTable( + String dbName, String tableName, Map params, StorageDescriptor sd) { + Table table = newTable(dbName, tableName, params); + table.setSd(sd); + return table; + } + + protected Database newDatabase(String dbName) { + return new Database( + dbName, "Created by " + HMSCatalogSyncClient.class.getName(), null, Collections.emptyMap()); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogConfig.java b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogConfig.java new file mode 100644 index 000000000..7946ee74a --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogConfig.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +public class TestHMSCatalogConfig { + + private static final String HMS_CATALOG_SERVER_URL_KEY = "externalCatalog.hms.serverUrl"; + private static final String HMS_CATALOG_SERVER_URL_VALUE = "thrift://localhost:9083"; + + @Test + void testGetHmsCatalogConfig_withNoPropertiesSet() { + Map props = new HashMap<>(); + HMSCatalogConfig catalogConfig = HMSCatalogConfig.of(props); + assertNull(catalogConfig.getServerUrl()); + } + + @Test + void testGetHmsCatalogConfig_withUnknownProperty() { + Map props = + createProps("externalCatalog.glue.unknownProperty", "unknown-property-value"); + assertDoesNotThrow(() -> HMSCatalogConfig.of(props)); + } + + @Test + void testGetHmsCatalogConfig() { + Map props = + createProps(HMS_CATALOG_SERVER_URL_KEY, HMS_CATALOG_SERVER_URL_VALUE); + HMSCatalogConfig catalogConfig = HMSCatalogConfig.of(props); + assertEquals(HMS_CATALOG_SERVER_URL_VALUE, catalogConfig.getServerUrl()); + } + + private Map createProps(String... keyValues) { + Map props = new HashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + props.put(keyValues[i], keyValues[i + 1]); + } + return props; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogConversionSource.java new file mode 100644 index 000000000..5e3090da8 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogConversionSource.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import lombok.SneakyThrows; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.storage.TableFormat; + +@ExtendWith(MockitoExtension.class) +class TestHMSCatalogConversionSource { + + @Mock private HMSCatalogConfig mockCatalogConfig; + @Mock private IMetaStoreClient mockMetaStoreClient; + private HMSCatalogConversionSource catalogConversionSource; + private static final String HMS_DB = "hms_db"; + private static final String HMS_TABLE = "hms_tbl"; + private static final String TABLE_BASE_PATH = "/var/data/table"; + private final CatalogTableIdentifier tableIdentifier = + CatalogTableIdentifier.builder().databaseName(HMS_DB).tableName(HMS_TABLE).build(); + + @BeforeEach + void init() { + catalogConversionSource = + new HMSCatalogConversionSource(mockCatalogConfig, mockMetaStoreClient); + } + + @SneakyThrows + @Test + void testGetSourceTable_errorGettingTableFromHMS() { + // error getting table from hms + when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE)) + .thenThrow(new TException("something went wrong")); + assertThrows( + CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier)); + + verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE); + } + + @SneakyThrows + @Test + void testGetSourceTable_tableNotFoundInHMS() { + // table not found in hms + when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE)) + .thenThrow(new NoSuchObjectException("table not found")); + assertThrows( + CatalogSyncException.class, () -> catalogConversionSource.getSourceTable(tableIdentifier)); + + verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE); + } + + @SneakyThrows + @Test + void testGetSourceTable_tableFormatNotPresent() { + // table format not present in table properties + when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE)) + .thenReturn(newHmsTable(HMS_DB, HMS_TABLE, Collections.emptyMap(), null)); + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> catalogConversionSource.getSourceTable(tableIdentifier)); + assertEquals("TableFormat is null or empty for table: hms_db.hms_tbl", exception.getMessage()); + + verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE); + } + + @SneakyThrows + @ParameterizedTest + @CsvSource(value = {"ICEBERG", "HUDI", "DELTA"}) + void testGetSourceTable(String tableFormat) { + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation(TABLE_BASE_PATH); + Map tableParams = new HashMap<>(); + if (Objects.equals(tableFormat, TableFormat.ICEBERG)) { + tableParams.put("write.data.path", String.format("%s/iceberg", TABLE_BASE_PATH)); + tableParams.put("table_type", tableFormat); + } else { + tableParams.put("spark.sql.sources.provider", tableFormat); + } + + String dataPath = + tableFormat.equals(TableFormat.ICEBERG) + ? String.format("%s/iceberg", TABLE_BASE_PATH) + : TABLE_BASE_PATH; + SourceTable expected = + newSourceTable(HMS_TABLE, TABLE_BASE_PATH, dataPath, tableFormat, tableParams); + when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE)) + .thenReturn(newHmsTable(HMS_DB, HMS_TABLE, tableParams, sd)); + SourceTable output = catalogConversionSource.getSourceTable(tableIdentifier); + assertEquals(expected, output); + } + + private Table newHmsTable( + String dbName, String tableName, Map params, StorageDescriptor sd) { + Table table = new Table(); + table.setDbName(dbName); + table.setTableName(tableName); + table.setParameters(params); + table.setSd(sd); + return table; + } + + private SourceTable newSourceTable( + String tblName, + String basePath, + String dataPath, + String tblFormat, + Map params) { + Properties tblProperties = new Properties(); + tblProperties.putAll(params); + return SourceTable.builder() + .name(tblName) + .basePath(basePath) + .dataPath(dataPath) + .formatName(tblFormat) + .additionalProperties(tblProperties) + .build(); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogSyncClient.java b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogSyncClient.java new file mode 100644 index 000000000..d79cdbea9 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSCatalogSyncClient.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Collections; + +import lombok.SneakyThrows; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; + +@ExtendWith(MockitoExtension.class) +public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { + + @Mock private CatalogTableBuilder mockTableBuilder; + private HMSCatalogSyncClient hmsCatalogSyncClient; + + private HMSCatalogSyncClient createHMSCatalogSyncClient() { + return new HMSCatalogSyncClient( + TEST_CATALOG_CONFIG, + mockHMSCatalogConfig, + testConfiguration, + mockMetaStoreClient, + mockTableBuilder); + } + + void setupCommonMocks() { + hmsCatalogSyncClient = createHMSCatalogSyncClient(); + } + + @SneakyThrows + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testHasDatabase(boolean isDbPresent) { + setupCommonMocks(); + Database db = new Database(TEST_HMS_DATABASE, null, null, Collections.emptyMap()); + if (isDbPresent) { + when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)).thenReturn(db); + } else { + when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)) + .thenThrow(new NoSuchObjectException("db not found")); + } + boolean output = hmsCatalogSyncClient.hasDatabase(TEST_HMS_DATABASE); + if (isDbPresent) { + assertTrue(output); + } else { + assertFalse(output); + } + verify(mockMetaStoreClient, times(1)).getDatabase(TEST_HMS_DATABASE); + } + + @SneakyThrows + @Test + void testHasDatabaseFailure() { + setupCommonMocks(); + when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)) + .thenThrow(new TException("something went wrong")); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, () -> hmsCatalogSyncClient.hasDatabase(TEST_HMS_DATABASE)); + assertEquals( + String.format("Failed to get database: %s", TEST_HMS_DATABASE), exception.getMessage()); + verify(mockMetaStoreClient, times(1)).getDatabase(TEST_HMS_DATABASE); + } + + @SneakyThrows + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testGetTable(boolean isTablePresent) { + setupCommonMocks(); + Table table = newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE); + if (isTablePresent) { + when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE)).thenReturn(table); + } else { + when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE)) + .thenThrow(new NoSuchObjectException("db not found")); + } + Table hmsTable = hmsCatalogSyncClient.getTable(TEST_CATALOG_TABLE_IDENTIFIER); + if (isTablePresent) { + assertEquals(table, hmsTable); + } else { + assertNull(hmsTable); + } + verify(mockMetaStoreClient, times(1)).getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE); + } + + @SneakyThrows + @Test + void testGetTableFailure() { + setupCommonMocks(); + when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE)) + .thenThrow(new TException("something went wrong")); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> hmsCatalogSyncClient.getTable(TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to get table: %s.%s", TEST_HMS_DATABASE, TEST_HMS_TABLE), + exception.getMessage()); + verify(mockMetaStoreClient, times(1)).getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE); + } + + @SneakyThrows + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testCreateDatabase(boolean shouldFail) { + setupCommonMocks(); + Database database = newDatabase(TEST_HMS_DATABASE); + if (shouldFail) { + Mockito.doThrow(new TException("something went wrong")) + .when(mockMetaStoreClient) + .createDatabase(database); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> hmsCatalogSyncClient.createDatabase(TEST_HMS_DATABASE)); + assertEquals( + String.format("Failed to create database: %s", TEST_HMS_DATABASE), + exception.getMessage()); + } else { + hmsCatalogSyncClient.createDatabase(TEST_HMS_DATABASE); + } + verify(mockMetaStoreClient, times(1)).createDatabase(database); + } + + @SneakyThrows + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testDropTable(boolean shouldFail) { + setupCommonMocks(); + if (shouldFail) { + Mockito.doThrow(new TException("something went wrong")) + .when(mockMetaStoreClient) + .dropTable(TEST_HMS_DATABASE, TEST_HMS_TABLE); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsCatalogSyncClient.dropTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to drop table: %s.%s", TEST_HMS_DATABASE, TEST_HMS_TABLE), + exception.getMessage()); + } else { + hmsCatalogSyncClient.dropTable(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + } + verify(mockMetaStoreClient, times(1)).dropTable(TEST_HMS_DATABASE, TEST_HMS_TABLE); + } + + @SneakyThrows + @Test + void testCreateTable_Success() { + setupCommonMocks(); + Table testTable = new Table(); + when(mockTableBuilder.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(testTable); + hmsCatalogSyncClient.createTable(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockMetaStoreClient, times(1)).createTable(testTable); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + } + + @SneakyThrows + @Test + void testCreateTable_ErrorGettingTableInput() { + setupCommonMocks(); + + // error when getting iceberg table input + doThrow(new RuntimeException("something went wrong")) + .when(mockTableBuilder) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + assertThrows( + RuntimeException.class, + () -> + hmsCatalogSyncClient.createTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockMetaStoreClient, never()).createTable(any()); + } + + @SneakyThrows + @Test + void testCreateTable_ErrorCreatingTable() { + setupCommonMocks(); + + // error when creating table + Table testTable = new Table(); + when(mockTableBuilder.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(testTable); + doThrow(new TException("something went wrong")) + .when(mockMetaStoreClient) + .createTable(testTable); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsCatalogSyncClient.createTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to create table: %s.%s", TEST_HMS_DATABASE, TEST_HMS_TABLE), + exception.getMessage()); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockMetaStoreClient, times(1)).createTable(testTable); + } + + @SneakyThrows + @Test + void testRefreshTable_Success() { + setupCommonMocks(); + Table origTable = new Table(); + Table updatedTable = new Table(origTable); + updatedTable.putToParameters(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_V2); + when(mockTableBuilder.getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(updatedTable); + hmsCatalogSyncClient.refreshTable( + TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockMetaStoreClient, times(1)) + .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable); + verify(mockTableBuilder, times(1)) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER); + } + + @SneakyThrows + @Test + void testRefreshTable_ErrorGettingUpdatedTable() { + setupCommonMocks(); + + // error when getting iceberg table input + Table testTable = new Table(); + doThrow(new RuntimeException("something went wrong")) + .when(mockTableBuilder) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, testTable, TEST_CATALOG_TABLE_IDENTIFIER); + assertThrows( + RuntimeException.class, + () -> + hmsCatalogSyncClient.refreshTable( + TEST_ICEBERG_INTERNAL_TABLE, testTable, TEST_CATALOG_TABLE_IDENTIFIER)); + verify(mockTableBuilder, times(1)) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, testTable, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockMetaStoreClient, never()).alter_table(any(), any(), any()); + } + + @SneakyThrows + @Test + void testRefreshTable_ErrorRefreshingTable() { + setupCommonMocks(); + + // error when creating table + Table origTable = new Table(); + Table updatedTable = new Table(origTable); + updatedTable.putToParameters(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_V2); + when(mockTableBuilder.getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(updatedTable); + doThrow(new TException("something went wrong")) + .when(mockMetaStoreClient) + .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable); + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsCatalogSyncClient.refreshTable( + TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER)); + assertEquals( + String.format("Failed to refresh table: %s.%s", TEST_HMS_DATABASE, TEST_HMS_TABLE), + exception.getMessage()); + verify(mockTableBuilder, times(1)) + .getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockMetaStoreClient, times(1)) + .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable); + } + + @SneakyThrows + @Test + void testCreateOrReplaceTable() { + setupCommonMocks(); + + ZonedDateTime zonedDateTime = + Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault()); + try (MockedStatic mockZonedDateTime = mockStatic(ZonedDateTime.class)) { + mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime); + + String tempTableName = TEST_HMS_TABLE + "_temp" + ZonedDateTime.now().toEpochSecond(); + final CatalogTableIdentifier tempTableIdentifier = + CatalogTableIdentifier.builder() + .databaseName(TEST_HMS_DATABASE) + .tableName(tempTableName) + .build(); + + Table table = newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE); + Table tempTable = newTable(TEST_HMS_DATABASE, tempTableName); + + when(mockTableBuilder.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)) + .thenReturn(table); + when(mockTableBuilder.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, tempTableIdentifier)) + .thenReturn(tempTable); + + hmsCatalogSyncClient.createOrReplaceTable( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + + verify(mockMetaStoreClient, times(1)).createTable(table); + verify(mockMetaStoreClient, times(1)) + .dropTable(TEST_HMS_DATABASE, TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + verify(mockMetaStoreClient, times(1)).createTable(tempTable); + verify(mockMetaStoreClient, times(1)) + .dropTable(TEST_HMS_DATABASE, tempTableIdentifier.getTableName()); + + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + verify(mockTableBuilder, times(1)) + .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, tempTableIdentifier); + } + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSSchemaExtractor.java new file mode 100644 index 000000000..d8e749466 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/TestHMSSchemaExtractor.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.catalog.TestSchemaExtractorBase; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.storage.TableFormat; + +public class TestHMSSchemaExtractor extends TestSchemaExtractorBase { + + private FieldSchema getFieldSchema(String name, String type) { + return new FieldSchema(name, type, null); + } + + @Test + void testPrimitiveTypes() { + int precision = 10; + int scale = 5; + Map doubleMetadata = new HashMap<>(); + doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, precision); + doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, scale); + String tableFormat = TableFormat.ICEBERG; + + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredBoolean", "boolean", InternalType.BOOLEAN, false, 1), + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 2), + getPrimitiveOneField("requiredInt", "integer", InternalType.INT, false, 3), + getPrimitiveOneField("requiredLong", "long", InternalType.LONG, false, 4), + getPrimitiveOneField("requiredDouble", "double", InternalType.DOUBLE, false, 5), + getPrimitiveOneField("requiredFloat", "float", InternalType.FLOAT, false, 6), + getPrimitiveOneField("requiredString", "string", InternalType.STRING, false, 7), + getPrimitiveOneField("requiredBytes", "binary", InternalType.BYTES, false, 8), + getPrimitiveOneField("requiredDate", "date", InternalType.DATE, false, 9), + getPrimitiveOneField( + "requiredDecimal", + "decimal", + InternalType.DECIMAL, + false, + 10, + doubleMetadata), + getPrimitiveOneField( + "requiredTimestamp", "timestamp", InternalType.TIMESTAMP, false, 11), + getPrimitiveOneField( + "requiredTimestampNTZ", + "timestamp_ntz", + InternalType.TIMESTAMP_NTZ, + false, + 12))) + .build(); + + List expected = + Arrays.asList( + getFieldSchema("requiredBoolean", "boolean"), + getFieldSchema("optionalBoolean", "boolean"), + getFieldSchema("requiredInt", "int"), + getFieldSchema("requiredLong", "bigint"), + getFieldSchema("requiredDouble", "double"), + getFieldSchema("requiredFloat", "float"), + getFieldSchema("requiredString", "string"), + getFieldSchema("requiredBytes", "binary"), + getFieldSchema("requiredDate", "date"), + getFieldSchema("requiredDecimal", String.format("decimal(%s,%s)", precision, scale)), + getFieldSchema("requiredTimestamp", "timestamp"), + getFieldSchema("requiredTimestampNTZ", "timestamp")); + + assertEquals(expected, HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testTimestamps() { + String tableFormat = TableFormat.ICEBERG; + Map millisTimestamp = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + + Map microsTimestamp = + Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredTimestampMillis", + "timestamp", + InternalType.TIMESTAMP, + false, + 1, + millisTimestamp), + getPrimitiveOneField( + "requiredTimestampMicros", + "timestamp", + InternalType.TIMESTAMP, + false, + 2, + microsTimestamp), + getPrimitiveOneField( + "requiredTimestampNTZMillis", + "timestamp_ntz", + InternalType.TIMESTAMP_NTZ, + false, + 3, + millisTimestamp), + getPrimitiveOneField( + "requiredTimestampNTZMicros", + "timestamp_ntz", + InternalType.TIMESTAMP_NTZ, + false, + 4, + microsTimestamp))) + .build(); + + List expected = + Arrays.asList( + getFieldSchema("requiredTimestampMillis", "timestamp"), + getFieldSchema("requiredTimestampMicros", "timestamp"), + getFieldSchema("requiredTimestampNTZMillis", "timestamp"), + getFieldSchema("requiredTimestampNTZMicros", "timestamp")); + + assertEquals(expected, HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testMaps() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema recordMapElementSchema = + InternalSchema.builder() + .name("struct") + .isNullable(true) + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredDouble", + "double", + InternalType.DOUBLE, + false, + 1, + "recordMap._one_field_value"), + getPrimitiveOneField( + "optionalString", + "string", + InternalType.STRING, + true, + 2, + "recordMap._one_field_value"))) + .dataType(InternalType.RECORD) + .build(); + + InternalSchema oneSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("intMap") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + getPrimitiveOneField( + InternalField.Constants.MAP_KEY_FIELD_NAME, + "string", + InternalType.STRING, + false, + 3, + "intMap"), + getPrimitiveOneField( + InternalField.Constants.MAP_VALUE_FIELD_NAME, + "integer", + InternalType.INT, + false, + 4, + "intMap"))) + .build()) + .build(), + InternalField.builder() + .name("recordMap") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("map") + .isNullable(true) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + getPrimitiveOneField( + InternalField.Constants.MAP_KEY_FIELD_NAME, + "integer", + InternalType.INT, + false, + 5, + "recordMap"), + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .fieldId(6) + .parentPath("recordMap") + .schema(recordMapElementSchema) + .build())) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + List expected = + Arrays.asList( + getFieldSchema("intMap", "map"), + getFieldSchema( + "recordMap", "map>")); + + assertEquals(expected, HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testLists() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema recordListElementSchema = + InternalSchema.builder() + .name("struct") + .isNullable(true) + .fields( + Arrays.asList( + getPrimitiveOneField( + "requiredDouble", + "double", + InternalType.DOUBLE, + false, + 11, + "recordMap._one_field_value"), + getPrimitiveOneField( + "optionalString", + "string", + InternalType.STRING, + true, + 12, + "recordMap._one_field_value"))) + .dataType(InternalType.RECORD) + .build(); + + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("record") + .isNullable(false) + .fields( + Arrays.asList( + InternalField.builder() + .name("intList") + .fieldId(1) + .schema( + InternalSchema.builder() + .name("list") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Collections.singletonList( + getPrimitiveOneField( + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME, + "integer", + InternalType.INT, + false, + 13, + "intList"))) + .build()) + .build(), + InternalField.builder() + .name("recordList") + .fieldId(2) + .schema( + InternalSchema.builder() + .name("list") + .isNullable(true) + .dataType(InternalType.LIST) + .fields( + Collections.singletonList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .fieldId(14) + .parentPath("recordList") + .schema(recordListElementSchema) + .build())) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + List expected = + Arrays.asList( + getFieldSchema("intList", "array"), + getFieldSchema( + "recordList", "array>")); + + assertEquals(expected, HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testNestedRecords() { + String tableFormat = TableFormat.ICEBERG; + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("record") + .isNullable(false) + .fields( + Collections.singletonList( + InternalField.builder() + .name("nestedOne") + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .fieldId(1) + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + getPrimitiveOneField( + "nestedOptionalInt", + "integer", + InternalType.INT, + true, + 11, + "nestedOne"), + getPrimitiveOneField( + "nestedRequiredDouble", + "double", + InternalType.DOUBLE, + false, + 12, + "nestedOne"), + InternalField.builder() + .name("nestedTwo") + .parentPath("nestedOne") + .fieldId(13) + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Collections.singletonList( + getPrimitiveOneField( + "doublyNestedString", + "string", + InternalType.STRING, + true, + 14, + "nestedOne.nestedTwo"))) + .build()) + .build())) + .build()) + .build())) + .build(); + + List expected = + Arrays.asList( + getFieldSchema( + "nestedOne", + "struct>")); + assertEquals(expected, HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + } + + @Test + void testUnsupportedType() { + String tableFormat = TableFormat.ICEBERG; + // Unknown "UNION" type + InternalSchema oneSchema = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 2), + InternalField.builder() + .name("unionField") + .schema( + InternalSchema.builder() + .name("unionSchema") + .dataType(InternalType.UNION) + .isNullable(true) + .build()) + .fieldId(2) + .build())) + .build(); + + NotSupportedException exception = + assertThrows( + NotSupportedException.class, + () -> HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema)); + assertEquals("Unsupported type: InternalType.UNION(name=union)", exception.getMessage()); + + // Invalid decimal type (precision and scale metadata is missing) + InternalSchema oneSchema2 = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 1), + getPrimitiveOneField( + "optionalDecimal", "decimal", InternalType.DECIMAL, true, 2))) + .build(); + + exception = + assertThrows( + NotSupportedException.class, + () -> HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema2)); + assertEquals("Invalid decimal type, precision and scale is missing", exception.getMessage()); + + // Invalid decimal type (scale metadata is missing) + Map doubleMetadata = new HashMap<>(); + doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10); + InternalSchema oneSchema3 = + InternalSchema.builder() + .dataType(InternalType.RECORD) + .isNullable(false) + .name("record") + .fields( + Arrays.asList( + getPrimitiveOneField( + "optionalBoolean", "boolean", InternalType.BOOLEAN, true, 1), + getPrimitiveOneField( + "optionalDecimal", + "decimal", + InternalType.DECIMAL, + true, + 2, + doubleMetadata))) + .build(); + + exception = + assertThrows( + NotSupportedException.class, + () -> HMSSchemaExtractor.getInstance().toColumns(tableFormat, oneSchema3)); + assertEquals("Invalid decimal type, scale is missing", exception.getMessage()); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/hms/table/TestIcebergHMSCatalogTableBuilder.java b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/table/TestIcebergHMSCatalogTableBuilder.java new file mode 100644 index 000000000..bd3178eeb --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/hms/table/TestIcebergHMSCatalogTableBuilder.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog.hms.table; + +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import lombok.SneakyThrows; + +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.hadoop.HadoopTables; + +import org.apache.xtable.catalog.hms.HMSCatalogSyncClientTestBase; +import org.apache.xtable.model.storage.TableFormat; + +@ExtendWith(MockitoExtension.class) +public class TestIcebergHMSCatalogTableBuilder extends HMSCatalogSyncClientTestBase { + + @Mock private HadoopTables mockIcebergHadoopTables; + @Mock private BaseTable mockIcebergBaseTable; + @Mock private TableOperations mockIcebergTableOperations; + @Mock private TableMetadata mockIcebergTableMetadata; + + private IcebergHMSCatalogTableBuilder mockIcebergHmsCatalogSyncRequestProvider; + + private IcebergHMSCatalogTableBuilder createIcebergHMSHelper() { + return new IcebergHMSCatalogTableBuilder(mockHmsSchemaExtractor, mockIcebergHadoopTables); + } + + void setupCommonMocks() { + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); + } + + void mockHadoopTables() { + when(mockIcebergHadoopTables.load(TEST_BASE_PATH)).thenReturn(mockIcebergBaseTable); + mockMetadataFileLocation(); + } + + void mockMetadataFileLocation() { + when(mockIcebergBaseTable.operations()).thenReturn(mockIcebergTableOperations); + when(mockIcebergTableOperations.current()).thenReturn(mockIcebergTableMetadata); + when(mockIcebergTableMetadata.metadataFileLocation()) + .thenReturn(ICEBERG_METADATA_FILE_LOCATION); + } + + @SneakyThrows + @Test + void testGetCreateTableRequest() { + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); + mockHadoopTables(); + when(mockHmsSchemaExtractor.toColumns( + TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema())) + .thenReturn(Collections.emptyList()); + ZonedDateTime zonedDateTime = + Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault()); + try (MockedStatic mockZonedDateTime = mockStatic(ZonedDateTime.class)) { + mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime); + Table expected = new Table(); + expected.setDbName(TEST_HMS_DATABASE); + expected.setTableName(TEST_HMS_TABLE); + expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + expected.setCreateTime((int) zonedDateTime.toEpochSecond()); + expected.setSd(getTestStorageDescriptor()); + expected.setTableType("EXTERNAL_TABLE"); + expected.setParameters(getTestParameters()); + + assertEquals( + expected, + mockIcebergHmsCatalogSyncRequestProvider.getCreateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)); + verify(mockHmsSchemaExtractor, times(1)) + .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()); + verify(mockIcebergBaseTable, times(1)).properties(); + verify(mockIcebergHadoopTables, times(1)).load(TEST_BASE_PATH); + } + } + + @SneakyThrows + @Test + void testGetUpdateTableRequest() { + setupCommonMocks(); + mockHadoopTables(); + when(mockHmsSchemaExtractor.toColumns( + TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema())) + .thenReturn(Collections.emptyList()); + + Map tableParams = new HashMap<>(); + tableParams.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION); + Table hmsTable = + newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE, tableParams, getTestStorageDescriptor()); + + when(mockIcebergTableMetadata.metadataFileLocation()) + .thenReturn(ICEBERG_METADATA_FILE_LOCATION_V2); + when(mockIcebergBaseTable.properties()).thenReturn(Collections.emptyMap()); + Table output = + mockIcebergHmsCatalogSyncRequestProvider.getUpdateTableRequest( + TEST_ICEBERG_INTERNAL_TABLE, hmsTable, TEST_CATALOG_TABLE_IDENTIFIER); + tableParams.put(PREVIOUS_METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION); + tableParams.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_V2); + Table expected = + newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE, tableParams, getTestStorageDescriptor()); + assertEquals(expected, output); + assertEquals(tableParams, hmsTable.getParameters()); + verify(mockHmsSchemaExtractor, times(1)) + .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()); + } + + @Test + void testGetStorageDescriptor() { + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); + when(mockHmsSchemaExtractor.toColumns( + TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema())) + .thenReturn(Collections.emptyList()); + StorageDescriptor expected = getTestStorageDescriptor(); + assertEquals( + expected, + mockIcebergHmsCatalogSyncRequestProvider.getStorageDescriptor(TEST_ICEBERG_INTERNAL_TABLE)); + verify(mockHmsSchemaExtractor, times(1)) + .toColumns(TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()); + } + + @Test + void testGetTableParameters() { + mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper(); + mockMetadataFileLocation(); + when(mockIcebergBaseTable.properties()).thenReturn(Collections.emptyMap()); + Map expected = getTestParameters(); + assertEquals( + expected, + mockIcebergHmsCatalogSyncRequestProvider.getTableParameters(mockIcebergBaseTable)); + verify(mockIcebergBaseTable, times(1)).properties(); + verify(mockIcebergHadoopTables, never()).load(any()); + } + + private StorageDescriptor getTestStorageDescriptor() { + StorageDescriptor storageDescriptor = new StorageDescriptor(); + SerDeInfo serDeInfo = new SerDeInfo(); + storageDescriptor.setCols(Collections.emptyList()); + storageDescriptor.setLocation(TEST_BASE_PATH); + storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat"); + storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"); + serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe"); + storageDescriptor.setSerdeInfo(serDeInfo); + return storageDescriptor; + } + + private Map getTestParameters() { + Map parameters = new HashMap<>(); + parameters.put("EXTERNAL", "TRUE"); + parameters.put("table_type", "ICEBERG"); + parameters.put("metadata_location", ICEBERG_METADATA_FILE_LOCATION); + parameters.put("storage_handler", "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"); + parameters.put("iceberg.catalog", "location_based_table"); + return parameters; + } +}