Skip to content

Commit

Permalink
add ObParams
Browse files Browse the repository at this point in the history
  • Loading branch information
HexyinUESTC committed Aug 30, 2024
1 parent 9507c7d commit ccc6123
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 71 deletions.
106 changes: 58 additions & 48 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
import com.alipay.oceanbase.hbase.util.*;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
import com.alipay.oceanbase.rpc.table.ObKVParams;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.property.Property;
Expand Down Expand Up @@ -166,6 +169,8 @@ public class OHTable implements HTableInterface {
*/
private final Configuration configuration;

private int scannerTimeout;

/**
* Creates an object to access a HBase table.
* Shares oceanbase table obTableClient and other resources with other OHTable instances
Expand All @@ -189,6 +194,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
HBaseConfiguration.getInt(configuration, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
this.obTableClient = ObTableClientManager
.getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
Expand Down Expand Up @@ -388,7 +396,7 @@ public HTableDescriptor getTableDescriptor() {
public boolean exists(Get get) throws IOException {
get.setCheckExistenceOnly(true);
Result r = get(get);
return !r.isEmpty();
return r.getExists();
}

@Override
Expand All @@ -401,8 +409,8 @@ public boolean[] existsAll(List<Get> list) throws IOException {
}
Result[] r = get(list);
boolean[] ret = new boolean[r.length];
for (int i = 0; i < r.length; ++i){
ret[i] = !r[i].isEmpty();
for (int i = 0; i < list.size(); ++i) {
ret[i] = exists(list.get(i));
}
return ret;
}
Expand Down Expand Up @@ -497,11 +505,16 @@ public Result call() throws IOException {
get.getMaxVersions(), null);
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
true);
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString));
obTableQuery.setObKVParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString));

clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
if (get.isCheckExistenceOnly() ) {
Result result = new Result();
result.setExists(clientQueryStreamResult.getCacheRows().size() != 0);
return result;
}
getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family);
} else {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
Expand All @@ -512,19 +525,25 @@ public Result call() throws IOException {

obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true);
// if (get.isClosestRowBefore()) {
// obTableQuery = buildObTableQuery(filter, null, false,
// get.getRow(), true, 1);
// obTableQuery.setScanOrder(ObScanOrder.Reverse);
// } else {
// obTableQuery = buildObTableQuery(filter, get.getRow(), true,
// get.getRow(), true, -1);
// }

if (get.isClosestRowBefore()) {
obTableQuery = buildObTableQuery(filter, null, false,
get.getRow(), true);
obTableQuery.setScanOrder(ObScanOrder.Reverse);
} else {
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true);
}

obTableQuery.setObKVParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
if (get.isCheckExistenceOnly() ) {
Result result = new Result();
result.setExists(clientQueryStreamResult.getCacheRows().size() != 0);
return result;
}
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
family);
}
Expand Down Expand Up @@ -583,24 +602,10 @@ public ResultScanner call() throws IOException {
if (scan.getFamilyMap().keySet().isEmpty()) {
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), null);
// obTableQuery = buildObTableQuery(filter, scan);
//
// request = buildObTableQueryAsyncRequest(obTableQuery,
// getTargetTableName(tableNameString));
// if (scan.isReversed()) {
// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
// scan.getStartRow(), true, scan.getBatch());
// } else {
// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
// scan.getStopRow(), false, scan.getBatch());
// }
// if (scan.isReversed()) { // reverse scan 时设置为逆序
// obTableQuery.setScanOrder(ObScanOrder.Reverse);
// }
// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
// request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString));
obTableQuery = buildObTableQuery(filter, scan);

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
.execute(request);
return new ClientStreamScanner(clientQueryAsyncStreamResult,
Expand All @@ -612,22 +617,6 @@ public ResultScanner call() throws IOException {
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), entry.getValue());
obTableQuery = buildObTableQuery(filter, scan);
// if (scan.isReversed()) {
// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
// scan.getStartRow(), true, scan.getBatch());
// } else {
// obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
// scan.getStopRow(), false, scan.getBatch());
// }
// if (scan.isReversed()) { // reverse scan 时设置为逆序
// obTableQuery.setScanOrder(ObScanOrder.Reverse);
// }
//
// // no support set maxResultSize.
// obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
// HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
// HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
Expand All @@ -649,6 +638,23 @@ public ResultScanner call() throws IOException {
return executeServerCallable(serverCallable);
}

public ObKVParams buildObHBaseParams(Scan scan, Get get) {
ObKVParams obKVParams = new ObKVParams();
ObHBaseParams obHBaseParams = new ObHBaseParams();
if (scan != null) {
obHBaseParams.setCaching(scan.getCaching());
obHBaseParams.setCallTimeout(scannerTimeout);
obHBaseParams.setCacheBlock(scan.isGetScan());
obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults());
}
if (get != null) {
obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly());
obHBaseParams.setCacheBlock(get.getCacheBlocks());
}
obKVParams.setObParamsBase(obHBaseParams);
return obKVParams;
}

@Override
public ResultScanner getScanner(final byte[] family) throws IOException {
Scan scan = new Scan();
Expand Down Expand Up @@ -1452,6 +1458,10 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
if (scan.getBatch() > 0) {
obTableQuery.setBatchSize(scan.getBatch());
}
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : configuration.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
obTableQuery.setObKVParams(buildObHBaseParams(scan, null));
return obTableQuery;
}

Expand Down
22 changes: 0 additions & 22 deletions src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,28 +229,6 @@ public int getRpcTimeout() {
return ohTable.getRpcTimeout();
}

@Override
public void setOperationTimeout(int operationTimeout) {
checkStatus();
ohTable.setOperationTimeout(operationTimeout);
}

@Override
public int getOperationTimeout() {
checkStatus();
return ohTable.getOperationTimeout();
}

@Override
public void setRpcTimeout(int rpcTimeout) {
conf.set(Property.RPC_EXECUTE_TIMEOUT.getKey(), String.valueOf(rpcTimeout));
}

@Override
public int getRpcTimeout() {
return Integer.parseInt(conf.get(Property.RPC_EXECUTE_TIMEOUT.getKey()));
}

@Override
public byte[] getTableName() {
return tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public Result next() throws IOException {
List<KeyValue> keyValues = new ArrayList<KeyValue>();
keyValues.add(startKeyValue);

while (streamNext = streamResult.next()) {
int size = streamResult.getCacheRows().size();
int current = 0;
while (streamNext = streamResult.next() && current < size){
List<ObObj> row = streamResult.getRow();
if (this.isTableGroup) {
// split family and qualifier
Expand All @@ -126,6 +128,7 @@ public Result next() throws IOException {
if (Arrays.equals(sk, k)) {
// when rowKey is equal to the previous rowKey ,merge the result into the same result
keyValues.add(new KeyValue(k, family, q, t, v));
current++;
} else {
break;
}
Expand Down
105 changes: 105 additions & 0 deletions src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.alipay.oceanbase.hbase;

import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;

import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -50,6 +52,109 @@ public abstract class HTableTestBase {

protected Table hTable;

@Test
public void testScanWithObParams() throws Exception {
String key1 = "scanKey1x";
String key2 = "scanKey2x";
String key3 = "scanKey3x";
String key4 = "scanKey4x";
String column1 = "column1";
String column2 = "column2";
String value1 = "value1";
String value2 = "value2";
String family = "family1";

// delete previous data
Delete deleteKey1Family = new Delete(toBytes(key1));
deleteKey1Family.deleteFamily(toBytes(family));
Delete deleteKey2Family = new Delete(toBytes(key2));
deleteKey2Family.deleteFamily(toBytes(family));
Delete deleteKey3Family = new Delete(toBytes(key3));
deleteKey3Family.deleteFamily(toBytes(family));
Delete deleteKey4Family = new Delete(toBytes(key4));
deleteKey4Family.deleteFamily(toBytes(family));

hTable.delete(deleteKey1Family);
hTable.delete(deleteKey2Family);
hTable.delete(deleteKey3Family);
hTable.delete(deleteKey4Family);

Put putKey1Column1Value1 = new Put(toBytes(key1));
putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

Put putKey1Column1Value2 = new Put(toBytes(key1));
putKey1Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));

Put putKey1Column2Value2 = new Put(toBytes(key1));
putKey1Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));

Put putKey1Column2Value1 = new Put(toBytes(key1));
putKey1Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

Put putKey2Column1Value1 = new Put(toBytes(key2));
putKey2Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

Put putKey2Column1Value2 = new Put(toBytes(key2));
putKey2Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));

Put putKey2Column2Value2 = new Put(toBytes(key2));
putKey2Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));

Put putKey2Column2Value1 = new Put(toBytes(key2));
putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

Put putKey3Column1Value1 = new Put(toBytes(key3));
putKey3Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

Put putKey4Column1Value1 = new Put(toBytes(key4));
putKey4Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

tryPut(hTable, putKey1Column1Value1);
tryPut(hTable, putKey1Column1Value2);
tryPut(hTable, putKey1Column1Value1); // 2 * putKey1Column1Value1
tryPut(hTable, putKey1Column2Value1);
tryPut(hTable, putKey1Column2Value2);
tryPut(hTable, putKey1Column2Value1); // 2 * putKey1Column2Value1
tryPut(hTable, putKey1Column2Value2); // 2 * putKey1Column2Value2
tryPut(hTable, putKey2Column2Value1);
tryPut(hTable, putKey2Column2Value2);
tryPut(hTable, putKey3Column1Value1);
tryPut(hTable, putKey4Column1Value1);

Scan scan;

scan = new Scan();
scan.addFamily(family.getBytes());
scan.setStartRow("scanKey1x".getBytes());
scan.setStopRow("scanKey5x".getBytes());
scan.setMaxVersions(10);
scan.setCaching(1);
scan.setBatch(3);
ResultScanner scanner = hTable.getScanner(scan);
Result result = scanner.next();
Assert.assertEquals(3, result.size());
scanner.close();

scan.setMaxResultSize(10);
scan.setBatch(-1);
ResultScanner scanner1 = hTable.getScanner(scan);
result = scanner1.next();
Assert.assertEquals(7, result.size()); // 返回第一行全部数据,因为不允许行内部分返回

scanner1.close();

scan.setAllowPartialResults(true);
ResultScanner scanner2 = hTable.getScanner(scan);
result = scanner2.next();
Assert.assertEquals(1, result.size());


hTable.delete(deleteKey1Family);
hTable.delete(deleteKey2Family);
hTable.delete(deleteKey3Family);
hTable.delete(deleteKey4Family);
}

@Test
public void testTableGroup() throws IOError, IOException {
/*
Expand Down

0 comments on commit ccc6123

Please sign in to comment.