diff --git a/docs/sink/flink-connector-obkv-hbase.md b/docs/sink/flink-connector-obkv-hbase.md index 73a63f8c..ff494e0d 100644 --- a/docs/sink/flink-connector-obkv-hbase.md +++ b/docs/sink/flink-connector-obkv-hbase.md @@ -113,6 +113,8 @@ public class Main { Once executed, the records should have been written to OceanBase. +For more information please refer to [OBKVHBaseConnectorITCase.java](../../flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java). + #### Flink SQL Demo Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client. diff --git a/docs/sink/flink-connector-obkv-hbase_cn.md b/docs/sink/flink-connector-obkv-hbase_cn.md index 122e37fc..7cd74914 100644 --- a/docs/sink/flink-connector-obkv-hbase_cn.md +++ b/docs/sink/flink-connector-obkv-hbase_cn.md @@ -113,6 +113,8 @@ public class Main { 执行完成后,即可在 OceanBase 中检索验证。 +更多信息请参考 [OBKVHBaseConnectorITCase.java](../../flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java)。 + #### Flink SQL 示例 将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。 diff --git a/docs/sink/flink-connector-oceanbase.md b/docs/sink/flink-connector-oceanbase.md index 8588699c..7dfb076a 100644 --- a/docs/sink/flink-connector-oceanbase.md +++ b/docs/sink/flink-connector-oceanbase.md @@ -95,8 +95,9 @@ public class Main { + ") with (" + " 'connector' = 'oceanbase'," + " 'url' = 'jdbc:oceanbase://127.0.0.1:2881/test'," + + " 'schema-name'= 'test'," + " 'table-name' = 't_sink'," - + " 'username' = 'root@test'," + + " 'username' = 'root@test#obcluster'," + " 'password' = 'pswd'," + " 'compatible-mode' = 'mysql'," + " 'connection-pool-properties' = 'druid.initialSize=10;druid.maxActive=100'," @@ -115,10 +116,13 @@ public class Main { .await(); } } + ``` Once executed, the records should have been written to OceanBase. +For more information please refer to [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java). + #### Flink SQL Demo Put the JAR files of dependencies to the 'lib' directory of Flink, and then create the destination table with Flink SQL through the sql client. @@ -133,8 +137,6 @@ CREATE TABLE t_sink ) with ( 'connector' = 'oceanbase', 'url' = 'jdbc:oceanbase://127.0.0.1:2881/test', - 'cluster-name' = 'obcluster', - 'tenant-name' = 'test', 'schema-name' = 'test', 'table-name' = 't_sink', 'username' = 'root@test#obcluster', diff --git a/docs/sink/flink-connector-oceanbase_cn.md b/docs/sink/flink-connector-oceanbase_cn.md index d943187a..a113e221 100644 --- a/docs/sink/flink-connector-oceanbase_cn.md +++ b/docs/sink/flink-connector-oceanbase_cn.md @@ -96,8 +96,9 @@ public class Main { + ") with (" + " 'connector' = 'oceanbase'," + " 'url' = 'jdbc:oceanbase://127.0.0.1:2881/test'," + + " 'schema-name'= 'test'," + " 'table-name' = 't_sink'," - + " 'username' = 'root@test'," + + " 'username' = 'root@test#obcluster'," + " 'password' = 'pswd'," + " 'compatible-mode' = 'mysql'," + " 'connection-pool-properties' = 'druid.initialSize=10;druid.maxActive=100'," @@ -116,10 +117,13 @@ public class Main { .await(); } } + ``` 执行完成后,即可在 OceanBase 中检索验证。 +更多信息请参考 [OceanBaseConnectorITCase.java](../../flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java)。 + #### Flink SQL 示例 将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。 @@ -135,8 +139,6 @@ CREATE TABLE t_sink with ( 'connector' = 'oceanbase', 'url' = 'jdbc:oceanbase://127.0.0.1:2881/test', - 'cluster-name' = 'obcluster', - 'tenant-name' = 'test', 'schema-name' = 'test', 'table-name' = 't_sink', 'username' = 'root@test#obcluster', diff --git a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java index 66adc6cf..6816cb65 100644 --- a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java +++ b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java @@ -16,17 +16,32 @@ package com.oceanbase.connector.flink; +import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider; +import com.oceanbase.connector.flink.connection.OBKVHBaseTableSchema; +import com.oceanbase.connector.flink.sink.OBKVHBaseStatementExecutor; +import com.oceanbase.connector.flink.sink.OceanBaseSink; + import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import com.alipay.oceanbase.hbase.OHTableClient; import com.alipay.oceanbase.hbase.constants.OHConstants; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -34,7 +49,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.Function; +import java.util.Map; import static org.junit.Assert.assertTrue; @@ -44,6 +59,118 @@ public class OBKVHBaseConnectorITCase extends OceanBaseTestBase { public static final String CONFIG_URL = "http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=" + CLUSTER_NAME; + @Override + protected String getTestTable() { + return "htable"; + } + + @Override + protected String getUrl() { + return String.format("%s&database=%s", CONFIG_URL, OB_SERVER.getDatabaseName()); + } + + @Override + protected String getUsername() { + return OB_SERVER.getUsername() + "#" + CLUSTER_NAME; + } + + @Override + protected Map getOptions() { + Map options = super.getOptions(); + options.put("sys.username", OB_SERVER.getSysUsername()); + options.put("sys.password", OB_SERVER.getSysPassword()); + return options; + } + + private OHTableClient client; + + @Before + public void before() throws Exception { + Configuration conf = new Configuration(); + conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, getUrl()); + conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, getUsername()); + conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, getPassword()); + conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, OB_SERVER.getSysUsername()); + conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, OB_SERVER.getSysPassword()); + client = new OHTableClient(getTestTable(), conf); + client.init(); + } + + @After + public void after() throws Exception { + client.delete( + Arrays.asList( + deleteFamily("1", "family1"), + deleteFamily("1", "family2"), + deleteFamily("2", "family2"), + deleteFamily("3", "family1"), + deleteFamily("4", "family1"), + deleteFamily("4", "family2"))); + client.close(); + client = null; + } + + private Delete deleteFamily(String rowKey, String family) { + return new Delete(Bytes.toBytes(rowKey)).deleteFamily(Bytes.toBytes(family)); + } + + @Test + public void testDataStreamSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ResolvedSchema physicalSchema = + new ResolvedSchema( + Arrays.asList( + Column.physical("rowkey", DataTypes.STRING().notNull()), + Column.physical( + "family1", + DataTypes.ROW( + DataTypes.FIELD( + "q1", DataTypes.INT().nullable())) + .notNull()), + Column.physical( + "family2", + DataTypes.ROW( + DataTypes.FIELD( + "q2", + DataTypes.STRING().nullable()), + DataTypes.FIELD( + "q3", DataTypes.INT().nullable())) + .notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("rowkey"))); + + List dataSet = + Arrays.asList( + rowData("1", 1, "1", 1), + rowData("2", null, "2", null), + rowData("3", 3, null, null), + rowData("4", 4, "4", null)); + + OBKVHBaseConnectorOptions connectorOptions = new OBKVHBaseConnectorOptions(getOptions()); + OBKVHBaseConnectionProvider connectionProvider = + new OBKVHBaseConnectionProvider(connectorOptions.getConnectionOptions()); + OBKVHBaseStatementExecutor statementExecutor = + new OBKVHBaseStatementExecutor( + connectorOptions.getStatementOptions(), + new OBKVHBaseTableSchema(physicalSchema), + connectionProvider); + OceanBaseSink sink = + new OceanBaseSink(connectorOptions.getWriterOptions(), statementExecutor); + env.fromCollection(dataSet).sinkTo(sink); + env.execute(); + + validateSinkResults(); + } + + private RowData rowData(String rowKey, Integer q1, String q2, Integer q3) { + return GenericRowData.of( + StringData.fromString(rowKey), + GenericRowData.of(q1), + GenericRowData.of(StringData.fromString(q2), q3)); + } + @Test public void testSink() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -52,37 +179,16 @@ public void testSink() throws Exception { StreamTableEnvironment.create( execEnv, EnvironmentSettings.newInstance().inStreamingMode().build()); - String hTable = "htable"; - String family1 = "family1"; - String family2 = "family2"; - - String url = String.format("%s&database=%s", CONFIG_URL, obServer.getDatabaseName()); - String fullUsername = obServer.getUsername() + "#" + CLUSTER_NAME; - tEnv.executeSql( - String.format( - "CREATE TEMPORARY TABLE target (" - + " rowkey STRING," - + " %s ROW," - + " %s ROW," - + " PRIMARY KEY (rowkey) NOT ENFORCED" - + ") with (" - + " 'connector'='obkv-hbase'," - + " 'url'='%s'," - + " 'table-name'='%s'," - + " 'username'='%s'," - + " 'password'='%s'," - + " 'sys.username'='%s'," - + " 'sys.password'='%s'" - + ");", - family1, - family2, - url, - hTable, - fullUsername, - obServer.getPassword(), - obServer.getSysUsername(), - obServer.getSysPassword())); + "CREATE TEMPORARY TABLE target (" + + " rowkey STRING," + + " family1 ROW," + + " family2 ROW," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") with (" + + " 'connector'='obkv-hbase'," + + getOptionsString() + + ");"); tEnv.executeSql( String.format( @@ -93,50 +199,28 @@ public void testSink() throws Exception { row("4", 4, "4", null))) .await(); - Configuration conf = new Configuration(); - conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, url); - conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, fullUsername); - conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, obServer.getPassword()); - conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, obServer.getSysUsername()); - conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, obServer.getSysPassword()); - - OHTableClient client = new OHTableClient(hTable, conf); - client.init(); - - Function valueFunc = - kv -> { - String column = Bytes.toString(kv.getQualifier()); - if ("q2".equals(column)) { - return Bytes.toString(kv.getValue()); - } else { - return String.valueOf(Bytes.toInt(kv.getValue())); - } - }; + validateSinkResults(); + } + private void validateSinkResults() throws Exception { assertEqualsInAnyOrder( - Collections.singletonList("1,q1,1"), queryHTable(client, family1, "1", valueFunc)); - assertTrue(queryHTable(client, family1, "2", valueFunc).isEmpty()); + Collections.singletonList("1,q1,1"), queryHTable(client, "family1", "1")); + assertTrue(queryHTable(client, "family1", "2").isEmpty()); assertEqualsInAnyOrder( - Collections.singletonList("3,q1,3"), queryHTable(client, family1, "3", valueFunc)); + Collections.singletonList("3,q1,3"), queryHTable(client, "family1", "3")); assertEqualsInAnyOrder( - Collections.singletonList("4,q1,4"), queryHTable(client, family1, "4", valueFunc)); + Collections.singletonList("4,q1,4"), queryHTable(client, "family1", "4")); assertEqualsInAnyOrder( - Arrays.asList("1,q2,1", "1,q3,1"), queryHTable(client, family2, "1", valueFunc)); + Arrays.asList("1,q2,1", "1,q3,1"), queryHTable(client, "family2", "1")); assertEqualsInAnyOrder( - Collections.singletonList("2,q2,2"), queryHTable(client, family2, "2", valueFunc)); - assertTrue(queryHTable(client, family2, "3", valueFunc).isEmpty()); + Collections.singletonList("2,q2,2"), queryHTable(client, "family2", "2")); + assertTrue(queryHTable(client, "family2", "3").isEmpty()); assertEqualsInAnyOrder( - Collections.singletonList("4,q2,4"), queryHTable(client, family2, "4", valueFunc)); - - client.close(); + Collections.singletonList("4,q2,4"), queryHTable(client, "family2", "4")); } - private List queryHTable( - OHTableClient client, - String family, - String rowKey, - Function valueStringFunction) + private List queryHTable(OHTableClient client, String family, String rowKey) throws IOException { List result = new ArrayList<>(); Get get = new Get(Bytes.toBytes(rowKey)); @@ -146,12 +230,15 @@ private List queryHTable( return result; } for (KeyValue kv : r.list()) { + String column = Bytes.toString(kv.getQualifier()); result.add( String.format( "%s,%s,%s", rowKey, - Bytes.toString(kv.getQualifier()), - valueStringFunction.apply(kv))); + column, + "q2".equals(column) + ? Bytes.toString(kv.getValue()) + : String.valueOf(Bytes.toInt(kv.getValue())))); } return result; } diff --git a/flink-connector-obkv-hbase/src/test/resources/sql/init.sql b/flink-connector-obkv-hbase/src/test/resources/sql/init.sql index f35cf3c4..e0b81a2a 100644 --- a/flink-connector-obkv-hbase/src/test/resources/sql/init.sql +++ b/flink-connector-obkv-hbase/src/test/resources/sql/init.sql @@ -11,7 +11,8 @@ -- specific language governing permissions and limitations -- under the License. -use test; +CREATE DATABASE IF NOT EXISTS test; +USE test; CREATE TABLE `htable$family1` ( diff --git a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java index 68f8c271..0c5809d8 100644 --- a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java +++ b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java @@ -18,47 +18,64 @@ import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; +import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.MountableFile; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class OceanBaseTestBase extends TestLogger { +public abstract class OceanBaseTestBase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestBase.class); - protected OceanBaseContainer obServer; - protected String imageTag = "4.2.1_bp2"; - - @Before - public void before() { - obServer = - new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + imageTag) - .withNetworkMode("host") - .withSysPassword("123456") - .withCopyFileToContainer( - MountableFile.forClasspathResource("sql/init.sql"), - "/root/boot/init.d/init.sql") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - Startables.deepStart(obServer).join(); + public static final String IMAGE_TAG = "4.2.1_bp2"; + + @ClassRule + public static final OceanBaseContainer OB_SERVER = + new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + IMAGE_TAG) + .withNetworkMode("host") + .withSysPassword("123456") + .withCopyFileToContainer( + MountableFile.forClasspathResource("sql/init.sql"), + "/root/boot/init.d/init.sql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + protected String getUrl() { + return OB_SERVER.getJdbcUrl(); + } + + protected abstract String getTestTable(); + + protected String getUsername() { + return OB_SERVER.getUsername(); + } + + protected String getPassword() { + return OB_SERVER.getPassword(); + } + + protected Map getOptions() { + Map options = new HashMap<>(); + options.put("url", getUrl()); + options.put("table-name", getTestTable()); + options.put("username", getUsername()); + options.put("password", getPassword()); + return options; } - @After - public void after() { - if (obServer != null) { - obServer.close(); - } + protected String getOptionsString() { + return getOptions().entrySet().stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")); } public static void assertEqualsInAnyOrder(List expected, List actual) { diff --git a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java index 2ced2669..5ff7aafe 100644 --- a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java +++ b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java @@ -16,12 +16,28 @@ package com.oceanbase.connector.flink; +import com.oceanbase.connector.flink.connection.OceanBaseConnectionPool; +import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider; +import com.oceanbase.connector.flink.connection.OceanBaseTableSchema; +import com.oceanbase.connector.flink.sink.OceanBaseSink; +import com.oceanbase.connector.flink.sink.OceanBaseStatementExecutor; + import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.junit.After; import org.junit.Test; +import java.math.BigDecimal; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -30,43 +46,108 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; public class OceanBaseConnectorITCase extends OceanBaseTestBase { + @Override + protected String getTestTable() { + return "products"; + } + + @Override + protected Map getOptions() { + Map options = super.getOptions(); + options.put("compatible-mode", "mysql"); + options.put("schema-name", "test"); + options.put("connection-pool-properties", "druid.initialSize=4;druid.maxActive=20;"); + return options; + } + + @After + public void after() throws Exception { + try (Connection connection = getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("DELETE FROM " + getTestTable()); + } + } + + @Test + public void testDataStreamSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ResolvedSchema physicalSchema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("name", DataTypes.STRING().notNull()), + Column.physical("description", DataTypes.STRING().notNull()), + Column.physical("weight", DataTypes.DECIMAL(20, 10).notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + + List dataSet = + Arrays.asList( + rowData(101, "scooter", "Small 2-wheel scooter", 3.14), + rowData(102, "car battery", "12V car battery", 8.1), + rowData( + 103, + "12-pack drill bits", + "12-pack of drill bits with sizes ranging from #40 to #3", + 0.8), + rowData(104, "hammer", "12oz carpenter's hammer", 0.75), + rowData(105, "hammer", "14oz carpenter's hammer", 0.875), + rowData(106, "hammer", "16oz carpenter's hammer", 1.0), + rowData(107, "rocks", "box of assorted rocks", 5.3), + rowData(108, "jacket", "water resistent black wind breaker", 0.1), + rowData(109, "spare tire", "24 inch spare tire", 22.2)); + + OceanBaseConnectorOptions connectorOptions = new OceanBaseConnectorOptions(getOptions()); + OceanBaseConnectionProvider connectionProvider = + new OceanBaseConnectionPool(connectorOptions.getConnectionOptions()); + OceanBaseStatementExecutor statementExecutor = + new OceanBaseStatementExecutor( + connectorOptions.getStatementOptions(), + new OceanBaseTableSchema(physicalSchema), + connectionProvider); + OceanBaseSink sink = + new OceanBaseSink(connectorOptions.getWriterOptions(), statementExecutor); + env.fromCollection(dataSet).sinkTo(sink); + env.execute(); + + validateSinkResults(); + } + + private RowData rowData(int id, String name, String description, double weight) { + return GenericRowData.of( + id, + StringData.fromString(name), + StringData.fromString(description), + DecimalData.fromBigDecimal(new BigDecimal(weight), 20, 10)); + } + @Test public void testSink() throws Exception { - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.setParallelism(1); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create( - execEnv, EnvironmentSettings.newInstance().inStreamingMode().build()); - - String tableName = "products"; + env, EnvironmentSettings.newInstance().inStreamingMode().build()); tEnv.executeSql( - String.format( - "CREATE TEMPORARY TABLE target (" - + " `id` INT NOT NULL," - + " name STRING," - + " description STRING," - + " weight DECIMAL(20, 10)," - + " PRIMARY KEY (`id`) NOT ENFORCED" - + ") with (" - + " 'connector'='oceanbase'," - + " 'url'='%s'," - + " 'schema-name'='%s'," - + " 'table-name'='%s'," - + " 'username'='%s'," - + " 'password'='%s'," - + " 'compatible-mode'='mysql'," - + " 'connection-pool-properties'='druid.initialSize=4;druid.maxActive=20;'" - + ");", - obServer.getJdbcUrl(), - obServer.getDatabaseName(), - tableName, - obServer.getUsername(), - obServer.getPassword())); + "CREATE TEMPORARY TABLE target (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") with (" + + " 'connector'='oceanbase'," + + getOptionsString() + + ");"); tEnv.executeSql( "INSERT INTO target " @@ -81,6 +162,10 @@ public void testSink() throws Exception { + " (109, 'spare tire', '24 inch spare tire', 22.2);") .await(); + validateSinkResults(); + } + + private void validateSinkResults() throws SQLException { List expected = Arrays.asList( "101,scooter,Small 2-wheel scooter,3.1400000000", @@ -93,20 +178,16 @@ public void testSink() throws Exception { "108,jacket,water resistent black wind breaker,0.1000000000", "109,spare tire,24 inch spare tire,22.2000000000"); - List actual = queryTable(tableName); + List actual = queryTable(); assertEqualsInAnyOrder(expected, actual); } - public List queryTable(String tableName) throws SQLException { + public List queryTable() throws SQLException { List result = new ArrayList<>(); - try (Connection connection = - DriverManager.getConnection( - obServer.getJdbcUrl(), - obServer.getUsername(), - obServer.getPassword()); + try (Connection connection = getConnection(); Statement statement = connection.createStatement()) { - ResultSet rs = statement.executeQuery("SELECT * FROM " + tableName); + ResultSet rs = statement.executeQuery("SELECT * FROM " + getTestTable()); ResultSetMetaData metaData = rs.getMetaData(); while (rs.next()) { @@ -122,4 +203,8 @@ public List queryTable(String tableName) throws SQLException { } return result; } + + protected Connection getConnection() throws SQLException { + return DriverManager.getConnection(getUrl(), getUsername(), getPassword()); + } } diff --git a/flink-connector-oceanbase/src/test/resources/sql/init.sql b/flink-connector-oceanbase/src/test/resources/sql/init.sql index 04dc4e71..53ee6c31 100644 --- a/flink-connector-oceanbase/src/test/resources/sql/init.sql +++ b/flink-connector-oceanbase/src/test/resources/sql/init.sql @@ -11,7 +11,8 @@ -- specific language governing permissions and limitations -- under the License. -use test; +CREATE DATABASE IF NOT EXISTS test; +USE test; CREATE TABLE products (