diff --git a/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java index cd162db..5690bf1 100644 --- a/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java +++ b/flink-connector-oceanbase-tools-cdc/src/test/java/com/oceanbase/cdc/tools/tests/CdcMysqlSyncDatabaseITCase.java @@ -16,7 +16,6 @@ package com.oceanbase.cdc.tools.tests; -import com.oceanbase.cdc.tools.tests.container.MySqlContainer; import com.oceanbase.connector.flink.OceanBaseConnectorOptions; import com.oceanbase.connector.flink.OceanBaseMySQLTestBase; import com.oceanbase.connector.flink.tools.cdc.DatabaseSync; @@ -29,36 +28,33 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.shaded.org.apache.commons.io.IOUtils; -import org.testcontainers.utility.DockerLoggerFactory; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; + +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Stream; public class CdcMysqlSyncDatabaseITCase extends OceanBaseMySQLTestBase { private static final Logger LOG = LoggerFactory.getLogger(CdcMysqlSyncDatabaseITCase.class); - private static final String MYSQL_HOST = "localhost"; - private static final Integer MYSQL_PORT = 3306; - private static final String MYSQL_USER_NAME = "root"; - private static final String MYSQL_USER_PASSWORD = "mysqlpw"; - private static final String MYSQL_DATABASE = "test"; private static final String MYSQL_TABLE_NAME = "test_history_text"; - static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final MySQLContainer MYSQL_CONTAINER = + new MySQLContainer<>("mysql:8.0.20") + .withConfigurationOverride("docker/server-gtids/my.cnf") + .withInitScript("sql/mysql-cdc.sql") + .withNetwork(NETWORK) + .withExposedPorts(3306) + .withDatabaseName("test") + .withUsername("root") + .withPassword("mysqlpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); @BeforeClass public static void setup() { @@ -75,48 +71,25 @@ public static void tearDown() { Stream.of(CONTAINER, MYSQL_CONTAINER).forEach(GenericContainer::stop); } - private static final MySqlContainer MYSQL_CONTAINER = - new MySqlContainer() - .withConfigurationOverride("docker/server-gtids/my.cnf") - .withSetupSQL("sql/cdc.sql") - .withNetwork(NETWORK) - .withNetworkAliases(MYSQL_HOST) - .withExposedPorts(MYSQL_PORT) - .withDatabaseName(MYSQL_DATABASE) - .withPassword(MYSQL_USER_PASSWORD) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger("mysql-docker-image"))); - @Test public void testCdcMysqlSyncOceanBase() throws Exception { - extractedCdcSync(); - Thread.sleep(30000); - checkResult(); - env.close(); - } + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - private static void extractedCdcSync() throws Exception { - // env.setParallelism(1); Map flinkMap = new HashMap<>(); flinkMap.put("execution.checkpointing.interval", "10s"); flinkMap.put("pipeline.operator-chaining", "false"); flinkMap.put("parallelism.default", "1"); - Configuration configuration = Configuration.fromMap(flinkMap); env.configure(configuration); - String tablePrefix = ""; - String tableSuffix = ""; Map mysqlConfig = new HashMap<>(); - mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), MYSQL_DATABASE); - mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), MYSQL_HOST); + mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), MYSQL_CONTAINER.getDatabaseName()); + mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), MYSQL_CONTAINER.getHost()); mysqlConfig.put( MySqlSourceOptions.PORT.key(), - String.valueOf(MYSQL_CONTAINER.getMappedPort(MYSQL_PORT))); - mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), MYSQL_USER_NAME); - mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), MYSQL_USER_PASSWORD); - // add jdbc properties for MySQL + String.valueOf(MYSQL_CONTAINER.getMappedPort(MySQLContainer.MYSQL_PORT))); + mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), MYSQL_CONTAINER.getUsername()); + mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), MYSQL_CONTAINER.getPassword()); mysqlConfig.put("jdbc.properties.use_ssl", "false"); Configuration config = Configuration.fromMap(mysqlConfig); @@ -127,83 +100,32 @@ private static void extractedCdcSync() throws Exception { sinkConfig.put("sink.enable-delete", "false"); Configuration sinkConf = Configuration.fromMap(sinkConfig); - String includingTables = "test.*"; - String excludingTables = ""; - boolean ignoreDefaultValue = false; - boolean ignoreIncompatible = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); databaseSync .setEnv(env) - .setDatabase(MYSQL_DATABASE) + .setDatabase(MYSQL_CONTAINER.getDatabaseName()) .setConfig(config) - .setTablePrefix(tablePrefix) - .setTableSuffix(tableSuffix) - .setIncludingTables(includingTables) - .setExcludingTables(excludingTables) - .setIgnoreDefaultValue(ignoreDefaultValue) + .setTablePrefix(null) + .setTableSuffix(null) + .setIncludingTables(MYSQL_CONTAINER.getDatabaseName() + ".*") + .setExcludingTables(null) + .setIgnoreDefaultValue(false) .setSinkConfig(sinkConf) .setCreateTableOnly(false) .create(); databaseSync.build(); - env.executeAsync(String.format("MySQL-OceanBase Database Sync: %s", MYSQL_DATABASE)); - } - static void checkResult() { - String sourceSql = String.format("select * from %s order by 1", MYSQL_TABLE_NAME); - String sinkSql = String.format("select * from %s order by 1", MYSQL_TABLE_NAME); - try (Statement sourceStatement = - getConnection( - MYSQL_CONTAINER.getJdbcUrl(), - MYSQL_USER_NAME, - MYSQL_USER_PASSWORD) - .createStatement( - ResultSet.TYPE_SCROLL_INSENSITIVE, - ResultSet.CONCUR_READ_ONLY); - Statement sinkStatement = - getConnection( - CONTAINER.getJdbcUrl(), - CONTAINER.getUsername(), - CONTAINER.getPassword()) - .createStatement( - ResultSet.TYPE_SCROLL_INSENSITIVE, - ResultSet.CONCUR_READ_ONLY); - ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); - ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql)) { - Assertions.assertEquals( - sourceResultSet.getMetaData().getColumnCount(), - sinkResultSet.getMetaData().getColumnCount()); - while (sourceResultSet.next()) { - if (sinkResultSet.next()) { - for (String column : getFieldNames()) { - Object source = sourceResultSet.getObject(column); - Object sink = sinkResultSet.getObject(column); - if (!Objects.deepEquals(source, sink)) { - InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); - InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); - String sourceValue = - IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); - String sinkValue = - IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); - Assertions.assertEquals(sourceValue, sinkValue); - } - } - } - } - sourceResultSet.last(); - sinkResultSet.last(); - } catch (Exception e) { - throw new RuntimeException("Compare result error", e); - } - } + env.executeAsync( + String.format( + "MySQL-OceanBase Database Sync: %s", MYSQL_CONTAINER.getDatabaseName())); - static String[] getFieldNames() { - return new String[] { - "itemid", "clock", "value", "ns", - }; - } + List expected = Arrays.asList("1,21131,ces1,21321", "2,21321,ces2,12321"); + + waitingAndAssertTableCount(MYSQL_TABLE_NAME, expected.size()); + + List actual = + queryTable(MYSQL_TABLE_NAME, Arrays.asList("itemid", "clock", "value", "ns")); - public static Connection getConnection(String jdbcUrl, String userName, String password) - throws SQLException { - return DriverManager.getConnection(jdbcUrl, userName, password); + assertEqualsInAnyOrder(expected, actual); } }