You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
publicstaticLocalStreamEnvironmentcreateLocalEnvironment(intparallelism, Configurationconfiguration) {
finalLocalStreamEnvironmentcurrentEnvironment;
// 创建本地执行环境,传入空配置,并将execution.target设置为localcurrentEnvironment = newLocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);
returncurrentEnvironment;
}
## 本地运行环境webUIpublicstaticStreamExecutionEnvironmentcreateLocalEnvironmentWithWebUI(Configurationconf) {
checkNotNull(conf, "conf");
if (!conf.contains(RestOptions.PORT)) {
// explicitly set this option so that it's not set to 0 laterconf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
}
returncreateLocalEnvironment(defaultLocalParallelism, conf);
}
远程执行环境
privatestaticConfigurationgetEffectiveConfiguration(
finalConfigurationbaseConfiguration,
finalStringhost,
finalintport,
finalString[] jars,
finalList<URL> classpaths,
finalSavepointRestoreSettingssavepointRestoreSettings) {
// 将客户端传入配置合并finalConfigurationeffectiveConfiguration = newConfiguration(baseConfiguration);
// 设置jobManager配置RemoteEnvironmentConfigUtils.setJobManagerAddressToConfig(host, port, effectiveConfiguration);
// 设置执行jar包路径RemoteEnvironmentConfigUtils.setJarURLsToConfig(jars, effectiveConfiguration);
ConfigUtils.encodeCollectionToConfig(effectiveConfiguration, PipelineOptions.CLASSPATHS, classpaths, URL::toString);
if (savepointRestoreSettings != null) {
// 设置savepoint配置SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, effectiveConfiguration);
} else {
SavepointRestoreSettings.toConfiguration(SavepointRestoreSettings.none(), effectiveConfiguration);
}
// these should be set in the end to overwrite any values from the client config provided in the constructor.effectiveConfiguration.setString(DeploymentOptions.TARGET, "remote");
effectiveConfiguration.setBoolean(DeploymentOptions.ATTACHED, true);
returneffectiveConfiguration;
}
privatestaticExecutionEnvironmentFactorycontextEnvironmentFactory = null;
/** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. */privatestaticfinalThreadLocal<ExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = newThreadLocal<>();
/** The default parallelism used by local environments. */privatestaticintdefaultLocalDop = Runtime.getRuntime().availableProcessors();
// sink算子数组privatefinalList<DataSink<?>> sinks = newArrayList<>();
privatefinalList<Tuple2<String, DistributedCacheEntry>> cacheFile = newArrayList<>();
privatefinalExecutionConfigconfig = newExecutionConfig();
/** Result from the latest execution, to make it retrievable when using eager execution methods. */protectedJobExecutionResultlastJobExecutionResult;
/** Flag to indicate whether sinks have been cleared in previous executions. */privatebooleanwasExecuted = false;
privatefinalPipelineExecutorServiceLoaderexecutorServiceLoader;
privatefinalConfigurationconfiguration;
privatefinalClassLoaderuserClassloader;
privatefinalList<JobListener> jobListeners = newArrayList<>();
# dataSinks转换PlanpublicPlancreateProgramPlan(StringjobName, booleanclearSinks) {
checkNotNull(jobName);
if (this.sinks.isEmpty()) {
if (wasExecuted) {
thrownewRuntimeException("No new data sinks have been defined since the " +
"last execution. The last execution refers to the latest call to " +
"'execute()', 'count()', 'collect()', or 'print()'.");
} else {
thrownewRuntimeException("No data sinks have been created yet. " +
"A program needs at least one sink that consumes data. " +
"Examples are writing the data set or printing it.");
}
}
finalPlanGeneratorgenerator = newPlanGenerator(
sinks, config, getParallelism(), cacheFile, jobName);
finalPlanplan = generator.generate();
// clear all the sinks such that the next execution does not redo everythingif (clearSinks) {
this.sinks.clear();
wasExecuted = true;
}
returnplan;
}
PlanGenerator
publicclassPlanGenerator {
privatestaticfinalLoggerLOG = LoggerFactory.getLogger(PlanGenerator.class);
privatefinalList<DataSink<?>> sinks;
privatefinalExecutionConfigconfig;
privatefinalintdefaultParallelism;
privatefinalList<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile;
privatefinalStringjobName;
publicPlanGenerator(
List<DataSink<?>> sinks,
ExecutionConfigconfig,
intdefaultParallelism,
List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile,
StringjobName) {
this.sinks = checkNotNull(sinks);
this.config = checkNotNull(config);
this.cacheFile = checkNotNull(cacheFile);
this.jobName = checkNotNull(jobName);
this.defaultParallelism = defaultParallelism;
}
publicPlangenerate() {
finalPlanplan = createPlan();
registerGenericTypeInfoIfConfigured(plan);
registerCachedFiles(plan);
logTypeRegistrationDetails();
returnplan;
}
/** * Create plan. * * @return the generated plan. */privatePlancreatePlan() {
finalOperatorTranslationtranslator = newOperatorTranslation();
finalPlanplan = translator.translateToPlan(sinks, jobName);
if (defaultParallelism > 0) {
plan.setDefaultParallelism(defaultParallelism);
}
plan.setExecutionConfig(config);
returnplan;
}
/** * Check plan for GenericTypeInfo's and register the types at the serializers. * * @param plan the generated plan. */privatevoidregisterGenericTypeInfoIfConfigured(Planplan) {
if (!config.isAutoTypeRegistrationDisabled()) {
plan.accept(newVisitor<Operator<?>>() {
privatefinalSet<Class<?>> registeredTypes = newHashSet<>();
privatefinalSet<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = newHashSet<>();
@OverridepublicbooleanpreVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
if (!visitedOperators.add(visitable)) {
returnfalse;
}
OperatorInformation<?> opInfo = visitable.getOperatorInfo();
Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
returntrue;
}
@OverridepublicvoidpostVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
}
});
}
}
privatevoidregisterCachedFiles(Planplan) {
try {
registerCachedFilesWithPlan(plan);
} catch (Exceptione) {
thrownewRuntimeException("Error while registering cached files: " + e.getMessage(), e);
}
}
/** * Registers all files that were registered at this execution environment's cache registry of the * given plan's cache registry. * * @param p The plan to register files at. * @throws IOException Thrown if checks for existence and sanity fail. */privatevoidregisterCachedFilesWithPlan(Planp) throwsIOException {
for (Tuple2<String, DistributedCache.DistributedCacheEntry> entry : cacheFile) {
p.registerCachedFile(entry.f0, entry.f1);
}
}
privatevoidlogTypeRegistrationDetails() {
intregisteredTypes = getNumberOfRegisteredTypes();
intdefaultKryoSerializers = getNumberOfDefaultKryoSerializers();
LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);
if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer for serializing POJOs");
} elseif (config.isForceKryoEnabled()) {
LOG.info("Using KryoSerializer for serializing POJOs");
} elseif (config.isForceAvroEnabled()) {
LOG.info("Using AvroSerializer for serializing POJOs");
}
if (LOG.isDebugEnabled()) {
logDebuggingTypeDetails();
}
}
privateintgetNumberOfRegisteredTypes() {
returnconfig.getRegisteredKryoTypes().size() +
config.getRegisteredPojoTypes().size() +
config.getRegisteredTypesWithKryoSerializerClasses().size() +
config.getRegisteredTypesWithKryoSerializers().size();
}
privateintgetNumberOfDefaultKryoSerializers() {
returnconfig.getDefaultKryoSerializers().size() +
config.getDefaultKryoSerializerClasses().size();
}
privatevoidlogDebuggingTypeDetails() {
LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
LOG.debug("Registered Kryo with Serializers types: {}",
config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
LOG.debug("Registered Kryo with Serializer Classes types: {}",
config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
LOG.debug("Registered Kryo default Serializers: {}",
config.getDefaultKryoSerializers().entrySet().toString());
LOG.debug("Registered Kryo default Serializers Classes {}",
config.getDefaultKryoSerializerClasses().entrySet().toString());
LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());
// print information about static code analysisLOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
}
}
publicvoidloadModule(StringmoduleName, Modulemodule) {
moduleManager.loadModule(moduleName, module);
}
publicvoidloadModule(Stringname, Modulemodule) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string");
checkNotNull(module, "module cannot be null");
// 类似于catalog操作if (!modules.containsKey(name)) {
modules.put(name, module);
LOG.info("Loaded module {} from class {}", name, module.getClass().getName());
} else {
thrownewValidationException(
String.format("A module with name %s already exists", name));
}
}
createTemporarySystemFunction
publicvoidcreateTemporarySystemFunction(Stringname, UserDefinedFunctionfunctionInstance) {
// 注册临时系统函数functionCatalog.registerTemporarySystemFunction(
name,
functionInstance,
false);
}
privatevoidregisterTemporarySystemFunction(
Stringname,
CatalogFunctionfunction,
booleanignoreIfExists) {
// 将functionName转换为全小写finalStringnormalizedName = FunctionIdentifier.normalizeName(name);
try {
// 校验函数validateAndPrepareFunction(function);
} catch (Throwablet) {
thrownewValidationException(
String.format(
"Could not register temporary system function '%s' due to implementation errors.",
name),
t);
}
if (!tempSystemFunctions.containsKey(normalizedName)) {
tempSystemFunctions.put(normalizedName, function);
} elseif (!ignoreIfExists) {
thrownewValidationException(
String.format(
"Could not register temporary system function. A function named '%s' does already exist.",
name));
}
}
createFunction
publicvoidcreateFunction(Stringpath, Class<? extendsUserDefinedFunction> functionClass, booleanignoreIfExists) {
finalUnresolvedIdentifierunresolvedIdentifier = parser.parseIdentifier(path);
functionCatalog.registerCatalogFunction(
unresolvedIdentifier,
functionClass,
ignoreIfExists);
}
publicvoidregisterCatalogFunction(
UnresolvedIdentifierunresolvedIdentifier,
Class<? extendsUserDefinedFunction> functionClass,
booleanignoreIfExists) {
finalObjectIdentifieridentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
finalObjectIdentifiernormalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);
try {
UserDefinedFunctionHelper.validateClass(functionClass);
} catch (Throwablet) {
thrownewValidationException(
String.format(
"Could not register catalog function '%s' due to implementation errors.",
identifier.asSummaryString()),
t);
}
finalCatalogcatalog = catalogManager.getCatalog(normalizedIdentifier.getCatalogName())
.orElseThrow(IllegalStateException::new);
finalObjectPathpath = identifier.toObjectPath();
// we force users to deal with temporary catalog functions first// 判断内存中是否存在if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
if (ignoreIfExists) {
return;
}
thrownewValidationException(
String.format(
"Could not register catalog function. A temporary function '%s' does already exist. " +
"Please drop the temporary function first.",
identifier.asSummaryString()));
}
// 判断该catalog是否存在if (catalog.functionExists(path)) {
if (ignoreIfExists) {
return;
}
thrownewValidationException(
String.format(
"Could not register catalog function. A function '%s' does already exist.",
identifier.asSummaryString()));
}
finalCatalogFunctioncatalogFunction = newCatalogFunctionImpl(
functionClass.getName(),
FunctionLanguage.JAVA);
try {
// 调用catalog创建函数catalog.createFunction(path, catalogFunction, ignoreIfExists);
} catch (Throwablet) {
thrownewTableException(
String.format(
"Could not register catalog function '%s'.",
identifier.asSummaryString()),
t);
}
}
dropFunction
publicbooleandropCatalogFunction(
UnresolvedIdentifierunresolvedIdentifier,
booleanignoreIfNotExist) {
finalObjectIdentifieridentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
finalObjectIdentifiernormalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);
finalCatalogcatalog = catalogManager.getCatalog(normalizedIdentifier.getCatalogName())
.orElseThrow(IllegalStateException::new);
finalObjectPathpath = identifier.toObjectPath();
// we force users to deal with temporary catalog functions first// 优先处理内存中的临时catalog函数if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
thrownewValidationException(
String.format(
"Could not drop catalog function. A temporary function '%s' does already exist. " +
"Please drop the temporary function first.",
identifier.asSummaryString()));
}
if (!catalog.functionExists(path)) {
if (ignoreIfNotExist) {
returnfalse;
}
thrownewValidationException(
String.format(
"Could not drop catalog function. A function '%s' doesn't exist.",
identifier.asSummaryString()));
}
try {
catalog.dropFunction(path, ignoreIfNotExist);
} catch (Throwablet) {
thrownewTableException(
String.format(
"Could not drop catalog function '%s'.",
identifier.asSummaryString()),
t);
}
returntrue;
}
createTemporaryFunction
publicvoidregisterTemporaryCatalogFunction(
UnresolvedIdentifierunresolvedIdentifier,
CatalogFunctioncatalogFunction,
booleanignoreIfExists) {
// 处理函数标识符finalObjectIdentifieridentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
finalObjectIdentifiernormalizedIdentifier = FunctionIdentifier.normalizeObjectIdentifier(identifier);
try {
// 校验和前置处理函数validateAndPrepareFunction(catalogFunction);
} catch (Throwablet) {
thrownewValidationException(
String.format(
"Could not register temporary catalog function '%s' due to implementation errors.",
identifier.asSummaryString()),
t);
}
// 放入tempCatalogFunctions内存map中if (!tempCatalogFunctions.containsKey(normalizedIdentifier)) {
tempCatalogFunctions.put(normalizedIdentifier, catalogFunction);
} elseif (!ignoreIfExists) {
thrownewValidationException(
String.format(
"Could not register temporary catalog function. A function '%s' does already exist.",
identifier.asSummaryString()));
}
}
from
//from->scanInternalpublicOptional<TableLookupResult> getTable(ObjectIdentifierobjectIdentifier) {
Preconditions.checkNotNull(schemaResolver, "schemaResolver should not be null");
// 获取临时表不存在从catalog中获取CatalogBaseTabletemporaryTable = temporaryTables.get(objectIdentifier);
if (temporaryTable != null) {
TableSchemaresolvedSchema = resolveTableSchema(temporaryTable);
returnOptional.of(TableLookupResult.temporary(temporaryTable, resolvedSchema));
} else {
returngetPermanentTable(objectIdentifier);
}
}
sqlQuery
@OverridepublicTablesqlQuery(Stringquery) {
// 解析query sql语句转换为Operation集合List<Operation> operations = parser.parse(query);
if (operations.size() != 1) {
thrownewValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
}
Operationoperation = operations.get(0);
// 判断是否为QueryOperationif (operationinstanceofQueryOperation && !(operationinstanceofModifyOperation)) {
// 创建TablereturncreateTable((QueryOperation) operation);
} else {
thrownewValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
"SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
}
}
protectedTableImplcreateTable(QueryOperationtableOperation) {
returnTableImpl.createTable(
this,
tableOperation,
operationTreeBuilder,
functionCatalog.asLookup(parser::parseIdentifier));
}