Skip to content

Commit

Permalink
add datastream demo
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 2, 2024
1 parent b7f3383 commit 962b249
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public void testSink() throws Exception {
String family1 = "family1";
String family2 = "family2";

String url = String.format("%s&database=%s", CONFIG_URL, obServer.getDatabaseName());
String fullUsername = obServer.getUsername() + "#" + CLUSTER_NAME;
String url = String.format("%s&database=%s", CONFIG_URL, OB_SERVER.getDatabaseName());
String fullUsername = OB_SERVER.getUsername() + "#" + CLUSTER_NAME;

tEnv.executeSql(
String.format(
Expand All @@ -80,9 +80,9 @@ public void testSink() throws Exception {
url,
hTable,
fullUsername,
obServer.getPassword(),
obServer.getSysUsername(),
obServer.getSysPassword()));
OB_SERVER.getPassword(),
OB_SERVER.getSysUsername(),
OB_SERVER.getSysPassword()));

tEnv.executeSql(
String.format(
Expand All @@ -96,9 +96,9 @@ public void testSink() throws Exception {
Configuration conf = new Configuration();
conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, url);
conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, fullUsername);
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, obServer.getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, obServer.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, obServer.getSysPassword());
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, OB_SERVER.getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, OB_SERVER.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, OB_SERVER.getSysPassword());

OHTableClient client = new OHTableClient(hTable, conf);
client.init();
Expand Down
3 changes: 2 additions & 1 deletion flink-connector-obkv-hbase/src/test/resources/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-- specific language governing permissions and limitations
-- under the License.

use test;
CREATE DATABASE IF NOT EXISTS test;
USE test;

CREATE TABLE `htable$family1`
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;

import java.util.List;
Expand All @@ -37,29 +35,17 @@ public class OceanBaseTestBase extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestBase.class);

protected OceanBaseContainer obServer;
protected String imageTag = "4.2.1_bp2";

@Before
public void before() {
obServer =
new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + imageTag)
.withNetworkMode("host")
.withSysPassword("123456")
.withCopyFileToContainer(
MountableFile.forClasspathResource("sql/init.sql"),
"/root/boot/init.d/init.sql")
.withLogConsumer(new Slf4jLogConsumer(LOG));

Startables.deepStart(obServer).join();
}

@After
public void after() {
if (obServer != null) {
obServer.close();
}
}
public static final String IMAGE_TAG = "4.2.1_bp2";

@ClassRule
public static final OceanBaseContainer OB_SERVER =
new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + IMAGE_TAG)
.withNetworkMode("host")
.withSysPassword("123456")
.withCopyFileToContainer(
MountableFile.forClasspathResource("sql/init.sql"),
"/root/boot/init.d/init.sql")
.withLogConsumer(new Slf4jLogConsumer(LOG));

public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.connection.OceanBaseConnectionPool;
import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider;
import com.oceanbase.connector.flink.connection.OceanBaseTableSchema;
import com.oceanbase.connector.flink.sink.OceanBaseSink;
import com.oceanbase.connector.flink.sink.OceanBaseStatementExecutor;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

import org.junit.After;
import org.junit.Test;

import java.sql.Connection;
Expand All @@ -30,19 +43,83 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class OceanBaseConnectorITCase extends OceanBaseTestBase {

private final String tableName = "products";

@After
public void after() throws Exception {
try (Connection connection = getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM " + tableName);
}
}

@Test
public void testDataStreamSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

ResolvedSchema physicalSchema =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
Column.physical("name", DataTypes.STRING().notNull()),
Column.physical("description", DataTypes.STRING().notNull()),
Column.physical("weight", DataTypes.DECIMAL(20, 10).notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
List<RowData> data =
Arrays.asList(
GenericRowData.of(101, "scooter", "Small 2-wheel scooter", 3.14),
GenericRowData.of(102, "car battery", "12V car battery", 8.1),
GenericRowData.of(
103,
"12-pack drill bits",
"12-pack of drill bits with sizes ranging from #40 to #3",
0.8),
GenericRowData.of(104, "hammer", "12oz carpenter's hammer", 0.75),
GenericRowData.of(105, "hammer", "14oz carpenter's hammer", 0.875),
GenericRowData.of(106, "hammer", "16oz carpenter's hammer", 1.0),
GenericRowData.of(107, "rocks", "box of assorted rocks", 5.3),
GenericRowData.of(108, "jacket", "water resistent black wind breaker", 0.1),
GenericRowData.of(109, "spare tire", "24 inch spare tire", 22.2));

Map<String, String> options = new HashMap<>();
options.put("url", OB_SERVER.getJdbcUrl());
options.put("schema-name", OB_SERVER.getDatabaseName());
options.put("username", OB_SERVER.getUsername());
options.put("table-name", tableName);
options.put("password", OB_SERVER.getPassword());
options.put("compatible-mode", "mysql");
options.put("connection-pool-properties", "druid.initialSize=4;druid.maxActive=20;");
OceanBaseConnectorOptions connectorOptions = new OceanBaseConnectorOptions(options);

OceanBaseConnectionProvider connectionProvider =
new OceanBaseConnectionPool(connectorOptions.getConnectionOptions());
OceanBaseTableSchema tableSchema = new OceanBaseTableSchema(physicalSchema);
OceanBaseStatementExecutor statementExecutor =
new OceanBaseStatementExecutor(
connectorOptions.getStatementOptions(), tableSchema, connectionProvider);
OceanBaseSink sink =
new OceanBaseSink(connectorOptions.getWriterOptions(), statementExecutor);
env.fromCollection(data).sinkTo(sink);

validateSinkResults();
}

@Test
public void testSink() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
execEnv, EnvironmentSettings.newInstance().inStreamingMode().build());

String tableName = "products";
env, EnvironmentSettings.newInstance().inStreamingMode().build());

tEnv.executeSql(
String.format(
Expand All @@ -62,11 +139,11 @@ public void testSink() throws Exception {
+ " 'compatible-mode'='mysql',"
+ " 'connection-pool-properties'='druid.initialSize=4;druid.maxActive=20;'"
+ ");",
obServer.getJdbcUrl(),
obServer.getDatabaseName(),
OB_SERVER.getJdbcUrl(),
OB_SERVER.getDatabaseName(),
tableName,
obServer.getUsername(),
obServer.getPassword()));
OB_SERVER.getUsername(),
OB_SERVER.getPassword()));

tEnv.executeSql(
"INSERT INTO target "
Expand All @@ -81,6 +158,10 @@ public void testSink() throws Exception {
+ " (109, 'spare tire', '24 inch spare tire', 22.2);")
.await();

validateSinkResults();
}

private void validateSinkResults() throws SQLException {
List<String> expected =
Arrays.asList(
"101,scooter,Small 2-wheel scooter,3.1400000000",
Expand All @@ -93,18 +174,14 @@ public void testSink() throws Exception {
"108,jacket,water resistent black wind breaker,0.1000000000",
"109,spare tire,24 inch spare tire,22.2000000000");

List<String> actual = queryTable(tableName);
List<String> actual = queryTable();

assertEqualsInAnyOrder(expected, actual);
}

public List<String> queryTable(String tableName) throws SQLException {
public List<String> queryTable() throws SQLException {
List<String> result = new ArrayList<>();
try (Connection connection =
DriverManager.getConnection(
obServer.getJdbcUrl(),
obServer.getUsername(),
obServer.getPassword());
try (Connection connection = getConnection();
Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM " + tableName);
ResultSetMetaData metaData = rs.getMetaData();
Expand All @@ -122,4 +199,9 @@ public List<String> queryTable(String tableName) throws SQLException {
}
return result;
}

protected Connection getConnection() throws SQLException {
return DriverManager.getConnection(
OB_SERVER.getJdbcUrl(), OB_SERVER.getUsername(), OB_SERVER.getPassword());
}
}
3 changes: 2 additions & 1 deletion flink-connector-oceanbase/src/test/resources/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-- specific language governing permissions and limitations
-- under the License.

use test;
CREATE DATABASE IF NOT EXISTS test;
USE test;

CREATE TABLE products
(
Expand Down

0 comments on commit 962b249

Please sign in to comment.