Skip to content

Commit

Permalink
refactor: rename tools module to cli and modify package structure (#118)
Browse files Browse the repository at this point in the history
* remove redundant wrap in test

* change module name tools-cdc to cli

* move properties to cli which are used in cli module only

* fix github ci

* use flink cdc 3.2.0 and update docs
  • Loading branch information
whhe authored Jan 16, 2025
1 parent 9caf977 commit ad5e4df
Show file tree
Hide file tree
Showing 49 changed files with 80 additions and 213 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ jobs:
with:
module: flink-connector-oceanbase-directload

flink-connector-oceanbase-tools-cdc:
flink-connector-oceanbase-cli:
uses: ./.github/workflows/test.yml
with:
module: flink-connector-oceanbase-tools-cdc
module: flink-connector-oceanbase-cli

flink-connector-oceanbase-e2e-tests:
strategy:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,12 @@
# Flink Connector OceanBase By Tools CDC
# Flink Connector OceanBase CLI

English | [简体中文](flink-connector-oceanbase-tools-cdc_cn.md)
English | [简体中文](flink-connector-oceanbase-cli_cn.md)

This project is a flink command line tool that supports the synchronization of CDC tasks to oceanbase through the Flink command line, which greatly simplifies the command writing of data synchronization to oceanbase through flink.
The project is a set of CLI (command line interface) tools that supports submitting Flink jobs to migrate data from other data sources to OceanBase.

## Getting Started

You can get the release packages at [Releases Page](https://github.com/oceanbase/flink-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbase-directload).

```xml
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-tools-cdc</artifactId>
<version>${project.version}</version>
</dependency>
```

If you want to use the latest snapshot version, you can specify by configuring the Maven snapshot repository:

```xml
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-tools-cdc</artifactId>
<version>${project.version}</version>
</dependency>

<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
```
You can get the release packages at [Releases Page](https://github.com/oceanbase/flink-connector-oceanbase/releases) or [Maven Central](https://central.sonatype.com/artifact/com.oceanbase/flink-connector-oceanbase-cli),or get the latest snapshot packages at [Sonatype Snapshot](https://s01.oss.sonatype.org/content/repositories/snapshots/com/oceanbase/flink-connector-oceanbase-cli).

You can also manually build it from the source code.

Expand All @@ -45,17 +16,23 @@ cd flink-connector-oceanbase
mvn clean package -DskipTests
```

### Notes:
### Using Flink CDC as Source

#### Dependencies

This project is based on the SQL Client JAR of [Flink CDC Source Connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/overview/).

We do not provide Flink CDC Source Connector in the JAR package of this project, so you need to manually download the used Flink CDC SQL JAR. Note that this project requires Flink CDC to be 3.2.0 or later version.

If you're using Flink Oracle CDC as source, you need also download the dependencies of the source connector, see the *Dependencies* chapter of [Oracle CDC Connector](https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/oracle-cdc/#sql-client-jar).

* Currently, the project supports using Flink CDC to access multiple tables or the entire database. During synchronization, you need to add the corresponding Flink CDC dependency in the `$FLINK_HOME/lib` directory, such as flink-sql-connector-mysql-cdc-\${version}. jar, flink-sql-connector-oracle-cdc-\${version}.jar, flink-sql-connector-sqlserver-cdc-\${version}.jar.
* The dependent Flink CDC version needs to be above 3.1. If you need to use Flink CDC to synchronize MySQL and Oracle, you also need to add the relevant JDBC driver under `$FLINK_HOME/lib`.
* If you synchronize data to OceanBase, you must use oceanBase or mysql as the protocol name for the URL connection string of OceanBase.
#### Demo: Migrate from Flink MySQL CDC to OceanBase

### MySQL Synchronous OceanBase Example
##### Preparation

#### Geting Ready
Add the CLI JAR `flink-connector-oceanbase-cli-xxx.jar` and dependency JAR `flink-sql-connector-mysql-cdc-xxx.jar` to `$FLINK_HOME/lib`.

Create a table test_history_strt_sink in a MySql database test_db library, test_history_text.
Then prepare tables and data in MySQL database.

```mysql
use test_db;
Expand All @@ -73,18 +50,21 @@ CREATE TABLE test_history_text (
ns integer DEFAULT '0' NOT NULL,
PRIMARY KEY (itemid,clock,ns)
);

INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
(1,21131,'ces1',21321);
```

#### Build A Flink Task
##### Submit Job via CLI

##### An example of the Flink command line
Replace the following command with your real database information, and execute it to submit a Flink job.

```shell
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.oceanbase.connector.flink.tools.cdc.CdcTools \
lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
lib/flink-connector-oceanbase-cli-xxx.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=xxxx \
Expand All @@ -98,66 +78,26 @@ $FLINK_HOME/bin/flink run \
--sink-conf url=jdbc:mysql://xxxx:xxxx
```

Replace the above database information with your real database information, and when a message similar to the following appears, the task is successfully built and submitted.
##### Check and Verify

```shell
Job has been submitted with JobID 0177b201a407045a17445aa288f0f111
```
Check the target OceanBase database, you should find out these two tables and one row data.

The tool automatically parses the information on the command line and creates a table, which can be queried and verified in OceanBase.

MySQL to insert test data
You can go on insert test data to MySQL database as below:

```sql
INSERT INTO test_db.test_history_str (itemid,clock,value,ns) VALUES
(1,2,'ces1',1123);
INSERT INTO test_db.test_history_text (itemid,clock,value,ns) VALUES
(1,21131,'ces1',21321),
(2,21321,'ces2',12321);
```

Since it is a CDC task, after data is inserted in MySQL, you can query and verify the synchronized data in OceanBase.

### Parameter parsing

This configuration is the program configuration information of flink

```shell
-Dexecution.checkpointing.interval=10s
-Dparallelism.default=1
```

Specify the JAR package of the program and the entry of the program

```shell
-c com.oceanbase.connector.flink.tools.cdc.CdcTools \
lib/flink-connector-oceanbase-tools-cdc-${version}.jar \
```

The name of the database

```shell
--database test_db
```

This name is customized, meaning the name given to this database, and ultimately serves as the naming rule for flink tasks.

## Configuration Items

#### Supported data sources

| Data source identifier | Data source |
|-------------------------|----------------------|
| mysql-sync-database | mysql datasource |
| oracle-sync-database | oracle datasource |
| postgres-sync-database | postgres datasource |
| sqlserver-sync-database | sqlserver datasource |
| db2-sync-database | db2 datasource |

#### Configuration Items
#### Options

| Configuration Items | Comment |
| Option | Comment |
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| {identifier} | Data source identifier, only mysql type `mysql-sync-database` is verified now. |
| --job-name | Flink task name, optional. |
| --database | Database name synchronized to OceanBase. |
| --table-prefix | OceanBase table prefix name, such as --table-prefix ods_. |
Expand All @@ -173,13 +113,13 @@ This name is customized, meaning the name given to this database, and ultimately
| --ignore-default-value | Turn off synchronization of MySQL table structures by default. It is suitable for the situation when synchronizing MySQL data to oceanbase, the field has a default value, but the actual inserted data is null. |
| --create-table-only | Whether to only synchronize the structure of the table. |

#### Configuration items of sink-conf
`--sink-conf` option

| Configuration Items | Default Value | Required | Comment |
|---------------------|---------------|----------|-------------------------------------------------------------------|
| url | -- | N | jdbc connection information, such as: jdbc:mysql://127.0.0.1:2881 |
| username | -- | Y | Username to access oceanbase |
| password | -- | Y | Password to access oceanbase |
| Option | Default Value | Required | Comment |
|----------|---------------|----------|-------------------------------------------------------------------|
| url | -- | N | jdbc connection information, such as: jdbc:mysql://127.0.0.1:2881 |
| username | -- | Y | Username to access oceanbase |
| password | -- | Y | Password to access oceanbase |

## Reference information

Expand Down
Loading

0 comments on commit ad5e4df

Please sign in to comment.