From 079d16126dee2dafd15eeac4cb4507888d92ed65 Mon Sep 17 00:00:00 2001 From: Anqi Date: Thu, 19 Oct 2023 16:30:22 +0800 Subject: [PATCH] support to get template config file according to the datasource (#170) --- README-CN.md | 10 +- README.md | 103 ++++++++++++------ exchange-common/pom.xml | 10 ++ .../vesoft/exchange/common/FileMigrate.java | 67 ++++++++++++ .../main/resources/config_template/csv.conf | 100 +++++++++++++++++ .../main/resources/config_template/hbase.conf | 95 ++++++++++++++++ .../main/resources/config_template/hive.conf | 88 +++++++++++++++ .../main/resources/config_template/jdbc.conf | 96 ++++++++++++++++ .../main/resources/config_template/json.conf | 95 ++++++++++++++++ .../main/resources/config_template/kafka.conf | 90 +++++++++++++++ .../main/resources/config_template/neo4j.conf | 94 ++++++++++++++++ .../main/resources/config_template/orc.conf | 95 ++++++++++++++++ .../resources/config_template/parquet.conf | 95 ++++++++++++++++ .../common/GenerateConfigTemplate.scala | 73 +++++++++++++ .../common/utils/ConfigTemplateUtils.scala | 43 ++++++++ 15 files changed, 1117 insertions(+), 37 deletions(-) create mode 100644 exchange-common/src/main/java/com/vesoft/exchange/common/FileMigrate.java create mode 100644 exchange-common/src/main/resources/config_template/csv.conf create mode 100644 exchange-common/src/main/resources/config_template/hbase.conf create mode 100644 exchange-common/src/main/resources/config_template/hive.conf create mode 100644 exchange-common/src/main/resources/config_template/jdbc.conf create mode 100644 exchange-common/src/main/resources/config_template/json.conf create mode 100644 exchange-common/src/main/resources/config_template/kafka.conf create mode 100644 exchange-common/src/main/resources/config_template/neo4j.conf create mode 100644 exchange-common/src/main/resources/config_template/orc.conf create mode 100644 exchange-common/src/main/resources/config_template/parquet.conf create mode 100644 exchange-common/src/main/scala/com/vesoft/exchange/common/GenerateConfigTemplate.scala create mode 100644 exchange-common/src/main/scala/com/vesoft/exchange/common/utils/ConfigTemplateUtils.scala diff --git a/README-CN.md b/README-CN.md index d806ae04..a966361d 100644 --- a/README-CN.md +++ b/README-CN.md @@ -18,8 +18,6 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和 ,或参考 Exchange 1.0 的使用文档[NebulaExchange 用户手册](https://docs.nebula-graph.com.cn/nebula-exchange/about-exchange/ex-ug-what-is-exchange/ "点击前往 Nebula Graph 网站")。 -> 注意:3.4.0版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或 -> 3.3.0 或 3.5.0 版本。 ## 如何获取 @@ -53,6 +51,14 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和 进入[GitHub Actions Artifacts](https://github.com/vesoft-inc/nebula-exchange/actions/workflows/snapshot.yml) 页面点击任意 workflow 后,从 Artifacts 中,根据需求下载下载。 +## 自动生成示例配置文件 + +通过如下命令,指定要导入的数据源,即可获得该数据源所对应的配置文件示例。 +```agsl +java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s {source} -p +{target-path-to-save-config-file} +``` + ## 版本匹配 Exchange 和 NebulaGraph 的版本对应关系如下: diff --git a/README.md b/README.md index d8cc40ba..b0e6a535 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,20 @@ # NebulaGraph Exchange - [中文版](https://github.com/vesoft-inc/nebula-exchange/blob/master/README-CN.md) -NebulaGraph Exchange (referred to as Exchange) is an Apache Spark™ application used to migrate data in bulk from different sources to NebulaGraph in a distributed way(Spark). It supports a variety of batch or streaming data sources and allows direct writing to NebulaGraph through side-loading (SST Files). +[中文版](https://github.com/vesoft-inc/nebula-exchange/blob/master/README-CN.md) -Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective toolkits named: `nebula-exchange_spark_2.2`, `nebula-exchange_spark_2.4`, and `nebula-exchange_spark_3.0`. +NebulaGraph Exchange (referred to as Exchange) is an Apache Spark™ application used to migrate data +in bulk from different sources to NebulaGraph in a distributed way(Spark). It supports a variety of +batch or streaming data sources and allows direct writing to NebulaGraph through side-loading (SST +Files). + +Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective toolkits +named: `nebula-exchange_spark_2.2`, `nebula-exchange_spark_2.4`, and `nebula-exchange_spark_3.0`. > Note: -> - Exchange 3.4.0 does not support Apache Kafka and Apache Pulsar. Please use Exchange of version 3.0.0, 3.3.0, or 3.5.0 to load data from Apache Kafka or Apache Pulsar to NebulaGraph for now. -> - This repo covers only NebulaGraph 2.x and 3.x, for NebulaGraph v1.x, please use [NebulaGraph Exchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange). +> - Exchange 3.4.0 does not support Apache Kafka and Apache Pulsar. Please use Exchange of version + 3.0.0, 3.3.0, or 3.5.0 to load data from Apache Kafka or Apache Pulsar to NebulaGraph for now. +> - This repo covers only NebulaGraph 2.x and 3.x, for NebulaGraph v1.x, please + use [NebulaGraph Exchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange). ## Build or Download Exchange @@ -21,13 +28,16 @@ Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective t $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-exchange_spark_3.0 -am -Pscala-2.12 -Pspark-3.0 ``` - After packaging, the newly generated JAR files can be found in the following path: - - nebula-exchange/nebula-exchange_spark_2.2/target/ contains nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar - - nebula-exchange/nebula-exchange_spark_2.4/target/ contains nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar - - nebula-exchange/nebula-exchange_spark_3.0/target/ contains nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar + After packaging, the newly generated JAR files can be found in the following path: + - nebula-exchange/nebula-exchange_spark_2.2/target/ contains + nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar + - nebula-exchange/nebula-exchange_spark_2.4/target/ contains + nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar + - nebula-exchange/nebula-exchange_spark_3.0/target/ contains + nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar 3. Download from the GitHub artifact - + **Released Version:** [GitHub Releases](https://github.com/vesoft-inc/nebula-exchange/releases) @@ -63,7 +73,8 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \ -c application.conf ``` -Note: When using Exchange to generate SST files, please add `spark.sql.shuffle.partition` in `--conf` for Spark's shuffle operation: +Note: When using Exchange to generate SST files, please add `spark.sql.shuffle.partition` +in `--conf` for Spark's shuffle operation: ``` $SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange \ @@ -73,38 +84,60 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \ -c application.conf ``` -For more details, please refer to [NebulaGraph Exchange Docs](https://docs.nebula-graph.io/master/nebula-exchange/about-exchange/ex-ug-what-is-exchange/) +For more details, please refer +to [NebulaGraph Exchange Docs](https://docs.nebula-graph.io/master/nebula-exchange/about-exchange/ex-ug-what-is-exchange/) + +## How to get the config file + +You can get the template config file with your datasource through the command: + +```agsl +java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s {source} -p +{target-path-to-save-config-file} +``` + +Such as your datasource is csv, and want to save the template config file in /tmp/, please run: + +```agsl +java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s csv -p /tmp +``` ## Version Compatibility Matrix Here is the version correspondence between Exchange and NebulaGraph: -| Exchange Version | Nebula Version | Spark Version | -|:------------------------------------------:|:--------------:|:--------------:| -| nebula-exchange-2.0.0.jar | 2.0.0, 2.0.1 |2.4.*| -| nebula-exchange-2.0.1.jar | 2.0.0, 2.0.1 |2.4.*| -| nebula-exchange-2.1.0.jar | 2.0.0, 2.0.1 |2.4.*| -| nebula-exchange-2.5.0.jar | 2.5.0, 2.5.1 |2.4.*| -| nebula-exchange-2.5.1.jar | 2.5.0, 2.5.1 |2.4.*| -| nebula-exchange-2.5.2.jar | 2.5.0, 2.5.1 |2.4.*| -| nebula-exchange-2.6.0.jar | 2.6.0, 2.6.1 |2.4.*| -| nebula-exchange-2.6.1.jar | 2.6.0, 2.6.1 |2.4.*| -| nebula-exchange-2.6.2.jar | 2.6.0, 2.6.1 |2.4.*| -| nebula-exchange-2.6.3.jar | 2.6.0, 2.6.1 |2.4.*| -| nebula-exchange_spark_2.2-3.x.x.jar | 3.x.x |2.2.*| -| nebula-exchange_spark_2.4-3.x.x.jar | 3.x.x |2.4.*| -| nebula-exchange_spark_3.0-3.x.x.jar | 3.x.x |`3.0.*`,`3.1.*`,`3.2.*`,`3.3.*`| -| nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar | nightly |2.2.*| -| nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar | nightly |2.4.*| -| nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar | nightly |`3.0.*`,`3.1.*`,`3.2.*`,`3.3.*`| +| Exchange Version | Nebula Version | Spark Version | +|:------------------------------------------:|:--------------:|:-------------------------------:| +| nebula-exchange-2.0.0.jar | 2.0.0, 2.0.1 | 2.4.* | +| nebula-exchange-2.0.1.jar | 2.0.0, 2.0.1 | 2.4.* | +| nebula-exchange-2.1.0.jar | 2.0.0, 2.0.1 | 2.4.* | +| nebula-exchange-2.5.0.jar | 2.5.0, 2.5.1 | 2.4.* | +| nebula-exchange-2.5.1.jar | 2.5.0, 2.5.1 | 2.4.* | +| nebula-exchange-2.5.2.jar | 2.5.0, 2.5.1 | 2.4.* | +| nebula-exchange-2.6.0.jar | 2.6.0, 2.6.1 | 2.4.* | +| nebula-exchange-2.6.1.jar | 2.6.0, 2.6.1 | 2.4.* | +| nebula-exchange-2.6.2.jar | 2.6.0, 2.6.1 | 2.4.* | +| nebula-exchange-2.6.3.jar | 2.6.0, 2.6.1 | 2.4.* | +| nebula-exchange_spark_2.2-3.x.x.jar | 3.x.x | 2.2.* | +| nebula-exchange_spark_2.4-3.x.x.jar | 3.x.x | 2.4.* | +| nebula-exchange_spark_3.0-3.x.x.jar | 3.x.x | `3.0.*`,`3.1.*`,`3.2.*`,`3.3.*` | +| nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar | nightly | 2.2.* | +| nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar | nightly | 2.4.* | +| nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar | nightly | `3.0.*`,`3.1.*`,`3.2.*`,`3.3.*` | ## Feature History 1. *Since 2.0* Exchange allows for the import of vertex data with both String and Integer type IDs. -2. *Since 2.0* Exchange also supports importing data of various types, including Null, Date, DateTime (using UTC instead of local time), and Time. -3. *Since 2.0* In addition to Hive on Spark, Exchange can import data from other Hive sources as well. -4. *Since 2.0* If there are failures during the data import process, Exchange supports recording and retrying the INSERT statement. -5. *Since 2.5* While SST import is supported by Exchange, property default values are not yet supported. +2. *Since 2.0* Exchange also supports importing data of various types, including Null, Date, + DateTime (using UTC instead of local time), and Time. +3. *Since 2.0* In addition to Hive on Spark, Exchange can import data from other Hive sources as + well. +4. *Since 2.0* If there are failures during the data import process, Exchange supports recording and + retrying the INSERT statement. +5. *Since 2.5* While SST import is supported by Exchange, property default values are not yet + supported. 6. *Since 3.0* Exchange is compatible with Spark 2.2, Spark 2.4, and Spark 3.0. -Refer to [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf) as an example to edit the configuration file. +Refer +to [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf) +as an example to edit the configuration file. diff --git a/exchange-common/pom.xml b/exchange-common/pom.xml index cad9b8a8..84829e29 100644 --- a/exchange-common/pom.xml +++ b/exchange-common/pom.xml @@ -65,6 +65,16 @@ mysql-connector-java 8.0.25 + + commons-cli + commons-cli + 1.2 + + + org.slf4j + slf4j-log4j12 + 1.7.16 + diff --git a/exchange-common/src/main/java/com/vesoft/exchange/common/FileMigrate.java b/exchange-common/src/main/java/com/vesoft/exchange/common/FileMigrate.java new file mode 100644 index 00000000..e92dd609 --- /dev/null +++ b/exchange-common/src/main/java/com/vesoft/exchange/common/FileMigrate.java @@ -0,0 +1,67 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.exchange.common; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +public class FileMigrate { + //Logger log = Logger.getLogger(FileMigrate.class); + + + /** + * migrate the source file to target path + * + * @param sourceFile template config file + * @param path target path to save the config info + */ + public void saveConfig(String sourceFile, String path) { + InputStream inputStream = + this.getClass().getClassLoader().getResourceAsStream(sourceFile); + if (inputStream == null) { + System.exit(-1); + } + File file = new File(path); + if (file.exists()) { + file.delete(); + } + FileWriter writer = null; + BufferedWriter bufferedWriter = null; + BufferedReader reader = null; + try { + writer = new FileWriter(path); + bufferedWriter = new BufferedWriter(writer); + + reader = new BufferedReader(new InputStreamReader(inputStream)); + String line = null; + while ((line = reader.readLine()) != null) { + bufferedWriter.write(line); + bufferedWriter.write("\n"); + } + } catch (IOException e) { + System.out.println("Failed to migrate the template conf file:" + e.getMessage()); + e.printStackTrace(); + } finally { + try { + if (bufferedWriter != null) { + bufferedWriter.close(); + } + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + System.out.println("Failed to close the writer or reader:" + e.getMessage()); + e.printStackTrace(); + } + } + + } +} diff --git a/exchange-common/src/main/resources/config_template/csv.conf b/exchange-common/src/main/resources/config_template/csv.conf new file mode 100644 index 00000000..ab5ccce2 --- /dev/null +++ b/exchange-common/src/main/resources/config_template/csv.conf @@ -0,0 +1,100 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --total-executor-cores=60 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c csv.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph: ["127.0.0.1:9669","127.0.0.2:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: "hdfs://127.0.0.1:9000/tmp/errors" + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name + type: { + source: csv + sink: client + } + # if your file in not in hdfs, config "file:///path/test.csv" + path: "hdfs://ip:port/path/test.csv" + # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields + fields: [csv-field-1, csv-field-2, csv-field-3] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + vertex: { + field: csv-field-0 + } + separator: "," + header: true + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name + type: { + source: csv + sink: client + } + path: "hdfs://ip:port/path/test.csv" + fields: [csv-field-2, csv-field-3, csv-field-4] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + source: { + field: csv-field-0 + } + target: { + field: csv-field-1 + } + #ranking: csv-field-2 + separator: "," + header: true + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/hbase.conf b/exchange-common/src/main/resources/config_template/hbase.conf new file mode 100644 index 00000000..5e3199ae --- /dev/null +++ b/exchange-common/src/main/resources/config_template/hbase.conf @@ -0,0 +1,95 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --total-executor-cores=60 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c hbase.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address: { + graph: ["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: hbase + sink: client + } + host: 127.0.0.1 + port: 2181 + table: hbase-table + columnFamily: hbase-table-cloumnfamily + fields: [hbase-field-0, hbase-field-1, hbase-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + # if fields or vertex contains rowkey, please configure it as "rowkey". + vertex: rowkey + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: hbase + sink: client + } + host: 127.0.0.1 + port: 2181 + table: hbase-table + columnFamily: hbase-table-cloumnfamily + fields: [hbase-field-0, hbase-field-1, hbase-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: hbase-field-0 + target: hbase-field-1 + ranking: hbase-filed-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/hive.conf b/exchange-common/src/main/resources/config_template/hive.conf new file mode 100644 index 00000000..fc059f24 --- /dev/null +++ b/exchange-common/src/main/resources/config_template/hive.conf @@ -0,0 +1,88 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c hive.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address: { + graph: ["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: hive + sink: client + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: hive-field-0 + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: hive + sink: client + } + exec: "select hive-field0, hive-field1, hive-field2 from database.table" + fields: [hive-field-0, hive-field-1, hive-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: hive-field-0 + target: hive-field-1 + ranking: hive-filed-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/jdbc.conf b/exchange-common/src/main/resources/config_template/jdbc.conf new file mode 100644 index 00000000..7a56eddf --- /dev/null +++ b/exchange-common/src/main/resources/config_template/jdbc.conf @@ -0,0 +1,96 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c jdbc.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address: { + graph: ["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: jdbc + sink: client + } + url: "jdbc:oracle:thin:@host:1521:db" + driver: "oracle.jdbc.driver.OracleDriver" + user: "root" + password: "nebula" + sentence: "select oracle-field-0, oracle-field-1, oracle-field-2 from table" + fields: [db-field-0, db-field-1, db-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: db-field-0 + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: jdbc + sink: client + } + url: "jdbc:oracle:thin:@host:1521:db" + driver: "oracle.jdbc.driver.OracleDriver" + user: "root" + password: "nebula" + sentence: "select db-field-0, db-field-1, db-field-2 from table" + fields: [db-field-0, db-field-1, db-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: db-field-0 + target: db-field-1 + #ranking: db-filed-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/json.conf b/exchange-common/src/main/resources/config_template/json.conf new file mode 100644 index 00000000..c4f59822 --- /dev/null +++ b/exchange-common/src/main/resources/config_template/json.conf @@ -0,0 +1,95 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --total-executor-cores=60 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c json.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph: ["127.0.0.1:9669","127.0.0.2:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: "hdfs://127.0.0.1:9000/tmp/errors" + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name + type: { + source: json + sink: client + } + # if your file in not in hdfs, config "file:///path/test.json" + path: "hdfs://ip:port/path/test.json" + fields: [json-field-1, json-field-2, json-field-3] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + vertex: { + field: json-field-0 + } + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name + type: { + source: json + sink: client + } + path: "hdfs://ip:port/path/test.json" + fields: [json-field-2, json-field-3, json-field-4] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + source: { + field: json-field-0 + } + target: { + field: json-field-1 + } + #ranking: json-field-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/kafka.conf b/exchange-common/src/main/resources/config_template/kafka.conf new file mode 100644 index 00000000..6753dbc4 --- /dev/null +++ b/exchange-common/src/main/resources/config_template/kafka.conf @@ -0,0 +1,90 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c kafka.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph:["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta:["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: kafka + sink: client + } + service: "kafka.service.address" + topic: "topic-name" + fields: [kafka-field-0, kafka-field-1, kafka-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: kafka-field-0 + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: kafka + sink: client + } + service: "kafka.service.address" + topic: "topic-name" + fields: [ kafka-field-3, kafka-field-4, kafka-field-5] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: kafka-field-0 + target: kafka-field-1 + #ranking: kafka-filed-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/neo4j.conf b/exchange-common/src/main/resources/config_template/neo4j.conf new file mode 100644 index 00000000..8965368f --- /dev/null +++ b/exchange-common/src/main/resources/config_template/neo4j.conf @@ -0,0 +1,94 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --executor-cores=20 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c neo4j.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph:["127.0.0.1:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta:["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: /tmp/errors + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name-1 + type: { + source: neo4j + sink: client + } + server: "bolt://127.0.0.1:7687" + user: neo4j + password: neo4j + exec: "match (a:vertex_label)-[r:edge_label]->(b:vertex_label) return a.neo4j-source-field, b.neo4j-target-field, r.neo4j-field-0 as neo4j-field-0, r.neo4j-field-1 as neo4j-field-1 order by id(r)" + fields: [neo4j-field-0, neo4j-field-1, neo4j-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + vertex: neo4j-field-0 + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name-1 + type: { + source: neo4j + sink: client + } + server: "bolt://127.0.0.1:7687" + user: neo4j + password: neo4j + exec: "match (a:vertex_label)-[r:edge_label]->(b:vertex_label) return a.neo4j-source-field, b.neo4j-target-field, r.neo4j-field-0 as neo4j-field-0, r.neo4j-field-1 as neo4j-field-1 order by id(r)" + fields: [ neo4j-field-0, neo4j-field-1, neo4j-field-2] + nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + source: neo4j-field-0 + target: neo4j-field-1 + #ranking: neo4j-filed-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/orc.conf b/exchange-common/src/main/resources/config_template/orc.conf new file mode 100644 index 00000000..ac25ef26 --- /dev/null +++ b/exchange-common/src/main/resources/config_template/orc.conf @@ -0,0 +1,95 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --total-executor-cores=60 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c orc.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph: ["127.0.0.1:9669","127.0.0.2:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: "hdfs://127.0.0.1:9000/tmp/errors" + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name + type: { + source: orc + sink: client + } + # if your file in not in hdfs, config "file:///path/test.orc" + path: "hdfs://ip:port/path/test.orc" + fields: [orc-field-1, orc-field-2, orc-field-3] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + vertex: { + field: orc-field-0 + } + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name + type: { + source: orc + sink: client + } + path: "hdfs://ip:port/path/test.orc" + fields: [orc-field-2, orc-field-3, orc-field-4] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + source: { + field: orc-field-0 + } + target: { + field: orc-field-1 + } + #ranking: orc-field-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/resources/config_template/parquet.conf b/exchange-common/src/main/resources/config_template/parquet.conf new file mode 100644 index 00000000..b9bd43cd --- /dev/null +++ b/exchange-common/src/main/resources/config_template/parquet.conf @@ -0,0 +1,95 @@ +# Use the command to submit the exchange job: + +# spark-submit \ +# --master "spark://master_ip:7077" \ +# --driver-memory=2G --executor-memory=30G \ +# --num-executors=3 --total-executor-cores=60 \ +# --class com.vesoft.nebula.exchange.Exchange \ +# nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c parquet.conf + +{ + # Spark config + spark: { + app: { + name: NebulaGraph Exchange + } + } + + # Nebula Graph config + nebula: { + address:{ + graph: ["127.0.0.1:9669","127.0.0.2:9669"] + # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta. + # use `SHOW meta leader` to see your meta leader's address + meta: ["127.0.0.1:9559"] + } + user: root + pswd: nebula + space: test + + # nebula client connection parameters + connection { + # socket connect & execute timeout, unit: millisecond + timeout: 30000 + } + + error: { + # max number of failures, if the number of failures is bigger than max, then exit the application. + max: 32 + # failed data will be recorded in output path, format with ngql + output: "hdfs://127.0.0.1:9000/tmp/errors" + } + + # use google's RateLimiter to limit the requests send to NebulaGraph + rate: { + # the stable throughput of RateLimiter + limit: 1024 + # Acquires a permit from RateLimiter, unit: MILLISECONDS + # if it can't be obtained within the specified timeout, then give up the request. + timeout: 1000 + } + } + + # Processing tags + tags: [ + { + name: tag-name + type: { + source: orc + sink: client + } + # if your file in not in hdfs, config "file:///path/test.orc" + path: "hdfs://ip:port/path/test.orc" + fields: [orc-field-1, orc-field-2, orc-field-3] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + vertex: { + field: orc-field-0 + } + batch: 2000 + partition: 60 + } + ] + + # process edges + edges: [ + { + name: edge-name + type: { + source: orc + sink: client + } + path: "hdfs://ip:port/path/test.orc" + fields: [orc-field-2, orc-field-3, orc-field-4] + nebula.fields: [nebula-field-1, nebula-field-2, nebula-field-3] + source: { + field: orc-field-0 + } + target: { + field: orc-field-1 + } + #ranking: orc-field-2 + batch: 2000 + partition: 60 + } + ] +} diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/GenerateConfigTemplate.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/GenerateConfigTemplate.scala new file mode 100644 index 00000000..694fafe3 --- /dev/null +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/GenerateConfigTemplate.scala @@ -0,0 +1,73 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.exchange.common + +import com.vesoft.exchange.common.config.SourceCategory +import org.apache.commons.cli.{ + CommandLine, + CommandLineParser, + HelpFormatter, + Option, + Options, + ParseException, + PosixParser +} + +object GenerateConfigTemplate { + + def main(args: Array[String]): Unit = { + val sourceOption = new Option("s", "dataSource", true, "data source type") + sourceOption.setRequired(true) + + val pathOption = new Option("p", "path", true, "target path to save the template config file") + pathOption.setRequired(true) + + val options = new Options + options.addOption(sourceOption) + options.addOption(pathOption) + + var cli: CommandLine = null + val cliParser: CommandLineParser = new PosixParser() + val helpFormatter = new HelpFormatter + try { + cli = cliParser.parse(options, args) + } catch { + case e: ParseException => + helpFormatter.printHelp(">>>> options", options) + e.printStackTrace() + System.exit(1) + } + val source: String = cli.getOptionValue("s") + val path: String = cli.getOptionValue("p") + + getConfigTemplate(source, path) + } + + def getConfigTemplate(source: String, path: String): Unit = { + val sourceCategory = SourceCategory.withName(source.trim.toUpperCase) + + val fileMigrate = new FileMigrate + sourceCategory match { + case SourceCategory.CSV => + fileMigrate.saveConfig("config_template/csv.conf", path + "/csv.conf") + case SourceCategory.JSON => + fileMigrate.saveConfig("config_template/json.conf", path + "/json.conf") + case SourceCategory.ORC => + fileMigrate.saveConfig("config_template/orc.conf", path + "/orc.conf") + case SourceCategory.PARQUET => + fileMigrate.saveConfig("config_template/parquet.conf", path + "/parquet.conf") + case SourceCategory.HIVE => + fileMigrate.saveConfig("config_template/hive.conf", path + "/hive.conf") + case SourceCategory.JDBC | SourceCategory.MYSQL | SourceCategory.CLICKHOUSE | + SourceCategory.MAXCOMPUTE | SourceCategory.ORC | SourceCategory.POSTGRESQL => + fileMigrate.saveConfig("config_template/jdbc.conf", path + "/jdbc.conf") + case SourceCategory.NEO4J => + fileMigrate.saveConfig("config_template/neo4j.conf", path + "/neo4j.conf") + case _ => throw new IllegalArgumentException(s"does not support datasource $sourceCategory") + } + } + +} diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/utils/ConfigTemplateUtils.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/utils/ConfigTemplateUtils.scala new file mode 100644 index 00000000..5756bf36 --- /dev/null +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/utils/ConfigTemplateUtils.scala @@ -0,0 +1,43 @@ +/* Copyright (c) 2023 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.exchange.common.utils + +import com.vesoft.exchange.common.FileMigrate +import com.vesoft.exchange.common.config.SourceCategory + +import java.io.{BufferedInputStream, BufferedOutputStream, File, FileOutputStream, InputStream} + +object ConfigTemplateUtils { + + def getConfigTemplate(source: String, path: String): Unit = { + val sourceCategory = SourceCategory.withName(source.trim.toUpperCase) + + val fileMigrate = new FileMigrate + sourceCategory match { + case SourceCategory.CSV => + fileMigrate.saveConfig("config_template/csv.conf", path + "/csv.conf") + case SourceCategory.JSON => + fileMigrate.saveConfig("config_template/json.conf", path + "/json.conf") + case SourceCategory.ORC => + fileMigrate.saveConfig("config_template/orc.conf", path + "/orc.conf") + case SourceCategory.PARQUET => + fileMigrate.saveConfig("config_template/parquet.conf", path + "/parquet.conf") + case SourceCategory.HIVE => + fileMigrate.saveConfig("config_template/hive.conf", path + "/hive.conf") + case SourceCategory.HBASE=> + fileMigrate.saveConfig("config_template/hbase.conf", path + "/hbase.conf") + case SourceCategory.JDBC | SourceCategory.MYSQL | SourceCategory.CLICKHOUSE | + SourceCategory.MAXCOMPUTE | SourceCategory.ORC | SourceCategory.POSTGRESQL => + fileMigrate.saveConfig("config_template/jdbc.conf", path + "/jdbc.conf") + case SourceCategory.NEO4J => + fileMigrate.saveConfig("config_template/neo4j.conf", path + "/neo4j.conf") + case SourceCategory.KAFKA | SourceCategory.PULSAR => + fileMigrate.saveConfig("config_template/kafka.conf", path + "/kafka.conf") + case _ => throw new IllegalArgumentException(s"does not support datasource $sourceCategory") + } + } + +}