diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java index 148cb2fbf..6cacd9076 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java @@ -26,13 +26,16 @@ import lombok.Value; /** - * Defines the configuration for an external catalog, user needs to populate at-least one of - * catalogType or catalogImpl + * Defines the configuration for an external catalog, user needs to populate at-least one of {@link + * ExternalCatalogConfig#catalogType} or {@link ExternalCatalogConfig#catalogSyncClientImpl} */ @Value @Builder public class ExternalCatalogConfig { - /** The name of the catalog, it also acts as a unique identifier for each catalog */ + /** + * A user-defined unique identifier for the catalog, allows user to sync table to multiple + * catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2 + */ @NonNull String catalogId; /** @@ -42,13 +45,21 @@ public class ExternalCatalogConfig { String catalogType; /** - * (Optional) A fully qualified class name that implements the interfaces for CatalogSyncClient, - * it can be used if the implementation for catalogType doesn't exist in XTable. + * (Optional) A fully qualified class name that implements the interface for {@link + * org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the implementation for + * catalogType doesn't exist in XTable. + */ + String catalogSyncClientImpl; + + /** + * (Optional) A fully qualified class name that implements the interface for {@link + * org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used if the implementation + * for catalogType doesn't exist in XTable. */ - String catalogImpl; + String catalogConversionSourceImpl; /** * The properties for each catalog, used for providing any custom behaviour during catalog sync */ - @NonNull @Builder.Default Map catalogOptions = Collections.emptyMap(); + @NonNull @Builder.Default Map catalogProperties = Collections.emptyMap(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java index 2a4e5d2ee..37778eb8e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java @@ -42,7 +42,7 @@ public static CatalogConversionFactory getInstance() { public static CatalogConversionSource createCatalogConversionSource( ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) { return ReflectionUtils.createInstanceOfClass( - sourceCatalogConfig.getCatalogImpl(), sourceCatalogConfig, configuration); + sourceCatalogConfig.getCatalogConversionSourceImpl(), sourceCatalogConfig, configuration); } /** @@ -53,8 +53,11 @@ public static CatalogConversionSource createCatalogConversionSource( * @param configuration hadoop configuration */ public CatalogSyncClient createCatalogSyncClient( - ExternalCatalogConfig targetCatalogConfig, Configuration configuration) { + ExternalCatalogConfig targetCatalogConfig, String tableFormat, Configuration configuration) { return ReflectionUtils.createInstanceOfClass( - targetCatalogConfig.getCatalogImpl(), targetCatalogConfig, configuration); + targetCatalogConfig.getCatalogSyncClientImpl(), + targetCatalogConfig, + tableFormat, + configuration); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java index f053df32e..3649ae8e0 100644 --- a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java @@ -28,11 +28,14 @@ public class ExternalCatalogConfigFactory { public static ExternalCatalogConfig fromCatalogType( String catalogType, String catalogId, Map properties) { // TODO: Choose existing implementation based on catalogType. - String catalogImpl = ""; + String catalogSyncClientImpl = ""; + String catalogConversionSourceImpl = ""; return ExternalCatalogConfig.builder() - .catalogImpl(catalogImpl) + .catalogType(catalogType) + .catalogSyncClientImpl(catalogSyncClientImpl) + .catalogConversionSourceImpl(catalogConversionSourceImpl) .catalogId(catalogId) - .catalogOptions(properties) + .catalogProperties(properties) .build(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index be0b7168f..cd79ccb27 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -142,7 +142,9 @@ public Map syncTableAcrossCatalogs( TargetCatalogConfig::getCatalogTableIdentifier, targetCatalog -> catalogConversionFactory.createCatalogSyncClient( - targetCatalog.getCatalogConfig(), conf))); + targetCatalog.getCatalogConfig(), + targetTable.getFormatName(), + conf))); catalogSyncResults.put( targetTable.getFormatName(), syncCatalogsForTargetTable( diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java index 386ecd641..53e689236 100644 --- a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java @@ -30,7 +30,8 @@ import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.spi.extractor.CatalogConversionSource; import org.apache.xtable.spi.sync.CatalogSyncClient; -import org.apache.xtable.testutil.ITTestUtils.TestCatalogImpl; +import org.apache.xtable.testutil.ITTestUtils.TestCatalogConversionSourceImpl; +import org.apache.xtable.testutil.ITTestUtils.TestCatalogSyncImpl; class TestCatalogConversionFactory { @@ -39,12 +40,14 @@ void createSourceForConfig() { ExternalCatalogConfig sourceCatalog = ExternalCatalogConfig.builder() .catalogId("catalogId") - .catalogImpl(TestCatalogImpl.class.getName()) - .catalogOptions(Collections.emptyMap()) + .catalogConversionSourceImpl(TestCatalogConversionSourceImpl.class.getName()) + .catalogProperties(Collections.emptyMap()) .build(); CatalogConversionSource catalogConversionSource = CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, new Configuration()); - assertEquals(catalogConversionSource.getClass().getName(), TestCatalogImpl.class.getName()); + assertEquals( + catalogConversionSource.getClass().getName(), + TestCatalogConversionSourceImpl.class.getName()); } @Test @@ -54,8 +57,8 @@ void createForCatalog() { .catalogConfig( ExternalCatalogConfig.builder() .catalogId("catalogId") - .catalogImpl(TestCatalogImpl.class.getName()) - .catalogOptions(Collections.emptyMap()) + .catalogSyncClientImpl(TestCatalogSyncImpl.class.getName()) + .catalogProperties(Collections.emptyMap()) .build()) .catalogTableIdentifier( CatalogTableIdentifier.builder() @@ -65,7 +68,8 @@ void createForCatalog() { .build(); CatalogSyncClient catalogSyncClient = CatalogConversionFactory.getInstance() - .createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), new Configuration()); - assertEquals(catalogSyncClient.getClass().getName(), TestCatalogImpl.class.getName()); + .createCatalogSyncClient( + targetCatalogConfig.getCatalogConfig(), "TABLE_FORMAT", new Configuration()); + assertEquals(catalogSyncClient.getClass().getName(), TestCatalogSyncImpl.class.getName()); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index c1fb98635..0600c5eec 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -459,10 +459,10 @@ void testNoTableFormatConversionWithMultipleCatalogSync() { .thenReturn(tableFormatSyncResults); // Mocks for catalogSync. when(mockCatalogConversionFactory.createCatalogSyncClient( - targetCatalogs.get(0).getCatalogConfig(), mockConf)) + eq(targetCatalogs.get(0).getCatalogConfig()), any(), eq(mockConf))) .thenReturn(mockCatalogSyncClient1); when(mockCatalogConversionFactory.createCatalogSyncClient( - targetCatalogs.get(1).getCatalogConfig(), mockConf)) + eq(targetCatalogs.get(1).getCatalogConfig()), any(), eq(mockConf))) .thenReturn(mockCatalogSyncClient2); when(catalogSync.syncTable( eq( @@ -590,8 +590,8 @@ private TargetCatalogConfig getTargetCatalog(String suffix) { .catalogConfig( ExternalCatalogConfig.builder() .catalogId("catalogId-" + suffix) - .catalogImpl("catalogImpl-" + suffix) - .catalogOptions(Collections.emptyMap()) + .catalogSyncClientImpl("catalogImpl-" + suffix) + .catalogProperties(Collections.emptyMap()) .build()) .catalogTableIdentifier( CatalogTableIdentifier.builder() diff --git a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java index 9a4f18ada..04be989dc 100644 --- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java +++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java @@ -51,18 +51,10 @@ public static void validateTable( Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields()); } - public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient { + public static class TestCatalogSyncImpl implements CatalogSyncClient { - public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} - - @Override - public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { - return SourceTable.builder() - .name("source_table_name") - .basePath("file://base_path/v1/") - .formatName("ICEBERG") - .build(); - } + public TestCatalogSyncImpl( + ExternalCatalogConfig catalogConfig, String tableFormat, Configuration hadoopConf) {} @Override public String getCatalogId() { @@ -103,4 +95,18 @@ public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifie @Override public void close() throws Exception {} } + + public static class TestCatalogConversionSourceImpl implements CatalogConversionSource { + public TestCatalogConversionSourceImpl( + ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) {} + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + return SourceTable.builder() + .name("source_table_name") + .basePath("file://base_path/v1/") + .formatName("ICEBERG") + .build(); + } + } } diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index b77ebd09e..a423fbc29 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -42,7 +42,6 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import com.fasterxml.jackson.databind.ObjectMapper; @@ -50,7 +49,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.apache.xtable.catalog.CatalogConversionFactory; -import org.apache.xtable.catalog.ExternalCatalogConfigFactory; import org.apache.xtable.conversion.ConversionConfig; import org.apache.xtable.conversion.ConversionController; import org.apache.xtable.conversion.ConversionSourceProvider; @@ -132,12 +130,12 @@ public static void main(String[] args) throws Exception { RunSync.TableFormatConverters tableFormatConverters = loadTableFormatConversionConfigs(customConfig); - Map catalogsByName = + Map catalogsById = datasetConfig.getTargetCatalogs().stream() - .collect(Collectors.toMap(DatasetConfig.Catalog::getCatalogId, Function.identity())); - ExternalCatalogConfig sourceCatalogConfig = getCatalogConfig(datasetConfig.getSourceCatalog()); + .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity())); CatalogConversionSource catalogConversionSource = - CatalogConversionFactory.createCatalogConversionSource(sourceCatalogConfig, hadoopConf); + CatalogConversionFactory.createCatalogConversionSource( + datasetConfig.getSourceCatalog(), hadoopConf); ConversionController conversionController = new ConversionController(hadoopConf); for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) { SourceTable sourceTable = null; @@ -181,9 +179,7 @@ public static void main(String[] args) throws Exception { TargetCatalogConfig.builder() .catalogTableIdentifier( targetCatalogTableIdentifier.getCatalogTableIdentifier()) - .catalogConfig( - getCatalogConfig( - catalogsByName.get(targetCatalogTableIdentifier.getCatalogId()))) + .catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId())) .build()); } ConversionConfig conversionConfig = @@ -208,19 +204,6 @@ public static void main(String[] args) throws Exception { } } - static ExternalCatalogConfig getCatalogConfig(DatasetConfig.Catalog catalog) { - if (!StringUtils.isEmpty(catalog.getCatalogType())) { - return ExternalCatalogConfigFactory.fromCatalogType( - catalog.getCatalogType(), catalog.getCatalogId(), catalog.getCatalogProperties()); - } else { - return ExternalCatalogConfig.builder() - .catalogId(catalog.getCatalogId()) - .catalogImpl(catalog.getCatalogImpl()) - .catalogOptions(catalog.getCatalogProperties()) - .build(); - } - } - static Map getConversionSourceProviders( List tableFormats, RunSync.TableFormatConverters tableFormatConverters, @@ -252,36 +235,17 @@ public static class DatasetConfig { * Configuration of the source catalog from which XTable will read. It must contain all the * necessary connection and access details for describing and listing tables */ - private Catalog sourceCatalog; + private ExternalCatalogConfig sourceCatalog; /** * Defines configuration one or more target catalogs, to which XTable will write or update * tables. Unlike the source, these catalogs must be writable */ - private List targetCatalogs; + private List targetCatalogs; /** A list of datasets that specify how a source table maps to one or more target tables. */ private List datasets; /** Configuration for catalog. */ - @Data - public static class Catalog { - /** A user defined unique identifier for the catalog. */ - private String catalogId; - /** - * The type of the source catalog. This might be a specific type understood by XTable, such as - * Hive, Glue etc. - */ - private String catalogType; - /** - * (Optional) A fully qualified class name that implements the interfaces for - * CatalogSyncClient, it can be used if the implementation for catalogType doesn't exist in - * XTable. This is an optional field. - */ - private String catalogImpl; - /** - * A collection of configs used to configure access or connection properties for the catalog. - */ - private Map catalogProperties; - } + ExternalCatalogConfig catalogConfig; @Data public static class Dataset { @@ -306,8 +270,8 @@ public static class SourceTableIdentifier { @Data public static class TargetTableIdentifier { /** - * The user defined unique identifier of the target {@link Catalog} where the table will be - * created or updated + * The user defined unique identifier of the target catalog where the table will be created or + * updated */ String catalogId; /** diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml b/xtable-utilities/src/test/resources/catalogConfig.yaml index 185f47b34..84773235d 100644 --- a/xtable-utilities/src/test/resources/catalogConfig.yaml +++ b/xtable-utilities/src/test/resources/catalogConfig.yaml @@ -16,26 +16,27 @@ # sourceCatalog: catalogId: "source-1" - catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogConversionSourceImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogConversionSourceImpl" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key01: "value1" key02: "value2" key03: "value3" targetCatalogs: - catalogId: "target-1" - catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key11: "value1" key12: "value2" key13: "value3" - catalogId: "target-2" - catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key21: "value1" key22: "value2" key23: "value3" - catalogId: "target-3" - catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" + catalogSyncClientImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key31: "value1" key32: "value2"