Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx committed Jan 22, 2025
1 parent a05ba93 commit e88e234
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,64 +17,14 @@

package org.apache.seatunnel.core.starter.execution;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryException;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;

import java.net.URL;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;

/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
@SuppressWarnings("rawtypes")
public class PluginUtil {

protected static final String ENGINE_TYPE = "seatunnel";

public static Optional<? extends Factory> createTransformFactory(
SeaTunnelFactoryDiscovery factoryDiscovery,
SeaTunnelTransformPluginDiscovery transformPluginDiscovery,
Config transformConfig,
List<URL> pluginJars) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(
ENGINE_TYPE, "transform", transformConfig.getString(PLUGIN_NAME.key()));
pluginJars.addAll(
transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
try {
return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
} catch (FactoryException e) {
return Optional.empty();
}
}

public static Optional<? extends Factory> createSinkFactory(
SeaTunnelFactoryDiscovery factoryDiscovery,
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery,
Config sinkConfig,
List<URL> pluginJars) {
PluginIdentifier pluginIdentifier =
PluginIdentifier.of(ENGINE_TYPE, "sink", sinkConfig.getString(PLUGIN_NAME.key()));
pluginJars.addAll(
sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
try {
return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier);
} catch (FactoryException e) {
return Optional.empty();
}
}

public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) {
if (jobContext.getJobMode() == JobMode.BATCH
&& source.getBoundedness()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;

Expand All @@ -56,6 +54,7 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;

@SuppressWarnings({"unchecked", "rawtypes"})
@Slf4j
Expand All @@ -73,18 +72,13 @@ protected SinkExecuteProcessor(
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
return pluginConfigs.stream()
.map(
sinkConfig ->
PluginUtil.createSinkFactory(
factoryDiscovery,
sinkPluginDiscovery,
sinkConfig,
jarPaths))
discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key())))
.distinct()
.collect(Collectors.toList());
}
Expand All @@ -95,7 +89,6 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
protected JobContext jobContext;
protected final List<T> plugins;
protected final Config envConfig;
protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

protected FlinkAbstractPluginExecuteProcessor(
List<URL> jarPaths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;

Expand All @@ -57,6 +55,7 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;

@Slf4j
@SuppressWarnings("unchecked,rawtypes")
Expand All @@ -74,18 +73,13 @@ protected SinkExecuteProcessor(
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
return pluginConfigs.stream()
.map(
sinkConfig ->
PluginUtil.createSinkFactory(
factoryDiscovery,
sinkPluginDiscovery,
sinkConfig,
jarPaths))
discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key())))
.distinct()
.collect(Collectors.toList());
}
Expand All @@ -96,7 +90,6 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected List<SourceTableInfo> initializePlugins(
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, List<CatalogTable>> source =
FactoryUtil.createAndPrepareSource(
ReadonlyConfig.fromConfig(sourceConfig),
Thread.currentThread().getContextClassLoader(),
classLoader,
pluginIdentifier.getPluginName(),
fallbackCreateSource,
(TableSourceFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
Expand All @@ -49,7 +46,9 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;

@SuppressWarnings("unchecked,rawtypes")
public class TransformExecuteProcessor
Expand All @@ -66,23 +65,16 @@ protected TransformExecuteProcessor(
@Override
protected List<TableTransformFactory> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {

SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
new SeaTunnelTransformPluginDiscovery();
return pluginConfigs.stream()
.map(
transformConfig ->
PluginUtil.createTransformFactory(
factoryDiscovery,
transformPluginDiscovery,
transformConfig,
jarPaths))
discoverOptionalFactory(
classLoader,
TableTransformFactory.class,
transformConfig.getString(PLUGIN_NAME.key())))
.distinct()
.filter(Optional::isPresent)
.map(Optional::get)
.map(e -> (TableTransformFactory) e)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
Expand All @@ -56,10 +53,10 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;

public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
private static final String PLUGIN_TYPE = PluginType.SINK.getType();

protected SinkExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
Expand All @@ -71,19 +68,15 @@ protected SinkExecuteProcessor(
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<? extends Config> pluginConfigs) {
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
List<Optional<? extends Factory>> sinks =
pluginConfigs.stream()
.map(
sinkConfig ->
PluginUtil.createSinkFactory(
factoryDiscovery,
sinkPluginDiscovery,
sinkConfig,
pluginJars))
discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key())))
.distinct()
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
Expand All @@ -94,7 +87,6 @@ protected List<Optional<? extends Factory>> initializePlugins(
public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams)
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
Expand All @@ -59,11 +56,11 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;

@Slf4j
public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
private static final String PLUGIN_TYPE = PluginType.SINK.getType();

protected SinkExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
Expand All @@ -75,19 +72,15 @@ protected SinkExecuteProcessor(
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<? extends Config> pluginConfigs) {
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSinkFactory.class);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
List<URL> pluginJars = new ArrayList<>();
List<Optional<? extends Factory>> sinks =
pluginConfigs.stream()
.map(
sinkConfig ->
PluginUtil.createSinkFactory(
factoryDiscovery,
sinkPluginDiscovery,
sinkConfig,
new ArrayList<>()))
discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key())))
.distinct()
.collect(Collectors.toList());
sparkRuntimeEnvironment.registerPlugin(pluginJars);
Expand All @@ -98,7 +91,6 @@ protected List<Optional<? extends Factory>> initializePlugins(
public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams)
throws TaskExecuteException {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
DatasetTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected List<SourceTableInfo> initializePlugins(List<? extends Config> pluginC
Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, List<CatalogTable>> source =
FactoryUtil.createAndPrepareSource(
ReadonlyConfig.fromConfig(sourceConfig),
Thread.currentThread().getContextClassLoader(),
classLoader,
pluginIdentifier.getPluginName(),
fallbackCreateSource,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public abstract class SparkAbstractPluginExecuteProcessor<T>
protected final JobContext jobContext;
protected final List<T> plugins;
protected static final String ENGINE_TYPE = "seatunnel";
protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

protected SparkAbstractPluginExecuteProcessor(
SparkRuntimeEnvironment sparkRuntimeEnvironment,
Expand Down
Loading

0 comments on commit e88e234

Please sign in to comment.