Skip to content
This repository has been archived by the owner on Jan 31, 2025. It is now read-only.

Commit

Permalink
Fix SSH related issue in CDC tests (#2966) (#2967)
Browse files Browse the repository at this point in the history
* Disable SSH when using JDBC with MySQL

* Fix database names

* Make checkstyle happy

Co-authored-by: Jozsef Bartok <[email protected]>
  • Loading branch information
olukas and jbartok authored Mar 8, 2021
1 parent d461147 commit e4163f6
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import org.testcontainers.containers.GenericContainer;

import javax.annotation.Nonnull;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -148,4 +152,17 @@ protected <T> T namedTestContainer(GenericContainer<?> container) {
});
}

protected static Connection getMySqlConnection(String url, String user, String password) throws SQLException {
Properties properties = new Properties();
properties.put("user", user);
properties.put("password", password);
properties.put("useSSL", "false");

return DriverManager.getConnection(url, properties);
}

protected static Connection getPostgreSqlConnection(String url, String user, String password) throws SQLException {
return DriverManager.getConnection(url, user, password);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.testcontainers.containers.PostgreSQLContainer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -94,7 +93,7 @@ public void mysql() throws Exception {
assertEqualsEventually(() -> jet.getMap("results").size(), 4);

//when
try (Connection connection = DriverManager.getConnection(container.withDatabaseName("inventory").getJdbcUrl(),
try (Connection connection = getMySqlConnection(container.withDatabaseName("inventory").getJdbcUrl(),
container.getUsername(), container.getPassword())) {
Statement statement = connection.createStatement();
statement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
Expand Down Expand Up @@ -271,7 +270,7 @@ public void postgres() throws Exception {
assertEqualsEventually(() -> jet.getMap("results").size(), 4);

//when
try (Connection connection = DriverManager.getConnection(container.getJdbcUrl(), container.getUsername(),
try (Connection connection = getPostgreSqlConnection(container.getJdbcUrl(), container.getUsername(),
container.getPassword())) {
connection.setSchema("inventory");
Statement statement = connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.testcontainers.containers.MySQLContainer;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

Expand All @@ -50,12 +49,20 @@ protected MySqlCdcSources.Builder sourceBuilder(String name) {

protected void createDb(String database) throws SQLException {
String jdbcUrl = "jdbc:mysql://" + mysql.getContainerIpAddress() + ":" + mysql.getMappedPort(MYSQL_PORT) + "/";
try (Connection connection = DriverManager.getConnection(jdbcUrl, "root", "mysqlpw")) {
try (Connection connection = getMySqlConnection(jdbcUrl, "root", "mysqlpw")) {
Statement statement = connection.createStatement();
statement.addBatch("CREATE DATABASE " + database);
statement.addBatch("GRANT ALL PRIVILEGES ON " + database + ".* TO 'mysqluser'@'%'");
statement.executeBatch();
}
}

static Connection getConnection(MySQLContainer<?> mysql, String database) throws SQLException {
return getMySqlConnection(
mysql.withDatabaseName(database).getJdbcUrl(),
mysql.getUsername(),
mysql.getPassword()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

import javax.annotation.Nonnull;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Date;
Expand Down Expand Up @@ -91,8 +90,7 @@ public void customers() throws Exception {
assertEqualsEventually(() -> jet.getMap("results").size(), 4);

//when
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName("inventory").getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, "inventory")) {
Statement statement = connection.createStatement();
statement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
statement.addBatch("INSERT INTO customers VALUES (1005, 'Jason', 'Bourne', '[email protected]')");
Expand Down Expand Up @@ -203,8 +201,7 @@ public void restart() throws Exception {
JetTestSupport.assertJobStatusEventually(job, JobStatus.RUNNING);

//then update a record
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName("inventory").getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, "inventory")) {
Statement statement = connection.createStatement();
statement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
statement.addBatch("INSERT INTO customers VALUES (1005, 'Jason', 'Bourne', '[email protected]')");
Expand Down Expand Up @@ -251,8 +248,7 @@ public void cdcMapSink() throws Exception {
//when
job.restart();
JetTestSupport.assertJobStatusEventually(job, JobStatus.RUNNING);
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName("inventory").getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, "inventory")) {
Statement statement = connection.createStatement();
statement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
statement.addBatch("INSERT INTO customers VALUES (1005, 'Jason', 'Bourne', '[email protected]')");
Expand All @@ -270,8 +266,7 @@ public void cdcMapSink() throws Exception {
);

//when
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName("inventory").getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, "inventory")) {
connection.createStatement().execute("DELETE FROM customers WHERE id=1005");
}
//then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.junit.experimental.categories.Category;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
Expand Down Expand Up @@ -149,8 +148,7 @@ public void listenBeforeColumnExists() throws Exception {
}

private void createTableWithData(String database, String table) throws SQLException {
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName(database).getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, database)) {
Statement statement = connection.createStatement();
statement.addBatch("CREATE TABLE " + table + " (\n"
+ " id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,\n"
Expand All @@ -177,16 +175,14 @@ private void insertToTable(String database, String table, int id, String val1, S
statement.append(", '").append(val3).append("'");
}
statement.append(")");
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName(database).getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, database)) {
connection.createStatement().execute(statement.toString());

}
}

private void addColumnToTable(String database, String table, String column) throws SQLException {
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName(database).getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, database)) {
connection.createStatement()
.execute("ALTER TABLE " + table + " ADD COLUMN " + column + " VARCHAR(255);");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
Expand Down Expand Up @@ -366,8 +365,7 @@ private static ToxiproxyContainer.ContainerProxy initProxy(ToxiproxyContainer to
}

private static void insertRecords(MySQLContainer<?> mysql, int... ids) throws SQLException {
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName("inventory").getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = AbstractMySqlCdcIntegrationTest.getConnection(mysql, "inventory")) {
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
for (int id : ids) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.junit.experimental.categories.Category;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand Down Expand Up @@ -282,8 +281,7 @@ private Pipeline pipeline(StreamSource<ChangeRecord> source) {
private void createDbWithData(int dbSuffix) throws SQLException {
String database = DB_PREFIX + dbSuffix;
createDb(database);
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName(database).getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, database)) {
int dbId = dbSuffix * 1000;
for (int i = 0; i < 3; i++) {
String table = "table" + i;
Expand Down Expand Up @@ -313,8 +311,7 @@ private void createDbWithData(int dbSuffix) throws SQLException {

private void executeStatementsOnDb(int dbSuffix) throws SQLException {
String database = DB_PREFIX + dbSuffix;
try (Connection connection = DriverManager.getConnection(mysql.withDatabaseName(database).getJdbcUrl(),
mysql.getUsername(), mysql.getPassword())) {
try (Connection connection = getConnection(mysql, database)) {
int id = dbSuffix * 1000 + 1;
Statement statement = connection.createStatement();
statement.addBatch("UPDATE table0 SET value_1='new_" + database + "_table0_val1_0' WHERE id=" + id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Date;
import java.util.Objects;
Expand Down Expand Up @@ -54,12 +53,15 @@ protected PostgresCdcSources.Builder sourceBuilder(String name) {
}

protected void createSchema(String schema) throws SQLException {
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.createStatement().execute("CREATE SCHEMA " + schema);
}
}

static Connection getConnection(PostgreSQLContainer<?> postgres) throws SQLException {
return getPostgreSqlConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword());
}

protected static class Customer implements Serializable {

@JsonProperty("id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Date;
Expand Down Expand Up @@ -97,8 +96,7 @@ record -> (Integer) record.value().toMap().get("purchaser"),
assertEqualsEventually(() -> getIMapContent(jet, CACHE), expected);

//when
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.setSchema("inventory");
Statement statement = connection.createStatement();
for (int i = 1; i <= REPEATS; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import javax.annotation.Nonnull;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -92,8 +91,7 @@ public void customers() throws Exception {
assertEqualsEventually(() -> jet.getMap("results").size(), 4);

//when
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.setSchema("inventory");
Statement statement = connection.createStatement();
statement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
Expand Down Expand Up @@ -201,8 +199,7 @@ public void restart() throws Exception {
JetTestSupport.assertJobStatusEventually(job, JobStatus.RUNNING);

//then update a record
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.setSchema("inventory");
Statement statement = connection.createStatement();
statement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
Expand Down Expand Up @@ -250,8 +247,7 @@ public void cdcMapSink() throws Exception {
//when
job.restart();
JetTestSupport.assertJobStatusEventually(job, JobStatus.RUNNING);
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.setSchema("inventory");
Statement statement = connection.createStatement();
statement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
Expand All @@ -270,8 +266,7 @@ public void cdcMapSink() throws Exception {
);

//when
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.setSchema("inventory");
connection
.prepareStatement("DELETE FROM customers WHERE id=1005")
Expand Down Expand Up @@ -337,8 +332,7 @@ public void dataLoss() throws Exception {
assertJobStatusEventually(job, JobStatus.RUNNING);

//when
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.setSchema("inventory");
Statement statement = connection.createStatement();
for (int i = offset; i < offset + length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.junit.experimental.categories.Category;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -156,8 +155,7 @@ public void listenBeforeColumnExists() throws Exception {
}

private void createTableWithData(String schema, String table) throws SQLException {
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(),
postgres.getUsername(), postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
Statement statement = connection.createStatement();
statement.addBatch("SET search_path TO " + schema);
statement.addBatch("CREATE TABLE " + table + " (\n"
Expand Down Expand Up @@ -187,8 +185,7 @@ private void insertIntoTable(String schema, String table, int id, String val1, S
}
insertSql.append(")");

try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
connection.setSchema(schema);
Statement statement = connection.createStatement();
statement.addBatch("SET search_path TO " + schema);
Expand All @@ -198,8 +195,7 @@ private void insertIntoTable(String schema, String table, int id, String val1, S
}

private void addColumnToTable(String schema, String table, String column) throws SQLException {
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
Statement statement = connection.createStatement();
statement.addBatch("SET search_path TO " + schema);
statement.addBatch("ALTER TABLE " + table + " ADD COLUMN " + column + " VARCHAR(255);");
Expand Down Expand Up @@ -232,8 +228,7 @@ private Pipeline pipeline(StreamSource<ChangeRecord> source) {

private void assertReplicationSlotActive(String slotName) {
assertTrueEventually(() -> {
try (Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(),
postgres.getPassword())) {
try (Connection connection = getConnection(postgres)) {
PreparedStatement preparedStatement = connection.prepareStatement(
"SELECT * FROM pg_replication_slots WHERE slot_name=?;");
preparedStatement.setString(1, slotName);
Expand Down
Loading

0 comments on commit e4163f6

Please sign in to comment.