Skip to content

Commit

Permalink
chore: update cli module shaded config and add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 22, 2025
1 parent 5f219c1 commit ff3182f
Show file tree
Hide file tree
Showing 20 changed files with 401 additions and 183 deletions.
15 changes: 7 additions & 8 deletions docs/cli/flink-cdc/flink-cdc-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,19 @@ Replace the following command with your real database information, and execute i
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.oceanbase.connector.flink.CdcCli \
lib/flink-connector-oceanbase-cli-xxx.jar \
mysql-cdc \
--database test_db \
--source-type mysql-cdc \
--source-conf hostname=xxxx \
--source-conf port=3306 \
--source-conf username=root \
--source-conf password=xxxx \
--source-conf database-name=test_db \
--source-conf table-name=.* \
--including-tables ".*" \
--sink-conf username=xxxx \
--sink-conf password=xxxx \
--sink-conf url=jdbc:mysql://xxxx:xxxx
--sink-conf url=jdbc:mysql://xxxx:xxxx \
--database test_db \
--including-tables ".*"
```

### Check and Verify
Expand All @@ -101,18 +100,18 @@ You can go on insert test data to MySQL database, since it is a CDC task, after
</thead>
<tbody>
<tr>
<td>${job-type}</td>
<td>--source-type</td>
<td>Yes</td>
<td>Enumeration value</td>
<td style="word-wrap: break-word;"></td>
<td>Job type, can be <code>mysql-cdc</code>.</td>
<td>Source type, can be <code>mysql-cdc</code>.</td>
</tr>
<tr>
<td>--source-conf</td>
<td>Yes</td>
<td>Multi-value parameter</td>
<td style="word-wrap: break-word;"></td>
<td>Configurations of specific Flink CDC Source.</td>
<td>Configurations of the specific source.</td>
</tr>
<tr>
<td>--sink-conf</td>
Expand Down
15 changes: 7 additions & 8 deletions docs/cli/flink-cdc/flink-cdc-source_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,19 @@ VALUES (default, "Sally", "Thomas", "[email protected]"),
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c com.oceanbase.connector.flink.CdcCli \
lib/flink-connector-oceanbase-cli-xxx.jar \
mysql-cdc \
--database test_db \
--source-type mysql-cdc \
--source-conf hostname=xxxx \
--source-conf port=3306 \
--source-conf username=root \
--source-conf password=xxxx \
--source-conf database-name=test_db \
--source-conf table-name=.* \
--including-tables ".*" \
--sink-conf username=xxxx \
--sink-conf password=xxxx \
--sink-conf url=jdbc:mysql://xxxx:xxxx
--sink-conf url=jdbc:mysql://xxxx:xxxx \
--database test_db \
--including-tables ".*"
```

请将以上的数据库信息替换为您真实的数据库信息,当出现类似于以下的信息时,任务构建成功并提交。
Expand All @@ -103,18 +102,18 @@ $FLINK_HOME/bin/flink run \
</thead>
<tbody>
<tr>
<td>${job-type}</td>
<td>--source-type</td>
<td>是</td>
<td>枚举值</td>
<td style="word-wrap: break-word;"></td>
<td>任务类型,可以是 <code>mysql-cdc</code>。</td>
<td>源端类型,可以是 <code>mysql-cdc</code>。</td>
</tr>
<tr>
<td>--source-conf</td>
<td>是</td>
<td>多值参数</td>
<td style="word-wrap: break-word;"></td>
<td>指定类型的 Flink CDC 源端连接器的配置参数。</td>
<td>指定类型的源端的配置参数。</td>
</tr>
<tr>
<td>--sink-conf</td>
Expand Down
93 changes: 31 additions & 62 deletions flink-connector-oceanbase-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@ under the License.
<artifactId>flink-connector-oceanbase-cli</artifactId>
<packaging>jar</packaging>

<properties>
<flink.cdc.version>3.2.1</flink.cdc.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
</properties>

<dependencies>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase</artifactId>
<artifactId>flink-sql-connector-oceanbase</artifactId>
<version>${project.version}</version>
</dependency>

Expand All @@ -49,62 +44,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-db2-cdc</artifactId>
<version>${flink.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.oracle.ojdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.oceanbase</groupId>
Expand All @@ -121,4 +60,34 @@ under the License.
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.oceanbase.connector.flink.CLI</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.source.cdc.CdcSync;
import com.oceanbase.connector.flink.source.cdc.CdcSyncConfig;
import com.oceanbase.connector.flink.config.CLIConfig;
import com.oceanbase.connector.flink.process.Sync;
import com.oceanbase.connector.flink.source.cdc.mysql.MysqlCdcSync;

import org.apache.flink.api.java.utils.MultipleParameterTool;
Expand All @@ -32,46 +32,48 @@
import java.util.HashMap;
import java.util.Map;

public class CdcCli {
private static final Logger LOG = LoggerFactory.getLogger(CdcCli.class);
public class CLI {
private static final Logger LOG = LoggerFactory.getLogger(CLI.class);

public static void main(String[] args) throws Exception {
LOG.info("Starting CdcCli with args: {}", Arrays.toString(args));
LOG.info("Starting CLI with args: {}", Arrays.toString(args));

String jobType = args[0];
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String sourceType = params.get(CLIConfig.SOURCE_TYPE);
if (sourceType == null) {
throw new IllegalArgumentException("'--source-type' is required");
}

CdcSync cdcSync;
switch (jobType.trim().toLowerCase()) {
case CdcSyncConfig.MYSQL_CDC:
cdcSync = new MysqlCdcSync();
Sync sync;
switch (sourceType.trim().toLowerCase()) {
case CLIConfig.MYSQL_CDC:
sync = new MysqlCdcSync();
break;
default:
throw new RuntimeException("Unsupported job type: " + jobType);
throw new RuntimeException("Unsupported source type: " + sourceType);
}

Map<String, String> sourceConfigMap = getConfigMap(params, CdcSyncConfig.SOURCE_CONF);
Map<String, String> sourceConfigMap = getConfigMap(params, CLIConfig.SOURCE_CONF);
Configuration sourceConfig = Configuration.fromMap(sourceConfigMap);

Map<String, String> sinkConfigMap = getConfigMap(params, CdcSyncConfig.SINK_CONF);
Map<String, String> sinkConfigMap = getConfigMap(params, CLIConfig.SINK_CONF);
Configuration sinkConfig = Configuration.fromMap(sinkConfigMap);

String jobName = params.get(CdcSyncConfig.JOB_NAME);
String database = params.get(CdcSyncConfig.DATABASE);
String tablePrefix = params.get(CdcSyncConfig.TABLE_PREFIX);
String tableSuffix = params.get(CdcSyncConfig.TABLE_SUFFIX);
String includingTables = params.get(CdcSyncConfig.INCLUDING_TABLES);
String excludingTables = params.get(CdcSyncConfig.EXCLUDING_TABLES);
String multiToOneOrigin = params.get(CdcSyncConfig.MULTI_TO_ONE_ORIGIN);
String multiToOneTarget = params.get(CdcSyncConfig.MULTI_TO_ONE_TARGET);
String jobName = params.get(CLIConfig.JOB_NAME);
String database = params.get(CLIConfig.DATABASE);
String tablePrefix = params.get(CLIConfig.TABLE_PREFIX);
String tableSuffix = params.get(CLIConfig.TABLE_SUFFIX);
String includingTables = params.get(CLIConfig.INCLUDING_TABLES);
String excludingTables = params.get(CLIConfig.EXCLUDING_TABLES);
String multiToOneOrigin = params.get(CLIConfig.MULTI_TO_ONE_ORIGIN);
String multiToOneTarget = params.get(CLIConfig.MULTI_TO_ONE_TARGET);

boolean createTableOnly = params.has(CdcSyncConfig.CREATE_TABLE_ONLY);
boolean ignoreDefaultValue = params.has(CdcSyncConfig.IGNORE_DEFAULT_VALUE);
boolean ignoreIncompatible = params.has(CdcSyncConfig.IGNORE_INCOMPATIBLE);
boolean createTableOnly = params.has(CLIConfig.CREATE_TABLE_ONLY);
boolean ignoreDefaultValue = params.has(CLIConfig.IGNORE_DEFAULT_VALUE);
boolean ignoreIncompatible = params.has(CLIConfig.IGNORE_INCOMPATIBLE);

cdcSync.setEnv(env)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
sync.setEnv(env)
.setSourceConfig(sourceConfig)
.setSinkConfig(sinkConfig)
.setDatabase(database)
Expand All @@ -87,7 +89,7 @@ public static void main(String[] args) throws Exception {
.build();

if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
jobName = String.format("%s Sync", jobType);
jobName = String.format("%s Sync", sourceType);
}
env.execute(jobName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* limitations under the License.
*/

package com.oceanbase.connector.flink.source.cdc;
package com.oceanbase.connector.flink.config;

public class CdcSyncConfig {
public class CLIConfig {

/** Option key for source type. */
public static final String SOURCE_TYPE = "source-type";

/** Option key for cdc source. */
public static final String SOURCE_CONF = "source-conf";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@
* limitations under the License.
*/

package com.oceanbase.connector.flink.source.cdc;

import com.oceanbase.connector.flink.source.TableNameConverter;
package com.oceanbase.connector.flink.process;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;
Expand Down
Loading

0 comments on commit ff3182f

Please sign in to comment.