diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 8ef527418..8d1e76974 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -36,8 +36,8 @@ public class ConversionConfig { // One or more targets to sync the table metadata to List targetTables; // Each target table can be synced to multiple target catalogs, this is map from - // targetTableIdentifier to target catalogs. - Map> targetCatalogs; + // targetTable to target catalogs. + Map> targetCatalogs; // The mode, incremental or snapshot SyncMode syncMode; @@ -45,7 +45,7 @@ public class ConversionConfig { ConversionConfig( @NonNull SourceTable sourceTable, List targetTables, - Map> targetCatalogs, + Map> targetCatalogs, SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java index 7f503b755..6256da2c6 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -44,8 +44,4 @@ public TargetTable( this.metadataRetention = metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; } - - public String getId() { - return String.format("%s#%s", sanitizeBasePath(this.basePath), formatName); - } } diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index f277495e7..80de22991 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -174,4 +174,24 @@ test + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index df72ff5c1..be0b7168f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -136,7 +136,7 @@ public Map syncTableAcrossCatalogs( Map catalogSyncResults = new HashMap<>(); for (TargetTable targetTable : config.getTargetTables()) { Map catalogSyncClients = - config.getTargetCatalogs().get(targetTable.getId()).stream() + config.getTargetCatalogs().get(targetTable).stream() .collect( Collectors.toMap( TargetCatalogConfig::getCatalogTableIdentifier, diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java index 5fd6523f4..6a6a55926 100644 --- a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java @@ -26,12 +26,11 @@ import org.junit.jupiter.api.Test; import org.apache.xtable.conversion.ExternalCatalogConfig; -import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.conversion.TargetCatalogConfig; -import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.spi.extractor.CatalogConversionSource; import org.apache.xtable.spi.sync.CatalogSyncClient; +import org.apache.xtable.testutil.ITTestUtils.TestCatalogImpl; class TestCatalogConversionFactory { @@ -69,54 +68,4 @@ void createForCatalog() { .createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), new Configuration()); assertEquals(catalogSyncClient.getClass().getName(), TestCatalogImpl.class.getName()); } - - public static class TestCatalogImpl - implements CatalogSyncClient, CatalogConversionSource { - - public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} - - @Override - public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { - return null; - } - - @Override - public String getCatalogName() { - return null; - } - - @Override - public String getStorageDescriptorLocation(Object o) { - return null; - } - - @Override - public boolean hasDatabase(String databaseName) { - return false; - } - - @Override - public void createDatabase(String databaseName) {} - - @Override - public Object getTable(CatalogTableIdentifier tableIdentifier) { - return null; - } - - @Override - public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void refreshTable( - InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void close() throws Exception {} - } } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index f7635d91d..11dee7604 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -579,7 +580,7 @@ private ConversionConfig getTableSyncConfig( .targetTables(targetTables) .targetCatalogs( targetTables.stream() - .collect(Collectors.toMap(TargetTable::getId, k -> targetCatalogs))) + .collect(Collectors.toMap(Function.identity(), k -> targetCatalogs))) .syncMode(syncMode) .build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java index 281e61fe1..4a113272c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java +++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java @@ -20,12 +20,18 @@ import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Assertions; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; public class ITTestUtils { @@ -44,4 +50,57 @@ public static void validateTable( Assertions.assertEquals(basePath, internalTable.getBasePath()); Assertions.assertEquals(partitioningFields, internalTable.getPartitioningFields()); } + + public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient { + + public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + return SourceTable.builder() + .name("source_table_name") + .basePath("file://base_path/v1/") + .formatName("ICEBERG") + .build(); + } + + @Override + public String getCatalogName() { + return null; + } + + @Override + public String getStorageLocation(Object o) { + return null; + } + + @Override + public boolean hasDatabase(String databaseName) { + return false; + } + + @Override + public void createDatabase(String databaseName) {} + + @Override + public Object getTable(CatalogTableIdentifier tableIdentifier) { + return null; + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void refreshTable( + InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void close() throws Exception {} + } } diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml index 8191af3c0..25d559730 100644 --- a/xtable-utilities/pom.xml +++ b/xtable-utilities/pom.xml @@ -35,6 +35,15 @@ ${project.version} + + org.apache.xtable + xtable-core_${scala.binary.version} + ${project.version} + tests + test-jar + test + + commons-cli diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index f69d75f54..28e26dd93 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -161,7 +161,7 @@ public static void main(String[] args) throws Exception { dataset.getSourceCatalogTableIdentifier().getCatalogTableIdentifier()); } List targetTables = new ArrayList<>(); - Map> targetCatalogs = new HashMap<>(); + Map> targetCatalogs = new HashMap<>(); for (TargetTableIdentifier targetCatalogTableIdentifier : dataset.getTargetCatalogTableIdentifiers()) { TargetTable targetTable = @@ -172,11 +172,11 @@ public static void main(String[] args) throws Exception { .formatName(targetCatalogTableIdentifier.getTableFormat()) .build(); targetTables.add(targetTable); - if (!targetCatalogs.containsKey(targetTable.getId())) { - targetCatalogs.put(targetTable.getId(), new ArrayList<>()); + if (!targetCatalogs.containsKey(targetTable)) { + targetCatalogs.put(targetTable, new ArrayList<>()); } targetCatalogs - .get(targetTable.getId()) + .get(targetTable) .add( TargetCatalogConfig.builder() .catalogTableIdentifier( @@ -248,45 +248,86 @@ static Map getConversionSourceProviders( @Data public static class DatasetConfig { + /** + * Configuration of the source catalog from which XTable will read. It must contain all the + * necessary connection and access details for describing and listing tables + */ private Catalog sourceCatalog; + /** + * Defines configuration one or more target catalogs, to which XTable will write or update + * tables. Unlike the source, these catalogs must be writable + */ private List targetCatalogs; + /** A list of datasets that specify how a source table maps to one or more target tables. */ private List datasets; + /** Configuration for catalog. */ @Data public static class Catalog { + /** A unique name for the catalog. */ private String catalogName; + /** + * The type of the source catalog. This might be a specific type understood by XTable, such as + * Hive, Glue etc. + */ private String catalogType; + /** + * (Optional) A fully qualified class name that implements the interfaces for + * CatalogSyncClient, it can be used if the implementation for catalogType doesn't exist in + * XTable. This is an optional field. + */ private String catalogImpl; + /** + * A collection of configs used to configure access or connection properties for the catalog. + */ private Map catalogProperties; } @Data - public static class StorageIdentifier { - String tableFormat; - String tableBasePath; - String tableDataPath; - String tableName; - String partitionSpec; - String namespace; + public static class Dataset { + /** Identifies the source table in sourceCatalog. */ + private SourceTableIdentifier sourceCatalogTableIdentifier; + /** A list of one or more targets that this source table should be written to. */ + private List targetCatalogTableIdentifiers; } @Data public static class SourceTableIdentifier { + /** Specifies the table identifier in the source catalog. */ CatalogTableIdentifier catalogTableIdentifier; + /** + * (Optional) Provides direct storage details such as a table’s base path (like an S3 + * location) and the partition specification. This allows reading from a source even if it is + * not strictly registered in a catalog, as long as the format and location are known + */ StorageIdentifier storageIdentifier; } @Data public static class TargetTableIdentifier { + /** name of the target catalog where the table will be created or updated */ String catalogName; + /** + * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how the data will be + * stored at the target. + */ String tableFormat; + /** Specifies the table identifier in the target catalog. */ CatalogTableIdentifier catalogTableIdentifier; } + /** + * Configuration in storage for table. This is an optional field in {@link + * SourceTableIdentifier}. + */ @Data - public static class Dataset { - private SourceTableIdentifier sourceCatalogTableIdentifier; - private List targetCatalogTableIdentifiers; + public static class StorageIdentifier { + String tableFormat; + String tableBasePath; + String tableDataPath; + String tableName; + String partitionSpec; + String namespace; } } } diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java index 7b77214f3..243261b5c 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java @@ -22,16 +22,8 @@ import lombok.SneakyThrows; -import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; -import org.apache.xtable.conversion.ExternalCatalogConfig; -import org.apache.xtable.conversion.SourceTable; -import org.apache.xtable.model.InternalTable; -import org.apache.xtable.model.catalog.CatalogTableIdentifier; -import org.apache.xtable.spi.extractor.CatalogConversionSource; -import org.apache.xtable.spi.sync.CatalogSyncClient; - class TestRunCatalogSync { @SneakyThrows @@ -43,57 +35,4 @@ void testMain() { // Ensure yaml gets parsed and no op-sync implemented in TestCatalogImpl is called. assertDoesNotThrow(() -> RunCatalogSync.main(args)); } - - public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient { - - public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} - - @Override - public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { - return SourceTable.builder() - .name("source_table_name") - .basePath("file://base_path/v1/") - .formatName("ICEBERG") - .build(); - } - - @Override - public String getCatalogName() { - return null; - } - - @Override - public String getStorageDescriptorLocation(Object o) { - return null; - } - - @Override - public boolean hasDatabase(String databaseName) { - return false; - } - - @Override - public void createDatabase(String databaseName) {} - - @Override - public Object getTable(CatalogTableIdentifier tableIdentifier) { - return null; - } - - @Override - public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void refreshTable( - InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} - - @Override - public void close() throws Exception {} - } } diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml b/xtable-utilities/src/test/resources/catalogConfig.yaml index 6a4df0f04..f3f1757c1 100644 --- a/xtable-utilities/src/test/resources/catalogConfig.yaml +++ b/xtable-utilities/src/test/resources/catalogConfig.yaml @@ -16,26 +16,26 @@ # sourceCatalog: catalogName: "source-1" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key01: "value1" key02: "value2" key03: "value3" targetCatalogs: - catalogName: "target-1" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key11: "value1" key12: "value2" key13: "value3" - catalogName: "target-2" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key21: "value1" key22: "value2" key23: "value3" - catalogName: "target-3" - catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogImpl: "org.apache.xtable.testutil.ITTestUtils$TestCatalogImpl" catalogProperties: key31: "value1" key32: "value2"