Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx committed Jan 23, 2025
1 parent 24e64aa commit 180b031
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
Expand All @@ -37,6 +38,7 @@
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.ExceptionUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -277,12 +279,10 @@ public static <T extends Factory> Optional<T> discoverOptionalFactory(
ClassLoader classLoader,
Class<T> factoryClass,
String factoryIdentifier,
Function<PluginIdentifier, T> transformFactoryFunction) {
Function<String, T> transformFactoryFunction) {

if (transformFactoryFunction != null) {
return Optional.of(
transformFactoryFunction.apply(
PluginIdentifier.of("seatunnel", "transform", factoryIdentifier)));
return Optional.of(transformFactoryFunction.apply(factoryIdentifier));
}
return discoverOptionalFactory(classLoader, factoryClass, factoryIdentifier);
}
Expand Down Expand Up @@ -450,4 +450,14 @@ private static <T extends Factory> boolean isFallback(
}
return false;
}

public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) {
if (jobContext.getJobMode() == JobMode.BATCH
&& source.getBoundedness()
== org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
throw new UnsupportedOperationException(
String.format(
"'%s' source don't support off-line job.", source.getPluginName()));
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.core.starter.flink.execution;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.CommonOptions;
Expand All @@ -35,8 +36,10 @@
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;

Expand Down Expand Up @@ -72,13 +75,36 @@ protected SinkExecuteProcessor(
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
Function<String, TableSinkFactory> discoverOptionalFactory =
pluginName ->
(TableSinkFactory)
factoryDiscovery.createPluginInstance(
PluginIdentifier.of(
ENGINE_TYPE,
PluginType.SINK.getType(),
pluginName));

return pluginConfigs.stream()
.map(
sinkConfig ->
discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key())))
sinkConfig -> {
jarPaths.addAll(
sinkPluginDiscovery.getPluginJarPaths(
Lists.newArrayList(
PluginIdentifier.of(
ENGINE_TYPE,
PluginType.SINK.getType(),
sinkConfig.getString(
PLUGIN_NAME.key())))));
return discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key()),
discoverOptionalFactory);
})
.distinct()
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.core.starter.flink.execution;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.CommonOptions;
Expand All @@ -35,8 +36,10 @@
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;

Expand Down Expand Up @@ -73,13 +76,37 @@ protected SinkExecuteProcessor(
@Override
protected List<Optional<? extends Factory>> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs) {

SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
Function<String, TableSinkFactory> discoverOptionalFactory =
pluginName ->
(TableSinkFactory)
factoryDiscovery.createPluginInstance(
PluginIdentifier.of(
ENGINE_TYPE,
PluginType.SINK.getType(),
pluginName));

return pluginConfigs.stream()
.map(
sinkConfig ->
discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key())))
sinkConfig -> {
jarPaths.addAll(
sinkPluginDiscovery.getPluginJarPaths(
Lists.newArrayList(
PluginIdentifier.of(
ENGINE_TYPE,
PluginType.SINK.getType(),
sinkConfig.getString(
PLUGIN_NAME.key())))));
return discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key()),
discoverOptionalFactory);
})
.distinct()
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
import static org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch;

@Slf4j
@SuppressWarnings("unchecked,rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
Expand All @@ -48,6 +49,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
Expand All @@ -73,6 +75,11 @@ protected List<TableTransformFactory> initializePlugins(
new SeaTunnelTransformPluginDiscovery();
SeaTunnelFactoryDiscovery factoryDiscovery =
new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER);
Function<String, TableTransformFactory> discoverOptionalFactory =
pluginName ->
(TableTransformFactory)
factoryDiscovery.createPluginInstance(
PluginIdentifier.of(ENGINE_TYPE, "transform", pluginName));
return pluginConfigs.stream()
.map(
transformConfig -> {
Expand All @@ -81,21 +88,14 @@ protected List<TableTransformFactory> initializePlugins(
Lists.newArrayList(
PluginIdentifier.of(
ENGINE_TYPE,
"transform",
PluginType.TRANSFORM.getType(),
transformConfig.getString(
PLUGIN_NAME.key())))));
return discoverOptionalFactory(
classLoader,
TableTransformFactory.class,
transformConfig.getString(PLUGIN_NAME.key()),
() ->
factoryDiscovery.loadPluginInstance(
PluginIdentifier.of(
ENGINE_TYPE,
"transform",
transformConfig.getString(
PLUGIN_NAME.key())),
classLoader));
discoverOptionalFactory);
})
.distinct()
.filter(Optional::isPresent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_OUTPUT;
import static org.apache.seatunnel.core.starter.execution.PluginUtil.ensureJobModeMatch;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.ensureJobModeMatch;

@SuppressWarnings("rawtypes")
public class SourceExecuteProcessor extends SparkAbstractPluginExecuteProcessor<SourceTableInfo> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
Expand Down Expand Up @@ -386,7 +385,7 @@ public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource(
String actionName = JobConfigParser.createSourceActionName(configIndex, factoryId);
SeaTunnelSource<Object, SourceSplit, Serializable> source = tuple2._1();
source.setJobContext(jobConfig.getJobContext());
PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
FactoryUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
SourceAction<Object, SourceSplit, Serializable> action =
new SourceAction<>(id, actionName, tuple2._1(), factoryUrls, new HashSet<>());
action.setParallelism(parallelism);
Expand Down

0 comments on commit 180b031

Please sign in to comment.