From 3bbba6071e101b6fee9d749f6d4505b5b8f1214c Mon Sep 17 00:00:00 2001 From: He Wang Date: Tue, 2 Jan 2024 19:40:35 +0800 Subject: [PATCH] add datastream demo --- .../flink/OBKVHBaseConnectorITCase.java | 46 +++--- .../src/test/resources/sql/init.sql | 3 +- .../connector/flink/OceanBaseTestBase.java | 69 +++++--- .../flink/OceanBaseConnectorITCase.java | 156 ++++++++++++++---- .../src/test/resources/sql/init.sql | 3 +- 5 files changed, 193 insertions(+), 84 deletions(-) diff --git a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java index 66adc6cf..c009d469 100644 --- a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java +++ b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java @@ -44,6 +44,21 @@ public class OBKVHBaseConnectorITCase extends OceanBaseTestBase { public static final String CONFIG_URL = "http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=" + CLUSTER_NAME; + @Override + protected String getUrl() { + return String.format("%s&database=%s", CONFIG_URL, OB_SERVER.getDatabaseName()); + } + + @Override + protected String getUsername() { + return OB_SERVER.getUsername() + "#" + CLUSTER_NAME; + } + + @Override + protected String getTestTable() { + return "htable"; + } + @Test public void testSink() throws Exception { StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -52,13 +67,9 @@ public void testSink() throws Exception { StreamTableEnvironment.create( execEnv, EnvironmentSettings.newInstance().inStreamingMode().build()); - String hTable = "htable"; String family1 = "family1"; String family2 = "family2"; - String url = String.format("%s&database=%s", CONFIG_URL, obServer.getDatabaseName()); - String fullUsername = obServer.getUsername() + "#" + CLUSTER_NAME; - tEnv.executeSql( String.format( "CREATE TEMPORARY TABLE target (" @@ -68,21 +79,14 @@ public void testSink() throws Exception { + " PRIMARY KEY (rowkey) NOT ENFORCED" + ") with (" + " 'connector'='obkv-hbase'," - + " 'url'='%s'," - + " 'table-name'='%s'," - + " 'username'='%s'," - + " 'password'='%s'," + " 'sys.username'='%s'," - + " 'sys.password'='%s'" + + " 'sys.password'='%s'," + + getCommonOptionsString() + ");", family1, family2, - url, - hTable, - fullUsername, - obServer.getPassword(), - obServer.getSysUsername(), - obServer.getSysPassword())); + OB_SERVER.getSysUsername(), + OB_SERVER.getSysPassword())); tEnv.executeSql( String.format( @@ -94,13 +98,13 @@ public void testSink() throws Exception { .await(); Configuration conf = new Configuration(); - conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, url); - conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, fullUsername); - conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, obServer.getPassword()); - conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, obServer.getSysUsername()); - conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, obServer.getSysPassword()); + conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, getUrl()); + conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, getUsername()); + conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, getPassword()); + conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, OB_SERVER.getSysUsername()); + conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, OB_SERVER.getSysPassword()); - OHTableClient client = new OHTableClient(hTable, conf); + OHTableClient client = new OHTableClient(getTestTable(), conf); client.init(); Function valueFunc = diff --git a/flink-connector-obkv-hbase/src/test/resources/sql/init.sql b/flink-connector-obkv-hbase/src/test/resources/sql/init.sql index f35cf3c4..e0b81a2a 100644 --- a/flink-connector-obkv-hbase/src/test/resources/sql/init.sql +++ b/flink-connector-obkv-hbase/src/test/resources/sql/init.sql @@ -11,7 +11,8 @@ -- specific language governing permissions and limitations -- under the License. -use test; +CREATE DATABASE IF NOT EXISTS test; +USE test; CREATE TABLE `htable$family1` ( diff --git a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java index 68f8c271..f92d30e9 100644 --- a/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java +++ b/flink-connector-oceanbase-base/src/test/java/com/oceanbase/connector/flink/OceanBaseTestBase.java @@ -18,48 +18,36 @@ import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.Before; +import org.junit.ClassRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.MountableFile; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class OceanBaseTestBase extends TestLogger { +public abstract class OceanBaseTestBase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestBase.class); - protected OceanBaseContainer obServer; - protected String imageTag = "4.2.1_bp2"; - - @Before - public void before() { - obServer = - new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + imageTag) - .withNetworkMode("host") - .withSysPassword("123456") - .withCopyFileToContainer( - MountableFile.forClasspathResource("sql/init.sql"), - "/root/boot/init.d/init.sql") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - Startables.deepStart(obServer).join(); - } + public static final String IMAGE_TAG = "4.2.1_bp2"; - @After - public void after() { - if (obServer != null) { - obServer.close(); - } - } + @ClassRule + public static final OceanBaseContainer OB_SERVER = + new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + IMAGE_TAG) + .withNetworkMode("host") + .withSysPassword("123456") + .withCopyFileToContainer( + MountableFile.forClasspathResource("sql/init.sql"), + "/root/boot/init.d/init.sql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); public static void assertEqualsInAnyOrder(List expected, List actual) { assertTrue(expected != null && actual != null); @@ -68,6 +56,35 @@ public static void assertEqualsInAnyOrder(List expected, List ac actual.stream().sorted().collect(Collectors.toList())); } + protected String getUrl() { + return OB_SERVER.getJdbcUrl(); + } + + protected abstract String getTestTable(); + + protected String getUsername() { + return OB_SERVER.getUsername(); + } + + protected String getPassword() { + return OB_SERVER.getPassword(); + } + + protected Map getCommonOptions() { + Map options = new HashMap<>(); + options.put("url", getUrl()); + options.put("table-name", getTestTable()); + options.put("username", getUsername()); + options.put("password", getPassword()); + return options; + } + + protected String getCommonOptionsString() { + return getCommonOptions().entrySet().stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",")); + } + public static void assertEqualsInOrder(List expected, List actual) { assertTrue(expected != null && actual != null); assertEquals(expected.size(), actual.size()); diff --git a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java index 2ced2669..6b01a548 100644 --- a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java +++ b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java @@ -16,12 +16,28 @@ package com.oceanbase.connector.flink; +import com.oceanbase.connector.flink.connection.OceanBaseConnectionPool; +import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider; +import com.oceanbase.connector.flink.connection.OceanBaseTableSchema; +import com.oceanbase.connector.flink.sink.OceanBaseSink; +import com.oceanbase.connector.flink.sink.OceanBaseStatementExecutor; + import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.junit.After; import org.junit.Test; +import java.math.BigDecimal; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -30,43 +46,109 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; public class OceanBaseConnectorITCase extends OceanBaseTestBase { + @Override + protected String getTestTable() { + return "products"; + } + + @Override + protected Map getCommonOptions() { + Map options = super.getCommonOptions(); + options.put("compatible-mode", "mysql"); + options.put("schema-name", "test"); + options.put("connection-pool-properties", "druid.initialSize=4;druid.maxActive=20;"); + return options; + } + + @After + public void after() throws Exception { + try (Connection connection = getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("DELETE FROM " + getTestTable()); + } + } + + @Test + public void testDataStreamSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ResolvedSchema physicalSchema = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("name", DataTypes.STRING().notNull()), + Column.physical("description", DataTypes.STRING().notNull()), + Column.physical("weight", DataTypes.DECIMAL(20, 10).notNull())), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + + List dataSet = + Arrays.asList( + rowData(101, "scooter", "Small 2-wheel scooter", 3.14), + rowData(102, "car battery", "12V car battery", 8.1), + rowData( + 103, + "12-pack drill bits", + "12-pack of drill bits with sizes ranging from #40 to #3", + 0.8), + rowData(104, "hammer", "12oz carpenter's hammer", 0.75), + rowData(105, "hammer", "14oz carpenter's hammer", 0.875), + rowData(106, "hammer", "16oz carpenter's hammer", 1.0), + rowData(107, "rocks", "box of assorted rocks", 5.3), + rowData(108, "jacket", "water resistent black wind breaker", 0.1), + rowData(109, "spare tire", "24 inch spare tire", 22.2)); + + OceanBaseConnectorOptions connectorOptions = + new OceanBaseConnectorOptions(getCommonOptions()); + + OceanBaseConnectionProvider connectionProvider = + new OceanBaseConnectionPool(connectorOptions.getConnectionOptions()); + OceanBaseTableSchema tableSchema = new OceanBaseTableSchema(physicalSchema); + OceanBaseStatementExecutor statementExecutor = + new OceanBaseStatementExecutor( + connectorOptions.getStatementOptions(), tableSchema, connectionProvider); + OceanBaseSink sink = + new OceanBaseSink(connectorOptions.getWriterOptions(), statementExecutor); + env.fromCollection(dataSet).sinkTo(sink); + env.execute(); + + validateSinkResults(); + } + + private RowData rowData(int id, String name, String description, double weight) { + return GenericRowData.of( + id, + StringData.fromString(name), + StringData.fromString(description), + DecimalData.fromBigDecimal(new BigDecimal(weight), 20, 10)); + } + @Test public void testSink() throws Exception { - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.setParallelism(1); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create( - execEnv, EnvironmentSettings.newInstance().inStreamingMode().build()); - - String tableName = "products"; + env, EnvironmentSettings.newInstance().inStreamingMode().build()); tEnv.executeSql( - String.format( - "CREATE TEMPORARY TABLE target (" - + " `id` INT NOT NULL," - + " name STRING," - + " description STRING," - + " weight DECIMAL(20, 10)," - + " PRIMARY KEY (`id`) NOT ENFORCED" - + ") with (" - + " 'connector'='oceanbase'," - + " 'url'='%s'," - + " 'schema-name'='%s'," - + " 'table-name'='%s'," - + " 'username'='%s'," - + " 'password'='%s'," - + " 'compatible-mode'='mysql'," - + " 'connection-pool-properties'='druid.initialSize=4;druid.maxActive=20;'" - + ");", - obServer.getJdbcUrl(), - obServer.getDatabaseName(), - tableName, - obServer.getUsername(), - obServer.getPassword())); + "CREATE TEMPORARY TABLE target (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + + ") with (" + + " 'connector'='oceanbase'," + + getCommonOptionsString() + + ");"); tEnv.executeSql( "INSERT INTO target " @@ -81,6 +163,10 @@ public void testSink() throws Exception { + " (109, 'spare tire', '24 inch spare tire', 22.2);") .await(); + validateSinkResults(); + } + + private void validateSinkResults() throws SQLException { List expected = Arrays.asList( "101,scooter,Small 2-wheel scooter,3.1400000000", @@ -93,20 +179,16 @@ public void testSink() throws Exception { "108,jacket,water resistent black wind breaker,0.1000000000", "109,spare tire,24 inch spare tire,22.2000000000"); - List actual = queryTable(tableName); + List actual = queryTable(); assertEqualsInAnyOrder(expected, actual); } - public List queryTable(String tableName) throws SQLException { + public List queryTable() throws SQLException { List result = new ArrayList<>(); - try (Connection connection = - DriverManager.getConnection( - obServer.getJdbcUrl(), - obServer.getUsername(), - obServer.getPassword()); + try (Connection connection = getConnection(); Statement statement = connection.createStatement()) { - ResultSet rs = statement.executeQuery("SELECT * FROM " + tableName); + ResultSet rs = statement.executeQuery("SELECT * FROM " + getTestTable()); ResultSetMetaData metaData = rs.getMetaData(); while (rs.next()) { @@ -122,4 +204,8 @@ public List queryTable(String tableName) throws SQLException { } return result; } + + protected Connection getConnection() throws SQLException { + return DriverManager.getConnection(getUrl(), getUsername(), getPassword()); + } } diff --git a/flink-connector-oceanbase/src/test/resources/sql/init.sql b/flink-connector-oceanbase/src/test/resources/sql/init.sql index 04dc4e71..53ee6c31 100644 --- a/flink-connector-oceanbase/src/test/resources/sql/init.sql +++ b/flink-connector-oceanbase/src/test/resources/sql/init.sql @@ -11,7 +11,8 @@ -- specific language governing permissions and limitations -- under the License. -use test; +CREATE DATABASE IF NOT EXISTS test; +USE test; CREATE TABLE products (