Skip to content

Commit

Permalink
test: refactor obkv-hbase test cases (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe authored Sep 19, 2024
1 parent b9600d7 commit 52c8898
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 220 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@

import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class OBKVHBaseConnectorITCase extends OceanBaseMySQLTestBase {

@Override
public Map<String, String> getOptions() {
Map<String, String> options = new HashMap<>();
options.put("url", getSysParameter("obconfig_url"));
options.put("sys.username", getSysUsername());
options.put("sys.password", getSysPassword());
options.put("username", getUsername() + "#" + getClusterName());
options.put("password", getPassword());
options.put("schema-name", getSchemaName());
Expand All @@ -45,6 +44,30 @@ public Map<String, String> getOptions() {

@Test
public void testSink() throws Exception {
Map<String, String> options = getOptions();
options.put("url", getSysParameter("obconfig_url"));
options.put("sys.username", getSysUsername());
options.put("sys.password", getSysPassword());

testSinkToHTable(options);
}

@Test
public void testSinkWithODP() throws Exception {
createSysUser("proxyro", getSysPassword());
try (OceanBaseProxyContainer odpContainer =
createOdpContainer(getRsListForODP(), getSysPassword())) {
Startables.deepStart(Stream.of(odpContainer)).join();

Map<String, String> options = getOptions();
options.put("odp-mode", "true");
options.put("url", odpContainer.getHost() + ":" + odpContainer.getRpcPort());

testSinkToHTable(options);
}
}

private void testSinkToHTable(Map<String, String> options) throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(1);
StreamTableEnvironment tEnv =
Expand All @@ -62,7 +85,7 @@ public void testSink() throws Exception {
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'table-name'='htable',"
+ getOptionsString()
+ getOptionsString(options)
+ ");");

String insertSql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.oceanbase.OceanBaseCEContainer;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;

public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {
Expand All @@ -32,16 +35,13 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {

private static final int SQL_PORT = 2881;
private static final int RPC_PORT = 2882;
private static final int ODP_PORT = 2883;
private static final int ODP_RPC_PORT = 2885;
private static final int CONFIG_SERVER_PORT = 8080;

private static final String CLUSTER_NAME = "flink-oceanbase-ci";
private static final String TEST_TENANT = "flink";
private static final String SYS_PASSWORD = "123456";
private static final String TEST_PASSWORD = "654321";
private static final String APP_NAME = "odp_test";
protected GenericContainer<?> obproxy;

private static final Network NETWORK = Network.newNetwork();

@ClassRule
Expand All @@ -59,17 +59,28 @@ public abstract class OceanBaseMySQLTestBase extends OceanBaseTestBase {
.withStartupTimeout(Duration.ofMinutes(4))
.withLogConsumer(new Slf4jLogConsumer(LOG));

GenericContainer<?> initObProxyContainer(String rsList) {
return new GenericContainer<>("oceanbase/obproxy-ce:4.3.1.0-4")
@SuppressWarnings("resource")
public OceanBaseProxyContainer createOdpContainer(String rsList, String password) {
return new OceanBaseProxyContainer("4.3.1.0-4")
.withNetwork(NETWORK)
.withEnv("PROXYRO_PASSWORD", SYS_PASSWORD)
.withEnv("OB_CLUSTER", CLUSTER_NAME)
.withEnv("APP_NAME", APP_NAME)
.withEnv("RS_LIST", rsList)
.withExposedPorts(ODP_PORT, ODP_RPC_PORT)
.withClusterName(CLUSTER_NAME)
.withRsList(rsList)
.withPassword(password)
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

public String getRsListForODP() throws SQLException {
try (Connection connection = getSysJdbcConnection();
Statement statement = connection.createStatement()) {
String sql = "SELECT svr_ip, inner_port FROM oceanbase.__all_server;";
ResultSet rs = statement.executeQuery(sql);
if (rs.next()) {
return rs.getString("svr_ip") + ":" + rs.getString("inner_port");
}
throw new RuntimeException("Server ip and port not found");
}
}

@Override
public String getHost() {
return CONTAINER.getHost();
Expand Down Expand Up @@ -125,10 +136,4 @@ public String getUsername() {
public String getPassword() {
return CONTAINER.getPassword();
}

public String getODPUrl() {
String odpUrl = obproxy.getHost() + ":" + obproxy.getMappedPort(ODP_RPC_PORT);
LOG.info("odp url value is: {}", odpUrl);
return odpUrl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.connector.flink;

import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class OceanBaseProxyContainer extends GenericContainer<OceanBaseProxyContainer> {

private static final String IMAGE = "oceanbase/obproxy-ce";

private static final int SQL_PORT = 2883;
private static final int RPC_PORT = 2885;
private static final String APP_NAME = "flink_oceanbase_test";

private String clusterName = "obcluster";
private String rsList;
private String password;

public OceanBaseProxyContainer(String version) {
super(DockerImageName.parse(IMAGE + ":" + version));
addExposedPorts(SQL_PORT, RPC_PORT);
}

@Override
protected void configure() {
assert rsList != null && password != null;
addEnv("APP_NAME", APP_NAME);
addEnv("OB_CLUSTER", clusterName);
addEnv("RS_LIST", rsList);
addEnv("PROXYRO_PASSWORD", password);
}

public @NotNull Set<Integer> getLivenessCheckPortNumbers() {
return new HashSet<>(
Arrays.asList(this.getMappedPort(SQL_PORT), this.getMappedPort(RPC_PORT)));
}

public OceanBaseProxyContainer withClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
}

public OceanBaseProxyContainer withRsList(String rsList) {
this.rsList = rsList;
return this;
}

public OceanBaseProxyContainer withPassword(String password) {
this.password = password;
return this;
}

public int getSqlPort() {
return getMappedPort(SQL_PORT);
}

public int getRpcPort() {
return getMappedPort(RPC_PORT);
}
}
Loading

0 comments on commit 52c8898

Please sign in to comment.