Skip to content

Commit

Permalink
[590] Add Iceberg HMS Catalog Sync implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kroushan-nit committed Dec 27, 2024
1 parent 2fe117f commit 88c0471
Show file tree
Hide file tree
Showing 24 changed files with 2,509 additions and 4 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version>
<hadoop.version>3.4.0</hadoop.version>
<hudi.version>0.14.0</hudi.version>
<hive.version>2.3.9</hive.version>
<maven-source-plugin.version>3.3.1</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.8.0</maven-javadoc-plugin.version>
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ConversionConfig {
SyncMode syncMode) {
this.sourceTable = sourceTable;
this.targetTables = targetTables;
this.targetCatalogs = targetCatalogs;
Preconditions.checkArgument(
targetTables != null && !targetTables.isEmpty(),
"Please provide at-least one format to sync");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public enum ErrorCode {
UNSUPPORTED_SCHEMA_TYPE(10007),
UNSUPPORTED_FEATURE(10008),
PARSE_EXCEPTION(10009),
CATALOG_REFRESH_EXCEPTION(10010);
CATALOG_REFRESH_EXCEPTION(10010),
CATALOG_SYNC_GENERIC_EXCEPTION(10011);

private final int errorCode;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.model.storage;

public class CatalogType {
public static final String HMS = "HMS";
}
92 changes: 92 additions & 0 deletions xtable-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,105 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-runtime</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- HMS dependencies -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>core</classifier>
<exclusions>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Mockito -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>

<!-- Junit -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;

/**
* The interface for creating/updating catalog table object, each catalog can have its own
* implementation that can be plugged in.
*/
public interface CatalogTableBuilder<REQUEST, TABLE> {
public REQUEST getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier);

public REQUEST getUpdateTableRequest(
InternalTable table, TABLE catalogTable, CatalogTableIdentifier tableIdentifier);
}
27 changes: 27 additions & 0 deletions xtable-core/src/main/java/org/apache/xtable/catalog/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

public class Constants {

public static final String PROP_SPARK_SQL_SOURCES_PROVIDER = "spark.sql.sources.provider";
public static final String PROP_PATH = "path";
public static final String PROP_SERIALIZATION_FORMAT = "serialization.format";
public static final String PROP_EXTERNAL = "EXTERNAL";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,27 @@

import java.util.Map;

import org.apache.xtable.catalog.hms.HMSCatalogConversionSource;
import org.apache.xtable.catalog.hms.HMSCatalogSyncClient;
import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.storage.CatalogType;

/** A factory class which returns {@link ExternalCatalogConfig} based on catalogType. */
public class ExternalCatalogConfigFactory {

public static ExternalCatalogConfig fromCatalogType(
String catalogType, String catalogId, Map<String, String> properties) {
// TODO: Choose existing implementation based on catalogType.
String catalogSyncClientImpl = "";
String catalogConversionSourceImpl = "";
String catalogSyncClientImpl;
String catalogConversionSourceImpl;
switch (catalogType) {
case CatalogType.HMS:
catalogSyncClientImpl = HMSCatalogSyncClient.class.getName();
catalogConversionSourceImpl = HMSCatalogConversionSource.class.getName();
break;
default:
throw new NotSupportedException("Unsupported catalogType: " + catalogType);
}
return ExternalCatalogConfig.builder()
.catalogType(catalogType)
.catalogSyncClientImpl(catalogSyncClientImpl)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog;

import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER;

import java.util.Map;

import org.apache.iceberg.TableProperties;

import com.google.common.base.Strings;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.storage.TableFormat;

public class TableFormatUtils {

public static String getTableDataLocation(
String tableFormat, String tableLocation, Map<String, String> properties) {
switch (tableFormat) {
case TableFormat.ICEBERG:
return getIcebergDataLocation(tableLocation, properties);
case TableFormat.DELTA:
case TableFormat.HUDI:
return tableLocation;
default:
throw new NotSupportedException("Unsupported table format: " + tableFormat);
}
}

/** Get iceberg table data files location */
private static String getIcebergDataLocation(
String tableLocation, Map<String, String> properties) {
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH);
if (dataLocation == null) {
dataLocation = String.format("%s/data", tableLocation);
}
}
}
return dataLocation;
}

// Get table format name from table properties
public static String getTableFormat(Map<String, String> properties) {
// - In case of ICEBERG, table_type param will give the table format
// - In case of DELTA, table_type or spark.sql.sources.provider param will give the table
// format
// - In case of HUDI, spark.sql.sources.provider param will give the table format
String tableFormat = properties.get(TABLE_TYPE_PROP);
if (Strings.isNullOrEmpty(tableFormat)) {
tableFormat = properties.get(PROP_SPARK_SQL_SOURCES_PROVIDER);
}
return tableFormat;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.catalog.hms;

import java.util.Map;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

@Getter
@EqualsAndHashCode
@ToString
public class HMSCatalogConfig {

private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

@JsonProperty("externalCatalog.hms.serverUrl")
private String serverUrl;

protected static HMSCatalogConfig of(Map<String, String> properties) {
try {
return OBJECT_MAPPER.readValue(
OBJECT_MAPPER.writeValueAsString(properties), HMSCatalogConfig.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 88c0471

Please sign in to comment.