-
Notifications
You must be signed in to change notification settings - Fork 18
node desc
qifeng dai edited this page Aug 11, 2017
·
87 revisions
本文档主要说明当前系统支持的任务类型及其相关协议。
当前支持的任务类型包括:
- MR: map-reduce
- HQL: 执行引擎包括 hive, spark, phoenix
- SPARK: spark 批任务
- SPARK_STREAMING: spark 流任务
- STORM:storm 流任务
- SHELL:shell 任务
- IMPEXP: 导入导出任务,将外部数据导入数仓或将数仓数据导出到外部。
- VIRTUAL: 无实际意义,用于做节点间的依赖同步
说明:这里描述了 map-reduce 对应的配置参数信息。
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
mainClass | string | 是 | 主函数的 class | 使用 class 的完全限定名 |
mainJar | jsonObject | 是 | 主 jar 包 | jar 包文件名称 |
args | string | 否 | 命令行参数信息 | 这里支持变量替换 |
properties | jsonArray | 否 | 配置参数信息 | 这里对应 -D |
files | jsonArray | 否 | 引用的文件名列表 | 这里对应 -files |
archives | jsonArray | 否 | 引用的压缩包资源名列表 | 这里对应 -archives |
libJars | jsonArray | 否 | 引用的 jar 包列表 | 这里对应 -libjars |
mainJar 及 libJars 数组中每个元素说明:
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
scope | enum | 否 | 资源域 | 可取值有 PROJECT -表示项目级别的资源,WORKFLOW -表示工作流级别的资源,默认为 PROJECT
|
res | string | 是 | 资源名称 | 如果 scope 是 PROJECT , 会根据 res 取项目级别的资源,如果是 WORKFLOW ,直接引用的是本地的文件 |
properties 说明:
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
prop | string | 是 | 参数 key | |
value | string | 是 | 参数 value |
files/archives 说明:
由于其为 jsonArray 格式,我们这里描述的是其每个元素的定义。
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
scope | enum | 否 | 资源域 | 参考 mainJar 中说明 |
res | string | 是 | 资源名称 | 参考 mainJar 中说明 |
alias | string | 否 | 资源别名 |
为了说明系统功能,有两个假设:
- 假设我们上传了项目级资源
wordcount-examples.jar
,ABC.conf
,JOB.zip
(注意项目级资源是没有路径信息的) - 假设在该节点所在的工作流中,有工作流级资源
lib/tokenizer-0.1.jar
,conf/HEL.conf
(注意工作流级资源是通过路径进行引用的)
配置示例:
{
"mainClass": "com.baifendian.mr.WordCount",
"mainJar": {
"scope": "PROJECT", // project level
"res": "wordcount-examples.jar"
},
"args": "/user/joe/wordcount/input /user/joe/wordcount/output",
"properties": [{
"prop": "wordcount.case.sensitive",
"value": "true"
}, {
"prop": "stopwords",
"value": "the,who,a,then"
}
],
"files": [{
"res": "ABC.conf",
"alias": "aa"
}, {
"scope": "WORKFLOW", // workflow level
"res": "conf/HEL.conf",
"alias": "hh"
}
],
"archives": [{
"res": "JOB.zip",
"alias": "jj"
}
],
"libJars": [{
"scope": "WORKFLOW",
"res": "lib/tokenizer-0.1.jar"
}
]
}
对上面的配置,系统会下载项目的文件以及工作流的文件到本地(如果有冲突,优先取工作流级的资源),这里会下载:
ABC.conf
conf/HEL.conf
lib/tokenizer-0.1.jar
JOB.zip
上面的配置生成的语句为:
hadoop jar wordcount-examples.jar com.baifendian.mr.WordCount -Dwordcount.case.sensitive=true -Dstopwords=the,who,a,then -files ABC.conf#aa,conf/HEL.conf#hh -libjars lib/tokenizer-0.1.jar -archives JOB.zip#jj /user/joe/wordcount/input /user/joe/wordcount/output
说明:描述一个 sql 的参数信息, 注意这里不是特指 hive-sql。
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
sql | string | 是 | sql 语句原始内容 | 这里支持变量替换 |
type | enum | 否 | sql 执行引擎的类型, 支持 HIVE, PHOENIX, SPARK | 默认为 "HIVE" |
udfs | jsonArray | 否 | 自定义函数描述 | 可能用到自定义函数,这里描述了用到的自定义函数 |
udfs 描述:
由于其为 jsonArray 格式,我们这里描述的是其每个元素的定义。
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
func | string | 是 | 函数名称 | |
className | string | 是 | 类名称 | |
libJars | jsonArray | 是 | 依赖的 jar 包 | 结构和上面的 mainJar 一致 |
示例如下:
{
"sql": "xxx",
"type": "HIVE",
"udfs": [{
"func": "upper",
"className": "com.baifendian.example.UpperUtils",
"libJars": [{
"scope": "PROJECT",
"res": "upper-0.1.jar"
}]
}]
}
说明:spark 是完成批处理任务的,spark-streaming 是完成流式任务的,它们的参数配置信息是一样的。
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
mainClass | string | 是 | 同 MR
|
|
mainJar | jsonObject | 是 | 同 MR
|
|
args | string | 否 | 同 MR
|
这里支持变量替换 |
properties | jsonObject | 否 | 同 MR
|
这里对应 --conf |
files | jsonArray | 否 | 同 MR
|
这里对应 --files |
archives | jsonArray | 否 | 同 MR
|
这里对应 --archives |
libJars | jsonArray | 否 | 同 MR
|
这里对应 --jars |
deployMode | enum | 否 | 部署模式 | 可选值只有 CLUSTER ,默认也是 CLUSTER
|
driverMemory | string | 否 | driver 的内存大小,如 512M | 对应 --driver-memory |
driverCores | int | 否 | driver 的 cores 数目 | 默认为:1,对应 --driver-cores |
numExecutors | int | 否 | executor 的个数 | 默认为:2,对应 --num-executors |
executorMemory | string | 否 | executor 的内存大小, 如 1024M | 对应 --num-executors |
executorCores | int | 否 | executor 的 core 数 | 默认:2,对应 --executor-cores |
配置示例:
{
"mainClass": "com.baifendian.spark.WordCount",
"mainJar": {
"scope": "PROJECT", // project level
"res": "spark-wc-examples.jar"
},
"args": "/user/joe/wordcount/input /user/joe/wordcount/output",
"properties": [{
"prop": "wordcount.case.sensitive",
"value": "true"
}, {
"prop": "stopwords",
"value": "the,who,a,then"
}
],
"files": [{
"res": "ABC.conf",
"alias": "aa"
}, {
"scope": "WORKFLOW", // workflow level
"res": "conf/HEL.conf",
"alias": "hh"
}
],
"archives": [{
"res": "JOB.zip",
"alias": "jj"
}
],
"libJars": [{
"scope": "WORKFLOW",
"res": "lib/tokenizer-0.1.jar"
}
],
"driverCores": 2,
"driverMemory": "2048M",
"numExecutors": 2,
"executorMemory": "4096M",
"executorCores": 2
}
上面的配置生成的语句为:
spark-submit --class com.baifendian.spark.WordCount \
--master yarn \
--deploy-mode CLUSTER \
--driver-cores 2 \
--driver-memory 2048M \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 4096M \
--jars lib/tokenizer-0.1.jar \
--files ABC.conf#aa,conf/HEL.conf#hh \
--archives JOB.zip#jj \
--conf "wordcount.case.sensitive=true" \
--conf "stopwords=the,who,a,then" \
spark-wc-examples.jar /user/joe/wordcount/input /user/joe/wordcount/output
说明:storm 流任务,支持 jar, sql, shell 类型的任务。
使用 jar 方式提交任务
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
type | enum | 是 | 必须为 'JAR' | |
topologyName | string | 是 | storm 任务名称 | |
stormParam | jsonObject | 是 | storm 任务配置 |
stormParam 每个参数说明:
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
mainClass | string | 是 | 同 MR
|
|
mainJar | jsonObject | 是 | 同 MR
|
|
args | string | 否 | 同 MR
|
这里支持变量替换 |
jars | jsonArray | 否 | 依赖第三方 jar 包 | 这里对应 --jars |
artifacts | string | 否 | 设置 mvn 依赖 | 这里对应 --artifacts |
artifactRepositories | string | 否 | 设置 mvn 中心仓库 | 这里对应 --artifactRepositories |
配置示例:
{
"mainClass": "com.baifendian.spark.WordCount",
"mainJar": {
"scope": "PROJECT",
"res": "storm-jar-examples.jar"
},
"args": "/user/joe/wordcount/input /user/joe/wordcount/output",
"jars": [
{
"scope": "PROJECT",
"res": "storm-utils.jar",
}
],
"artifacts": "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12",
"artifactRepositories": "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"
}
上面的配置生成的语句为:
./bin/storm jar storm-jar-examples.jar com.baifendian.spark.WordCount /user/joe/wordcount/input /user/joe/wordcount/output --jars "storm-utils.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
type | enum | 是 | 必须为 'SQL' | |
topologyName | string | 是 | storm 任务名称 | |
stormParam | jsonObject | 是 | storm 任务配置 |
stormParam 每个参数说明:
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
sqlFile | jsonObject | 是 | sql 文件 | |
jars | jsonArray | 否 | 依赖第三方 jar 包 | 这里对应 --jars |
artifacts | string | 否 | 设置 mvn 依赖 | 这里对应 --artifacts |
artifactRepositories | string | 否 | 设置 mvn 中心仓库 | 这里对应 --artifactRepositories |
配置示例:
{
"mainClass": "com.baifendian.spark.WordCount",
"sqlFile": {
"scope": "PROJECT",
"res": "storm.sql"
},
"jars": [
{
"scope": "PROJECT",
"res": "storm-udfs.jar",
}
],
"artifacts": "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12",
"artifactRepositories": "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"
}
上面的配置生成的语句为:
./bin/storm sql storm.sql --jars "storm-udfs.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"
说明:shell 任务是一种非常灵活的任务类型,用户可以在 shell 中完成完成很多原生没有提供的工作,比如运行一个 java/python 查询,甚至是完成一个 mr 或者 spark 任务的提交。
参数 | 类型 | 是否必选 | 描述 | 说明 |
---|---|---|---|---|
script | string | 是 | shell 脚本信息 | 这里支持变量替换 |
resources | jsonArray<string> | 否 | 该脚本依赖的资源,注意,这里仅仅需要注明项目基本的资源 | 列表中的每个元素表明依赖的项目级资源名称 |
配置示例:
{
"script": "#!/bin/bash\npwd\njava -classpath examples.jar com.baifendian.example.HelloWorld",
"resources": [{
"scope": "PROJECT", // project level
"res": "examples.jar"
}
]
}
参见: 导入导出
说明:该任务类型实际上是作为依赖使用,比如在一个 DAG 中,任务
D、E
需要依赖A、B、C
完成,这个时候可以配置一个虚拟结点V
依赖A、B、C
,而D、E
只需要依赖V
即可。
该任务类型没有参数信息。