diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java similarity index 98% rename from seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java index 85400636989..4cdbfe27e40 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginIdentifier.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/PluginIdentifier.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.plugin.discovery; +package org.apache.seatunnel.api.common; import org.apache.commons.lang3.StringUtils; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index e11afd1d19e..30e7b008643 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.configuration.util.OptionRule; @@ -36,11 +37,13 @@ import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import scala.Tuple2; import java.io.Serializable; @@ -51,12 +54,17 @@ import java.util.Optional; import java.util.ServiceConfigurationError; import java.util.ServiceLoader; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; + /** * Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and {@link * CatalogFactory}. */ +@Slf4j public final class FactoryUtil { private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); @@ -65,8 +73,13 @@ public final class FactoryUtil { public static Tuple2, List> createAndPrepareSource( - ReadonlyConfig options, ClassLoader classLoader, String factoryIdentifier) { - return restoreAndPrepareSource(options, classLoader, factoryIdentifier, null); + ReadonlyConfig options, + ClassLoader classLoader, + String factoryIdentifier, + Function fallbackCreateSource, + TableSourceFactory factory) { + return restoreAndPrepareSource( + options, classLoader, factoryIdentifier, null, fallbackCreateSource, factory); } public static @@ -74,22 +87,46 @@ Tuple2, List> restoreAndPrepare ReadonlyConfig options, ClassLoader classLoader, String factoryIdentifier, - ChangeStreamTableSourceCheckpoint checkpoint) { + ChangeStreamTableSourceCheckpoint checkpoint, + Function fallbackCreateSource, + TableSourceFactory factory) { try { - final TableSourceFactory factory = - discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier); + SeaTunnelSource source; - if (factory instanceof ChangeStreamTableSourceFactory && checkpoint != null) { - ChangeStreamTableSourceFactory changeStreamTableSourceFactory = - (ChangeStreamTableSourceFactory) factory; - ChangeStreamTableSourceState state = - changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint); + final String factoryId = options.get(PLUGIN_NAME); + + boolean fallback = + isFallback( + classLoader, + TableSourceFactory.class, + factoryId, + (sourceFactory) -> sourceFactory.createSource(null)); + + if (fallback) { source = - restoreAndPrepareSource( - changeStreamTableSourceFactory, options, classLoader, state); + fallbackCreateSource.apply( + PluginIdentifier.of("seatunnel", "source", factoryId)); + source.prepare(options.toConfig()); + } else { - source = createAndPrepareSource(factory, options, classLoader); + if (factory == null) { + factory = + discoverFactory( + classLoader, TableSourceFactory.class, factoryIdentifier); + } + + if (factory instanceof ChangeStreamTableSourceFactory && checkpoint != null) { + ChangeStreamTableSourceFactory changeStreamTableSourceFactory = + (ChangeStreamTableSourceFactory) factory; + ChangeStreamTableSourceState state = + changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint); + source = + restoreAndPrepareSource( + changeStreamTableSourceFactory, options, classLoader, state); + } else { + source = createAndPrepareSource(factory, options, classLoader); + } } List catalogTables; try { @@ -115,6 +152,7 @@ Tuple2, List> restoreAndPrepare catalogTables.add(catalogTable); } return new Tuple2<>(source, catalogTables); + } catch (Throwable t) { throw new FactoryException( String.format( @@ -150,17 +188,42 @@ SeaTunnelSink createAndPrepareSi CatalogTable catalogTable, ReadonlyConfig config, ClassLoader classLoader, - String factoryIdentifier) { + String factoryIdentifier, + Function fallbackCreateSink, + TableSinkFactory + tableSinkFactory) { try { - TableSinkFactory factory = - discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier); + final String factoryId = config.get(PLUGIN_NAME); + + boolean fallback = + isFallback( + classLoader, + TableSinkFactory.class, + factoryId, + (sinkFactory) -> sinkFactory.createSink(null)); + + if (fallback) { + SeaTunnelSink sink = + fallbackCreateSink.apply( + PluginIdentifier.of("seatunnel", "sink", factoryId)); + sink.prepare(config.toConfig()); + sink.setTypeInfo(catalogTable.getSeaTunnelRowType()); + + return sink; + } + + if (tableSinkFactory == null) { + tableSinkFactory = + discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier); + } + TableSinkFactoryContext context = TableSinkFactoryContext.replacePlaceholderAndCreate( catalogTable, config, classLoader, - factory.excludeTablePlaceholderReplaceKeys()); - ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); + tableSinkFactory.excludeTablePlaceholderReplaceKeys()); + ConfigValidator.of(context.getOptions()).validate(tableSinkFactory.optionRule()); LOG.info( "Create sink '{}' with upstream input catalog-table[database: {}, schema: {}, table: {}]", @@ -168,7 +231,7 @@ SeaTunnelSink createAndPrepareSi catalogTable.getTablePath().getDatabaseName(), catalogTable.getTablePath().getSchemaName(), catalogTable.getTablePath().getTableName()); - return factory.createSink(context).createSink(); + return tableSinkFactory.createSink(context).createSink(); } catch (Throwable t) { throw new FactoryException( String.format( @@ -351,4 +414,26 @@ public static SeaTunnelTransform createAndPrepareMultiTableTransform( ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); return factory.createTransform(context).createTransform(); } + + private static boolean isFallback( + ClassLoader classLoader, + Class factoryClass, + String factoryId, + Consumer virtualCreator) { + Optional factory = discoverOptionalFactory(classLoader, factoryClass, factoryId); + if (!factory.isPresent()) { + return true; + } + try { + virtualCreator.accept(factory.get()); + } catch (Exception e) { + if (e instanceof UnsupportedOperationException + && "The Factory has not been implemented and the deprecated Plugin will be used." + .equals(e.getMessage())) { + return true; + } + log.debug(ExceptionUtils.getMessage(e)); + } + return false; + } } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java index a8a245dd9bb..b2b47854e30 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java @@ -20,40 +20,21 @@ import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.configuration.util.ConfigValidator; -import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryException; -import org.apache.seatunnel.api.table.factory.FactoryUtil; -import org.apache.seatunnel.api.table.factory.TableSinkFactory; -import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; -import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.core.starter.enums.PluginType; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import java.net.URL; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; -import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID; /** The util used for Spark/Flink to create to SeaTunnelSource etc. */ @SuppressWarnings("rawtypes") @@ -61,75 +42,6 @@ public class PluginUtil { protected static final String ENGINE_TYPE = "seatunnel"; - public static SourceTableInfo createSource( - SeaTunnelFactoryDiscovery factoryDiscovery, - SeaTunnelSourcePluginDiscovery sourcePluginDiscovery, - PluginIdentifier pluginIdentifier, - Config pluginConfig, - JobContext jobContext) { - // get current thread classloader - ClassLoader classLoader = - Thread.currentThread() - .getContextClassLoader(); // try to find factory of this plugin - - final ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig); - // try to find table source factory - final Optional sourceFactory = - factoryDiscovery.createOptionalPluginInstance(pluginIdentifier); - final boolean fallback = isFallback(sourceFactory); - SeaTunnelSource source; - if (fallback) { - source = fallbackCreate(sourcePluginDiscovery, pluginIdentifier, pluginConfig); - } else { - // create source with source factory - TableSourceFactoryContext context = - new TableSourceFactoryContext(readonlyConfig, classLoader); - ConfigValidator.of(context.getOptions()).validate(sourceFactory.get().optionRule()); - TableSource tableSource = - ((TableSourceFactory) sourceFactory.get()).createSource(context); - source = tableSource.createSource(); - } - source.setJobContext(jobContext); - ensureJobModeMatch(jobContext, source); - List catalogTables; - try { - catalogTables = source.getProducedCatalogTables(); - } catch (UnsupportedOperationException e) { - // TODO remove it when all connector use `getProducedCatalogTables` - SeaTunnelDataType seaTunnelDataType = source.getProducedType(); - final String tableId = - readonlyConfig.getOptional(CommonOptions.PLUGIN_OUTPUT).orElse(DEFAULT_ID); - catalogTables = - CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId); - } - return new SourceTableInfo(source, catalogTables); - } - - private static boolean isFallback(Optional factory) { - if (!factory.isPresent()) { - return true; - } - try { - ((TableSourceFactory) factory.get()).createSource(null); - } catch (Exception e) { - if (e instanceof UnsupportedOperationException - && "The Factory has not been implemented and the deprecated Plugin will be used." - .equals(e.getMessage())) { - return true; - } - } - return false; - } - - private static SeaTunnelSource fallbackCreate( - SeaTunnelSourcePluginDiscovery sourcePluginDiscovery, - PluginIdentifier pluginIdentifier, - Config pluginConfig) { - SeaTunnelSource source = sourcePluginDiscovery.createPluginInstance(pluginIdentifier); - source.prepare(pluginConfig); - return source; - } - public static Optional createTransformFactory( SeaTunnelFactoryDiscovery factoryDiscovery, SeaTunnelTransformPluginDiscovery transformPluginDiscovery, @@ -163,87 +75,6 @@ public static Optional createSinkFactory( } } - public static SeaTunnelSink createSink( - Optional factory, - Config sinkConfig, - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - JobContext jobContext, - List catalogTables, - ClassLoader classLoader) { - boolean fallBack = !factory.isPresent() || isFallback(factory.get()); - if (fallBack) { - SeaTunnelSink sink = - fallbackCreateSink( - sinkPluginDiscovery, - PluginIdentifier.of( - ENGINE_TYPE, - PluginType.SINK.getType(), - sinkConfig.getString(PLUGIN_NAME.key())), - sinkConfig); - sink.setJobContext(jobContext); - sink.setTypeInfo(catalogTables.get(0).getSeaTunnelRowType()); - return sink; - } else { - if (catalogTables.size() > 1) { - Map sinks = new HashMap<>(); - ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig); - catalogTables.forEach( - catalogTable -> { - TableSinkFactoryContext context = - TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, - ReadonlyConfig.fromConfig(sinkConfig), - classLoader, - ((TableSinkFactory) factory.get()) - .excludeTablePlaceholderReplaceKeys()); - ConfigValidator.of(context.getOptions()) - .validate(factory.get().optionRule()); - SeaTunnelSink action = - ((TableSinkFactory) factory.get()) - .createSink(context) - .createSink(); - action.setJobContext(jobContext); - sinks.put(catalogTable.getTablePath(), action); - }); - return FactoryUtil.createMultiTableSink(sinks, readonlyConfig, classLoader); - } - TableSinkFactoryContext context = - TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTables.get(0), - ReadonlyConfig.fromConfig(sinkConfig), - classLoader, - ((TableSinkFactory) factory.get()) - .excludeTablePlaceholderReplaceKeys()); - ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); - SeaTunnelSink sink = - ((TableSinkFactory) factory.get()).createSink(context).createSink(); - sink.setJobContext(jobContext); - return sink; - } - } - - public static boolean isFallback(Factory factory) { - try { - ((TableSinkFactory) factory).createSink(null); - } catch (Exception e) { - if (e instanceof UnsupportedOperationException - && "The Factory has not been implemented and the deprecated Plugin will be used." - .equals(e.getMessage())) { - return true; - } - } - return false; - } - - public static SeaTunnelSink fallbackCreateSink( - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - PluginIdentifier pluginIdentifier, - Config pluginConfig) { - SeaTunnelSink source = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); - source.prepare(pluginConfig); - return source; - } - public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) { if (jobContext.getJobMode() == JobMode.BATCH && source.getBoundedness() diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 6f24e1c3fef..44668445367 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper; import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -34,14 +34,10 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; -import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; @@ -55,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; @@ -65,8 +62,6 @@ public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor> { - private static final String PLUGIN_TYPE = PluginType.SINK.getType(); - protected SinkExecuteProcessor( List jarPaths, Config envConfig, @@ -101,48 +96,27 @@ public List execute(List upstreamDataS new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Function fallbackCreateSink = + sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { + Optional factory = plugins.get(i); Config sinkConfig = pluginConfigs.get(i); DataStreamTableInfo stream = fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input); - Optional factory = plugins.get(i); - boolean fallBack = !factory.isPresent() || isFallback(factory.get()); Map sinks = new HashMap<>(); - if (fallBack) { - for (CatalogTable catalogTable : stream.getCatalogTables()) { - SeaTunnelSink fallBackSink = - fallbackCreateSink( - sinkPluginDiscovery, - PluginIdentifier.of( - ENGINE_TYPE, - PLUGIN_TYPE, - sinkConfig.getString(PLUGIN_NAME.key())), - sinkConfig); - fallBackSink.setJobContext(jobContext); - SeaTunnelRowType sourceType = catalogTable.getSeaTunnelRowType(); - fallBackSink.setTypeInfo(sourceType); - handleSaveMode(fallBackSink); - TableIdentifier tableId = catalogTable.getTableId(); - sinks.put(tableId.toTablePath(), fallBackSink); - } - } else { - for (CatalogTable catalogTable : stream.getCatalogTables()) { - SeaTunnelSink seaTunnelSink; - TableSinkFactoryContext context = - TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, - ReadonlyConfig.fromConfig(sinkConfig), - classLoader, - ((TableSinkFactory) factory.get()) - .excludeTablePlaceholderReplaceKeys()); - ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); - seaTunnelSink = - ((TableSinkFactory) factory.get()).createSink(context).createSink(); - seaTunnelSink.setJobContext(jobContext); - handleSaveMode(seaTunnelSink); - TableIdentifier tableId = catalogTable.getTableId(); - sinks.put(tableId.toTablePath(), seaTunnelSink); - } + for (CatalogTable catalogTable : stream.getCatalogTables()) { + SeaTunnelSink sink = + FactoryUtil.createAndPrepareSink( + catalogTable, + ReadonlyConfig.fromConfig(sinkConfig), + classLoader, + sinkConfig.getString(PLUGIN_NAME.key()), + fallbackCreateSink, + ((TableSinkFactory) (factory.orElse(null)))); + sink.setJobContext(jobContext); + handleSaveMode(sink); + TableIdentifier tableId = catalogTable.getTableId(); + sinks.put(tableId.toTablePath(), sink); } SeaTunnelSink sink = tryGenerateMultiTableSink( @@ -178,28 +152,6 @@ public SeaTunnelSink tryGenerateMultiTableSink( return FactoryUtil.createMultiTableSink(sinks, sinkConfig, classLoader); } - public boolean isFallback(Factory factory) { - try { - ((TableSinkFactory) factory).createSink(null); - } catch (Exception e) { - if (e instanceof UnsupportedOperationException - && "The Factory has not been implemented and the deprecated Plugin will be used." - .equals(e.getMessage())) { - return true; - } - } - return false; - } - - public SeaTunnelSink fallbackCreateSink( - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - PluginIdentifier pluginIdentifier, - Config pluginConfig) { - SeaTunnelSink source = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); - source.prepare(pluginConfig); - return source; - } - public void handleSaveMode(SeaTunnelSink seaTunnelSink) { if (seaTunnelSink instanceof SupportSaveMode) { SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index d41bfe34ce1..a82d2392e65 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -21,8 +21,8 @@ import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper; import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -34,14 +34,10 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; -import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; @@ -56,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; @@ -66,8 +63,6 @@ public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor> { - private static final String PLUGIN_TYPE = PluginType.SINK.getType(); - protected SinkExecuteProcessor( List jarPaths, Config envConfig, @@ -102,48 +97,27 @@ public List execute(List upstreamDataS new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Function fallbackCreateSink = + sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { + Optional factory = plugins.get(i); Config sinkConfig = pluginConfigs.get(i); DataStreamTableInfo stream = fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input); - Optional factory = plugins.get(i); - boolean fallBack = !factory.isPresent() || isFallback(factory.get()); Map sinks = new HashMap<>(); - if (fallBack) { - for (CatalogTable catalogTable : stream.getCatalogTables()) { - SeaTunnelSink fallBackSink = - fallbackCreateSink( - sinkPluginDiscovery, - PluginIdentifier.of( - ENGINE_TYPE, - PLUGIN_TYPE, - sinkConfig.getString(PLUGIN_NAME.key())), - sinkConfig); - fallBackSink.setJobContext(jobContext); - SeaTunnelRowType sourceType = catalogTable.getSeaTunnelRowType(); - fallBackSink.setTypeInfo(sourceType); - handleSaveMode(fallBackSink); - TableIdentifier tableId = catalogTable.getTableId(); - sinks.put(tableId.toTablePath(), fallBackSink); - } - } else { - for (CatalogTable catalogTable : stream.getCatalogTables()) { - SeaTunnelSink seaTunnelSink; - TableSinkFactoryContext context = - TableSinkFactoryContext.replacePlaceholderAndCreate( - catalogTable, - ReadonlyConfig.fromConfig(sinkConfig), - classLoader, - ((TableSinkFactory) factory.get()) - .excludeTablePlaceholderReplaceKeys()); - ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); - seaTunnelSink = - ((TableSinkFactory) factory.get()).createSink(context).createSink(); - seaTunnelSink.setJobContext(jobContext); - handleSaveMode(seaTunnelSink); - TableIdentifier tableId = catalogTable.getTableId(); - sinks.put(tableId.toTablePath(), seaTunnelSink); - } + for (CatalogTable catalogTable : stream.getCatalogTables()) { + SeaTunnelSink sink = + FactoryUtil.createAndPrepareSink( + catalogTable, + ReadonlyConfig.fromConfig(sinkConfig), + classLoader, + sinkConfig.getString(PLUGIN_NAME.key()), + fallbackCreateSink, + ((TableSinkFactory) (factory.orElse(null)))); + sink.setJobContext(jobContext); + handleSaveMode(sink); + TableIdentifier tableId = catalogTable.getTableId(); + sinks.put(tableId.toTablePath(), sink); } SeaTunnelSink sink = tryGenerateMultiTableSink( @@ -184,28 +158,6 @@ public SeaTunnelSink tryGenerateMultiTableSink( return FactoryUtil.createMultiTableSink(sinks, sinkConfig, classLoader); } - public boolean isFallback(Factory factory) { - try { - ((TableSinkFactory) factory).createSink(null); - } catch (Exception e) { - if (e instanceof UnsupportedOperationException - && "The Factory has not been implemented and the deprecated Plugin will be used." - .equals(e.getMessage())) { - return true; - } - } - return false; - } - - public SeaTunnelSink fallbackCreateSink( - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - PluginIdentifier pluginIdentifier, - Config pluginConfig) { - SeaTunnelSink source = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); - source.prepare(pluginConfig); - return source; - } - public void handleSaveMode(SeaTunnelSink seaTunnelSink) { if (seaTunnelSink instanceof SupportSaveMode) { SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index 4e1de7c95da..d637f2256b7 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -22,14 +22,16 @@ import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.core.starter.enums.PluginType; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.core.starter.execution.SourceTableInfo; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; import org.apache.seatunnel.translation.flink.source.FlinkSource; @@ -39,15 +41,19 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.extern.slf4j.Slf4j; +import scala.Tuple2; +import java.io.Serializable; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Function; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; +import static org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch; @Slf4j @SuppressWarnings("unchecked,rawtypes") @@ -95,11 +101,12 @@ public List execute(List upstreamDataS @Override protected List initializePlugins( List jarPaths, List pluginConfigs) { - SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = - new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER); - SeaTunnelFactoryDiscovery factoryDiscovery = new SeaTunnelFactoryDiscovery(TableSourceFactory.class, ADD_URL_TO_CLASSLOADER); + SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = + new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER); + Function fallbackCreateSource = + sourcePluginDiscovery::createPluginInstance; List sources = new ArrayList<>(); Set jars = new HashSet<>(); @@ -109,14 +116,22 @@ protected List initializePlugins( ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME.key())); jars.addAll( sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - SourceTableInfo source = - PluginUtil.createSource( - factoryDiscovery, - sourcePluginDiscovery, - pluginIdentifier, - sourceConfig, - jobContext); - sources.add(source); + + Tuple2, List> source = + FactoryUtil.createAndPrepareSource( + ReadonlyConfig.fromConfig(sourceConfig), + Thread.currentThread().getContextClassLoader(), + pluginIdentifier.getPluginName(), + fallbackCreateSource, + (TableSourceFactory) + factoryDiscovery + .createOptionalPluginInstance(pluginIdentifier) + .orElse(null)); + + source._1().setJobContext(jobContext); + ensureJobModeMatch(jobContext, source._1()); + + sources.add(new SourceTableInfo(source._1(), source._2())); } jarPaths.addAll(jars); return sources; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index f2c20e84081..e9ee482e9c9 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; @@ -29,7 +30,6 @@ import org.apache.seatunnel.core.starter.utils.CommandLineUtils; import org.apache.seatunnel.core.starter.utils.CompressionUtils; import org.apache.seatunnel.core.starter.utils.ConfigBuilder; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index d4f99d65f54..6e22576c911 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -21,6 +21,8 @@ import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper; import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -29,12 +31,12 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; @@ -45,11 +47,14 @@ import java.net.URL; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; public class SinkExecuteProcessor @@ -91,6 +96,8 @@ public List execute(List upstreamDataStreams SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); + Function fallbackCreateSink = + sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { Config sinkConfig = pluginConfigs.get(i); DatasetTableInfo datasetTableInfo = @@ -110,15 +117,25 @@ public List execute(List upstreamDataStreams CommonOptions.PARALLELISM.defaultValue()); } dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism); - Optional factory = plugins.get(i); + Map sinks = new HashMap<>(); + datasetTableInfo.getCatalogTables().stream() + .forEach( + catalogTable -> { + SeaTunnelSink sink = + FactoryUtil.createAndPrepareSink( + catalogTable, + ReadonlyConfig.fromConfig(sinkConfig), + classLoader, + sinkConfig.getString(PLUGIN_NAME.key()), + fallbackCreateSink, + null); + sink.setJobContext(jobContext); + sinks.put(catalogTable.getTableId().toTablePath(), sink); + }); + SeaTunnelSink sink = - PluginUtil.createSink( - factory, - sinkConfig, - sinkPluginDiscovery, - jobContext, - datasetTableInfo.getCatalogTables(), - classLoader); + tryGenerateMultiTableSink( + sinks, ReadonlyConfig.fromConfig(sinkConfig), classLoader); // TODO modify checkpoint location handleSaveMode(sink); String applicationId = @@ -134,28 +151,6 @@ public List execute(List upstreamDataStreams return null; } - public boolean isFallback(Factory factory) { - try { - ((TableSinkFactory) factory).createSink(null); - } catch (Exception e) { - if (e instanceof UnsupportedOperationException - && "The Factory has not been implemented and the deprecated Plugin will be used." - .equals(e.getMessage())) { - return true; - } - } - return false; - } - - public SeaTunnelSink fallbackCreateSink( - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - PluginIdentifier pluginIdentifier, - Config pluginConfig) { - SeaTunnelSink source = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); - source.prepare(pluginConfig); - return source; - } - public void handleSaveMode(SeaTunnelSink sink) { if (sink instanceof SupportSaveMode) { Optional saveModeHandler = diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java index 790a20191d9..dcd6a804b26 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; @@ -29,7 +30,6 @@ import org.apache.seatunnel.core.starter.utils.CommandLineUtils; import org.apache.seatunnel.core.starter.utils.CompressionUtils; import org.apache.seatunnel.core.starter.utils.ConfigBuilder; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index b66aaf7d868..74b0e1c1f1d 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -21,6 +21,8 @@ import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper; import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -29,12 +31,12 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; @@ -44,15 +46,21 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; +import lombok.extern.slf4j.Slf4j; + import java.net.URL; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +@Slf4j public class SinkExecuteProcessor extends SparkAbstractPluginExecuteProcessor> { private static final String PLUGIN_TYPE = PluginType.SINK.getType(); @@ -92,6 +100,8 @@ public List execute(List upstreamDataStreams SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); + Function fallbackCreateSink = + sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { Config sinkConfig = pluginConfigs.get(i); DatasetTableInfo datasetTableInfo = @@ -110,15 +120,24 @@ public List execute(List upstreamDataStreams CommonOptions.PARALLELISM.defaultValue()); } dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism); - Optional factory = plugins.get(i); + Map sinks = new HashMap<>(); + datasetTableInfo.getCatalogTables().stream() + .forEach( + catalogTable -> { + SeaTunnelSink sink = + FactoryUtil.createAndPrepareSink( + catalogTable, + ReadonlyConfig.fromConfig(sinkConfig), + classLoader, + sinkConfig.getString(PLUGIN_NAME.key()), + fallbackCreateSink, + null); + sink.setJobContext(jobContext); + sinks.put(catalogTable.getTableId().toTablePath(), sink); + }); SeaTunnelSink sink = - PluginUtil.createSink( - factory, - sinkConfig, - sinkPluginDiscovery, - jobContext, - datasetTableInfo.getCatalogTables(), - classLoader); + tryGenerateMultiTableSink( + sinks, ReadonlyConfig.fromConfig(sinkConfig), classLoader); // TODO modify checkpoint location handleSaveMode(sink); String applicationId = @@ -135,28 +154,6 @@ public List execute(List upstreamDataStreams return null; } - public boolean isFallback(Factory factory) { - try { - ((TableSinkFactory) factory).createSink(null); - } catch (Exception e) { - if (e instanceof UnsupportedOperationException - && "The Factory has not been implemented and the deprecated Plugin will be used." - .equals(e.getMessage())) { - return true; - } - } - return false; - } - - public SeaTunnelSink fallbackCreateSink( - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - PluginIdentifier pluginIdentifier, - Config pluginConfig) { - SeaTunnelSink source = sinkPluginDiscovery.createPluginInstance(pluginIdentifier); - source.prepare(pluginConfig); - return source; - } - public void handleSaveMode(SeaTunnelSink sink) { if (sink instanceof SupportSaveMode) { Optional saveModeHandler = diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java index 5f4a583d84e..50b3a88814b 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java @@ -23,21 +23,24 @@ import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.utils.SerializationUtils; -import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.core.starter.execution.SourceTableInfo; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import scala.Tuple2; + +import java.io.Serializable; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; @@ -45,9 +48,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; +import static org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch; @SuppressWarnings("rawtypes") public class SourceExecuteProcessor extends SparkAbstractPluginExecuteProcessor { @@ -110,8 +115,9 @@ public List execute(List upstreamDataStreams @Override protected List initializePlugins(List pluginConfigs) { SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery(); - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSourceFactory.class); + + Function fallbackCreateSource = + sourcePluginDiscovery::createPluginInstance; List sources = new ArrayList<>(); Set jars = new HashSet<>(); @@ -121,14 +127,17 @@ protected List initializePlugins(List pluginC ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME.key())); jars.addAll( sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - SourceTableInfo source = - PluginUtil.createSource( - factoryDiscovery, - sourcePluginDiscovery, - pluginIdentifier, - sourceConfig, - jobContext); - sources.add(source); + Tuple2, List> source = + FactoryUtil.createAndPrepareSource( + ReadonlyConfig.fromConfig(sourceConfig), + Thread.currentThread().getContextClassLoader(), + pluginIdentifier.getPluginName(), + fallbackCreateSource, + null); + + source._1().setJobContext(jobContext); + ensureJobModeMatch(jobContext, source._1()); + sources.add(new SourceTableInfo(source._1(), source._2())); } sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars)); return sources; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java index e85e2c56eb0..d9de0164469 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java @@ -21,6 +21,10 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; @@ -28,12 +32,16 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import lombok.extern.slf4j.Slf4j; + import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_INPUT; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; +@Slf4j public abstract class SparkAbstractPluginExecuteProcessor implements PluginExecuteProcessor { protected SparkRuntimeEnvironment sparkRuntimeEnvironment; @@ -101,6 +109,19 @@ protected Optional fromSourceTable( pluginInputIdentifier)); } + // if not support multi table, rollback + protected SeaTunnelSink tryGenerateMultiTableSink( + Map sinks, + ReadonlyConfig sinkConfig, + ClassLoader classLoader) { + if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) { + log.info("Unsupported multi table sink api, rollback to sink template"); + // choose the first sink + return sinks.values().iterator().next(); + } + return FactoryUtil.createMultiTableSink(sinks, sinkConfig, classLoader); + } + private void registerTempView(String tableName, Dataset ds) { ds.createOrReplaceTempView(tableName); } diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java index 1a2514fdcab..b7b9adbccb5 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ConnectorCheckCommand.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.core.starter.seatunnel.command; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.common.constants.PluginType; @@ -25,7 +26,6 @@ import org.apache.seatunnel.core.starter.exception.ConfigCheckException; import org.apache.seatunnel.core.starter.seatunnel.args.ConnectorCheckCommandArgs; import org.apache.seatunnel.plugin.discovery.PluginDiscovery; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf index bb22567bcf9..86f52b5f54a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf @@ -54,6 +54,10 @@ sink { max_transaction_retry_time = 3 max_connection_timeout = 1 + queryParamPosition = { + string = 0 + int = 1 + } query = "unwind $batch as row create(n:BatchLabel) set n.name = row.name,n.age = row.age" } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java index affc90283be..87b1a4e7cdd 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/ConnectorInstanceLoader.java @@ -21,12 +21,12 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.common.constants.CollectionConstants; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java index 2ec19cabc9b..02a56b3b835 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java @@ -17,42 +17,15 @@ package org.apache.seatunnel.engine.core.parse; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.constants.CollectionConstants; -import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.utils.IdGenerator; -import org.apache.seatunnel.engine.core.dag.actions.Action; -import org.apache.seatunnel.engine.core.dag.actions.SinkAction; -import org.apache.seatunnel.engine.core.dag.actions.SourceAction; - -import org.apache.commons.lang3.tuple.ImmutablePair; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; import lombok.Data; import lombok.NonNull; -import scala.Serializable; -import scala.Tuple2; import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals; @Data public class JobConfigParser { @@ -73,125 +46,10 @@ public JobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; } - public Tuple2 parseSource( - Config config, JobConfig jobConfig, String tableId, int parallelism) { - ImmutablePair> tuple = - ConnectorInstanceLoader.loadSourceInstance( - config, jobConfig.getJobContext(), commonPluginJars); - final SeaTunnelSource source = tuple.getLeft(); - // old logic: prepare(initialization) -> set job context - source.prepare(config); - source.setJobContext(jobConfig.getJobContext()); - PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source); - String actionName = - createSourceActionName(0, config.getString(CollectionConstants.PLUGIN_NAME)); - SourceAction action = - new SourceAction( - idGenerator.getNextId(), - actionName, - tuple.getLeft(), - tuple.getRight(), - new HashSet<>()); - action.setParallelism(parallelism); - SeaTunnelRowType producedType = (SeaTunnelRowType) tuple.getLeft().getProducedType(); - CatalogTable catalogTable = CatalogTableUtil.getCatalogTable(tableId, producedType); - return new Tuple2<>(catalogTable, action); - } - - public List> parseSinks( - int configIndex, - List>> inputVertices, - Config sinkConfig, - JobConfig jobConfig) { - List> sinkActions = new ArrayList<>(); - int spareParallelism = inputVertices.get(0).get(0)._2().getParallelism(); - if (inputVertices.size() > 1) { - // union - Set inputActions = - inputVertices.stream() - .flatMap(Collection::stream) - .map(Tuple2::_2) - .collect(Collectors.toCollection(LinkedHashSet::new)); - checkProducedTypeEquals(inputActions); - SinkAction sinkAction = - parseSink( - configIndex, - sinkConfig, - jobConfig, - spareParallelism, - inputVertices - .get(0) - .get(0) - ._1() - .getTableSchema() - .toPhysicalRowDataType(), - inputActions); - sinkActions.add(sinkAction); - } else { - // sink template - for (Tuple2 tableTuple : inputVertices.get(0)) { - CatalogTable catalogTable = tableTuple._1(); - Action inputAction = tableTuple._2(); - int parallelism = inputAction.getParallelism(); - SinkAction sinkAction = - parseSink( - configIndex, - sinkConfig, - jobConfig, - parallelism, - catalogTable.getTableSchema().toPhysicalRowDataType(), - Collections.singleton(inputAction)); - sinkActions.add(sinkAction); - } - } - return sinkActions; - } - - private SinkAction parseSink( - int configIndex, - Config config, - JobConfig jobConfig, - int parallelism, - SeaTunnelRowType rowType, - Set inputActions) { - final ImmutablePair< - SeaTunnelSink, - Set> - tuple = - ConnectorInstanceLoader.loadSinkInstance( - config, jobConfig.getJobContext(), commonPluginJars); - final SeaTunnelSink sink = - tuple.getLeft(); - // old logic: prepare(initialization) -> set job context -> set row type (There is a logical - // judgment that depends on before and after, not a simple set) - sink.prepare(config); - sink.setJobContext(jobConfig.getJobContext()); - sink.setTypeInfo(rowType); - if (!isStartWithSavePoint) { - multipleTableJobConfigParser.handleSaveMode(sink); - } - final String actionName = - createSinkActionName(configIndex, tuple.getLeft().getPluginName()); - final SinkAction action = - new SinkAction<>( - idGenerator.getNextId(), - actionName, - new ArrayList<>(inputActions), - sink, - tuple.getRight(), - new HashSet<>()); - action.setParallelism(parallelism); - return action; - } - static String createSourceActionName(int configIndex, String pluginName) { return String.format("Source[%s]-%s", configIndex, pluginName); } - static String createSinkActionName(int configIndex, String pluginName) { - return String.format("Sink[%s]-%s", configIndex, pluginName); - } - static String createSinkActionName(int configIndex, String pluginName, String table) { return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table); } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 61df9abcfe8..903db02f804 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.CommonOptions; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.env.EnvCommonOptions; import org.apache.seatunnel.api.sink.SaveModeExecuteLocation; @@ -35,10 +36,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceCheckpoint; -import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.FactoryUtil; -import org.apache.seatunnel.api.table.factory.TableSinkFactory; -import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.common.Constants; @@ -48,7 +46,6 @@ import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.core.starter.utils.ConfigBuilder; import org.apache.seatunnel.engine.common.config.JobConfig; @@ -64,7 +61,6 @@ import org.apache.seatunnel.engine.core.dag.actions.TransformAction; import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier; import org.apache.seatunnel.engine.core.job.JobPipelineCheckpointData; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; @@ -95,7 +91,6 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -116,7 +111,6 @@ public class MultipleTableJobConfigParser { private final ReadonlyConfig envOptions; - private final JobConfigParser fallbackParser; private final boolean isStartWithSavePoint; private final List pipelineCheckpoints; @@ -166,8 +160,6 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath), variables); this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); - this.fallbackParser = - new JobConfigParser(idGenerator, commonPluginJars, this, isStartWithSavePoint); this.pipelineCheckpoints = pipelineCheckpoints; } @@ -184,8 +176,6 @@ public MultipleTableJobConfigParser( this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = seaTunnelJobConfig; this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); - this.fallbackParser = - new JobConfigParser(idGenerator, commonPluginJars, this, isStartWithSavePoint); this.pipelineCheckpoints = pipelineCheckpoints; } @@ -347,31 +337,6 @@ private void fillJobConfigAndCommonJars() { log.info("add common jar in plugins :{}", commonPluginJars); } - private static boolean isFallback( - ClassLoader classLoader, - Class factoryClass, - String factoryId, - Consumer virtualCreator) { - Optional factory = - FactoryUtil.discoverOptionalFactory(classLoader, factoryClass, factoryId); - if (!factory.isPresent()) { - return true; - } - try { - virtualCreator.accept(factory.get()); - } catch (Exception e) { - if (e instanceof UnsupportedOperationException - && "The Factory has not been implemented and the deprecated Plugin will be used." - .equals(e.getMessage())) { - log.warn( - "The Factory has not been implemented and the deprecated Plugin will be used."); - return true; - } - log.debug(ExceptionUtils.getMessage(e)); - } - return false; - } - private int getParallelism(ReadonlyConfig config) { return Math.max( 1, @@ -388,18 +353,12 @@ public Tuple2>> parseSource( final int parallelism = getParallelism(readonlyConfig); - boolean fallback = - isFallback( - classLoader, - TableSourceFactory.class, - factoryId, - (factory) -> factory.createSource(null)); - - if (fallback) { - Tuple2 tuple = - fallbackParser.parseSource(sourceConfig, jobConfig, tableId, parallelism); - return new Tuple2<>(tableId, Collections.singletonList(tuple)); - } + Function fallbackCreateSource = + pluginIdentifier -> { + SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = + new SeaTunnelSourcePluginDiscovery(); + return sourcePluginDiscovery.createPluginInstance(pluginIdentifier); + }; Tuple2, List> tuple2; if (isStartWithSavePoint && pipelineCheckpoints != null && !pipelineCheckpoints.isEmpty()) { @@ -407,9 +366,16 @@ public Tuple2>> parseSource( getSourceCheckpoint(configIndex, factoryId); tuple2 = FactoryUtil.restoreAndPrepareSource( - readonlyConfig, classLoader, factoryId, checkpoint); + readonlyConfig, + classLoader, + factoryId, + checkpoint, + fallbackCreateSource, + null); } else { - tuple2 = FactoryUtil.createAndPrepareSource(readonlyConfig, classLoader, factoryId); + tuple2 = + FactoryUtil.createAndPrepareSource( + readonlyConfig, classLoader, factoryId, fallbackCreateSource, null); } Set factoryUrls = new HashSet<>(); @@ -591,16 +557,6 @@ private static T findLast(LinkedHashMap map) { } } - boolean fallback = - isFallback( - classLoader, - TableSinkFactory.class, - factoryId, - (factory) -> factory.createSink(null)); - if (fallback) { - return fallbackParser.parseSinks(configIndex, inputVertices, sinkConfig, jobConfig); - } - // get jar urls Set jarUrls = new HashSet<>(); jarUrls.addAll(getSinkPluginJarPaths(sinkConfig)); @@ -702,9 +658,22 @@ private static T findLast(LinkedHashMap map) { String factoryId, int parallelism, int configIndex) { + + Function fallbackCreateSink = + pluginIdentifier -> { + SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = + new SeaTunnelSinkPluginDiscovery(); + return sinkPluginDiscovery.createPluginInstance(pluginIdentifier); + }; + SeaTunnelSink sink = FactoryUtil.createAndPrepareSink( - catalogTable, readonlyConfig, classLoader, factoryId); + catalogTable, + readonlyConfig, + classLoader, + factoryId, + fallbackCreateSink, + null); sink.setJobContext(jobConfig.getJobContext()); SinkConfig actionConfig = new SinkConfig(catalogTable.getTableId().toTablePath()); long id = idGenerator.getNextId(); diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index 4b62895f18c..a946700c75a 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions; import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.common.PluginIdentifierInterface; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.util.OptionRule; diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java index e765822af39..0d631745eba 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/PluginDiscovery.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.plugin.discovery; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.util.OptionRule; diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java index 9fe87174881..a536f3a4466 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelFactoryDiscovery.java @@ -17,9 +17,9 @@ package org.apache.seatunnel.plugin.discovery.seatunnel; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.commons.lang3.StringUtils; diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java index cef4f42ab5f..145b0a04b44 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSinkPluginDiscovery.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.plugin.discovery.seatunnel; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -24,7 +25,6 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.commons.lang3.tuple.ImmutableTriple; diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java index e1fbce74bbf..a224066f2dd 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscovery.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.plugin.discovery.seatunnel; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; @@ -24,7 +25,6 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.commons.lang3.tuple.ImmutableTriple; diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java index 606cd0d7cae..88b586d1379 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.plugin.discovery.seatunnel; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.factory.TableTransformFactory; @@ -24,7 +25,6 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.apache.commons.lang3.tuple.ImmutableTriple; diff --git a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java index e4cffe87802..5e8f4001aab 100644 --- a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java +++ b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.plugin.discovery; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.common.constants.PluginType; diff --git a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java index 1bc62981e81..e8511d524b3 100644 --- a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java +++ b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java @@ -19,10 +19,10 @@ import org.apache.seatunnel.shade.com.google.common.collect.Lists; +import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.plugin.discovery.PluginIdentifier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions;