Skip to content

node desc

caojingwei edited this page Jun 27, 2017 · 87 revisions

本文档主要说明当前系统支持的任务类型及其相关协议。

当前支持的任务类型包括:

  • MR: map-reduce
  • HQL: hive-sql
  • PRESTO
  • SPARK: spark 批任务
  • SPARK_STREAMING: spark 流任务
  • SHELL
  • IMPEXP: 导入导出任务,将外部数据导入数仓或将数仓数据导出到外部。
  • VIRTUAL: 无实际意义,用于做节点间的依赖同步
  • 特殊API,把一个本地文件上传到hive
  • 特殊API,把一个本地文件上传到hdfs

1.MR 任务的参数信息

说明:这里描述了 map-reduce 对应的配置参数信息。

参数 类型 是否必选 描述 说明
mainClass string 主函数的 class 使用 class 的完全限定名
mainJar jsonObject 主 jar 包 jar 包文件名称
args string 命令行参数信息 这里支持变量替换
properties jsonArray 配置参数信息 这里对应 -D
files jsonArray 引用的文件名列表 这里对应 -files
archives jsonArray 引用的压缩包资源名列表 这里对应 -archives
libJars jsonArray 引用的 jar 包列表 这里对应 -libjars

mainJar 及 libJars 数组中每个元素说明:

参数 类型 是否必选 描述 说明
scope enum 资源域 可取值有 PROJECT-表示项目级别的资源,WORKFLOW-表示工作流级别的资源,默认为 PROJECT
res string 资源名称 如果 scopePROJECT, 会根据 res 取项目级别的资源,如果是 WORKFLOW,直接引用的是本地的文件

properties 说明:

参数 类型 是否必选 描述 说明
prop string 参数 key
value string 参数 value

files/archives 说明:

由于其为 jsonArray 格式,我们这里描述的是其每个元素的定义。

参数 类型 是否必选 描述 说明
scope enum 资源域 参考 mainJar 中说明
res string 资源名称 参考 mainJar 中说明
alias string 资源别名

为了说明系统功能,有两个假设:

  1. 假设我们上传了项目级资源 wordcount-examples.jar, ABC.conf, JOB.zip (注意项目级资源是没有路径信息的)
  2. 假设在该节点所在的工作流中,有工作流级资源 lib/tokenizer-0.1.jar, conf/HEL.conf (注意工作流级资源是通过路径进行引用的)

配置示例:

{
  "mainClass": "com.baifendian.mr.WordCount",
  "mainJar": {
      "scope": "PROJECT", // project level
      "res": "wordcount-examples.jar"
  },
  "args": "/user/joe/wordcount/input /user/joe/wordcount/output",
  "properties": [{
      "prop": "wordcount.case.sensitive",
      "value": "true"
    }, {
      "prop": "stopwords", 
      "value": "the,who,a,then"
    }
  ],
  "files": [{
      "res": "ABC.conf",
      "alias": "aa"
    }, {
      "scope": "WORKFLOW", // workflow level
      "res": "conf/HEL.conf",
      "alias": "hh"
    }
  ],
  "archives": [{
      "res": "JOB.zip",
      "alias": "jj"
    }
  ],
  "libJars": [{
      "scope": "WORKFLOW", 
      "res": "lib/tokenizer-0.1.jar"
    }
  ]
}

对上面的配置,系统会下载项目的文件以及工作流的文件到本地(如果有冲突,优先取工作流级的资源),这里会下载:

ABC.conf
conf/HEL.conf
lib/tokenizer-0.1.jar
JOB.zip

上面的配置生成的语句为:

hadoop jar wordcount-examples.jar com.baifendian.mr.WordCount -Dwordcount.case.sensitive=true -Dstopwords=the,who,a,then -files ABC.conf#aa,conf/HEL.conf#hh -libjars lib/tokenizer-0.1.jar -archives JOB.zip#jj /user/joe/wordcount/input /user/joe/wordcount/output

2.HQL 任务的参数信息

说明:描述一个 hive-sql 的参数信息。

参数 类型 是否必选 描述 说明
sql string sql 语句原始内容 这里支持变量替换
udfs jsonArray 自定义函数描述 可能用到自定义函数,这里描述了用到的自定义函数

udfs 描述:

由于其为 jsonArray 格式,我们这里描述的是其每个元素的定义。

参数 类型 是否必选 描述 说明
func string 函数名称
className string 类名称
libJars jsonArray 依赖的 jar 包 结构和上面的 mainJar 一致

示例如下:

{
  "sql": "xxx", 
  "udfs": [{
      "func": "upper",
      "className": "com.baifendian.example.UpperUtils",
      "libJars": [{
        "scope": "PROJECT",
        "res": "upper-0.1.jar"
      }]
    }]
}

3.PRESTO 任务的参数信息

待定

4.SPARK/SPARK_STREAMING 任务的参数信息

说明:spark 是完成批处理任务的,spark-streaming 是完成流式任务的,它们的参数配置信息是一样的。

参数 类型 是否必选 描述 说明
mainClass string MR
mainJar jsonObject MR
args string MR 这里支持变量替换
properties jsonObject MR 这里对应 --conf
files jsonArray MR 这里对应 --files
archives jsonArray MR 这里对应 --archives
libJars jsonArray MR 这里对应 --jars
deployMode enum 部署模式 可选值只有 CLUSTER,默认也是 CLUSTER
driverMemory string driver 的内存大小,如 512M 对应 --driver-memory
driverCores int driver 的 cores 数目 默认为:1,对应 --driver-cores
numExecutors int executor 的个数 默认为:2,对应 --num-executors
executorMemory string executor 的内存大小, 如 1024M 对应 --num-executors
executorCores int executor 的 core 数 默认:2,对应 --executor-cores

配置示例:

{
  "mainClass": "com.baifendian.spark.WordCount",
  "mainJar": {
      "scope": "PROJECT", // project level
      "res": "spark-wc-examples.jar"
  },
  "args": "/user/joe/wordcount/input /user/joe/wordcount/output",
  "properties": [{
      "prop": "wordcount.case.sensitive",
      "value": "true"
    }, {
      "prop": "stopwords", 
      "value": "the,who,a,then"
    }
  ],
  "files": [{
      "res": "ABC.conf",
      "alias": "aa"
    }, {
      "scope": "WORKFLOW", // workflow level
      "res": "conf/HEL.conf",
      "alias": "hh"
    }
  ],
  "archives": [{
      "res": "JOB.zip",
      "alias": "jj"
    }
  ],
  "libJars": [{
      "scope": "WORKFLOW", 
      "res": "lib/tokenizer-0.1.jar"
    }
  ],
  "driverCores": 2,
  "driverMemory": "2048M",
  "numExecutors": 2,
  "executorMemory": "4096M",
  "executorCores": 2
}

上面的配置生成的语句为:

spark-submit --class com.baifendian.spark.WordCount \
             --master yarn \
             --deploy-mode CLUSTER \
             --driver-cores 2 \
             --driver-memory 2048M \
             --num-executors 2 \
             --executor-cores 2 \
             --executor-memory 4096M \
             --jars lib/tokenizer-0.1.jar \
             --files ABC.conf#aa,conf/HEL.conf#hh \
             --archives JOB.zip#jj \
             --conf "wordcount.case.sensitive=true" \
             --conf "stopwords=the,who,a,then" \
             spark-wc-examples.jar /user/joe/wordcount/input /user/joe/wordcount/output

5.SHELL 任务的参数信息

说明:shell 任务是一种非常灵活的任务类型,用户可以在 shell 中完成完成很多原生没有提供的工作,比如运行一个 java/python 查询,甚至是完成一个 mr 或者 spark 任务的提交。

参数 类型 是否必选 描述 说明
script string shell 脚本信息 这里支持变量替换
resources jsonArray<string> 该脚本依赖的资源,注意,这里仅仅需要注明项目基本的资源 列表中的每个元素表明依赖的项目级资源名称

配置示例:

{
  "script": "#!/bin/bash\npwd\njava -classpath examples.jar com.baifendian.example.HelloWorld",
  "resources": [{
      "scope": "PROJECT", // project level
      "res": "examples.jar"
    }
  ]
}

6.IMPEXP 任务的参数信息

6.1.MYSQL 导入到 HIVE

说明:从 mysql 导入到 hive 表中。

参数 类型 是否必选 描述 说明
type string 导入导出的类型 必须是 "MYSQL_TO_HIVE"
reader jsonObject 读规则参数
writer jsonObject 写规则参数
setting jsonObject 其他导入规则

reader 中每个元素说明如下:

参数 类型 是否必选 描述 说明
datasource string 数据源名称
table array 需要导入的表列表,可以是多张表,但是必须相同的schema结构,支持动态规则 test_$[yyyyMMdd],关于自定义参数相关内容,详见:[[参数说明 parameter desc]]
splitPk string 根据指定字段进行数据分片 目前只支持整形的分片,建议使用主键
where string 筛选条件 不需要要写where,不支持limit等,只支持合法的where语句
querySql string 查询语句 如果需要做复杂查询,这里可以指定查询语句
column array 需要读取的字段 用户需要按照Mysql SQL语法格式: ["id", "table", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,table为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。

writer 中每个元素说明如下: 注意:当前导入不支持hive中的高级类型。

参数 类型 是否必选 描述 说明
database string 需要导入的数据库名
table string 需要导入的表名,支持动态规则 test_$[yyyyMMdd],关于自定义参数相关内容,详见:[[参数说明 parameter desc]]
writeMode enum 写入模式 支持 append 追加,和overwrite 覆盖
column jsonArray 需要写入的字段 与reader的中的字段按照数组顺序一一对应

writer.column 中每个字段说明如下:

参数 类型 是否必选 描述 说明
name string 字段名称
type enum 字段类型 目前仅支持与以下 Hive 数据类型: 数值型:TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE 字符串类型:STRING,VARCHAR,CHAR 布尔类型:BOOLEAN 时间类型:DATE,TIMESTAMP,目前不支持:decimal、binary、arrays、maps、structs、union 类型

mysql 类型到 hive 类型的转换支持列表

mysql 类型 支持的 hive 类型 说明
int, tinyint, smallint, mediumint, int, bigint TINYINT,SMALLINT,INT,BIGINT
float, double, decimal FLOAT,DOUBLE
varchar, char, tinytext, text, mediumtext, longtext, year STRING,VARCHAR,CHAR
date, datetime, timestamp, time DATE,TIMESTAMP
bit, bool BOOLEAN
tinyblob, mediumblob, blob, longblob, varbinary 不支持

setting 字段配置说明:

参数 类型 是否必选 描述 说明
speed jsonObject 流量控制
errorLimit jsonObject 脏数据控制

setting.speed 字段配置说明:

参数 类型 是否必选 描述 说明
channel int 控制并发数
byte int 同步时的速度

setting.errorLimit 字段配置说明:

参数 类型 是否必选 描述 说明
record int 脏数据阈值
percentage int 脏数据占百分比阈值

配置示例:

{
  "reader": {
    "column": [
      "`id`",
      "`name`",
      "`email`",
      "`desc`",
      "`phone`",
      "`password`",
      "`role`",
      "`proxy_users`",
      "`create_time`",
      "`modify_time`"
    ],
    "datasource": "test11",
    "table": [
      "table1",
      "table2"
    ],
    "splitPk": false,
    "where": "id > 0"
  },
  "writer": {
    "database": "test",
    "table": "table",
    "writeMode": "append",
    "column": [
      {
        "name": "id",
        "type": "VARCHAR"
      },
      {
        "name": "name",
        "type": "VARCHAR"
      },
      {
        "name": "email",
        "type": "VARCHAR"
      },
      {
        "name": "desc",
        "type": "VARCHAR"
      },
      {
        "name": "phone",
        "type": "VARCHAR"
      },
      {
        "name": "password",
        "type": "VARCHAR"
      },
      {
        "name": "role",
        "type": "VARCHAR"
      },
      {
        "name": "proxy_users",
        "type": "VARCHAR"
      },
      {
        "name": "create_time",
        "type": "VARCHAR"
      },
      {
        "name": "modify_time",
        "type": "VARCHAR"
      }
    ]
  },
  "setting": {
    "speed": {
      "channel": 1,
      "byte": 104857600
    },
    "errorLimit": {
      "record": 3,
      "percentage": 0.05
    }
  }
}

6.2.MYSQL 导入到 HDFS

说明:从 mysql 导入到 HDFS。

参数 类型 是否必选 描述 说明
type string 导入导出的类型 必须是 "MYSQL_TO_HDFS"
reader jsonObject 读规则参数
writer jsonObject 写规则参数
setting jsonObject 其他导入规则

reader 参考mysql 导出到 hdfs

writer 中每个元素说明如下:

参数 类型 是否必选 描述 说明
path string HDFS路径名
fileName string 需要导入的文件名,支持动态规则 test_$[yyyyMMdd],关于自定义参数相关内容,详见:[[参数说明 parameter desc]]
writeMode enum 写入模式 支持 APPEND 追加,和OVERWRITE 覆盖,如果选择OVERWRITE 如果目录下有fileName前缀的文件,直接报错
fieldDelimiter string 字段分隔符 当文件类型为TEXT必须要指定,不可以是换行符。
fileType enum 文件格式 目前只支持用户配置为"TEXT"或"ORC",TEXT表示textfile文件格式,ORC表示orcfile文件格式
column jsonArray 需要写入的字段 与reader的中的字段按照数组顺序一一对应

writer.column 中每个字段说明如下:

参数 类型 是否必选 描述 说明
name string 字段名称
type enum 字段类型 HDFS字段类型与hive对应,目前仅支持与以下Hive数据类型: 数值型:TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE 字符串类型:STRING,VARCHAR,CHAR 布尔类型:BOOLEAN 时间类型:DATE,TIMESTAMP,目前不支持:decimal、binary、arrays、maps、structs、union 类型

setting 参考mysql 导入到 hive

支持的字段映射关系参考mysql 导入 hive。

配置示例:

{
 "reader": {
   "column": [
     "`id`",
     "`name`",
     "`email`",
     "`desc`",
     "`phone`",
     "`password`",
     "`role`",
     "`proxy_users`",
     "`create_time`",
     "`modify_time`"
   ],
   "datasource": "test11",
   "table": [
     "table1",
     "table2"
   ],
   "splitPk": false,
   "where": "id > 0"
 },
 "writer": {
   "path": "/test/temp/here",
   "fileName": "filetest",
   "writeMode": "append",
   "fileType": "ORC",
   "column": [
     {
       "name": "id",
       "type": "VARCHAR"
     },
     {
       "name": "name",
       "type": "VARCHAR"
     },
     {
       "name": "email",
       "type": "VARCHAR"
     },
     {
       "name": "desc",
       "type": "VARCHAR"
     },
     {
       "name": "phone",
       "type": "VARCHAR"
     },
     {
       "name": "password",
       "type": "VARCHAR"
     },
     {
       "name": "role",
       "type": "VARCHAR"
     },
     {
       "name": "proxy_users",
       "type": "VARCHAR"
     },
     {
       "name": "create_time",
       "type": "VARCHAR"
     },
     {
       "name": "modify_time",
       "type": "VARCHAR"
     }
   ]
 },
 "setting": {
   "speed": {
     "channel": 1,
     "byte": 104857600
   },
   "errorLimit": {
     "record": 3,
     "percentage": 0.05
   }
 }
}

6.3 HIVE 导出到 MYSQL

说明:从 HIVE 导出到 mysql 表中。

参数 类型 是否必选 描述 说明
type string 导入导出的类型 必须是 "HIVE_TO_MYSQL"
reader jsonObject 读规则参数
writer jsonObject 写规则参数
setting jsonObject 其他导入规则

reader 中的字段说明:

参数 类型 是否必选 描述 说明
database string 数据库名称
table string 读取得表,支持动态规则 test_$[yyyyMMdd],关于自定义参数相关内容,详见:[[参数说明 parameter desc]]
where string 筛选条件 不需要要写where,不支持limit等,只支持合法的where语句
querySql string 查询语句 如果需要做复杂查询,这里可以指定查询语句
column array 需要读取的字段 用户需要按照hive SQL语法格式: ["id", "table", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,table为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。

writer 中的字段说明:

参数 类型 是否必选 描述 说明
datasource string 数据源名称
table string 写入表,支持动态规则 test_$[yyyyMMdd],关于自定义参数相关内容,详见:[[参数说明 parameter desc]]
session array DataX在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
preSql string 写入前置操作 写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。
postSql string 写入后置操作 参考前置操作。
writeMode string 写入模式 支持 INSERT(insert into)/REPLACE (replace into)/UPDATE(ON DUPLICATE KEY UPDATE)
batchSize long 批量提交记录的大小 一次性批量提交的记录数大小,该值可以极大减少DataX与Mysql的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成DataX运行进程OOM情况。
column array 需要读取的字段 用户需要按照hive SQL语法格式: ["id", "table", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,table为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。

支持的字段映射关系

mysql 类型 支持的 hive 类型 说明
int, tinyint, smallint, mediumint, int, bigint TINYINT,SMALLINT,INT,BIGINT
float, double, decimal FLOAT,DOUBLE
varchar, char, tinytext, text, mediumtext, longtext, year STRING,VARCHAR,CHAR
date, datetime, timestamp, time DATE,TIMESTAMP
bit, bool BOOLEAN
tinyblob, mediumblob, blob, longblob, varbinary 不支持

配置示例:

{
  "name": "HIVE_TO_MYSQL",
  "reader": {
    "column": [
      "`id`",
      "`name`",
      "`email`",
      "`desc`",
      "`phone`",
      "`password`",
      "`role`",
      "`proxy_users`",
      "`create_time`",
      "`modify_time`"
    ],
    "datasource": "test11",
    "table": "table1",
    "where": "id > 0"
  },
  "writer": {
    "datasource": "database1",
    "table": "table",
    "writeMode": "INSERT",
    "column": [
      "`id`",
      "`name`",
      "`email`",
      "`desc`",
      "`phone`",
      "`password`",
      "`role`",
      "`proxy_users`",
      "`create_time`",
      "`modify_time`"
    ]
  },
  "setting": {
    "speed": {
      "channel": 1,
      "byte": 104857600
    },
    "errorLimit": {
      "record": 3,
      "percentage": 0.05
    }
  }
}

6.4 HIVE 导出到 MONGODB

说明:从 HIVE 导出到 MONGODB 表中。

参数 类型 是否必选 描述 说明
type string 导入导出的类型 必须是 "HIVE_TO_MONGODB"
reader jsonObject 读规则参数
writer jsonObject 写规则参数
setting jsonObject 其他导入规则

reader 配置参考 hive 到 mysql

writer 配置字段说明:

参数 类型 是否必选 描述 说明
datasource string 数据源名称
table string 写入表,支持动态规则 test_$[yyyyMMdd],关于自定义参数相关内容,详见:[[参数说明 parameter desc]]
writeMode enum 写入模式 支持3种写入模式,1追加插入INSERT,2更新UPDATE,3更新或追加UPSET,注意选择2.3模式需要制定upsetKey
column array MongoDB的文档列名
upsetKey string 更新的依据字段 字段值相同的会做更新操作,字段可以为“name”,"user.name"等

column 配置字段说明:

参数 类型 是否必选 描述 说明
name boolean 列名 列名支持所有mongo支持的指定字段写法,例如“name”,"user.name","user[0].name"
type string 类型 支持几乎所有mongodb的类型:DOUBLE,STRING,OBJECT(只接受json格式的string),BINARY,OBJECT_ID(所有的object_id值均当做string处理),BOOLEAN,DATE,INT,TIMESTAMP,LONG,DECIMAL,ARRAY

支持的字段转换

mongodb 类型 支持的 hive 类型 说明
int, Long TINYINT,SMALLINT,INT,BIGINT
double FLOAT,DOUBLE
string STRING,VARCHAR,CHAR
date DATE,TIMESTAMP
boolean BOOLEAN
bytes binary
object MAP,STRUCT
array ARRAY

配置示例:

{
  "job": {
    "content": [
      {
        "reader": {
          "column": [
            "`name`",
            "`desc`",
            "`create_time`"
          ],
          "datasource": "import",
          "table": "data_test",
          "where": "id > 0"
        },
        "writer": {
          "name": "mongodbwriter",
          "parameter": {
            "address": [
              "172.18.1.22:27028"
            ],
            "collectionName": "user2",
            "column": [
              {
                "name": "user.name",
                "type": "STRING"
              },
              {
                "name": "user.info",
                "type": "OBJECT"
              }
            ],
            "dbName": "datax_test",
            "writeMode": "UPSET",
            "upsertKey": "user.name"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}

7.VIRTUAL 任务的参数信息

说明:该任务类型实际上是作为依赖使用,比如在一个 DAG 中,任务 D、E 需要依赖 A、B、C 完成,这个时候可以配置一个虚拟结点 V 依赖 A、B、C,而 D、E 只需要依赖 V 即可。

该任务类型没有参数信息。

8.把一个本地文件上传到hive

说明:把一个本地文件直接上传到hdfs中,不做其他任何处理。

POST    /projects/:project-name/workflows/file-to-hive
Parameters: data={data}&file={file}
内容上传的 Content-Type 使用:multipart/form-data
 
Response:
Status: 201 Created

请求参数:

参数 类型 是否必选 描述 说明
data object 上传到hive的详细配置
file string 需要上传的文件 只支持 txt 或 csv 格式文件,并且必须是一张逻辑上的二维表

data配置参数详解

参数 类型 是否必选 描述 说明
type string 导入导出的类型 必须是 "FILE_TO_HIVE"
reader jsonObject 读文件规则参数
writer jsonObject 写hive规则参数

reader配置参数详解

参数 类型 是否必选 描述 说明
column array 需要导入的字段下标
length int 文件字段的长度

write配置参考mysql写hive

9.把一个本地文件上传到hdfs

说明:把一个本地文件直接上传到hdfs中,不做其他任何处理。

POST    /projects/:project-name/workflows/file-to-hdfs
Parameters: hdfsPath={hdfsPath}&file={file}
内容上传的 Content-Type 使用:multipart/form-data
 
Response:
Status: 201 Created

请求参数:

参数 类型 是否必选 描述 说明
hdfsPath string hdfs路径
file string 需要上传的文件
Clone this wiki locally