diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java index b2b47854e30..b7b500de9f7 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java @@ -17,64 +17,14 @@ package org.apache.seatunnel.core.starter.execution; -import org.apache.seatunnel.shade.com.google.common.collect.Lists; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.common.PluginIdentifier; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.FactoryException; import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; - -import java.net.URL; -import java.util.List; -import java.util.Optional; - -import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; /** The util used for Spark/Flink to create to SeaTunnelSource etc. */ @SuppressWarnings("rawtypes") public class PluginUtil { - protected static final String ENGINE_TYPE = "seatunnel"; - - public static Optional createTransformFactory( - SeaTunnelFactoryDiscovery factoryDiscovery, - SeaTunnelTransformPluginDiscovery transformPluginDiscovery, - Config transformConfig, - List pluginJars) { - PluginIdentifier pluginIdentifier = - PluginIdentifier.of( - ENGINE_TYPE, "transform", transformConfig.getString(PLUGIN_NAME.key())); - pluginJars.addAll( - transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - try { - return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier); - } catch (FactoryException e) { - return Optional.empty(); - } - } - - public static Optional createSinkFactory( - SeaTunnelFactoryDiscovery factoryDiscovery, - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery, - Config sinkConfig, - List pluginJars) { - PluginIdentifier pluginIdentifier = - PluginIdentifier.of(ENGINE_TYPE, "sink", sinkConfig.getString(PLUGIN_NAME.key())); - pluginJars.addAll( - sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - try { - return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier); - } catch (FactoryException e) { - return Optional.empty(); - } - } - public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) { if (jobContext.getJobMode() == JobMode.BATCH && source.getBoundedness() diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 44668445367..f1e631b1ace 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -37,8 +37,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; @@ -56,6 +54,7 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @SuppressWarnings({"unchecked", "rawtypes"}) @Slf4j @@ -73,18 +72,13 @@ protected SinkExecuteProcessor( @Override protected List> initializePlugins( List jarPaths, List pluginConfigs) { - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER); - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = - new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); return pluginConfigs.stream() .map( sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - jarPaths)) + discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key()))) .distinct() .collect(Collectors.toList()); } @@ -95,7 +89,6 @@ public List execute(List upstreamDataS SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Function fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java index 00614b1d880..d82c4c23bde 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java @@ -57,6 +57,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor protected JobContext jobContext; protected final List plugins; protected final Config envConfig; + protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); protected FlinkAbstractPluginExecuteProcessor( List jarPaths, diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index a82d2392e65..1826bbd0794 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -37,8 +37,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.flink.sink.FlinkSink; @@ -57,6 +55,7 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @Slf4j @SuppressWarnings("unchecked,rawtypes") @@ -74,18 +73,13 @@ protected SinkExecuteProcessor( @Override protected List> initializePlugins( List jarPaths, List pluginConfigs) { - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER); - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = - new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); return pluginConfigs.stream() .map( sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - jarPaths)) + discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key()))) .distinct() .collect(Collectors.toList()); } @@ -96,7 +90,6 @@ public List execute(List upstreamDataS SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER); DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Function fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; for (int i = 0; i < plugins.size(); i++) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index d637f2256b7..35a8597e79f 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -120,7 +120,7 @@ protected List initializePlugins( Tuple2, List> source = FactoryUtil.createAndPrepareSource( ReadonlyConfig.fromConfig(sourceConfig), - Thread.currentThread().getContextClassLoader(), + classLoader, pluginIdentifier.getPluginName(), fallbackCreateSource, (TableSourceFactory) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index 615876c4173..79b89a2c8a3 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -29,9 +29,6 @@ import org.apache.seatunnel.api.transform.SeaTunnelMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -49,7 +46,9 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @SuppressWarnings("unchecked,rawtypes") public class TransformExecuteProcessor @@ -66,23 +65,16 @@ protected TransformExecuteProcessor( @Override protected List initializePlugins( List jarPaths, List pluginConfigs) { - - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER); - SeaTunnelTransformPluginDiscovery transformPluginDiscovery = - new SeaTunnelTransformPluginDiscovery(); return pluginConfigs.stream() .map( transformConfig -> - PluginUtil.createTransformFactory( - factoryDiscovery, - transformPluginDiscovery, - transformConfig, - jarPaths)) + discoverOptionalFactory( + classLoader, + TableTransformFactory.class, + transformConfig.getString(PLUGIN_NAME.key()))) .distinct() .filter(Optional::isPresent) .map(Optional::get) - .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 6e22576c911..bec8e11dbd8 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -34,10 +34,7 @@ import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector; @@ -56,10 +53,10 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; public class SinkExecuteProcessor extends SparkAbstractPluginExecuteProcessor> { - private static final String PLUGIN_TYPE = PluginType.SINK.getType(); protected SinkExecuteProcessor( SparkRuntimeEnvironment sparkRuntimeEnvironment, @@ -71,19 +68,15 @@ protected SinkExecuteProcessor( @Override protected List> initializePlugins( List pluginConfigs) { - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSinkFactory.class); - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); List pluginJars = new ArrayList<>(); List> sinks = pluginConfigs.stream() .map( sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - pluginJars)) + discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key()))) .distinct() .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); @@ -94,7 +87,6 @@ protected List> initializePlugins( public List execute(List upstreamDataStreams) throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); Function fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 74b0e1c1f1d..afaa4ed2061 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -34,10 +34,7 @@ import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector; @@ -59,11 +56,11 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @Slf4j public class SinkExecuteProcessor extends SparkAbstractPluginExecuteProcessor> { - private static final String PLUGIN_TYPE = PluginType.SINK.getType(); protected SinkExecuteProcessor( SparkRuntimeEnvironment sparkRuntimeEnvironment, @@ -75,19 +72,15 @@ protected SinkExecuteProcessor( @Override protected List> initializePlugins( List pluginConfigs) { - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableSinkFactory.class); - SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); List pluginJars = new ArrayList<>(); List> sinks = pluginConfigs.stream() .map( sinkConfig -> - PluginUtil.createSinkFactory( - factoryDiscovery, - sinkPluginDiscovery, - sinkConfig, - new ArrayList<>())) + discoverOptionalFactory( + classLoader, + TableSinkFactory.class, + sinkConfig.getString(PLUGIN_NAME.key()))) .distinct() .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); @@ -98,7 +91,6 @@ protected List> initializePlugins( public List execute(List upstreamDataStreams) throws TaskExecuteException { SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1); Function fallbackCreateSink = sinkPluginDiscovery::createPluginInstance; diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java index 50b3a88814b..4469987c918 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java @@ -130,7 +130,7 @@ protected List initializePlugins(List pluginC Tuple2, List> source = FactoryUtil.createAndPrepareSource( ReadonlyConfig.fromConfig(sourceConfig), - Thread.currentThread().getContextClassLoader(), + classLoader, pluginIdentifier.getPluginName(), fallbackCreateSource, null); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java index d9de0164469..8663dfb1cb6 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java @@ -49,6 +49,7 @@ public abstract class SparkAbstractPluginExecuteProcessor protected final JobContext jobContext; protected final List plugins; protected static final String ENGINE_TYPE = "seatunnel"; + protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); protected SparkAbstractPluginExecuteProcessor( SparkRuntimeEnvironment sparkRuntimeEnvironment, diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index 492af1ad73d..19f77be115a 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -30,9 +30,6 @@ import org.apache.seatunnel.api.transform.SeaTunnelMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; -import org.apache.seatunnel.core.starter.execution.PluginUtil; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; -import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; import org.apache.seatunnel.translation.spark.execution.MultiTableManager; @@ -56,7 +53,9 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory; @Slf4j public class TransformExecuteProcessor @@ -71,25 +70,19 @@ protected TransformExecuteProcessor( @Override protected List initializePlugins(List pluginConfigs) { - SeaTunnelTransformPluginDiscovery transformPluginDiscovery = - new SeaTunnelTransformPluginDiscovery(); - SeaTunnelFactoryDiscovery factoryDiscovery = - new SeaTunnelFactoryDiscovery(TableTransformFactory.class); List pluginJars = new ArrayList<>(); List transforms = pluginConfigs.stream() .map( transformConfig -> - PluginUtil.createTransformFactory( - factoryDiscovery, - transformPluginDiscovery, - transformConfig, - new ArrayList<>())) + discoverOptionalFactory( + classLoader, + TableTransformFactory.class, + transformConfig.getString(PLUGIN_NAME.key()))) .distinct() .filter(Optional::isPresent) .map(Optional::get) - .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); return transforms;