From ea325d755b7c3dd6d82f38a9ebf44e2502f63293 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Wed, 18 Dec 2024 02:02:59 -0800 Subject: [PATCH] [590] Add RunCatalogSync utility for synchronizing tables across catalogs --- .../xtable/conversion/ConversionConfig.java | 10 +- .../conversion/ExternalCatalogConfig.java | 42 +++ .../conversion/TargetCatalogConfig.java | 42 +++ .../apache/xtable/conversion/TargetTable.java | 4 + .../catalog/CatalogConversionFactory.java | 60 ++++ .../catalog/ExternalCatalogConfigFactory.java | 38 +++ .../conversion/ConversionController.java | 199 +++++++++--- .../xtable/conversion/ConversionUtils.java | 33 ++ .../xtable/iceberg/IcebergCatalogConfig.java | 8 +- .../apache/xtable/TestSparkDeltaTable.java | 8 - .../catalog/TestCatalogConversionFactory.java | 122 ++++++++ .../conversion/TestConversionController.java | 172 ++++++++++- .../xtable/utilities/RunCatalogSync.java | 292 ++++++++++++++++++ .../org/apache/xtable/utilities/RunSync.java | 2 +- .../xtable/utilities/TestRunCatalogSync.java | 99 ++++++ .../src/test/resources/catalogConfig.yaml | 75 +++++ 16 files changed, 1143 insertions(+), 63 deletions(-) create mode 100644 xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java create mode 100644 xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java create mode 100644 xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java create mode 100644 xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java create mode 100644 xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java create mode 100644 xtable-utilities/src/test/resources/catalogConfig.yaml diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 73e2628db..8ef527418 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -19,6 +19,7 @@ package org.apache.xtable.conversion; import java.util.List; +import java.util.Map; import lombok.Builder; import lombok.NonNull; @@ -34,14 +35,21 @@ public class ConversionConfig { @NonNull SourceTable sourceTable; // One or more targets to sync the table metadata to List targetTables; + // Each target table can be synced to multiple target catalogs, this is map from + // targetTableIdentifier to target catalogs. + Map> targetCatalogs; // The mode, incremental or snapshot SyncMode syncMode; @Builder ConversionConfig( - @NonNull SourceTable sourceTable, List targetTables, SyncMode syncMode) { + @NonNull SourceTable sourceTable, + List targetTables, + Map> targetCatalogs, + SyncMode syncMode) { this.sourceTable = sourceTable; this.targetTables = targetTables; + this.targetCatalogs = targetCatalogs; Preconditions.checkArgument( targetTables != null && !targetTables.isEmpty(), "Please provide at-least one format to sync"); diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java new file mode 100644 index 000000000..16785ec6f --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.Collections; +import java.util.Map; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +/** Defines the configuration for an external catalog. */ +@Value +@Builder +public class ExternalCatalogConfig implements CatalogConfig { + /** The name of the catalog, it also acts as a unique identifier for each catalog */ + @NonNull String catalogName; + + /** The implementation class path for the catalog */ + @NonNull String catalogImpl; + + /** + * The properties for each catalog, used for providing any custom behaviour during catalog sync + */ + @NonNull @Builder.Default Map catalogOptions = Collections.emptyMap(); +} diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java new file mode 100644 index 000000000..ca6cec2dc --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetCatalogConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +import org.apache.xtable.model.catalog.CatalogTableIdentifier; + +/** + * TargetCatalogConfig contains the parameters that are required when syncing {@link TargetTable} to + * a catalog. + */ +@Value +@Builder +public class TargetCatalogConfig { + /** + * The tableIdentifiers(databaseName, tableName) that will be used when syncing {@link + * TargetTable} to the catalog. + */ + @NonNull CatalogTableIdentifier catalogTableIdentifier; + + /** Configuration for the catalog. */ + @NonNull ExternalCatalogConfig catalogConfig; +} diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java index 6256da2c6..7f503b755 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -44,4 +44,8 @@ public TargetTable( this.metadataRetention = metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; } + + public String getId() { + return String.format("%s#%s", sanitizeBasePath(this.basePath), formatName); + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java new file mode 100644 index 000000000..2a4e5d2ee --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogConversionFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.reflection.ReflectionUtils; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +public class CatalogConversionFactory { + private static final CatalogConversionFactory INSTANCE = new CatalogConversionFactory(); + + public static CatalogConversionFactory getInstance() { + return INSTANCE; + } + + /** + * Returns an implementation class for {@link CatalogConversionSource} that's used for converting + * table definitions in the catalog to {@link org.apache.xtable.conversion.SourceTable} object. + * + * @param sourceCatalogConfig configuration for the source catalog + * @param configuration hadoop configuration + */ + public static CatalogConversionSource createCatalogConversionSource( + ExternalCatalogConfig sourceCatalogConfig, Configuration configuration) { + return ReflectionUtils.createInstanceOfClass( + sourceCatalogConfig.getCatalogImpl(), sourceCatalogConfig, configuration); + } + + /** + * Returns an implementation class for {@link CatalogSyncClient} that's used for syncing {@link + * org.apache.xtable.conversion.TargetTable} to a catalog. + * + * @param targetCatalogConfig configuration for the target catalog + * @param configuration hadoop configuration + */ + public CatalogSyncClient createCatalogSyncClient( + ExternalCatalogConfig targetCatalogConfig, Configuration configuration) { + return ReflectionUtils.createInstanceOfClass( + targetCatalogConfig.getCatalogImpl(), targetCatalogConfig, configuration); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java new file mode 100644 index 000000000..09bf1566e --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import java.util.Map; + +import org.apache.xtable.conversion.ExternalCatalogConfig; + +/** A factory class which returns {@link ExternalCatalogConfig} based on catalogType. */ +public class ExternalCatalogConfigFactory { + + public static ExternalCatalogConfig fromCatalogType( + String catalogType, String catalogName, Map properties) { + // TODO: Choose existing implementation based on catalogType. + String catalogImpl = ""; + return ExternalCatalogConfig.builder() + .catalogImpl(catalogImpl) + .catalogName(catalogName) + .catalogOptions(properties) + .build(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index 222652a6d..df72ff5c1 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -18,6 +18,8 @@ package org.apache.xtable.conversion; +import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable; + import java.io.IOException; import java.time.Instant; import java.util.Collection; @@ -37,15 +39,19 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.xtable.catalog.CatalogConversionFactory; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.IncrementalTableChanges; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.model.sync.SyncResult; import org.apache.xtable.spi.extractor.ConversionSource; import org.apache.xtable.spi.extractor.ExtractFromSource; +import org.apache.xtable.spi.sync.CatalogSync; +import org.apache.xtable.spi.sync.CatalogSyncClient; import org.apache.xtable.spi.sync.ConversionTarget; import org.apache.xtable.spi.sync.TableFormatSync; @@ -64,10 +70,17 @@ public class ConversionController { private final Configuration conf; private final ConversionTargetFactory conversionTargetFactory; + private final CatalogConversionFactory catalogConversionFactory; private final TableFormatSync tableFormatSync; + private final CatalogSync catalogSync; public ConversionController(Configuration conf) { - this(conf, ConversionTargetFactory.getInstance(), TableFormatSync.getInstance()); + this( + conf, + ConversionTargetFactory.getInstance(), + CatalogConversionFactory.getInstance(), + TableFormatSync.getInstance(), + CatalogSync.getInstance()); } /** @@ -89,57 +102,133 @@ public Map sync( try (ConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource source = ExtractFromSource.of(conversionSource); + return syncTableFormats(config, source, config.getSyncMode()); + } catch (IOException ioException) { + throw new ReadException("Failed to close source converter", ioException); + } + } - Map conversionTargetByFormat = - config.getTargetTables().stream() - .collect( - Collectors.toMap( - TargetTable::getFormatName, - targetTable -> conversionTargetFactory.createForFormat(targetTable, conf))); - // State for each TableFormat - Map> lastSyncMetadataByFormat = - conversionTargetByFormat.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, entry -> entry.getValue().getTableMetadata())); - Map formatsToSyncIncrementally = - getFormatsToSyncIncrementally( - config, - conversionTargetByFormat, - lastSyncMetadataByFormat, - source.getConversionSource()); - Map formatsToSyncBySnapshot = - conversionTargetByFormat.entrySet().stream() - .filter(entry -> !formatsToSyncIncrementally.containsKey(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - SyncResultForTableFormats syncResultForSnapshotSync = - formatsToSyncBySnapshot.isEmpty() - ? SyncResultForTableFormats.builder().build() - : syncSnapshot(formatsToSyncBySnapshot, source); - SyncResultForTableFormats syncResultForIncrementalSync = - formatsToSyncIncrementally.isEmpty() - ? SyncResultForTableFormats.builder().build() - : syncIncrementalChanges( - formatsToSyncIncrementally, lastSyncMetadataByFormat, source); - Map syncResultsMerged = - new HashMap<>(syncResultForIncrementalSync.getLastSyncResult()); - syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult()); - String successfulSyncs = - getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.SUCCESS); - if (!successfulSyncs.isEmpty()) { - log.info("Sync is successful for the following formats {}", successfulSyncs); - } - String failedSyncs = - getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.ERROR); - if (!failedSyncs.isEmpty()) { - log.error("Sync failed for the following formats {}", failedSyncs); + /** + * Synchronizes the source table in conversion config to multiple target catalogs. If the + * configuration for the target table uses a different table format, synchronizes the table format + * first before syncing it to target catalog + * + * @param config A per table level config containing source table, target tables, target catalogs + * and syncMode. + * @param conversionSourceProvider A provider for the {@link ConversionSource} instance for each + * tableFormat, {@link ConversionSourceProvider#init(Configuration)} must be called before + * calling this method. + * @return Returns a map containing the table format, and it's sync result. Run sync for a table * + * with the provided per table level configuration. + */ + public Map syncTableAcrossCatalogs( + ConversionConfig config, Map conversionSourceProvider) { + if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) { + throw new IllegalArgumentException("Please provide at-least one format to sync"); + } + try (ConversionSource conversionSource = + conversionSourceProvider + .get(config.getSourceTable().getFormatName()) + .getConversionSourceInstance(config.getSourceTable())) { + ExtractFromSource source = ExtractFromSource.of(conversionSource); + Map tableFormatSyncResults = + syncTableFormats(config, source, config.getSyncMode()); + Map catalogSyncResults = new HashMap<>(); + for (TargetTable targetTable : config.getTargetTables()) { + Map catalogSyncClients = + config.getTargetCatalogs().get(targetTable.getId()).stream() + .collect( + Collectors.toMap( + TargetCatalogConfig::getCatalogTableIdentifier, + targetCatalog -> + catalogConversionFactory.createCatalogSyncClient( + targetCatalog.getCatalogConfig(), conf))); + catalogSyncResults.put( + targetTable.getFormatName(), + syncCatalogsForTargetTable( + targetTable, + catalogSyncClients, + conversionSourceProvider.get(targetTable.getFormatName()))); } - return syncResultsMerged; + mergeSyncResults(tableFormatSyncResults, catalogSyncResults); + return tableFormatSyncResults; } catch (IOException ioException) { throw new ReadException("Failed to close source converter", ioException); } } + private Map syncTableFormats( + ConversionConfig config, ExtractFromSource source, SyncMode syncMode) { + Map conversionTargetByFormat = + config.getTargetTables().stream() + .filter( + targetTable -> + !targetTable.getFormatName().equals(config.getSourceTable().getFormatName())) + .collect( + Collectors.toMap( + TargetTable::getFormatName, + targetTable -> conversionTargetFactory.createForFormat(targetTable, conf))); + + Map> lastSyncMetadataByFormat = + conversionTargetByFormat.entrySet().stream() + .collect( + Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getTableMetadata())); + Map formatsToSyncIncrementally = + getFormatsToSyncIncrementally( + syncMode, + conversionTargetByFormat, + lastSyncMetadataByFormat, + source.getConversionSource()); + Map formatsToSyncBySnapshot = + conversionTargetByFormat.entrySet().stream() + .filter(entry -> !formatsToSyncIncrementally.containsKey(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + SyncResultForTableFormats syncResultForSnapshotSync = + formatsToSyncBySnapshot.isEmpty() + ? SyncResultForTableFormats.builder().build() + : syncSnapshot(formatsToSyncBySnapshot, source); + SyncResultForTableFormats syncResultForIncrementalSync = + formatsToSyncIncrementally.isEmpty() + ? SyncResultForTableFormats.builder().build() + : syncIncrementalChanges(formatsToSyncIncrementally, lastSyncMetadataByFormat, source); + Map syncResultsMerged = + new HashMap<>(syncResultForIncrementalSync.getLastSyncResult()); + syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult()); + String successfulSyncs = + getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.SUCCESS); + if (!successfulSyncs.isEmpty()) { + log.info("Sync is successful for the following formats {}", successfulSyncs); + } + String failedSyncs = + getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.ERROR); + if (!failedSyncs.isEmpty()) { + log.error("Sync failed for the following formats {}", failedSyncs); + } + return syncResultsMerged; + } + + /** + * Synchronizes the target table to multiple target catalogs. + * + * @param targetTable target table that needs to synced. + * @param catalogSyncClients Collection of catalog sync clients along with their table identifiers + * for each target catalog. + * @param conversionSourceProvider A provider for the {@link ConversionSource} instance for the + * table format of targetTable. + */ + private SyncResult syncCatalogsForTargetTable( + TargetTable targetTable, + Map catalogSyncClients, + ConversionSourceProvider conversionSourceProvider) { + return catalogSync.syncTable( + catalogSyncClients, + // We get the latest state of InternalTable for TargetTable + // and then synchronize it to catalogSyncClients. + conversionSourceProvider + .getConversionSourceInstance(convertToSourceTable(targetTable)) + .getCurrentTable()); + } + private static String getFormatsWithStatusCode( Map syncResultsMerged, SyncResult.SyncStatusCode statusCode) { return syncResultsMerged.entrySet().stream() @@ -149,11 +238,11 @@ private static String getFormatsWithStatusCode( } private Map getFormatsToSyncIncrementally( - ConversionConfig conversionConfig, + SyncMode syncMode, Map conversionTargetByFormat, Map> lastSyncMetadataByFormat, ConversionSource conversionSource) { - if (conversionConfig.getSyncMode() == SyncMode.FULL) { + if (syncMode == SyncMode.FULL) { // Full sync requested by config, hence no incremental sync. return Collections.emptyMap(); } @@ -268,6 +357,22 @@ private InstantsForIncrementalSync getMostOutOfSyncCommitAndPendingCommits( .build(); } + private void mergeSyncResults( + Map syncResultsMerged, Map catalogSyncResults) { + catalogSyncResults.forEach( + (tableFormat, catalogSyncResult) -> { + syncResultsMerged.computeIfPresent( + tableFormat, + (k, syncResult) -> + syncResult.toBuilder() + .syncDuration( + syncResult.getSyncDuration().plus(catalogSyncResult.getSyncDuration())) + .catalogSyncStatusList(catalogSyncResult.getCatalogSyncStatusList()) + .build()); + syncResultsMerged.computeIfAbsent(tableFormat, k -> catalogSyncResult); + }); + } + @Value @Builder private static class SyncResultForTableFormats { diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java new file mode 100644 index 000000000..f21be6702 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +public class ConversionUtils { + + public static SourceTable convertToSourceTable(TargetTable table) { + return new SourceTable( + table.getName(), + table.getFormatName(), + table.getBasePath(), + table.getBasePath(), + table.getNamespace(), + table.getCatalogConfig(), + table.getAdditionalProperties()); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java index d5d7a3c50..b678bf004 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergCatalogConfig.java @@ -27,10 +27,16 @@ import org.apache.xtable.conversion.CatalogConfig; +/** + * Iceberg requires a catalog to perform any operation, if no catalog is provided the default + * catalog (HadoopCatalog or storage based catalog) is used. For syncing iceberg to multiple + * catalogs, you can use {@link org.apache.xtable.catalog.ExternalCatalogConfig} instead which + * allows syncing the latest version of iceberg metadata to multiple catalogs. + */ @Value @Builder public class IcebergCatalogConfig implements CatalogConfig { - @NonNull String catalogImpl; @NonNull String catalogName; + @NonNull String catalogImpl; @NonNull @Builder.Default Map catalogOptions = Collections.emptyMap(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index ee5b1ccdd..a458070b3 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -134,15 +134,7 @@ public void upsertRows(List upsertRows) { } @SneakyThrows - @Override public void deleteRows(List deleteRows) { - String idsToDelete = - deleteRows.stream().map(row -> row.get(0).toString()).collect(Collectors.joining(", ")); - deltaTable.delete("id in (" + idsToDelete + ")"); - } - - @SneakyThrows - public void mergeDeleteRows(List deleteRows) { List deletes = testDeltaHelper.transformForUpsertsOrDeletes(deleteRows, false); Dataset deleteDataset = sparkSession.createDataFrame(deletes, testDeltaHelper.getTableStructSchema()); diff --git a/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java new file mode 100644 index 000000000..5fd6523f4 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/catalog/TestCatalogConversionFactory.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetCatalogConfig; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +class TestCatalogConversionFactory { + + @Test + void createSourceForConfig() { + ExternalCatalogConfig sourceCatalog = + ExternalCatalogConfig.builder() + .catalogName("catalogName") + .catalogImpl(TestCatalogImpl.class.getName()) + .catalogOptions(Collections.emptyMap()) + .build(); + CatalogConversionSource catalogConversionSource = + CatalogConversionFactory.createCatalogConversionSource(sourceCatalog, new Configuration()); + assertEquals(catalogConversionSource.getClass().getName(), TestCatalogImpl.class.getName()); + } + + @Test + void createForCatalog() { + TargetCatalogConfig targetCatalogConfig = + TargetCatalogConfig.builder() + .catalogConfig( + ExternalCatalogConfig.builder() + .catalogName("catalogName") + .catalogImpl(TestCatalogImpl.class.getName()) + .catalogOptions(Collections.emptyMap()) + .build()) + .catalogTableIdentifier( + CatalogTableIdentifier.builder() + .databaseName("target-database") + .tableName("target-tableName") + .build()) + .build(); + CatalogSyncClient catalogSyncClient = + CatalogConversionFactory.getInstance() + .createCatalogSyncClient(targetCatalogConfig.getCatalogConfig(), new Configuration()); + assertEquals(catalogSyncClient.getClass().getName(), TestCatalogImpl.class.getName()); + } + + public static class TestCatalogImpl + implements CatalogSyncClient, CatalogConversionSource { + + public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + return null; + } + + @Override + public String getCatalogName() { + return null; + } + + @Override + public String getStorageDescriptorLocation(Object o) { + return null; + } + + @Override + public boolean hasDatabase(String databaseName) { + return false; + } + + @Override + public void createDatabase(String databaseName) {} + + @Override + public Object getTable(CatalogTableIdentifier tableIdentifier) { + return null; + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void refreshTable( + InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void close() throws Exception {} + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index caba80468..f7635d91d 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -18,8 +18,12 @@ package org.apache.xtable.conversion; +import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable; +import static org.apache.xtable.model.storage.TableFormat.DELTA; import static org.apache.xtable.model.storage.TableFormat.HUDI; +import static org.apache.xtable.model.storage.TableFormat.ICEBERG; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -43,17 +47,23 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatcher; +import com.google.common.collect.ImmutableMap; + +import org.apache.xtable.catalog.CatalogConversionFactory; import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.IncrementalTableChanges; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.model.sync.SyncResult; import org.apache.xtable.spi.extractor.ConversionSource; +import org.apache.xtable.spi.sync.CatalogSync; +import org.apache.xtable.spi.sync.CatalogSyncClient; import org.apache.xtable.spi.sync.ConversionTarget; import org.apache.xtable.spi.sync.TableFormatSync; @@ -62,12 +72,22 @@ public class TestConversionController { private final Configuration mockConf = mock(Configuration.class); private final ConversionSourceProvider mockConversionSourceProvider = mock(ConversionSourceProvider.class); + private final ConversionSourceProvider mockConversionSourceProvider2 = + mock(ConversionSourceProvider.class); + private final ConversionSourceProvider mockConversionSourceProvider3 = + mock(ConversionSourceProvider.class); + private final ConversionSource mockConversionSource = mock(ConversionSource.class); private final ConversionTargetFactory mockConversionTargetFactory = mock(ConversionTargetFactory.class); + private final CatalogConversionFactory mockCatalogConversionFactory = + mock(CatalogConversionFactory.class); private final TableFormatSync tableFormatSync = mock(TableFormatSync.class); + private final CatalogSync catalogSync = mock(CatalogSync.class); private final ConversionTarget mockConversionTarget1 = mock(ConversionTarget.class); private final ConversionTarget mockConversionTarget2 = mock(ConversionTarget.class); + private final CatalogSyncClient mockCatalogSyncClient1 = mock(CatalogSyncClient.class); + private final CatalogSyncClient mockCatalogSyncClient2 = mock(CatalogSyncClient.class); @Test void testAllSnapshotSyncAsPerConfig() { @@ -96,7 +116,12 @@ void testAllSnapshotSyncAsPerConfig() { eq(internalSnapshot))) .thenReturn(perTableResults); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(perTableResults, result); @@ -182,7 +207,12 @@ void testAllIncrementalSyncAsPerConfigAndNoFallbackNecessary() { expectedSyncResult.put(TableFormat.ICEBERG, getLastSyncResult(icebergSyncResults)); expectedSyncResult.put(TableFormat.DELTA, getLastSyncResult(deltaSyncResults)); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); @@ -226,7 +256,12 @@ void testIncrementalSyncFallBackToSnapshotForAllFormats() { eq(internalSnapshot))) .thenReturn(syncResults); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(syncResults, result); @@ -310,7 +345,12 @@ void testIncrementalSyncFallbackToSnapshotForOnlySingleFormat() { expectedSyncResult.put(TableFormat.ICEBERG, syncResult); expectedSyncResult.put(TableFormat.DELTA, getLastSyncResult(deltaSyncResults)); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); @@ -368,16 +408,101 @@ void incrementalSyncWithNoPendingInstantsForAllFormats() { // Iceberg and Delta have no commits to sync Map expectedSyncResult = Collections.emptyMap(); ConversionController conversionController = - new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); Map result = conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); } + @Test + void testNoTableFormatConversionWithMultipleCatalogSync() { + SyncMode syncMode = SyncMode.INCREMENTAL; + List targetCatalogs = + Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2")); + InternalTable internalTable = getInternalTable(); + InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1"); + // Conversion source and target mocks. + ConversionConfig conversionConfig = + getTableSyncConfig( + Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode, targetCatalogs); + when(mockConversionSourceProvider.getConversionSourceInstance( + conversionConfig.getSourceTable())) + .thenReturn(mockConversionSource); + when(mockConversionSourceProvider.getConversionSourceInstance( + convertToSourceTable(conversionConfig.getTargetTables().get(0)))) + .thenReturn(mockConversionSource); + when(mockConversionSourceProvider.getConversionSourceInstance( + convertToSourceTable(conversionConfig.getTargetTables().get(1)))) + .thenReturn(mockConversionSource); + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(0), mockConf)) + .thenReturn(mockConversionTarget1); + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(1), mockConf)) + .thenReturn(mockConversionTarget2); + when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot); + when(mockConversionSource.getCurrentTable()).thenReturn(getInternalTable()); + // Mocks for tableFormatSync. + Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1)); + SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour, Duration.ofSeconds(1)); + Map tableFormatSyncResults = + buildPerTableResult(Arrays.asList(ICEBERG, DELTA), syncResult); + when(tableFormatSync.syncSnapshot( + argThat(containsAll(Arrays.asList(mockConversionTarget1, mockConversionTarget2))), + eq(internalSnapshot))) + .thenReturn(tableFormatSyncResults); + // Mocks for catalogSync. + when(mockCatalogConversionFactory.createCatalogSyncClient( + targetCatalogs.get(0).getCatalogConfig(), mockConf)) + .thenReturn(mockCatalogSyncClient1); + when(mockCatalogConversionFactory.createCatalogSyncClient( + targetCatalogs.get(1).getCatalogConfig(), mockConf)) + .thenReturn(mockCatalogSyncClient2); + when(catalogSync.syncTable( + eq( + ImmutableMap.of( + targetCatalogs.get(0).getCatalogTableIdentifier(), mockCatalogSyncClient1, + targetCatalogs.get(1).getCatalogTableIdentifier(), mockCatalogSyncClient2)), + any())) + .thenReturn(buildSyncResult(syncMode, Instant.now(), Duration.ofSeconds(3))); + ConversionController conversionController = + new ConversionController( + mockConf, + mockConversionTargetFactory, + mockCatalogConversionFactory, + tableFormatSync, + catalogSync); + // Mocks for conversionSourceProviders. + Map conversionSourceProviders = new HashMap<>(); + conversionSourceProviders.put(HUDI, mockConversionSourceProvider); + conversionSourceProviders.put(ICEBERG, mockConversionSourceProvider); + conversionSourceProviders.put(DELTA, mockConversionSourceProvider); + // Assert results. + Map mergedSyncResults = + buildPerTableResult( + Arrays.asList(ICEBERG, DELTA), + syncResult.toBuilder().syncDuration(Duration.ofSeconds(4)).build()); + Map result = + conversionController.syncTableAcrossCatalogs(conversionConfig, conversionSourceProviders); + assertEquals(mergedSyncResults, result); + } + private SyncResult getLastSyncResult(List syncResults) { return syncResults.get(syncResults.size() - 1); } + private Map buildPerTableResult( + List tableFormats, SyncResult syncResult) { + Map perTableResults = new HashMap<>(); + tableFormats.forEach(tableFormat -> perTableResults.put(tableFormat, syncResult)); + return perTableResults; + } + private List buildSyncResults(List instantList) { return instantList.stream() .map(instant -> buildSyncResult(SyncMode.INCREMENTAL, instant)) @@ -396,6 +521,17 @@ private SyncResult buildSyncResult(SyncMode syncMode, Instant lastSyncedInstant) .build(); } + private SyncResult buildSyncResult( + SyncMode syncMode, Instant lastSyncedInstant, Duration duration) { + return SyncResult.builder() + .mode(syncMode) + .lastInstantSynced(lastSyncedInstant) + .syncStartTime(Instant.now().minusSeconds(duration.getSeconds())) + .syncDuration(duration) + .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS) + .build(); + } + private InternalSnapshot buildSnapshot(InternalTable internalTable, String version) { return InternalSnapshot.builder().table(internalTable).version(version).build(); } @@ -413,6 +549,13 @@ private Instant getInstantAtLastNMinutes(Instant currentInstant, int n) { } private ConversionConfig getTableSyncConfig(List targetTableFormats, SyncMode syncMode) { + return getTableSyncConfig(targetTableFormats, syncMode, Collections.emptyList()); + } + + private ConversionConfig getTableSyncConfig( + List targetTableFormats, + SyncMode syncMode, + List targetCatalogs) { SourceTable sourceTable = SourceTable.builder() .name("tablename") @@ -434,10 +577,29 @@ private ConversionConfig getTableSyncConfig(List targetTableFormats, Syn return ConversionConfig.builder() .sourceTable(sourceTable) .targetTables(targetTables) + .targetCatalogs( + targetTables.stream() + .collect(Collectors.toMap(TargetTable::getId, k -> targetCatalogs))) .syncMode(syncMode) .build(); } + private TargetCatalogConfig getTargetCatalog(String suffix) { + return TargetCatalogConfig.builder() + .catalogConfig( + ExternalCatalogConfig.builder() + .catalogName("catalogName-" + suffix) + .catalogImpl("catalogImpl-" + suffix) + .catalogOptions(Collections.emptyMap()) + .build()) + .catalogTableIdentifier( + CatalogTableIdentifier.builder() + .databaseName("target-database" + suffix) + .tableName("target-tableName" + suffix) + .build()) + .build(); + } + private static ArgumentMatcher> containsAll(Collection expected) { return actual -> actual.size() == expected.size() && actual.containsAll(expected); } diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java new file mode 100644 index 000000000..f69d75f54 --- /dev/null +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.utilities; + +import static org.apache.xtable.utilities.RunSync.getCustomConfigurations; +import static org.apache.xtable.utilities.RunSync.loadHadoopConf; +import static org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.Data; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import org.apache.xtable.catalog.CatalogConversionFactory; +import org.apache.xtable.catalog.ExternalCatalogConfigFactory; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetCatalogConfig; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.reflection.ReflectionUtils; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier; + +/** + * Provides standalone process for reading tables from a source catalog and synchronizing their + * state in target tables, supports table format conversion as well if the target table chooses a + * different format from source table. + */ +@Log4j2 +public class RunCatalogSync { + public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory()); + private static final String CATALOG_SOURCE_AND_TARGET_CONFIG_PATH = "catalogConfig"; + private static final String HADOOP_CONFIG_PATH = "hadoopConfig"; + private static final String CONVERTERS_CONFIG_PATH = "convertersConfig"; + private static final String HELP_OPTION = "h"; + private static final Map CONVERSION_SOURCE_PROVIDERS = + new HashMap<>(); + + private static final Options OPTIONS = + new Options() + .addRequiredOption( + CATALOG_SOURCE_AND_TARGET_CONFIG_PATH, + "catalogSyncConfig", + true, + "The path to a yaml file containing source and target tables catalog configurations along with the table identifiers that need to synced") + .addOption( + HADOOP_CONFIG_PATH, + "hadoopConfig", + true, + "Hadoop config xml file path containing configs necessary to access the " + + "file system. These configs will override the default configs.") + .addOption( + CONVERTERS_CONFIG_PATH, + "convertersConfig", + true, + "The path to a yaml file containing InternalTable converter configurations. " + + "These configs will override the default") + .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); + + public static void main(String[] args) throws Exception { + CommandLineParser parser = new DefaultParser(); + CommandLine cmd; + try { + cmd = parser.parse(OPTIONS, args); + } catch (ParseException e) { + new HelpFormatter().printHelp("xtable.jar", OPTIONS, true); + return; + } + + if (cmd.hasOption(HELP_OPTION)) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("RunCatalogSync", OPTIONS); + return; + } + + DatasetConfig datasetConfig = new DatasetConfig(); + try (InputStream inputStream = + Files.newInputStream( + Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) { + ObjectReader objectReader = YAML_MAPPER.readerForUpdating(datasetConfig); + objectReader.readValue(inputStream); + } + + byte[] customConfig = getCustomConfigurations(cmd, HADOOP_CONFIG_PATH); + Configuration hadoopConf = loadHadoopConf(customConfig); + + customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH); + RunSync.TableFormatConverters tableFormatConverters = + loadTableFormatConversionConfigs(customConfig); + + Map catalogsByName = + datasetConfig.getTargetCatalogs().stream() + .collect(Collectors.toMap(DatasetConfig.Catalog::getCatalogName, Function.identity())); + ExternalCatalogConfig sourceCatalogConfig = getCatalogConfig(datasetConfig.getSourceCatalog()); + CatalogConversionSource catalogConversionSource = + CatalogConversionFactory.createCatalogConversionSource(sourceCatalogConfig, hadoopConf); + ConversionController conversionController = new ConversionController(hadoopConf); + for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) { + SourceTable sourceTable = null; + if (dataset.getSourceCatalogTableIdentifier().getStorageIdentifier() != null) { + StorageIdentifier storageIdentifier = + dataset.getSourceCatalogTableIdentifier().getStorageIdentifier(); + sourceTable = + SourceTable.builder() + .name(storageIdentifier.getTableName()) + .basePath(storageIdentifier.getTableBasePath()) + .namespace( + storageIdentifier.getNamespace() == null + ? null + : storageIdentifier.getNamespace().split("\\.")) + .dataPath(storageIdentifier.getTableDataPath()) + .formatName(storageIdentifier.getTableFormat()) + .build(); + } else { + sourceTable = + catalogConversionSource.getSourceTable( + dataset.getSourceCatalogTableIdentifier().getCatalogTableIdentifier()); + } + List targetTables = new ArrayList<>(); + Map> targetCatalogs = new HashMap<>(); + for (TargetTableIdentifier targetCatalogTableIdentifier : + dataset.getTargetCatalogTableIdentifiers()) { + TargetTable targetTable = + TargetTable.builder() + .name(sourceTable.getName()) + .basePath(sourceTable.getBasePath()) + .namespace(sourceTable.getNamespace()) + .formatName(targetCatalogTableIdentifier.getTableFormat()) + .build(); + targetTables.add(targetTable); + if (!targetCatalogs.containsKey(targetTable.getId())) { + targetCatalogs.put(targetTable.getId(), new ArrayList<>()); + } + targetCatalogs + .get(targetTable.getId()) + .add( + TargetCatalogConfig.builder() + .catalogTableIdentifier( + targetCatalogTableIdentifier.getCatalogTableIdentifier()) + .catalogConfig( + getCatalogConfig( + catalogsByName.get(targetCatalogTableIdentifier.getCatalogName()))) + .build()); + } + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .targetCatalogs(targetCatalogs) + .syncMode(SyncMode.INCREMENTAL) + .build(); + List tableFormats = + new ArrayList<>(Collections.singleton(sourceTable.getFormatName())); + tableFormats.addAll( + targetTables.stream().map(TargetTable::getFormatName).collect(Collectors.toList())); + tableFormats = tableFormats.stream().distinct().collect(Collectors.toList()); + try { + conversionController.syncTableAcrossCatalogs( + conversionConfig, + getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf)); + } catch (Exception e) { + log.error(String.format("Error running sync for %s", sourceTable.getBasePath()), e); + } + } + } + + static ExternalCatalogConfig getCatalogConfig(DatasetConfig.Catalog catalog) { + if (!StringUtils.isEmpty(catalog.getCatalogType())) { + return ExternalCatalogConfigFactory.fromCatalogType( + catalog.getCatalogType(), catalog.getCatalogName(), catalog.getCatalogProperties()); + } else { + return ExternalCatalogConfig.builder() + .catalogName(catalog.getCatalogName()) + .catalogImpl(catalog.getCatalogImpl()) + .catalogOptions(catalog.getCatalogProperties()) + .build(); + } + } + + static Map getConversionSourceProviders( + List tableFormats, + RunSync.TableFormatConverters tableFormatConverters, + Configuration hadoopConf) { + for (String tableFormat : tableFormats) { + if (CONVERSION_SOURCE_PROVIDERS.containsKey(tableFormat)) { + continue; + } + RunSync.TableFormatConverters.ConversionConfig sourceConversionConfig = + tableFormatConverters.getTableFormatConverters().get(tableFormat); + if (sourceConversionConfig == null) { + throw new IllegalArgumentException( + String.format( + "Source format %s is not supported. Known source and target formats are %s", + tableFormat, tableFormatConverters.getTableFormatConverters().keySet())); + } + String sourceProviderClass = sourceConversionConfig.conversionSourceProviderClass; + ConversionSourceProvider conversionSourceProvider = + ReflectionUtils.createInstanceOfClass(sourceProviderClass); + conversionSourceProvider.init(hadoopConf); + CONVERSION_SOURCE_PROVIDERS.put(tableFormat, conversionSourceProvider); + } + return CONVERSION_SOURCE_PROVIDERS; + } + + @Data + public static class DatasetConfig { + private Catalog sourceCatalog; + private List targetCatalogs; + private List datasets; + + @Data + public static class Catalog { + private String catalogName; + private String catalogType; + private String catalogImpl; + private Map catalogProperties; + } + + @Data + public static class StorageIdentifier { + String tableFormat; + String tableBasePath; + String tableDataPath; + String tableName; + String partitionSpec; + String namespace; + } + + @Data + public static class SourceTableIdentifier { + CatalogTableIdentifier catalogTableIdentifier; + StorageIdentifier storageIdentifier; + } + + @Data + public static class TargetTableIdentifier { + String catalogName; + String tableFormat; + CatalogTableIdentifier catalogTableIdentifier; + } + + @Data + public static class Dataset { + private SourceTableIdentifier sourceCatalogTableIdentifier; + private List targetCatalogTableIdentifiers; + } + } +} diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index c84753de5..9475b8296 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -195,7 +195,7 @@ public static void main(String[] args) throws IOException { } } - private static byte[] getCustomConfigurations(CommandLine cmd, String option) throws IOException { + static byte[] getCustomConfigurations(CommandLine cmd, String option) throws IOException { byte[] customConfig = null; if (cmd.hasOption(option)) { customConfig = Files.readAllBytes(Paths.get(cmd.getOptionValue(option))); diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java new file mode 100644 index 000000000..7b77214f3 --- /dev/null +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.utilities; + +import static org.junit.jupiter.api.Assertions.*; + +import lombok.SneakyThrows; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +class TestRunCatalogSync { + + @SneakyThrows + @Test + void testMain() { + String catalogConfigYamlPath = + TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath(); + String[] args = {"-catalogConfig", catalogConfigYamlPath}; + // Ensure yaml gets parsed and no op-sync implemented in TestCatalogImpl is called. + assertDoesNotThrow(() -> RunCatalogSync.main(args)); + } + + public static class TestCatalogImpl implements CatalogConversionSource, CatalogSyncClient { + + public TestCatalogImpl(ExternalCatalogConfig catalogConfig, Configuration hadoopConf) {} + + @Override + public SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier) { + return SourceTable.builder() + .name("source_table_name") + .basePath("file://base_path/v1/") + .formatName("ICEBERG") + .build(); + } + + @Override + public String getCatalogName() { + return null; + } + + @Override + public String getStorageDescriptorLocation(Object o) { + return null; + } + + @Override + public boolean hasDatabase(String databaseName) { + return false; + } + + @Override + public void createDatabase(String databaseName) {} + + @Override + public Object getTable(CatalogTableIdentifier tableIdentifier) { + return null; + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void refreshTable( + InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) {} + + @Override + public void close() throws Exception {} + } +} diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml b/xtable-utilities/src/test/resources/catalogConfig.yaml new file mode 100644 index 000000000..6a4df0f04 --- /dev/null +++ b/xtable-utilities/src/test/resources/catalogConfig.yaml @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +sourceCatalog: + catalogName: "source-1" + catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogProperties: + key01: "value1" + key02: "value2" + key03: "value3" +targetCatalogs: + - catalogName: "target-1" + catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogProperties: + key11: "value1" + key12: "value2" + key13: "value3" + - catalogName: "target-2" + catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogProperties: + key21: "value1" + key22: "value2" + key23: "value3" + - catalogName: "target-3" + catalogImpl: "org.apache.xtable.utilities.TestRunCatalogSync$TestCatalogImpl" + catalogProperties: + key31: "value1" + key32: "value2" + key33: "value3" +datasets: + - sourceCatalogTableIdentifier: + catalogTableIdentifier: + databaseName: "source-database-1" + tableName: "source-1" + targetCatalogTableIdentifiers: + - catalogName: "target-1" + tableFormat: "DELTA" + catalogTableIdentifier: + databaseName: "target-database-1" + tableName: "target-tableName-1" + - catalogName: "target-2" + tableFormat: "HUDI" + catalogTableIdentifier: + databaseName: "target-database-2" + tableName: "target-tableName-2-delta" + - sourceCatalogTableIdentifier: + storageIdentifier: + tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales + tableName: catalog_sales + partitionSpec: cs_sold_date_sk:VALUE + tableFormat: "HUDI" + targetCatalogTableIdentifiers: + - catalogName: "target-2" + tableFormat: "ICEBERG" + catalogTableIdentifier: + databaseName: "target-database-2" + tableName: "target-tableName-2" + - catalogName: "target-3" + tableFormat: "HUDI" + catalogTableIdentifier: + databaseName: "target-database-3" + tableName: "target-tableName-3"