Skip to content

Commit

Permalink
add datastream demo
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 2, 2024
1 parent b7f3383 commit 3bbba60
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ public class OBKVHBaseConnectorITCase extends OceanBaseTestBase {
public static final String CONFIG_URL =
"http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=" + CLUSTER_NAME;

@Override
protected String getUrl() {
return String.format("%s&database=%s", CONFIG_URL, OB_SERVER.getDatabaseName());
}

@Override
protected String getUsername() {
return OB_SERVER.getUsername() + "#" + CLUSTER_NAME;
}

@Override
protected String getTestTable() {
return "htable";
}

@Test
public void testSink() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -52,13 +67,9 @@ public void testSink() throws Exception {
StreamTableEnvironment.create(
execEnv, EnvironmentSettings.newInstance().inStreamingMode().build());

String hTable = "htable";
String family1 = "family1";
String family2 = "family2";

String url = String.format("%s&database=%s", CONFIG_URL, obServer.getDatabaseName());
String fullUsername = obServer.getUsername() + "#" + CLUSTER_NAME;

tEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE target ("
Expand All @@ -68,21 +79,14 @@ public void testSink() throws Exception {
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'url'='%s',"
+ " 'table-name'='%s',"
+ " 'username'='%s',"
+ " 'password'='%s',"
+ " 'sys.username'='%s',"
+ " 'sys.password'='%s'"
+ " 'sys.password'='%s',"
+ getCommonOptionsString()
+ ");",
family1,
family2,
url,
hTable,
fullUsername,
obServer.getPassword(),
obServer.getSysUsername(),
obServer.getSysPassword()));
OB_SERVER.getSysUsername(),
OB_SERVER.getSysPassword()));

tEnv.executeSql(
String.format(
Expand All @@ -94,13 +98,13 @@ public void testSink() throws Exception {
.await();

Configuration conf = new Configuration();
conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, url);
conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, fullUsername);
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, obServer.getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, obServer.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, obServer.getSysPassword());
conf.set(OHConstants.HBASE_OCEANBASE_PARAM_URL, getUrl());
conf.set(OHConstants.HBASE_OCEANBASE_FULL_USER_NAME, getUsername());
conf.set(OHConstants.HBASE_OCEANBASE_PASSWORD, getPassword());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_USER_NAME, OB_SERVER.getSysUsername());
conf.set(OHConstants.HBASE_OCEANBASE_SYS_PASSWORD, OB_SERVER.getSysPassword());

OHTableClient client = new OHTableClient(hTable, conf);
OHTableClient client = new OHTableClient(getTestTable(), conf);
client.init();

Function<KeyValue, String> valueFunc =
Expand Down
3 changes: 2 additions & 1 deletion flink-connector-obkv-hbase/src/test/resources/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-- specific language governing permissions and limitations
-- under the License.

use test;
CREATE DATABASE IF NOT EXISTS test;
USE test;

CREATE TABLE `htable$family1`
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,36 @@

import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class OceanBaseTestBase extends TestLogger {
public abstract class OceanBaseTestBase extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(OceanBaseTestBase.class);

protected OceanBaseContainer obServer;
protected String imageTag = "4.2.1_bp2";

@Before
public void before() {
obServer =
new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + imageTag)
.withNetworkMode("host")
.withSysPassword("123456")
.withCopyFileToContainer(
MountableFile.forClasspathResource("sql/init.sql"),
"/root/boot/init.d/init.sql")
.withLogConsumer(new Slf4jLogConsumer(LOG));

Startables.deepStart(obServer).join();
}
public static final String IMAGE_TAG = "4.2.1_bp2";

@After
public void after() {
if (obServer != null) {
obServer.close();
}
}
@ClassRule
public static final OceanBaseContainer OB_SERVER =
new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME + ":" + IMAGE_TAG)
.withNetworkMode("host")
.withSysPassword("123456")
.withCopyFileToContainer(
MountableFile.forClasspathResource("sql/init.sql"),
"/root/boot/init.d/init.sql")
.withLogConsumer(new Slf4jLogConsumer(LOG));

public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
Expand All @@ -68,6 +56,35 @@ public static void assertEqualsInAnyOrder(List<String> expected, List<String> ac
actual.stream().sorted().collect(Collectors.toList()));
}

protected String getUrl() {
return OB_SERVER.getJdbcUrl();
}

protected abstract String getTestTable();

protected String getUsername() {
return OB_SERVER.getUsername();
}

protected String getPassword() {
return OB_SERVER.getPassword();
}

protected Map<String, String> getCommonOptions() {
Map<String, String> options = new HashMap<>();
options.put("url", getUrl());
options.put("table-name", getTestTable());
options.put("username", getUsername());
options.put("password", getPassword());
return options;
}

protected String getCommonOptionsString() {
return getCommonOptions().entrySet().stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(","));
}

public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
Expand Down
Loading

0 comments on commit 3bbba60

Please sign in to comment.