From ff3182f96c9717b6018e93e880d72d8ba993aef5 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 22 Jan 2025 12:02:11 +0800 Subject: [PATCH] chore: update cli module shaded config and add e2e test --- docs/cli/flink-cdc/flink-cdc-source.md | 15 ++- docs/cli/flink-cdc/flink-cdc-source_cn.md | 15 ++- flink-connector-oceanbase-cli/pom.xml | 93 +++++++------------ .../connector/flink/{CdcCli.java => CLI.java} | 60 ++++++------ .../CLIConfig.java} | 7 +- .../ParsingProcessFunction.java | 8 +- .../cdc/CdcSync.java => process/Sync.java} | 39 ++++---- .../TableNameConverter.java | 2 +- .../OceanBaseJsonDeserializationSchema.java | 10 +- .../flink/source/cdc/mysql/MysqlCdcSync.java | 6 +- .../source/cdc/mysql/MysqlDateConverter.java | 40 ++++---- .../OceanBaseJsonSerializationSchema.java | 7 +- .../connector/flink/MysqlCdcSyncITCase.java | 14 +-- .../src/test/resources/sql/mysql-cdc.sql | 4 - flink-connector-oceanbase-e2e-tests/pom.xml | 24 +++++ .../flink/MysqlCdcSyncE2eITCase.java | 84 +++++++++++++++++ .../utils/FlinkContainerTestEnvironment.java | 28 ++++++ .../src/test/resources/docker/mysql/my.cnf | 65 +++++++++++++ .../src/test/resources/sql/mysql-cdc.sql | 49 ++++++++++ pom.xml | 14 +++ 20 files changed, 401 insertions(+), 183 deletions(-) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{CdcCli.java => CLI.java} (65%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source/cdc/CdcSyncConfig.java => config/CLIConfig.java} (95%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source/cdc => process}/ParsingProcessFunction.java (92%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source/cdc/CdcSync.java => process/Sync.java} (88%) rename flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/{source => process}/TableNameConverter.java (98%) create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf create mode 100644 flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql diff --git a/docs/cli/flink-cdc/flink-cdc-source.md b/docs/cli/flink-cdc/flink-cdc-source.md index bea8d180..9498ed27 100644 --- a/docs/cli/flink-cdc/flink-cdc-source.md +++ b/docs/cli/flink-cdc/flink-cdc-source.md @@ -64,20 +64,19 @@ Replace the following command with your real database information, and execute i $FLINK_HOME/bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ - -c com.oceanbase.connector.flink.CdcCli \ lib/flink-connector-oceanbase-cli-xxx.jar \ - mysql-cdc \ - --database test_db \ + --source-type mysql-cdc \ --source-conf hostname=xxxx \ --source-conf port=3306 \ --source-conf username=root \ --source-conf password=xxxx \ --source-conf database-name=test_db \ --source-conf table-name=.* \ - --including-tables ".*" \ --sink-conf username=xxxx \ --sink-conf password=xxxx \ - --sink-conf url=jdbc:mysql://xxxx:xxxx + --sink-conf url=jdbc:mysql://xxxx:xxxx \ + --database test_db \ + --including-tables ".*" ``` ### Check and Verify @@ -101,18 +100,18 @@ You can go on insert test data to MySQL database, since it is a CDC task, after - ${job-type} + --source-type Yes Enumeration value - Job type, can be mysql-cdc. + Source type, can be mysql-cdc. --source-conf Yes Multi-value parameter - Configurations of specific Flink CDC Source. + Configurations of the specific source. --sink-conf diff --git a/docs/cli/flink-cdc/flink-cdc-source_cn.md b/docs/cli/flink-cdc/flink-cdc-source_cn.md index f9c51d0f..9bfe426f 100644 --- a/docs/cli/flink-cdc/flink-cdc-source_cn.md +++ b/docs/cli/flink-cdc/flink-cdc-source_cn.md @@ -64,20 +64,19 @@ VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"), $FLINK_HOME/bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ - -c com.oceanbase.connector.flink.CdcCli \ lib/flink-connector-oceanbase-cli-xxx.jar \ - mysql-cdc \ - --database test_db \ + --source-type mysql-cdc \ --source-conf hostname=xxxx \ --source-conf port=3306 \ --source-conf username=root \ --source-conf password=xxxx \ --source-conf database-name=test_db \ --source-conf table-name=.* \ - --including-tables ".*" \ --sink-conf username=xxxx \ --sink-conf password=xxxx \ - --sink-conf url=jdbc:mysql://xxxx:xxxx + --sink-conf url=jdbc:mysql://xxxx:xxxx \ + --database test_db \ + --including-tables ".*" ``` 请将以上的数据库信息替换为您真实的数据库信息,当出现类似于以下的信息时,任务构建成功并提交。 @@ -103,18 +102,18 @@ $FLINK_HOME/bin/flink run \ - ${job-type} + --source-type 是 枚举值 - 任务类型,可以是 mysql-cdc。 + 源端类型,可以是 mysql-cdc。 --source-conf 是 多值参数 - 指定类型的 Flink CDC 源端连接器的配置参数。 + 指定类型的源端的配置参数。 --sink-conf diff --git a/flink-connector-oceanbase-cli/pom.xml b/flink-connector-oceanbase-cli/pom.xml index cd270110..3d58cb53 100644 --- a/flink-connector-oceanbase-cli/pom.xml +++ b/flink-connector-oceanbase-cli/pom.xml @@ -25,15 +25,10 @@ under the License. flink-connector-oceanbase-cli jar - - 3.2.1 - 19.3.0.0 - - com.oceanbase - flink-connector-oceanbase + flink-sql-connector-oceanbase ${project.version} @@ -49,62 +44,6 @@ under the License. - - org.apache.flink - flink-sql-connector-oracle-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - org.apache.flink - flink-sql-connector-postgres-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - - org.apache.flink - flink-sql-connector-sqlserver-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - org.apache.flink - flink-sql-connector-db2-cdc - ${flink.cdc.version} - provided - - - org.apache.flink - flink-shaded-guava - - - - - - com.oracle.ojdbc - ojdbc8 - ${ojdbc.version} - provided - com.oceanbase @@ -121,4 +60,34 @@ under the License. + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + + shade + + package + + + + *:* + + + + + com.oceanbase.connector.flink.CLI + + + + + + + + + diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CdcCli.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CLI.java similarity index 65% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CdcCli.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CLI.java index 97b852e5..7f85ec7d 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CdcCli.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/CLI.java @@ -16,8 +16,8 @@ package com.oceanbase.connector.flink; -import com.oceanbase.connector.flink.source.cdc.CdcSync; -import com.oceanbase.connector.flink.source.cdc.CdcSyncConfig; +import com.oceanbase.connector.flink.config.CLIConfig; +import com.oceanbase.connector.flink.process.Sync; import com.oceanbase.connector.flink.source.cdc.mysql.MysqlCdcSync; import org.apache.flink.api.java.utils.MultipleParameterTool; @@ -32,46 +32,48 @@ import java.util.HashMap; import java.util.Map; -public class CdcCli { - private static final Logger LOG = LoggerFactory.getLogger(CdcCli.class); +public class CLI { + private static final Logger LOG = LoggerFactory.getLogger(CLI.class); public static void main(String[] args) throws Exception { - LOG.info("Starting CdcCli with args: {}", Arrays.toString(args)); + LOG.info("Starting CLI with args: {}", Arrays.toString(args)); - String jobType = args[0]; - String[] opArgs = Arrays.copyOfRange(args, 1, args.length); - MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String sourceType = params.get(CLIConfig.SOURCE_TYPE); + if (sourceType == null) { + throw new IllegalArgumentException("'--source-type' is required"); + } - CdcSync cdcSync; - switch (jobType.trim().toLowerCase()) { - case CdcSyncConfig.MYSQL_CDC: - cdcSync = new MysqlCdcSync(); + Sync sync; + switch (sourceType.trim().toLowerCase()) { + case CLIConfig.MYSQL_CDC: + sync = new MysqlCdcSync(); break; default: - throw new RuntimeException("Unsupported job type: " + jobType); + throw new RuntimeException("Unsupported source type: " + sourceType); } - Map sourceConfigMap = getConfigMap(params, CdcSyncConfig.SOURCE_CONF); + Map sourceConfigMap = getConfigMap(params, CLIConfig.SOURCE_CONF); Configuration sourceConfig = Configuration.fromMap(sourceConfigMap); - Map sinkConfigMap = getConfigMap(params, CdcSyncConfig.SINK_CONF); + Map sinkConfigMap = getConfigMap(params, CLIConfig.SINK_CONF); Configuration sinkConfig = Configuration.fromMap(sinkConfigMap); - String jobName = params.get(CdcSyncConfig.JOB_NAME); - String database = params.get(CdcSyncConfig.DATABASE); - String tablePrefix = params.get(CdcSyncConfig.TABLE_PREFIX); - String tableSuffix = params.get(CdcSyncConfig.TABLE_SUFFIX); - String includingTables = params.get(CdcSyncConfig.INCLUDING_TABLES); - String excludingTables = params.get(CdcSyncConfig.EXCLUDING_TABLES); - String multiToOneOrigin = params.get(CdcSyncConfig.MULTI_TO_ONE_ORIGIN); - String multiToOneTarget = params.get(CdcSyncConfig.MULTI_TO_ONE_TARGET); + String jobName = params.get(CLIConfig.JOB_NAME); + String database = params.get(CLIConfig.DATABASE); + String tablePrefix = params.get(CLIConfig.TABLE_PREFIX); + String tableSuffix = params.get(CLIConfig.TABLE_SUFFIX); + String includingTables = params.get(CLIConfig.INCLUDING_TABLES); + String excludingTables = params.get(CLIConfig.EXCLUDING_TABLES); + String multiToOneOrigin = params.get(CLIConfig.MULTI_TO_ONE_ORIGIN); + String multiToOneTarget = params.get(CLIConfig.MULTI_TO_ONE_TARGET); - boolean createTableOnly = params.has(CdcSyncConfig.CREATE_TABLE_ONLY); - boolean ignoreDefaultValue = params.has(CdcSyncConfig.IGNORE_DEFAULT_VALUE); - boolean ignoreIncompatible = params.has(CdcSyncConfig.IGNORE_INCOMPATIBLE); + boolean createTableOnly = params.has(CLIConfig.CREATE_TABLE_ONLY); + boolean ignoreDefaultValue = params.has(CLIConfig.IGNORE_DEFAULT_VALUE); + boolean ignoreIncompatible = params.has(CLIConfig.IGNORE_INCOMPATIBLE); - cdcSync.setEnv(env) + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + sync.setEnv(env) .setSourceConfig(sourceConfig) .setSinkConfig(sinkConfig) .setDatabase(database) @@ -87,7 +89,7 @@ public static void main(String[] args) throws Exception { .build(); if (StringUtils.isNullOrWhitespaceOnly(jobName)) { - jobName = String.format("%s Sync", jobType); + jobName = String.format("%s Sync", sourceType); } env.execute(jobName); } diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSyncConfig.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/config/CLIConfig.java similarity index 95% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSyncConfig.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/config/CLIConfig.java index 286a0d86..2904575c 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSyncConfig.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/config/CLIConfig.java @@ -14,9 +14,12 @@ * limitations under the License. */ -package com.oceanbase.connector.flink.source.cdc; +package com.oceanbase.connector.flink.config; -public class CdcSyncConfig { +public class CLIConfig { + + /** Option key for source type. */ + public static final String SOURCE_TYPE = "source-type"; /** Option key for cdc source. */ public static final String SOURCE_CONF = "source-conf"; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/ParsingProcessFunction.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/ParsingProcessFunction.java similarity index 92% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/ParsingProcessFunction.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/ParsingProcessFunction.java index 287f3a98..65253cd6 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/ParsingProcessFunction.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/ParsingProcessFunction.java @@ -14,17 +14,15 @@ * limitations under the License. */ -package com.oceanbase.connector.flink.source.cdc; - -import com.oceanbase.connector.flink.source.TableNameConverter; +package com.oceanbase.connector.flink.process; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import java.util.HashMap; import java.util.Map; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSync.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/Sync.java similarity index 88% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSync.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/Sync.java index 63aa717a..47808c02 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/CdcSync.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/Sync.java @@ -13,13 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.oceanbase.connector.flink.source.cdc; +package com.oceanbase.connector.flink.process; import com.oceanbase.connector.flink.OceanBaseConnectorOptions; import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider; import com.oceanbase.connector.flink.sink.OceanBaseRecordFlusher; import com.oceanbase.connector.flink.sink.OceanBaseSink; -import com.oceanbase.connector.flink.source.TableNameConverter; import com.oceanbase.connector.flink.source.TableSchema; import com.oceanbase.connector.flink.table.DataChangeRecord; import com.oceanbase.connector.flink.table.OceanBaseJsonSerializationSchema; @@ -53,8 +52,8 @@ import static com.oceanbase.connector.flink.utils.OceanBaseCatalogUtils.tableExists; import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; -public abstract class CdcSync { - private static final Logger LOG = LoggerFactory.getLogger(CdcSync.class); +public abstract class Sync { + private static final Logger LOG = LoggerFactory.getLogger(Sync.class); protected StreamExecutionEnvironment env; protected Configuration sourceConfig; @@ -71,67 +70,67 @@ public abstract class CdcSync { protected boolean ignoreDefaultValue; protected boolean ignoreIncompatible; - public CdcSync setEnv(StreamExecutionEnvironment env) { + public Sync setEnv(StreamExecutionEnvironment env) { this.env = env; return this; } - public CdcSync setSourceConfig(Configuration sourceConfig) { + public Sync setSourceConfig(Configuration sourceConfig) { this.sourceConfig = sourceConfig; return this; } - public CdcSync setSinkConfig(Configuration sinkConfig) { + public Sync setSinkConfig(Configuration sinkConfig) { this.sinkConfig = sinkConfig; return this; } - public CdcSync setDatabase(String database) { + public Sync setDatabase(String database) { this.database = database; return this; } - public CdcSync setTablePrefix(String tablePrefix) { + public Sync setTablePrefix(String tablePrefix) { this.tablePrefix = tablePrefix; return this; } - public CdcSync setTableSuffix(String tableSuffix) { + public Sync setTableSuffix(String tableSuffix) { this.tableSuffix = tableSuffix; return this; } - public CdcSync setIncludingTables(String includingTables) { + public Sync setIncludingTables(String includingTables) { this.includingTables = includingTables; return this; } - public CdcSync setExcludingTables(String excludingTables) { + public Sync setExcludingTables(String excludingTables) { this.excludingTables = excludingTables; return this; } - public CdcSync setMultiToOneOrigin(String multiToOneOrigin) { + public Sync setMultiToOneOrigin(String multiToOneOrigin) { this.multiToOneOrigin = multiToOneOrigin; return this; } - public CdcSync setMultiToOneTarget(String multiToOneTarget) { + public Sync setMultiToOneTarget(String multiToOneTarget) { this.multiToOneTarget = multiToOneTarget; return this; } - public CdcSync setCreateTableOnly(boolean createTableOnly) { + public Sync setCreateTableOnly(boolean createTableOnly) { this.createTableOnly = createTableOnly; return this; } - public CdcSync setIgnoreDefaultValue(boolean ignoreDefaultValue) { + public Sync setIgnoreDefaultValue(boolean ignoreDefaultValue) { this.ignoreDefaultValue = ignoreDefaultValue; return this; } - public CdcSync setIgnoreIncompatible(boolean ignoreIncompatible) { + public Sync setIgnoreIncompatible(boolean ignoreIncompatible) { this.ignoreIncompatible = ignoreIncompatible; return this; } @@ -174,7 +173,7 @@ protected boolean isSyncNeeded(String tableName) { protected abstract List getTableSchemas(); - protected abstract DataStreamSource buildCdcSource(); + protected abstract DataStreamSource buildSource(); public void build() { this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); @@ -213,9 +212,9 @@ public void build() { return; } - DataStreamSource cdcSource = buildCdcSource(); + DataStreamSource source = buildSource(); SingleOutputStreamOperator parsedStream = - cdcSource.process(new ParsingProcessFunction(tableNameConverter)); + source.process(new ParsingProcessFunction(tableNameConverter)); for (Tuple2 dbTbl : targetTables) { OutputTag recordOutputTag = ParsingProcessFunction.createRecordOutputTag(dbTbl.f1); diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/TableNameConverter.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/TableNameConverter.java similarity index 98% rename from flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/TableNameConverter.java rename to flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/TableNameConverter.java index 7a7e512d..f5084217 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/TableNameConverter.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/process/TableNameConverter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.oceanbase.connector.flink.source; +package com.oceanbase.connector.flink.process; import org.apache.flink.util.StringUtils; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java index e72aaaef..c5789365 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/OceanBaseJsonDeserializationSchema.java @@ -26,11 +26,11 @@ import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.util.Collector; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import java.math.BigDecimal; import java.nio.ByteBuffer; diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java index 4fff93b6..5d7a935f 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlCdcSync.java @@ -16,9 +16,9 @@ package com.oceanbase.connector.flink.source.cdc.mysql; +import com.oceanbase.connector.flink.process.Sync; import com.oceanbase.connector.flink.source.FieldSchema; import com.oceanbase.connector.flink.source.TableSchema; -import com.oceanbase.connector.flink.source.cdc.CdcSync; import com.oceanbase.connector.flink.source.cdc.OceanBaseJsonDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -55,7 +55,7 @@ import static org.apache.flink.cdc.debezium.utils.JdbcUrlUtils.PROPERTIES_PREFIX; -public class MysqlCdcSync extends CdcSync { +public class MysqlCdcSync extends Sync { private static final Logger LOG = LoggerFactory.getLogger(MysqlCdcSync.class); public static final String JDBC_URL_PATTERN = @@ -134,7 +134,7 @@ protected List getTableSchemas() { } @Override - protected DataStreamSource buildCdcSource() { + protected DataStreamSource buildSource() { String databaseName = sourceConfig.get(MySqlSourceOptions.DATABASE_NAME); MySqlSourceBuilder sourceBuilder = MySqlSource.builder(); sourceBuilder diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java index fd080089..76ee1265 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/source/cdc/mysql/MysqlDateConverter.java @@ -16,7 +16,7 @@ package com.oceanbase.connector.flink.source.cdc.mysql; -import com.oceanbase.connector.flink.source.cdc.CdcSyncConfig; +import com.oceanbase.connector.flink.config.CLIConfig; import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; @@ -49,37 +49,29 @@ public class MysqlDateConverter implements CustomConverter dateFormatter = DateTimeFormatter.ofPattern(p)); + props, CLIConfig.FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p)); readProps( - props, - CdcSyncConfig.FORMAT_TIME, - p -> timeFormatter = DateTimeFormatter.ofPattern(p)); + props, CLIConfig.FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p)); readProps( props, - CdcSyncConfig.FORMAT_DATETIME, + CLIConfig.FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); readProps( props, - CdcSyncConfig.FORMAT_TIMESTAMP, + CLIConfig.FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); - readProps(props, CdcSyncConfig.FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z)); + readProps(props, CLIConfig.FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z)); } private void readProps(Properties properties, String settingKey, Consumer consumer) { @@ -101,19 +93,19 @@ public void converterFor( String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; - if (CdcSyncConfig.UPPERCASE_DATE.equals(sqlType)) { + if (CLIConfig.UPPERCASE_DATE.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertDate; } - if (CdcSyncConfig.TIME.equals(sqlType)) { + if (CLIConfig.TIME.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertTime; } - if (CdcSyncConfig.DATETIME.equals(sqlType)) { + if (CLIConfig.DATETIME.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertDateTime; } - if (CdcSyncConfig.TIMESTAMP.equals(sqlType)) { + if (CLIConfig.TIMESTAMP.equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional(); converter = this::convertTimestamp; } diff --git a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java index 75c8f7a3..2fe07a42 100644 --- a/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java +++ b/flink-connector-oceanbase-cli/src/main/java/com/oceanbase/connector/flink/table/OceanBaseJsonSerializationSchema.java @@ -27,9 +27,10 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java b/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java index 7fad110a..0e13141a 100644 --- a/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java +++ b/flink-connector-oceanbase-cli/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncITCase.java @@ -16,7 +16,7 @@ package com.oceanbase.connector.flink; -import com.oceanbase.connector.flink.source.cdc.CdcSync; +import com.oceanbase.connector.flink.process.Sync; import com.oceanbase.connector.flink.source.cdc.mysql.MysqlCdcSync; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; @@ -52,17 +52,13 @@ public class MysqlCdcSyncITCase extends OceanBaseMySQLTestBase { @BeforeClass public static void setup() { - CONFIG_SERVER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); - CONTAINER - .withEnv("OB_CONFIGSERVER_ADDRESS", getConfigServerAddress()) - .withLogConsumer(new Slf4jLogConsumer(LOG)) - .start(); + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); MYSQL_CONTAINER.start(); } @AfterClass public static void tearDown() { - Stream.of(CONFIG_SERVER, CONTAINER, MYSQL_CONTAINER).forEach(GenericContainer::stop); + Stream.of(CONTAINER, MYSQL_CONTAINER).forEach(GenericContainer::stop); } @Test @@ -98,8 +94,8 @@ public void testMysqlCdcSync() throws Exception { sinkConfig.put("sink.enable-delete", "false"); Configuration sinkConf = Configuration.fromMap(sinkConfig); - CdcSync cdcSync = new MysqlCdcSync(); - cdcSync.setEnv(env) + Sync sync = new MysqlCdcSync(); + sync.setEnv(env) .setSourceConfig(sourceConfig) .setSinkConfig(sinkConf) .setDatabase(CONTAINER.getDatabaseName()) diff --git a/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql b/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql index ed7cedd3..c76db64e 100644 --- a/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql +++ b/flink-connector-oceanbase-cli/src/test/resources/sql/mysql-cdc.sql @@ -11,10 +11,6 @@ -- specific language governing permissions and limitations -- under the License. --- ---------------------------------------------------------------------------------------------------------------- --- DATABASE: inventory --- ---------------------------------------------------------------------------------------------------------------- - -- Create and populate our products using a single insert with many rows CREATE TABLE products ( diff --git a/flink-connector-oceanbase-e2e-tests/pom.xml b/flink-connector-oceanbase-e2e-tests/pom.xml index 75acdff6..8a6ec9d5 100644 --- a/flink-connector-oceanbase-e2e-tests/pom.xml +++ b/flink-connector-oceanbase-e2e-tests/pom.xml @@ -30,12 +30,14 @@ under the License. mysql-connector-java test + com.oceanbase flink-connector-oceanbase-base ${project.version} test + com.oceanbase flink-connector-oceanbase-base @@ -43,6 +45,12 @@ under the License. test-jar test + + + org.testcontainers + mysql + test + @@ -108,6 +116,22 @@ under the License. jar ${project.build.directory}/dependencies + + com.oceanbase + flink-connector-oceanbase-cli + ${project.version} + flink-connector-oceanbase-cli.jar + jar + ${project.build.directory}/dependencies + + + org.apache.flink + flink-sql-connector-mysql-cdc + ${flink.cdc.version} + flink-sql-connector-mysql-cdc.jar + jar + ${project.build.directory}/dependencies + diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java new file mode 100644 index 00000000..4d0b47b1 --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/MysqlCdcSyncE2eITCase.java @@ -0,0 +1,84 @@ +/* + * Copyright 2024 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.connector.flink; + +import com.oceanbase.connector.flink.utils.FlinkContainerTestEnvironment; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.util.Collections; +import java.util.stream.Stream; + +public class MysqlCdcSyncE2eITCase extends FlinkContainerTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlCdcSyncE2eITCase.class); + + private static final MySQLContainer MYSQL_CONTAINER = + new MySQLContainer<>("mysql:8.0.20") + .withConfigurationOverride("docker/mysql") + .withInitScript("sql/mysql-cdc.sql") + .withNetwork(NETWORK) + .withExposedPorts(3306) + .withDatabaseName("test") + .withUsername("root") + .withPassword("mysqlpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @BeforeClass + public static void setup() { + CONTAINER.withLogConsumer(new Slf4jLogConsumer(LOG)).start(); + MYSQL_CONTAINER.start(); + } + + @AfterClass + public static void tearDown() { + Stream.of(CONTAINER, MYSQL_CONTAINER).forEach(GenericContainer::stop); + } + + @Test + public void testMysqlCdcSync() throws Exception { + submitJob( + Collections.singletonList(getResource("flink-sql-connector-mysql-cdc.jar")), + getResource("flink-connector-oceanbase-cli.jar"), + new String[] { + "--job-type mysql-cdc", + "--source-conf hostname=" + MYSQL_CONTAINER.getHost(), + "--source-conf port=" + + MYSQL_CONTAINER.getMappedPort(MySQLContainer.MYSQL_PORT), + "--source-conf username=" + MYSQL_CONTAINER.getUsername(), + "--source-conf password=" + MYSQL_CONTAINER.getPassword(), + "--source-conf database-name=" + MYSQL_CONTAINER.getDatabaseName(), + "--source-conf table-name=.*", + "--job-name test-mysql-cdc-sync", + "--database " + CONTAINER.getDatabaseName(), + "--including-tables .*", + "--sink-conf url=" + CONTAINER.getJdbcUrl(), + "--sink-conf username=" + CONTAINER.getUsername(), + "--sink-conf password=" + CONTAINER.getPassword(), + }); + + waitingAndAssertTableCount("products", 9); + waitingAndAssertTableCount("customers", 4); + } +} diff --git a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java index 30609bfb..4069bff7 100644 --- a/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java +++ b/flink-connector-oceanbase-e2e-tests/src/test/java/com/oceanbase/connector/flink/utils/FlinkContainerTestEnvironment.java @@ -212,4 +212,32 @@ private String copyAndGetContainerPath(GenericContainer container, String fil container.copyFileToContainer(MountableFile.forHostPath(path), containerPath); return containerPath; } + + public void submitJob(List dependencies, Path jar, String[] args) + throws IOException, InterruptedException { + final List commands = new ArrayList<>(); + commands.add(FLINK_BIN + "/flink run"); + if (dependencies != null && !dependencies.isEmpty()) { + String dependencyJars = + dependencies.stream() + .map( + dep -> + "file://" + + copyAndGetContainerPath( + jobManager, + dep.toAbsolutePath().toString())) + .collect(Collectors.joining(";")); + commands.add("-Dpipeline.jars " + dependencyJars); + } + commands.add(copyAndGetContainerPath(jobManager, jar.toAbsolutePath().toString())); + commands.addAll(Arrays.asList(args)); + + Container.ExecResult execResult = + jobManager.execInContainer("bash", "-c", String.join(" ", commands)); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when submitting the job."); + } + } } diff --git a/flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf b/flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf new file mode 100644 index 00000000..a3908978 --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/resources/docker/mysql/my.cnf @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql new file mode 100644 index 00000000..c76db64e --- /dev/null +++ b/flink-connector-oceanbase-e2e-tests/src/test/resources/sql/mysql-cdc.sql @@ -0,0 +1,49 @@ +-- Copyright 2024 OceanBase. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE products AUTO_INCREMENT = 101; + +INSERT INTO products +VALUES (default, "scooter", "Small 2-wheel scooter", 3.14), + (default, "car battery", "12V car battery", 8.1), + (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8), + (default, "hammer", "12oz carpenter's hammer", 0.75), + (default, "hammer", "14oz carpenter's hammer", 0.875), + (default, "hammer", "16oz carpenter's hammer", 1.0), + (default, "rocks", "box of assorted rocks", 5.3), + (default, "jacket", "water resistent black wind breaker", 0.1), + (default, "spare tire", "24 inch spare tire", 22.2); + +-- Create some customers ... +CREATE TABLE customers +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR(255) NOT NULL, + last_name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL UNIQUE KEY +) AUTO_INCREMENT = 1001; + + +INSERT INTO customers +VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"), + (default, "George", "Bailey", "gbailey@foobar.com"), + (default, "Edward", "Walker", "ed@walker.com"), + (default, "Anne", "Kretchmar", "annek@noanswer.org"); diff --git a/pom.xml b/pom.xml index 6b57caf1..a16d574c 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,8 @@ under the License. 1.18 2.12 com.oceanbase.connector.flink.shaded + + 3.2.1 @@ -145,24 +147,35 @@ under the License. ${flink.version} provided + + + org.apache.flink + flink-core + ${flink.version} + provided + + org.apache.flink flink-connector-base ${flink.version} provided + org.apache.flink flink-test-utils ${flink.version} test + org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} test + org.apache.flink flink-table-planner_${scala.binary.version} @@ -170,6 +183,7 @@ under the License. test-jar test + org.testcontainers oceanbase