Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBase 1.x compatible #47

Open
wants to merge 2 commits into
base: HBase_1.x_compatible
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 76 additions & 15 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
import com.alipay.oceanbase.hbase.util.*;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
import com.alipay.oceanbase.rpc.table.ObKVParams;
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.property.Property;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
Expand Down Expand Up @@ -165,6 +169,8 @@ public class OHTable implements HTableInterface {
*/
private final Configuration configuration;

private int scannerTimeout;

/**
* Creates an object to access a HBase table.
* Shares oceanbase table obTableClient and other resources with other OHTable instances
Expand All @@ -188,6 +194,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
HBaseConfiguration.getInt(configuration, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
this.obTableClient = ObTableClientManager
.getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
Expand Down Expand Up @@ -385,18 +394,34 @@ public HTableDescriptor getTableDescriptor() {
*/
@Override
public boolean exists(Get get) throws IOException {
get.setCheckExistenceOnly(true);
Result r = get(get);
return !r.isEmpty();
return r.getExists();
}

@Override
public boolean[] existsAll(List<Get> list) throws IOException {
throw new FeatureNotSupportedException("not supported yet.");
if (list.isEmpty()) {
return new boolean[]{};
}
if (list.size() == 1) {
return new boolean[]{exists(list.get(0))};
}
Result[] r = get(list);
boolean[] ret = new boolean[r.length];
for (int i = 0; i < list.size(); ++i) {
ret[i] = exists(list.get(i));
}
return ret;
}

@Override
public Boolean[] exists(List<Get> gets) throws IOException {
throw new FeatureNotSupportedException("not supported yet'");
Boolean[] result = new Boolean[gets.size()];
boolean[] exists = existsAll(gets);
for (int i = 0; i < gets.size(); ++i) {
result[i] = exists[i];
}
return result;
}

@Override
Expand Down Expand Up @@ -480,11 +505,16 @@ public Result call() throws IOException {
get.getMaxVersions(), null);
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
true);
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString));
obTableQuery.setObKVParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString));

clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
if (get.isCheckExistenceOnly() ) {
Result result = new Result();
result.setExists(clientQueryStreamResult.getCacheRows().size() != 0);
return result;
}
getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family);
} else {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
Expand All @@ -495,11 +525,25 @@ public Result call() throws IOException {

obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true);

if (get.isClosestRowBefore()) {
HexyinUESTC marked this conversation as resolved.
Show resolved Hide resolved
obTableQuery = buildObTableQuery(filter, null, false,
get.getRow(), true);
obTableQuery.setScanOrder(ObScanOrder.Reverse);
} else {
obTableQuery = buildObTableQuery(filter, get.getRow(), true,
get.getRow(), true);
}

obTableQuery.setObKVParams(buildObHBaseParams(null, get));
request = buildObTableQueryRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
.execute(request);
if (get.isCheckExistenceOnly() ) {
Result result = new Result();
result.setExists(clientQueryStreamResult.getCacheRows().size() != 0);
return result;
}
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
family);
}
Expand Down Expand Up @@ -573,7 +617,6 @@ public ResultScanner call() throws IOException {
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
scan.getMaxVersions(), entry.getValue());
obTableQuery = buildObTableQuery(filter, scan);

request = buildObTableQueryAsyncRequest(obTableQuery,
getTargetTableName(tableNameString, Bytes.toString(family)));
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
Expand All @@ -595,6 +638,23 @@ public ResultScanner call() throws IOException {
return executeServerCallable(serverCallable);
}

public ObKVParams buildObHBaseParams(Scan scan, Get get) {
ObKVParams obKVParams = new ObKVParams();
ObHBaseParams obHBaseParams = new ObHBaseParams();
if (scan != null) {
obHBaseParams.setCaching(scan.getCaching());
obHBaseParams.setCallTimeout(scannerTimeout);
obHBaseParams.setCacheBlock(scan.isGetScan());
obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults());
}
if (get != null) {
obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly());
obHBaseParams.setCacheBlock(get.getCacheBlocks());
}
obKVParams.setObParamsBase(obHBaseParams);
return obKVParams;
}

@Override
public ResultScanner getScanner(final byte[] family) throws IOException {
Scan scan = new Scan();
Expand Down Expand Up @@ -1202,22 +1262,19 @@ public void setOperationTimeout(int operationTimeout) {
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
}

// todo
@Override
public int getOperationTimeout() {
throw new FeatureNotSupportedException("not supported yet.");
return operationTimeout;
}

//todo
// rpcTimeout means server max execute time, equal Table API rpc_execute_time, it must be set before OHTable init; please pass this parameter through conf
@Override
public void setRpcTimeout(int i) {
throw new FeatureNotSupportedException("not supported yet.");
public void setRpcTimeout(int rpcTimeout) {
}

// todo
@Override
public int getRpcTimeout() {
throw new FeatureNotSupportedException("not supported yet.");
return Integer.parseInt(configuration.get(Property.RPC_EXECUTE_TIMEOUT.getKey()));
}

public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
Expand Down Expand Up @@ -1401,6 +1458,10 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
if (scan.getBatch() > 0) {
obTableQuery.setBatchSize(scan.getBatch());
}
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : configuration.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
obTableQuery.setObKVParams(buildObHBaseParams(scan, null));
return obTableQuery;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.alipay.oceanbase.hbase.core.Lifecycle;
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
import com.alipay.oceanbase.rpc.property.Property;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public Result next() throws IOException {
List<KeyValue> keyValues = new ArrayList<KeyValue>();
keyValues.add(startKeyValue);

while (streamNext = streamResult.next()) {
int size = streamResult.getCacheRows().size();
int current = 0;
while (streamNext = streamResult.next() && current < size){
List<ObObj> row = streamResult.getRow();
if (this.isTableGroup) {
// split family and qualifier
Expand All @@ -126,6 +128,7 @@ public Result next() throws IOException {
if (Arrays.equals(sk, k)) {
// when rowKey is equal to the previous rowKey ,merge the result into the same result
keyValues.add(new KeyValue(k, family, q, t, v));
current++;
} else {
break;
}
Expand Down
137 changes: 136 additions & 1 deletion src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.alipay.oceanbase.hbase;

import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;

import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -50,6 +52,109 @@ public abstract class HTableTestBase {

protected Table hTable;

@Test
public void testScanWithObParams() throws Exception {
String key1 = "scanKey1x";
String key2 = "scanKey2x";
String key3 = "scanKey3x";
String key4 = "scanKey4x";
String column1 = "column1";
String column2 = "column2";
String value1 = "value1";
String value2 = "value2";
String family = "family1";

// delete previous data
Delete deleteKey1Family = new Delete(toBytes(key1));
deleteKey1Family.deleteFamily(toBytes(family));
Delete deleteKey2Family = new Delete(toBytes(key2));
deleteKey2Family.deleteFamily(toBytes(family));
Delete deleteKey3Family = new Delete(toBytes(key3));
deleteKey3Family.deleteFamily(toBytes(family));
Delete deleteKey4Family = new Delete(toBytes(key4));
deleteKey4Family.deleteFamily(toBytes(family));

hTable.delete(deleteKey1Family);
hTable.delete(deleteKey2Family);
hTable.delete(deleteKey3Family);
hTable.delete(deleteKey4Family);

Put putKey1Column1Value1 = new Put(toBytes(key1));
putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

Put putKey1Column1Value2 = new Put(toBytes(key1));
putKey1Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));

Put putKey1Column2Value2 = new Put(toBytes(key1));
putKey1Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));

Put putKey1Column2Value1 = new Put(toBytes(key1));
putKey1Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

Put putKey2Column1Value1 = new Put(toBytes(key2));
putKey2Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

Put putKey2Column1Value2 = new Put(toBytes(key2));
putKey2Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));

Put putKey2Column2Value2 = new Put(toBytes(key2));
putKey2Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));

Put putKey2Column2Value1 = new Put(toBytes(key2));
putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));

Put putKey3Column1Value1 = new Put(toBytes(key3));
putKey3Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

Put putKey4Column1Value1 = new Put(toBytes(key4));
putKey4Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));

tryPut(hTable, putKey1Column1Value1);
tryPut(hTable, putKey1Column1Value2);
tryPut(hTable, putKey1Column1Value1); // 2 * putKey1Column1Value1
tryPut(hTable, putKey1Column2Value1);
tryPut(hTable, putKey1Column2Value2);
tryPut(hTable, putKey1Column2Value1); // 2 * putKey1Column2Value1
tryPut(hTable, putKey1Column2Value2); // 2 * putKey1Column2Value2
tryPut(hTable, putKey2Column2Value1);
tryPut(hTable, putKey2Column2Value2);
tryPut(hTable, putKey3Column1Value1);
tryPut(hTable, putKey4Column1Value1);

Scan scan;

scan = new Scan();
scan.addFamily(family.getBytes());
scan.setStartRow("scanKey1x".getBytes());
scan.setStopRow("scanKey5x".getBytes());
scan.setMaxVersions(10);
scan.setCaching(1);
scan.setBatch(3);
ResultScanner scanner = hTable.getScanner(scan);
Result result = scanner.next();
Assert.assertEquals(3, result.size());
scanner.close();

scan.setMaxResultSize(10);
scan.setBatch(-1);
ResultScanner scanner1 = hTable.getScanner(scan);
result = scanner1.next();
Assert.assertEquals(7, result.size()); // 返回第一行全部数据,因为不允许行内部分返回

scanner1.close();

scan.setAllowPartialResults(true);
ResultScanner scanner2 = hTable.getScanner(scan);
result = scanner2.next();
Assert.assertEquals(1, result.size());


hTable.delete(deleteKey1Family);
hTable.delete(deleteKey2Family);
hTable.delete(deleteKey3Family);
hTable.delete(deleteKey4Family);
}

@Test
public void testTableGroup() throws IOError, IOException {
/*
Expand Down Expand Up @@ -2222,11 +2327,23 @@ public void testCheckAndPut() throws IOException, InterruptedException {
boolean ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
value.getBytes(), put);
Assert.assertTrue(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER, "value1".getBytes(), put);
Assert.assertFalse(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER_OR_EQUAL, "value1".getBytes(), put);
Assert.assertTrue(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS, "".getBytes(), put);
Assert.assertFalse(ret);
ret = hTable.checkAndPut(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS_OR_EQUAL, "".getBytes(), put);
Assert.assertFalse(ret);
get = new Get(key.getBytes());
get.setMaxVersions(Integer.MAX_VALUE);
get.addColumn(family.getBytes(), column.getBytes());
r = hTable.get(get);
Assert.assertEquals(2, r.raw().length);
Assert.assertEquals(3, r.raw().length);
Assert.assertEquals("value1", Bytes.toString(r.raw()[0].getValue()));
}

Expand All @@ -2251,6 +2368,24 @@ public void testCheckAndDelete() throws IOException {
boolean ret = hTable.checkAndDelete(key.getBytes(), family.getBytes(), column.getBytes(),
value.getBytes(), delete);
Assert.assertTrue(ret);
put.add(family.getBytes(), column.getBytes(), "value6".getBytes());
hTable.put(put);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER, "value5".getBytes(), delete);
Assert.assertTrue(ret);
put.add(family.getBytes(), column.getBytes(), "value5".getBytes());
hTable.put(put);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.GREATER_OR_EQUAL, "value5".getBytes(), delete);
Assert.assertTrue(ret);
put.add(family.getBytes(), column.getBytes(), "value1".getBytes());
hTable.put(put);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS, "value1".getBytes(), delete);
Assert.assertFalse(ret);
ret = hTable.checkAndDelete(key.getBytes(), "family1".getBytes(), column.getBytes(),
CompareFilter.CompareOp.LESS_OR_EQUAL, "value1".getBytes(), delete);
Assert.assertTrue(ret);
Get get = new Get(key.getBytes());
get.setMaxVersions(Integer.MAX_VALUE);
get.addColumn(family.getBytes(), column.getBytes());
Expand Down
1 change: 1 addition & 0 deletions src/test/java/com/alipay/oceanbase/hbase/OHTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.exception.ObTableNotExistException;
import com.alipay.oceanbase.rpc.property.Property;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
Expand Down