Skip to content

Commit

Permalink
update mysql-cdc it case
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 6, 2025
1 parent 590cbfe commit 4548482
Showing 1 changed file with 37 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.oceanbase.cdc.tools.tests;

import com.oceanbase.cdc.tools.tests.container.MySqlContainer;
import com.oceanbase.connector.flink.OceanBaseConnectorOptions;
import com.oceanbase.connector.flink.OceanBaseMySQLTestBase;
import com.oceanbase.connector.flink.tools.cdc.DatabaseSync;
Expand All @@ -29,36 +28,33 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import org.testcontainers.utility.DockerLoggerFactory;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

public class CdcMysqlSyncDatabaseITCase extends OceanBaseMySQLTestBase {
private static final Logger LOG = LoggerFactory.getLogger(CdcMysqlSyncDatabaseITCase.class);

private static final String MYSQL_HOST = "localhost";
private static final Integer MYSQL_PORT = 3306;
private static final String MYSQL_USER_NAME = "root";
private static final String MYSQL_USER_PASSWORD = "mysqlpw";
private static final String MYSQL_DATABASE = "test";
private static final String MYSQL_TABLE_NAME = "test_history_text";
static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

private static final MySQLContainer<?> MYSQL_CONTAINER =
new MySQLContainer<>("mysql:8.0.20")
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withInitScript("sql/mysql-cdc.sql")
.withNetwork(NETWORK)
.withExposedPorts(3306)
.withDatabaseName("test")
.withUsername("root")
.withPassword("mysqlpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));

@BeforeClass
public static void setup() {
Expand All @@ -75,48 +71,25 @@ public static void tearDown() {
Stream.of(CONTAINER, MYSQL_CONTAINER).forEach(GenericContainer::stop);
}

private static final MySqlContainer MYSQL_CONTAINER =
new MySqlContainer()
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("sql/cdc.sql")
.withNetwork(NETWORK)
.withNetworkAliases(MYSQL_HOST)
.withExposedPorts(MYSQL_PORT)
.withDatabaseName(MYSQL_DATABASE)
.withPassword(MYSQL_USER_PASSWORD)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger("mysql-docker-image")));

@Test
public void testCdcMysqlSyncOceanBase() throws Exception {
extractedCdcSync();
Thread.sleep(30000);
checkResult();
env.close();
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

private static void extractedCdcSync() throws Exception {
// env.setParallelism(1);
Map<String, String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining", "false");
flinkMap.put("parallelism.default", "1");

Configuration configuration = Configuration.fromMap(flinkMap);
env.configure(configuration);

String tablePrefix = "";
String tableSuffix = "";
Map<String, String> mysqlConfig = new HashMap<>();
mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), MYSQL_DATABASE);
mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), MYSQL_HOST);
mysqlConfig.put(MySqlSourceOptions.DATABASE_NAME.key(), MYSQL_CONTAINER.getDatabaseName());
mysqlConfig.put(MySqlSourceOptions.HOSTNAME.key(), MYSQL_CONTAINER.getHost());
mysqlConfig.put(
MySqlSourceOptions.PORT.key(),
String.valueOf(MYSQL_CONTAINER.getMappedPort(MYSQL_PORT)));
mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), MYSQL_USER_NAME);
mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), MYSQL_USER_PASSWORD);
// add jdbc properties for MySQL
String.valueOf(MYSQL_CONTAINER.getMappedPort(MySQLContainer.MYSQL_PORT)));
mysqlConfig.put(MySqlSourceOptions.USERNAME.key(), MYSQL_CONTAINER.getUsername());
mysqlConfig.put(MySqlSourceOptions.PASSWORD.key(), MYSQL_CONTAINER.getPassword());
mysqlConfig.put("jdbc.properties.use_ssl", "false");
Configuration config = Configuration.fromMap(mysqlConfig);

Expand All @@ -127,83 +100,32 @@ private static void extractedCdcSync() throws Exception {
sinkConfig.put("sink.enable-delete", "false");
Configuration sinkConf = Configuration.fromMap(sinkConfig);

String includingTables = "test.*";
String excludingTables = "";
boolean ignoreDefaultValue = false;
boolean ignoreIncompatible = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync
.setEnv(env)
.setDatabase(MYSQL_DATABASE)
.setDatabase(MYSQL_CONTAINER.getDatabaseName())
.setConfig(config)
.setTablePrefix(tablePrefix)
.setTableSuffix(tableSuffix)
.setIncludingTables(includingTables)
.setExcludingTables(excludingTables)
.setIgnoreDefaultValue(ignoreDefaultValue)
.setTablePrefix(null)
.setTableSuffix(null)
.setIncludingTables(MYSQL_CONTAINER.getDatabaseName() + ".*")
.setExcludingTables(null)
.setIgnoreDefaultValue(false)
.setSinkConfig(sinkConf)
.setCreateTableOnly(false)
.create();
databaseSync.build();
env.executeAsync(String.format("MySQL-OceanBase Database Sync: %s", MYSQL_DATABASE));
}

static void checkResult() {
String sourceSql = String.format("select * from %s order by 1", MYSQL_TABLE_NAME);
String sinkSql = String.format("select * from %s order by 1", MYSQL_TABLE_NAME);
try (Statement sourceStatement =
getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_USER_NAME,
MYSQL_USER_PASSWORD)
.createStatement(
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
Statement sinkStatement =
getConnection(
CONTAINER.getJdbcUrl(),
CONTAINER.getUsername(),
CONTAINER.getPassword())
.createStatement(
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql)) {
Assertions.assertEquals(
sourceResultSet.getMetaData().getColumnCount(),
sinkResultSet.getMetaData().getColumnCount());
while (sourceResultSet.next()) {
if (sinkResultSet.next()) {
for (String column : getFieldNames()) {
Object source = sourceResultSet.getObject(column);
Object sink = sinkResultSet.getObject(column);
if (!Objects.deepEquals(source, sink)) {
InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column);
InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column);
String sourceValue =
IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
String sinkValue =
IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
Assertions.assertEquals(sourceValue, sinkValue);
}
}
}
}
sourceResultSet.last();
sinkResultSet.last();
} catch (Exception e) {
throw new RuntimeException("Compare result error", e);
}
}
env.executeAsync(
String.format(
"MySQL-OceanBase Database Sync: %s", MYSQL_CONTAINER.getDatabaseName()));

static String[] getFieldNames() {
return new String[] {
"itemid", "clock", "value", "ns",
};
}
List<String> expected = Arrays.asList("1,21131,ces1,21321", "2,21321,ces2,12321");

waitingAndAssertTableCount(MYSQL_TABLE_NAME, expected.size());

List<String> actual =
queryTable(MYSQL_TABLE_NAME, Arrays.asList("itemid", "clock", "value", "ns"));

public static Connection getConnection(String jdbcUrl, String userName, String password)
throws SQLException {
return DriverManager.getConnection(jdbcUrl, userName, password);
assertEqualsInAnyOrder(expected, actual);
}
}

0 comments on commit 4548482

Please sign in to comment.